This is an automated email from the ASF dual-hosted git repository.
sajjad pushed a commit to branch hotfix-unicode
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/hotfix-unicode by this push:
new ed640e6acdb Cherry picks changes for improvement of Schema Update
Refresh message (#16756)
ed640e6acdb is described below
commit ed640e6acdb1cff507008a2780be0b1423eb1d8d
Author: Praveen <[email protected]>
AuthorDate: Thu Sep 4 13:37:53 2025 -0700
Cherry picks changes for improvement of Schema Update Refresh message
(#16756)
---
.../helix/core/PinotHelixResourceManager.java | 238 ++++++---------------
.../realtime/PinotLLCRealtimeSegmentManager.java | 29 +--
.../helix/core/relocation/SegmentRelocator.java | 19 +-
.../helix/core/util/MessagingServiceUtils.java | 74 +++++++
.../core/relocation/SegmentRelocatorTest.java | 4 +-
.../core/data/manager/BaseTableDataManager.java | 5 +-
.../core/operator/filter/MapFilterOperator.java | 14 +-
.../apache/pinot/core/startree/StarTreeUtils.java | 24 ++-
.../tests/OfflineClusterIntegrationTest.java | 25 +++
.../segment/readers/PinotSegmentColumnReader.java | 1 -
10 files changed, 211 insertions(+), 222 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index b097ecd05bf..c26db20c093 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -154,6 +154,7 @@ import
org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils;
+import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
import org.apache.pinot.controller.helix.starter.HelixConfig;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.DatabaseConfig;
@@ -195,10 +196,8 @@ public class PinotHelixResourceManager {
private static final RetryPolicy DEFAULT_RETRY_POLICY =
RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f);
private static final int DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY = 5;
public static final String APPEND = "APPEND";
- private static final int DEFAULT_IDEAL_STATE_UPDATER_LOCKERS_SIZE = 500;
private static final int DEFAULT_LINEAGE_UPDATER_LOCKERS_SIZE = 500;
private static final String API_REQUEST_ID_PREFIX = "api-";
- private static final int INFINITE_TIMEOUT = -1;
private enum LineageUpdateType {
START, END, REVERT
@@ -207,8 +206,6 @@ public class PinotHelixResourceManager {
// TODO: make this configurable
public static final long EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS = 10 *
60_000L; // 10 minutes
public static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1_000L; // 1
second
- public static final long SEGMENT_CLEANUP_TIMEOUT_MS = 20 * 60_000L; // 20
minutes
- public static final long SEGMENT_CLEANUP_CHECK_INTERVAL_MS = 1_000L; // 1
second
private static final DateTimeFormatter SIMPLE_DATE_FORMAT =
DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss'Z'").withZone(ZoneOffset.UTC);
@@ -1558,11 +1555,9 @@ public class PinotHelixResourceManager {
reloadAllSegments(tableNameWithType, false, null);
}
} else {
- // Send schema refresh message to all tables that use this schema
+ LOGGER.info("Refreshing schema for tables with name: {}", schemaName);
for (String tableNameWithType : tableNamesWithType) {
- LOGGER.info("Sending updated schema message for table: {}",
tableNameWithType);
- sendTableConfigSchemaRefreshMessage(tableNameWithType,
getServerInstancesForTable(tableNameWithType,
- TableNameBuilder.getTableTypeFromTableName(tableNameWithType)));
+ sendTableConfigSchemaRefreshMessage(tableNameWithType);
}
}
} catch (TableNotFoundException e) {
@@ -2747,24 +2742,10 @@ public class PinotHelixResourceManager {
* Delete the table on servers by sending table deletion messages.
*/
private void deleteTableOnServers(String tableNameWithType) {
- // External view can be null for newly created table, skip sending messages
- if
(_helixDataAccessor.getProperty(_keyBuilder.externalView(tableNameWithType)) ==
null) {
- LOGGER.warn("No delete table message sent for newly created table: {}
without external view", tableNameWithType);
- return;
- }
-
LOGGER.info("Sending delete table messages for table: {}",
tableNameWithType);
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName("%");
- recipientCriteria.setResource(tableNameWithType);
- recipientCriteria.setSessionSpecific(true);
- TableDeletionMessage tableDeletionMessage = new
TableDeletionMessage(tableNameWithType);
ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
-
- // Infinite timeout on the recipient
- int timeoutMs = -1;
- int numMessagesSent = messagingService.send(recipientCriteria,
tableDeletionMessage, null, timeoutMs);
+ TableDeletionMessage message = new TableDeletionMessage(tableNameWithType);
+ int numMessagesSent = MessagingServiceUtils.send(messagingService,
message, tableNameWithType);
if (numMessagesSent > 0) {
LOGGER.info("Sent {} delete table messages for table: {}",
numMessagesSent, tableNameWithType);
} else {
@@ -2809,27 +2790,21 @@ public class PinotHelixResourceManager {
Preconditions.checkArgument(tt == TableType.OFFLINE,
"Table: %s is not an OFFLINE table, which is required to force to
download segments", tableNameWithType);
}
- // Infinite timeout on the recipient
- int timeoutMs = -1;
+
+ ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
Map<String, Pair<Integer, String>> instanceMsgInfoMap = new HashMap<>();
for (Map.Entry<String, List<String>> entry :
instanceToSegmentsMap.entrySet()) {
String targetInstance = entry.getKey();
- List<String> segments = entry.getValue();
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName(targetInstance);
- recipientCriteria.setResource(tableNameWithType);
- recipientCriteria.setSessionSpecific(true);
- SegmentReloadMessage segmentReloadMessage = new
SegmentReloadMessage(tableNameWithType, segments, forceDownload);
- ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
- int numMessagesSent = messagingService.send(recipientCriteria,
segmentReloadMessage, null, timeoutMs);
+ SegmentReloadMessage message = new
SegmentReloadMessage(tableNameWithType, entry.getValue(), forceDownload);
+ int numMessagesSent =
+ MessagingServiceUtils.send(messagingService, message,
tableNameWithType, null, targetInstance);
if (numMessagesSent > 0) {
LOGGER.info("Sent {} reload messages to instance: {} for table: {}",
numMessagesSent, targetInstance,
tableNameWithType);
} else {
LOGGER.warn("No reload message sent to instance: {} for table: {}",
targetInstance, tableNameWithType);
}
- instanceMsgInfoMap.put(targetInstance, Pair.of(numMessagesSent,
segmentReloadMessage.getMsgId()));
+ instanceMsgInfoMap.put(targetInstance, Pair.of(numMessagesSent,
message.getMsgId()));
}
return instanceMsgInfoMap;
}
@@ -2846,24 +2821,17 @@ public class PinotHelixResourceManager {
"Table: %s is not an OFFLINE table, which is required to force to
download segments", tableNameWithType);
}
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName(targetInstance == null ? "%" :
targetInstance);
- recipientCriteria.setResource(tableNameWithType);
- recipientCriteria.setSessionSpecific(true);
- SegmentReloadMessage segmentReloadMessage = new
SegmentReloadMessage(tableNameWithType, forceDownload);
ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
-
- // Infinite timeout on the recipient
- int timeoutMs = -1;
- int numMessagesSent = messagingService.send(recipientCriteria,
segmentReloadMessage, null, timeoutMs);
+ SegmentReloadMessage message = new SegmentReloadMessage(tableNameWithType,
forceDownload);
+ int numMessagesSent =
+ MessagingServiceUtils.send(messagingService, message,
tableNameWithType, null, targetInstance);
if (numMessagesSent > 0) {
LOGGER.info("Sent {} reload messages for table: {}", numMessagesSent,
tableNameWithType);
} else {
LOGGER.warn("No reload message sent for table: {}", tableNameWithType);
}
- return Pair.of(numMessagesSent, segmentReloadMessage.getMsgId());
+ return Pair.of(numMessagesSent, message.getMsgId());
}
public Pair<Integer, String> reloadSegment(String tableNameWithType, String
segmentName, boolean forceDownload,
@@ -2879,26 +2847,18 @@ public class PinotHelixResourceManager {
segmentName);
}
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName(targetInstance == null ? "%" :
targetInstance);
- recipientCriteria.setResource(tableNameWithType);
- recipientCriteria.setPartition(segmentName);
- recipientCriteria.setSessionSpecific(true);
- SegmentReloadMessage segmentReloadMessage =
- new SegmentReloadMessage(tableNameWithType,
Collections.singletonList(segmentName), forceDownload);
ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
-
- // Infinite timeout on the recipient
- int timeoutMs = -1;
- int numMessagesSent = messagingService.send(recipientCriteria,
segmentReloadMessage, null, timeoutMs);
+ SegmentReloadMessage message = new SegmentReloadMessage(tableNameWithType,
List.of(segmentName), forceDownload);
+ int numMessagesSent =
+ MessagingServiceUtils.send(messagingService, message,
tableNameWithType, segmentName, targetInstance);
if (numMessagesSent > 0) {
LOGGER.info("Sent {} reload messages for segment: {} in table: {}",
numMessagesSent, segmentName,
tableNameWithType);
} else {
LOGGER.warn("No reload message sent for segment: {} in table: {}",
segmentName, tableNameWithType);
}
- return Pair.of(numMessagesSent, segmentReloadMessage.getMsgId());
+
+ return Pair.of(numMessagesSent, message.getMsgId());
}
/**
@@ -2984,9 +2944,10 @@ public class PinotHelixResourceManager {
* This util is similar to {@link HelixAdmin#resetPartition(String, String,
String, List)}.
* However instead of resetting only the ERROR state to its initial state.
we reset all state regardless.
*/
- private void resetPartitionAllState(String instanceName, String
resourceName, Set<String> resetPartitionNames) {
- LOGGER.info("Reset partitions {} for resource {} on instance {} in cluster
{}.",
- resetPartitionNames == null ? "NULL" : resetPartitionNames,
resourceName, instanceName, _helixClusterName);
+ @VisibleForTesting
+ void resetPartitionAllState(String instanceName, String resourceName,
Set<String> resetPartitionNames) {
+ LOGGER.info("Resetting partitions: {} for resource: {} on instance: {}",
resetPartitionNames, resourceName,
+ instanceName);
HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
@@ -3022,7 +2983,7 @@ public class PinotHelixResourceManager {
+ message.getResourceName());
}
- String adminName = null;
+ String adminName;
try {
adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
} catch (UnknownHostException e) {
@@ -3074,21 +3035,12 @@ public class PinotHelixResourceManager {
*/
public void sendSegmentRefreshMessage(String tableNameWithType, String
segmentName, boolean refreshServerSegment,
boolean refreshBrokerRouting) {
- SegmentRefreshMessage segmentRefreshMessage = new
SegmentRefreshMessage(tableNameWithType, segmentName);
-
- // Send segment refresh message to servers
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName("%");
- recipientCriteria.setSessionSpecific(true);
ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
+ SegmentRefreshMessage message = new
SegmentRefreshMessage(tableNameWithType, segmentName);
+ // Send segment refresh message to servers
if (refreshServerSegment) {
- // Send segment refresh message to servers
- recipientCriteria.setResource(tableNameWithType);
- recipientCriteria.setPartition(segmentName);
- // Send message with no callback and infinite timeout on the recipient
- int numMessagesSent = messagingService.send(recipientCriteria,
segmentRefreshMessage, null, -1);
+ int numMessagesSent = MessagingServiceUtils.send(messagingService,
message, tableNameWithType, segmentName, null);
if (numMessagesSent > 0) {
// TODO: Would be nice if we can get the name of the instances to
which messages were sent
LOGGER.info("Sent {} segment refresh messages to servers for segment:
{} of table: {}", numMessagesSent,
@@ -3099,11 +3051,11 @@ public class PinotHelixResourceManager {
}
}
+ // Send segment refresh message to brokers
if (refreshBrokerRouting) {
- // Send segment refresh message to brokers
- recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
- recipientCriteria.setPartition(tableNameWithType);
- int numMessagesSent = messagingService.send(recipientCriteria,
segmentRefreshMessage, null, -1);
+ int numMessagesSent =
+ MessagingServiceUtils.send(messagingService, message,
Helix.BROKER_RESOURCE_INSTANCE, tableNameWithType,
+ null);
if (numMessagesSent > 0) {
// TODO: Would be nice if we can get the name of the instances to
which messages were sent
LOGGER.info("Sent {} segment refresh messages to brokers for segment:
{} of table: {}", numMessagesSent,
@@ -3115,19 +3067,12 @@ public class PinotHelixResourceManager {
}
}
+ /// Sends table config refresh message to brokers.
private void sendTableConfigRefreshMessage(String tableNameWithType) {
- TableConfigRefreshMessage tableConfigRefreshMessage = new
TableConfigRefreshMessage(tableNameWithType);
-
- // Send table config refresh message to brokers
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName("%");
- recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
- recipientCriteria.setSessionSpecific(true);
- recipientCriteria.setPartition(tableNameWithType);
- // Send message with no callback and infinite timeout on the recipient
+ ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
+ TableConfigRefreshMessage message = new
TableConfigRefreshMessage(tableNameWithType);
int numMessagesSent =
- _helixZkManager.getMessagingService().send(recipientCriteria,
tableConfigRefreshMessage, null, -1);
+ MessagingServiceUtils.send(messagingService, message,
Helix.BROKER_RESOURCE_INSTANCE, tableNameWithType, null);
if (numMessagesSent > 0) {
// TODO: Would be nice if we can get the name of the instances to which
messages were sent
LOGGER.info("Sent {} table config refresh messages to brokers for table:
{}", numMessagesSent, tableNameWithType);
@@ -3136,19 +3081,23 @@ public class PinotHelixResourceManager {
}
}
+ /// Sends table config and schema refresh message to servers.
+ private void sendTableConfigSchemaRefreshMessage(String tableNameWithType) {
+ ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
+ TableConfigSchemaRefreshMessage message = new
TableConfigSchemaRefreshMessage(tableNameWithType);
+ int numMessagesSent = MessagingServiceUtils.send(messagingService,
message, tableNameWithType);
+ if (numMessagesSent > 0) {
+ LOGGER.info("Sent {} table config and schema refresh messages for table:
{}", numMessagesSent, tableNameWithType);
+ } else {
+ LOGGER.warn("No table config and schema refresh message sent for table:
{}", tableNameWithType);
+ }
+ }
+
private void sendLogicalTableConfigRefreshMessage(String logicalTableName) {
- LogicalTableConfigRefreshMessage refreshMessage = new
LogicalTableConfigRefreshMessage(logicalTableName);
-
- // Send logical table config refresh message to brokers
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName("%");
- recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
- recipientCriteria.setSessionSpecific(true);
- recipientCriteria.setPartition(logicalTableName);
- // Send message with no callback and infinite timeout on the recipient
+ ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
+ LogicalTableConfigRefreshMessage message = new
LogicalTableConfigRefreshMessage(logicalTableName);
int numMessagesSent =
- _helixZkManager.getMessagingService().send(recipientCriteria,
refreshMessage, null, -1);
+ MessagingServiceUtils.send(messagingService, message,
Helix.BROKER_RESOURCE_INSTANCE, logicalTableName, null);
if (numMessagesSent > 0) {
LOGGER.info("Sent {} logical table config refresh messages to brokers
for table: {}", numMessagesSent,
logicalTableName);
@@ -3158,18 +3107,11 @@ public class PinotHelixResourceManager {
}
private void sendApplicationQpsQuotaRefreshMessage(String appName) {
+ ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
ApplicationQpsQuotaRefreshMessage message = new
ApplicationQpsQuotaRefreshMessage(appName);
-
- // Send database config refresh message to brokers
- Criteria criteria = new Criteria();
- criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- criteria.setInstanceName("%");
- criteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
- criteria.setSessionSpecific(true);
-
- int numMessagesSent = _helixZkManager.getMessagingService().send(criteria,
message, null, INFINITE_TIMEOUT);
+ int numMessagesSent = MessagingServiceUtils.send(messagingService,
message, Helix.BROKER_RESOURCE_INSTANCE);
if (numMessagesSent > 0) {
- LOGGER.info("Sent {} applcation qps quota refresh messages to brokers
for application: {}", numMessagesSent,
+ LOGGER.info("Sent {} application qps quota refresh messages to brokers
for application: {}", numMessagesSent,
appName);
} else {
LOGGER.warn("No application qps quota refresh message sent to brokers
for application: {}", appName);
@@ -3177,17 +3119,9 @@ public class PinotHelixResourceManager {
}
private void sendDatabaseConfigRefreshMessage(String databaseName) {
- DatabaseConfigRefreshMessage databaseConfigRefreshMessage = new
DatabaseConfigRefreshMessage(databaseName);
-
- // Send database config refresh message to brokers
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName("%");
- recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
- recipientCriteria.setSessionSpecific(true);
- // Send message with no callback and infinite timeout on the recipient
- int numMessagesSent =
- _helixZkManager.getMessagingService().send(recipientCriteria,
databaseConfigRefreshMessage, null, -1);
+ ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
+ DatabaseConfigRefreshMessage message = new
DatabaseConfigRefreshMessage(databaseName);
+ int numMessagesSent = MessagingServiceUtils.send(messagingService,
message, Helix.BROKER_RESOURCE_INSTANCE);
if (numMessagesSent > 0) {
LOGGER.info("Sent {} database config refresh messages to brokers for
database: {}", numMessagesSent,
databaseName);
@@ -3197,18 +3131,10 @@ public class PinotHelixResourceManager {
}
private void sendRoutingTableRebuildMessage(String tableNameWithType) {
- RoutingTableRebuildMessage routingTableRebuildMessage = new
RoutingTableRebuildMessage(tableNameWithType);
-
- // Send table config refresh message to brokers
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName("%");
- recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
- recipientCriteria.setSessionSpecific(true);
- recipientCriteria.setPartition(tableNameWithType);
- // Send message with no callback and infinite timeout on the recipient
+ ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
+ RoutingTableRebuildMessage message = new
RoutingTableRebuildMessage(tableNameWithType);
int numMessagesSent =
- _helixZkManager.getMessagingService().send(recipientCriteria,
routingTableRebuildMessage, null, -1);
+ MessagingServiceUtils.send(messagingService, message,
Helix.BROKER_RESOURCE_INSTANCE, tableNameWithType, null);
if (numMessagesSent > 0) {
// TODO: Would be nice if we can get the name of the instances to which
messages were sent
LOGGER.info("Sent {} routing table rebuild messages to brokers for
table: {}", numMessagesSent,
@@ -3218,27 +3144,6 @@ public class PinotHelixResourceManager {
}
}
- private void sendTableConfigSchemaRefreshMessage(String tableNameWithType,
List<String> instances) {
- TableConfigSchemaRefreshMessage refreshMessage = new
TableConfigSchemaRefreshMessage(tableNameWithType);
- for (String instance : instances) {
- // Send refresh message to servers
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName(instance);
- recipientCriteria.setSessionSpecific(true);
- ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
- // Send message with no callback and infinite timeout on the recipient
- int numMessagesSent = messagingService.send(recipientCriteria,
refreshMessage, null, -1);
- if (numMessagesSent > 0) {
- LOGGER.info("Sent {} schema refresh messages to servers for table: {}
for instance: {}", numMessagesSent,
- tableNameWithType, instance);
- } else {
- LOGGER.warn("No schema refresh message sent to servers for table: {}
for instance: {}", tableNameWithType,
- instance);
- }
- }
- }
-
/**
* Update the instance config given the broker instance id
*/
@@ -4714,27 +4619,17 @@ public class PinotHelixResourceManager {
public PeriodicTaskInvocationResponse invokeControllerPeriodicTask(String
tableName, String periodicTaskName,
Map<String, String> taskProperties) {
String periodicTaskRequestId = API_REQUEST_ID_PREFIX +
UUID.randomUUID().toString().substring(0, 8);
-
LOGGER.info("[TaskRequestId: {}] Sending periodic task message to all
controllers for running task {} against {},"
+ " with properties {}.\"", periodicTaskRequestId,
periodicTaskName,
tableName != null ? " table '" + tableName + "'" : "all tables",
taskProperties);
-
- // Create and send message to send to all controllers (including this one)
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName("%");
- recipientCriteria.setSessionSpecific(true);
-
recipientCriteria.setResource(CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
- recipientCriteria.setSelfExcluded(false);
- RunPeriodicTaskMessage runPeriodicTaskMessage =
+ ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
+ RunPeriodicTaskMessage message =
new RunPeriodicTaskMessage(periodicTaskRequestId, periodicTaskName,
tableName, taskProperties);
-
- ClusterMessagingService clusterMessagingService =
getHelixZkManager().getMessagingService();
- int messageCount = clusterMessagingService.send(recipientCriteria,
runPeriodicTaskMessage, null, -1);
-
+ int numMessagesSent =
+ MessagingServiceUtils.sendIncludingSelf(messagingService, message,
Helix.LEAD_CONTROLLER_RESOURCE_NAME);
LOGGER.info("[TaskRequestId: {}] Periodic task execution message sent to
{} controllers.", periodicTaskRequestId,
- messageCount);
- return new PeriodicTaskInvocationResponse(periodicTaskRequestId,
messageCount > 0);
+ numMessagesSent);
+ return new PeriodicTaskInvocationResponse(periodicTaskRequestId,
numMessagesSent > 0);
}
/**
@@ -4761,7 +4656,6 @@ public class PinotHelixResourceManager {
}
return tagMinInstanceMap;
}
-
/*
* Uncomment and use for testing on a real cluster
public static void main(String[] args) throws Exception {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 012ef8d2d5b..f7bcd221d0a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -55,10 +55,8 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.ClusterMessagingService;
-import org.apache.helix.Criteria;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
-import org.apache.helix.InstanceType;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -99,6 +97,7 @@ import
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpd
import
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
import
org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
import
org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionStrategy;
+import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
@@ -1410,16 +1409,10 @@ public class PinotLLCRealtimeSegmentManager {
newConsumingSegment, newInstances, realtimeTableName,
instancesNoLongerServe);
ClusterMessagingService messagingService =
_helixManager.getMessagingService();
+ IngestionMetricsRemoveMessage message = new
IngestionMetricsRemoveMessage();
List<String> instancesSent = new
ArrayList<>(instancesNoLongerServe.size());
for (String instance : instancesNoLongerServe) {
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setInstanceName(instance);
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setResource(realtimeTableName);
- recipientCriteria.setPartition(committedSegment);
- recipientCriteria.setSessionSpecific(true);
- IngestionMetricsRemoveMessage message = new
IngestionMetricsRemoveMessage();
- if (messagingService.send(recipientCriteria, message, null, -1) > 0) {
+ if (MessagingServiceUtils.send(messagingService, message,
realtimeTableName, committedSegment, instance) > 0) {
instancesSent.add(instance);
} else {
LOGGER.warn("Failed to send ingestion metrics remove message for
table: {} segment: {} to instance: {}",
@@ -2358,20 +2351,14 @@ public class PinotLLCRealtimeSegmentManager {
private void sendForceCommitMessageToServers(String tableNameWithType,
Set<String> consumingSegments) {
if (!consumingSegments.isEmpty()) {
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setInstanceName("%");
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setResource(tableNameWithType);
- recipientCriteria.setSessionSpecific(true);
+ LOGGER.info("Sending force commit messages for segments: {} of table:
{}", consumingSegments, tableNameWithType);
+ ClusterMessagingService messagingService =
_helixManager.getMessagingService();
ForceCommitMessage message = new ForceCommitMessage(tableNameWithType,
consumingSegments);
- int numMessagesSent =
_helixManager.getMessagingService().send(recipientCriteria, message, null, -1);
+ int numMessagesSent = MessagingServiceUtils.send(messagingService,
message, tableNameWithType);
if (numMessagesSent > 0) {
- LOGGER.info("Sent {} force commit messages for table: {} segments:
{}", numMessagesSent, tableNameWithType,
- consumingSegments);
+ LOGGER.info("Sent {} force commit messages for table: {}",
numMessagesSent, tableNameWithType);
} else {
- throw new RuntimeException(
- String.format("No force commit message was sent for table: %s
segments: %s", tableNameWithType,
- consumingSegments));
+ throw new IllegalStateException("No force commit message sent for
table: " + tableNameWithType);
}
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
index fd10148ce3e..729b0c49c72 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
@@ -33,8 +34,6 @@ import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.helix.ClusterMessagingService;
-import org.apache.helix.Criteria;
-import org.apache.helix.InstanceType;
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.messages.SegmentReloadMessage;
import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -47,6 +46,7 @@ import
org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
+import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
import org.apache.pinot.controller.util.TableTierReader;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -319,18 +319,11 @@ public class SegmentRelocator extends
ControllerPeriodicTask<Void> {
Map<String, Set<String>> serverToSegmentsToMigrate,
ClusterMessagingService messagingService) {
for (Map.Entry<String, Set<String>> entry :
serverToSegmentsToMigrate.entrySet()) {
String serverName = entry.getKey();
- Set<String> segmentNames = entry.getValue();
- // One SegmentReloadMessage per server but takes all segment names.
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName(serverName);
- recipientCriteria.setResource(tableNameWithType);
- recipientCriteria.setSessionSpecific(true);
- SegmentReloadMessage segmentReloadMessage =
- new SegmentReloadMessage(tableNameWithType, new
ArrayList<>(segmentNames), false);
+ List<String> segments = new ArrayList<>(entry.getValue());
LOGGER.info("Sending SegmentReloadMessage to server: {} to reload
segments: {} of table: {}", serverName,
- segmentNames, tableNameWithType);
- int numMessagesSent = messagingService.send(recipientCriteria,
segmentReloadMessage, null, -1);
+ segments, tableNameWithType);
+ SegmentReloadMessage message = new
SegmentReloadMessage(tableNameWithType, segments, false);
+ int numMessagesSent = MessagingServiceUtils.send(messagingService,
message, tableNameWithType, null, serverName);
if (numMessagesSent > 0) {
LOGGER.info("Sent SegmentReloadMessage to server: {} for table: {}",
serverName, tableNameWithType);
} else {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/MessagingServiceUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/MessagingServiceUtils.java
new file mode 100644
index 00000000000..88bf6f43433
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/MessagingServiceUtils.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.util;
+
+import javax.annotation.Nullable;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.Criteria;
+import org.apache.helix.InstanceType;
+import org.apache.helix.model.Message;
+
+
+public class MessagingServiceUtils {
+ private MessagingServiceUtils() {
+ }
+
+ /// Sends a message to the recipients specified by the criteria, returns the
number of messages being sent.
+ public static int send(ClusterMessagingService messagingService, Message
message, Criteria criteria) {
+ try {
+ return messagingService.send(criteria, message);
+ } catch (Exception e) {
+ // NOTE:
+ // It can throw exception when the target resource doesn't exist (e.g.
ExternalView has not been created yet). It
+ // is normal case, and we count it as no message being sent.
+ return 0;
+ }
+ }
+
+ public static int send(ClusterMessagingService messagingService, Message
message, String resource,
+ @Nullable String partition, @Nullable String instanceName, boolean
includingSelf) {
+ Criteria criteria = new Criteria();
+ criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ criteria.setSessionSpecific(true);
+ criteria.setResource(resource);
+ if (partition != null) {
+ criteria.setPartition(partition);
+ }
+ if (instanceName != null) {
+ criteria.setInstanceName(instanceName);
+ } else {
+ criteria.setInstanceName("%");
+ }
+ criteria.setSelfExcluded(!includingSelf);
+ return send(messagingService, message, criteria);
+ }
+
+ public static int send(ClusterMessagingService messagingService, Message
message, String resource,
+ @Nullable String partition, @Nullable String instanceName) {
+ return send(messagingService, message, resource, partition, instanceName,
false);
+ }
+
+ public static int send(ClusterMessagingService messagingService, Message
message, String resource) {
+ return send(messagingService, message, resource, null, null, false);
+ }
+
+ public static int sendIncludingSelf(ClusterMessagingService
messagingService, Message message, String resource) {
+ return send(messagingService, message, resource, null, null, true);
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
index e6bbcdd026a..6d058effdb6 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
@@ -44,7 +44,6 @@ import org.apache.pinot.util.TestUtils;
import org.mockito.ArgumentCaptor;
import org.testng.annotations.Test;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -94,8 +93,7 @@ public class SegmentRelocatorTest {
ArgumentCaptor<Criteria> criteriaCapture =
ArgumentCaptor.forClass(Criteria.class);
ArgumentCaptor<SegmentReloadMessage> reloadMessageCapture =
ArgumentCaptor.forClass(SegmentReloadMessage.class);
- verify(messagingService, times(2)).send(criteriaCapture.capture(),
reloadMessageCapture.capture(), eq(null),
- eq(-1));
+ verify(messagingService, times(2)).send(criteriaCapture.capture(),
reloadMessageCapture.capture());
List<Criteria> criteriaList = criteriaCapture.getAllValues();
List<SegmentReloadMessage> msgList = reloadMessageCapture.getAllValues();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 1e283ca29b9..a2f2b844d11 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -1372,10 +1372,9 @@ public abstract class BaseTableDataManager implements
TableDataManager {
for (String columnName : segmentPhysicalColumns) {
ColumnMetadata columnMetadata =
segmentMetadata.getColumnMetadataFor(columnName);
- FieldSpec fieldSpecInSchema = schema.getFieldSpecFor(columnName);
DataSource source = segment.getDataSource(columnName);
- Preconditions.checkNotNull(columnMetadata);
- Preconditions.checkNotNull(source);
+ assert columnMetadata != null && source != null;
+ FieldSpec fieldSpecInSchema = schema.getFieldSpecFor(columnName);
// Column is deleted
if (fieldSpecInSchema == null) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapFilterOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapFilterOperator.java
index ee413306b3b..bc4b19e52b5 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapFilterOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapFilterOperator.java
@@ -65,10 +65,14 @@ public class MapFilterOperator extends BaseFilterOperator {
_columnName = arguments.get(0).getIdentifier();
_keyName = arguments.get(1).getLiteral().getStringValue();
- // Get JSON index and create operator
- DataSource dataSource = indexSegment.getDataSource(_columnName);
- JsonIndexReader jsonIndex = dataSource.getJsonIndex();
- if (jsonIndex != null && useJsonIndex(_predicate.getType())) {
+ JsonIndexReader jsonIndex = null;
+ if (canUseJsonIndex(_predicate.getType())) {
+ DataSource dataSource = indexSegment.getDataSourceNullable(_columnName);
+ if (dataSource != null) {
+ jsonIndex = dataSource.getJsonIndex();
+ }
+ }
+ if (jsonIndex != null) {
FilterContext filterContext = createFilterContext();
_jsonMatchOperator = new JsonMatchFilterOperator(jsonIndex,
filterContext, numDocs);
_expressionFilterOperator = null;
@@ -201,7 +205,7 @@ public class MapFilterOperator extends BaseFilterOperator {
* @param predicateType The type of predicate
* @return true if the predicate type is supported for JSON index, false
otherwise
*/
- private boolean useJsonIndex(Predicate.Type predicateType) {
+ private static boolean canUseJsonIndex(Predicate.Type predicateType) {
switch (predicateType) {
case EQ:
case NOT_EQ:
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
index d5180285955..c422a64a7ee 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
@@ -324,7 +324,11 @@ public class StarTreeUtils {
return null;
}
String column = lhs.getIdentifier();
- DataSource dataSource = indexSegment.getDataSource(column);
+ DataSource dataSource = indexSegment.getDataSourceNullable(column);
+ if (dataSource == null) {
+ // Star-tree does not support non-existent column
+ return null;
+ }
Dictionary dictionary = dataSource.getDictionary();
if (dictionary == null) {
// Star-tree does not support non-dictionary encoded dimension
@@ -388,7 +392,11 @@ public class StarTreeUtils {
}
String column = aggregationFunctionColumnPair.getColumn();
- DataSource dataSource = indexSegment.getDataSource(column);
+ DataSource dataSource = indexSegment.getDataSourceNullable(column);
+ if (dataSource == null) {
+ LOGGER.debug("Cannot use star-tree index because aggregation column:
'{}' does not exist", column);
+ return null;
+ }
if (dataSource.getNullValueVector() != null &&
!dataSource.getNullValueVector().getNullBitmap().isEmpty()) {
LOGGER.debug("Cannot use star-tree index because aggregation column:
'{}' has null values", column);
return null;
@@ -396,7 +404,11 @@ public class StarTreeUtils {
}
for (String column : predicateEvaluatorsMap.keySet()) {
- DataSource dataSource = indexSegment.getDataSource(column);
+ DataSource dataSource = indexSegment.getDataSourceNullable(column);
+ if (dataSource == null) {
+ LOGGER.debug("Cannot use star-tree index because filter column: '{}'
does not exist", column);
+ return null;
+ }
if (dataSource.getNullValueVector() != null &&
!dataSource.getNullValueVector().getNullBitmap().isEmpty()) {
LOGGER.debug("Cannot use star-tree index because filter column: '{}'
has null values", column);
return null;
@@ -410,7 +422,11 @@ public class StarTreeUtils {
}
}
for (String column : groupByColumns) {
- DataSource dataSource = indexSegment.getDataSource(column);
+ DataSource dataSource = indexSegment.getDataSourceNullable(column);
+ if (dataSource == null) {
+ LOGGER.debug("Cannot use star-tree index because group-by column:
'{}' does not exist", column);
+ return null;
+ }
if (dataSource.getNullValueVector() != null &&
!dataSource.getNullValueVector().getNullBitmap().isEmpty()) {
LOGGER.debug("Cannot use star-tree index because group-by column:
'{}' has null values", column);
return null;
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 1c87c1eee3d..89153e55ba3 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -1697,6 +1697,19 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
ingestionConfig.setTransformConfigs(transformConfigs);
tableConfig.setIngestionConfig(ingestionConfig);
updateTableConfig(tableConfig);
+
+ // Query the new added columns without reload
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ JsonNode response = postQuery(TEST_STAR_TREE_QUERY_3);
+ return response.get("resultTable").get("rows").get(0).get(0).asInt()
== 0;
+ } catch (Exception e) {
+ return false;
+ }
+ }, 60_000L, "Failed to query new added columns without reload");
+ // Table size shouldn't change without reload
+ assertEquals(getTableSize(getTableName()), _tableSize);
+
reloadAllSegments(TEST_STAR_TREE_QUERY_3, false, numTotalDocs);
int thirdQueryResult =
postQuery(TEST_STAR_TREE_QUERY_3_REFERENCE).get("resultTable").get("rows").get(0).get(0).asInt();
@@ -1865,6 +1878,18 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
CompressionCodec.MV_ENTRY_DICT, null));
updateTableConfig(tableConfig);
+ // Query the new added columns without reload
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ JsonNode response = postQuery(SELECT_STAR_QUERY);
+ return
response.get("resultTable").get("dataSchema").get("columnNames").size() == 104;
+ } catch (Exception e) {
+ return false;
+ }
+ }, 60_000L, "Failed to query new added columns without reload");
+ // Table size shouldn't change without reload
+ assertEquals(getTableSize(getTableName()), _tableSize);
+
// Trigger reload and verify column count
reloadAllSegments(TEST_EXTRA_COLUMNS_QUERY, false, numTotalDocs);
JsonNode segmentsMetadata = JsonUtils.stringToJsonNode(
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
index 01257777777..4a691e4fc42 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
@@ -42,7 +42,6 @@ public class PinotSegmentColumnReader implements Closeable {
public PinotSegmentColumnReader(IndexSegment indexSegment, String column) {
DataSource dataSource = indexSegment.getDataSource(column);
- Preconditions.checkArgument(dataSource != null, "Failed to find data
source for column: %s", column);
_forwardIndexReader = dataSource.getForwardIndex();
Preconditions.checkArgument(_forwardIndexReader != null, "Forward index
disabled for column: %s", column);
_forwardIndexReaderContext = _forwardIndexReader.createContext();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]