This is an automated email from the ASF dual-hosted git repository.

sajjad pushed a commit to branch hotfix-unicode
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/hotfix-unicode by this push:
     new ed640e6acdb Cherry picks changes for improvement of Schema Update 
Refresh message (#16756)
ed640e6acdb is described below

commit ed640e6acdb1cff507008a2780be0b1423eb1d8d
Author: Praveen <[email protected]>
AuthorDate: Thu Sep 4 13:37:53 2025 -0700

    Cherry picks changes for improvement of Schema Update Refresh message 
(#16756)
---
 .../helix/core/PinotHelixResourceManager.java      | 238 ++++++---------------
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  29 +--
 .../helix/core/relocation/SegmentRelocator.java    |  19 +-
 .../helix/core/util/MessagingServiceUtils.java     |  74 +++++++
 .../core/relocation/SegmentRelocatorTest.java      |   4 +-
 .../core/data/manager/BaseTableDataManager.java    |   5 +-
 .../core/operator/filter/MapFilterOperator.java    |  14 +-
 .../apache/pinot/core/startree/StarTreeUtils.java  |  24 ++-
 .../tests/OfflineClusterIntegrationTest.java       |  25 +++
 .../segment/readers/PinotSegmentColumnReader.java  |   1 -
 10 files changed, 211 insertions(+), 222 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index b097ecd05bf..c26db20c093 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -154,6 +154,7 @@ import 
org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils;
+import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
 import org.apache.pinot.controller.helix.starter.HelixConfig;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.config.DatabaseConfig;
@@ -195,10 +196,8 @@ public class PinotHelixResourceManager {
   private static final RetryPolicy DEFAULT_RETRY_POLICY = 
RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f);
   private static final int DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY = 5;
   public static final String APPEND = "APPEND";
-  private static final int DEFAULT_IDEAL_STATE_UPDATER_LOCKERS_SIZE = 500;
   private static final int DEFAULT_LINEAGE_UPDATER_LOCKERS_SIZE = 500;
   private static final String API_REQUEST_ID_PREFIX = "api-";
-  private static final int INFINITE_TIMEOUT = -1;
 
   private enum LineageUpdateType {
     START, END, REVERT
@@ -207,8 +206,6 @@ public class PinotHelixResourceManager {
   // TODO: make this configurable
   public static final long EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS = 10 * 
60_000L; // 10 minutes
   public static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1_000L; // 1 
second
-  public static final long SEGMENT_CLEANUP_TIMEOUT_MS = 20 * 60_000L; // 20 
minutes
-  public static final long SEGMENT_CLEANUP_CHECK_INTERVAL_MS = 1_000L; // 1 
second
 
   private static final DateTimeFormatter SIMPLE_DATE_FORMAT =
       
DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss'Z'").withZone(ZoneOffset.UTC);
@@ -1558,11 +1555,9 @@ public class PinotHelixResourceManager {
           reloadAllSegments(tableNameWithType, false, null);
         }
       } else {
-        // Send schema refresh message to all tables that use this schema
+        LOGGER.info("Refreshing schema for tables with name: {}", schemaName);
         for (String tableNameWithType : tableNamesWithType) {
-          LOGGER.info("Sending updated schema message for table: {}", 
tableNameWithType);
-          sendTableConfigSchemaRefreshMessage(tableNameWithType, 
getServerInstancesForTable(tableNameWithType,
-              TableNameBuilder.getTableTypeFromTableName(tableNameWithType)));
+          sendTableConfigSchemaRefreshMessage(tableNameWithType);
         }
       }
     } catch (TableNotFoundException e) {
@@ -2747,24 +2742,10 @@ public class PinotHelixResourceManager {
    * Delete the table on servers by sending table deletion messages.
    */
   private void deleteTableOnServers(String tableNameWithType) {
-    // External view can be null for newly created table, skip sending messages
-    if 
(_helixDataAccessor.getProperty(_keyBuilder.externalView(tableNameWithType)) == 
null) {
-      LOGGER.warn("No delete table message sent for newly created table: {} 
without external view", tableNameWithType);
-      return;
-    }
-
     LOGGER.info("Sending delete table messages for table: {}", 
tableNameWithType);
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setResource(tableNameWithType);
-    recipientCriteria.setSessionSpecific(true);
-    TableDeletionMessage tableDeletionMessage = new 
TableDeletionMessage(tableNameWithType);
     ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
-
-    // Infinite timeout on the recipient
-    int timeoutMs = -1;
-    int numMessagesSent = messagingService.send(recipientCriteria, 
tableDeletionMessage, null, timeoutMs);
+    TableDeletionMessage message = new TableDeletionMessage(tableNameWithType);
+    int numMessagesSent = MessagingServiceUtils.send(messagingService, 
message, tableNameWithType);
     if (numMessagesSent > 0) {
       LOGGER.info("Sent {} delete table messages for table: {}", 
numMessagesSent, tableNameWithType);
     } else {
@@ -2809,27 +2790,21 @@ public class PinotHelixResourceManager {
       Preconditions.checkArgument(tt == TableType.OFFLINE,
           "Table: %s is not an OFFLINE table, which is required to force to 
download segments", tableNameWithType);
     }
-    // Infinite timeout on the recipient
-    int timeoutMs = -1;
+
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
     Map<String, Pair<Integer, String>> instanceMsgInfoMap = new HashMap<>();
     for (Map.Entry<String, List<String>> entry : 
instanceToSegmentsMap.entrySet()) {
       String targetInstance = entry.getKey();
-      List<String> segments = entry.getValue();
-      Criteria recipientCriteria = new Criteria();
-      recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-      recipientCriteria.setInstanceName(targetInstance);
-      recipientCriteria.setResource(tableNameWithType);
-      recipientCriteria.setSessionSpecific(true);
-      SegmentReloadMessage segmentReloadMessage = new 
SegmentReloadMessage(tableNameWithType, segments, forceDownload);
-      ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
-      int numMessagesSent = messagingService.send(recipientCriteria, 
segmentReloadMessage, null, timeoutMs);
+      SegmentReloadMessage message = new 
SegmentReloadMessage(tableNameWithType, entry.getValue(), forceDownload);
+      int numMessagesSent =
+          MessagingServiceUtils.send(messagingService, message, 
tableNameWithType, null, targetInstance);
       if (numMessagesSent > 0) {
         LOGGER.info("Sent {} reload messages to instance: {} for table: {}", 
numMessagesSent, targetInstance,
             tableNameWithType);
       } else {
         LOGGER.warn("No reload message sent to instance: {} for table: {}", 
targetInstance, tableNameWithType);
       }
-      instanceMsgInfoMap.put(targetInstance, Pair.of(numMessagesSent, 
segmentReloadMessage.getMsgId()));
+      instanceMsgInfoMap.put(targetInstance, Pair.of(numMessagesSent, 
message.getMsgId()));
     }
     return instanceMsgInfoMap;
   }
@@ -2846,24 +2821,17 @@ public class PinotHelixResourceManager {
           "Table: %s is not an OFFLINE table, which is required to force to 
download segments", tableNameWithType);
     }
 
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName(targetInstance == null ? "%" : 
targetInstance);
-    recipientCriteria.setResource(tableNameWithType);
-    recipientCriteria.setSessionSpecific(true);
-    SegmentReloadMessage segmentReloadMessage = new 
SegmentReloadMessage(tableNameWithType, forceDownload);
     ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
-
-    // Infinite timeout on the recipient
-    int timeoutMs = -1;
-    int numMessagesSent = messagingService.send(recipientCriteria, 
segmentReloadMessage, null, timeoutMs);
+    SegmentReloadMessage message = new SegmentReloadMessage(tableNameWithType, 
forceDownload);
+    int numMessagesSent =
+        MessagingServiceUtils.send(messagingService, message, 
tableNameWithType, null, targetInstance);
     if (numMessagesSent > 0) {
       LOGGER.info("Sent {} reload messages for table: {}", numMessagesSent, 
tableNameWithType);
     } else {
       LOGGER.warn("No reload message sent for table: {}", tableNameWithType);
     }
 
-    return Pair.of(numMessagesSent, segmentReloadMessage.getMsgId());
+    return Pair.of(numMessagesSent, message.getMsgId());
   }
 
   public Pair<Integer, String> reloadSegment(String tableNameWithType, String 
segmentName, boolean forceDownload,
@@ -2879,26 +2847,18 @@ public class PinotHelixResourceManager {
           segmentName);
     }
 
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName(targetInstance == null ? "%" : 
targetInstance);
-    recipientCriteria.setResource(tableNameWithType);
-    recipientCriteria.setPartition(segmentName);
-    recipientCriteria.setSessionSpecific(true);
-    SegmentReloadMessage segmentReloadMessage =
-        new SegmentReloadMessage(tableNameWithType, 
Collections.singletonList(segmentName), forceDownload);
     ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
-
-    // Infinite timeout on the recipient
-    int timeoutMs = -1;
-    int numMessagesSent = messagingService.send(recipientCriteria, 
segmentReloadMessage, null, timeoutMs);
+    SegmentReloadMessage message = new SegmentReloadMessage(tableNameWithType, 
List.of(segmentName), forceDownload);
+    int numMessagesSent =
+        MessagingServiceUtils.send(messagingService, message, 
tableNameWithType, segmentName, targetInstance);
     if (numMessagesSent > 0) {
       LOGGER.info("Sent {} reload messages for segment: {} in table: {}", 
numMessagesSent, segmentName,
           tableNameWithType);
     } else {
       LOGGER.warn("No reload message sent for segment: {} in table: {}", 
segmentName, tableNameWithType);
     }
-    return Pair.of(numMessagesSent, segmentReloadMessage.getMsgId());
+
+    return Pair.of(numMessagesSent, message.getMsgId());
   }
 
   /**
@@ -2984,9 +2944,10 @@ public class PinotHelixResourceManager {
    * This util is similar to {@link HelixAdmin#resetPartition(String, String, 
String, List)}.
    * However instead of resetting only the ERROR state to its initial state. 
we reset all state regardless.
    */
-  private void resetPartitionAllState(String instanceName, String 
resourceName, Set<String> resetPartitionNames) {
-    LOGGER.info("Reset partitions {} for resource {} on instance {} in cluster 
{}.",
-        resetPartitionNames == null ? "NULL" : resetPartitionNames, 
resourceName, instanceName, _helixClusterName);
+  @VisibleForTesting
+  void resetPartitionAllState(String instanceName, String resourceName, 
Set<String> resetPartitionNames) {
+    LOGGER.info("Resetting partitions: {} for resource: {} on instance: {}", 
resetPartitionNames, resourceName,
+        instanceName);
     HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
@@ -3022,7 +2983,7 @@ public class PinotHelixResourceManager {
           + message.getResourceName());
     }
 
-    String adminName = null;
+    String adminName;
     try {
       adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
     } catch (UnknownHostException e) {
@@ -3074,21 +3035,12 @@ public class PinotHelixResourceManager {
    */
   public void sendSegmentRefreshMessage(String tableNameWithType, String 
segmentName, boolean refreshServerSegment,
       boolean refreshBrokerRouting) {
-    SegmentRefreshMessage segmentRefreshMessage = new 
SegmentRefreshMessage(tableNameWithType, segmentName);
-
-    // Send segment refresh message to servers
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setSessionSpecific(true);
     ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
+    SegmentRefreshMessage message = new 
SegmentRefreshMessage(tableNameWithType, segmentName);
 
+    // Send segment refresh message to servers
     if (refreshServerSegment) {
-      // Send segment refresh message to servers
-      recipientCriteria.setResource(tableNameWithType);
-      recipientCriteria.setPartition(segmentName);
-      // Send message with no callback and infinite timeout on the recipient
-      int numMessagesSent = messagingService.send(recipientCriteria, 
segmentRefreshMessage, null, -1);
+      int numMessagesSent = MessagingServiceUtils.send(messagingService, 
message, tableNameWithType, segmentName, null);
       if (numMessagesSent > 0) {
         // TODO: Would be nice if we can get the name of the instances to 
which messages were sent
         LOGGER.info("Sent {} segment refresh messages to servers for segment: 
{} of table: {}", numMessagesSent,
@@ -3099,11 +3051,11 @@ public class PinotHelixResourceManager {
       }
     }
 
+    // Send segment refresh message to brokers
     if (refreshBrokerRouting) {
-      // Send segment refresh message to brokers
-      recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
-      recipientCriteria.setPartition(tableNameWithType);
-      int numMessagesSent = messagingService.send(recipientCriteria, 
segmentRefreshMessage, null, -1);
+      int numMessagesSent =
+          MessagingServiceUtils.send(messagingService, message, 
Helix.BROKER_RESOURCE_INSTANCE, tableNameWithType,
+              null);
       if (numMessagesSent > 0) {
         // TODO: Would be nice if we can get the name of the instances to 
which messages were sent
         LOGGER.info("Sent {} segment refresh messages to brokers for segment: 
{} of table: {}", numMessagesSent,
@@ -3115,19 +3067,12 @@ public class PinotHelixResourceManager {
     }
   }
 
+  /// Sends table config refresh message to brokers.
   private void sendTableConfigRefreshMessage(String tableNameWithType) {
-    TableConfigRefreshMessage tableConfigRefreshMessage = new 
TableConfigRefreshMessage(tableNameWithType);
-
-    // Send table config refresh message to brokers
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
-    recipientCriteria.setSessionSpecific(true);
-    recipientCriteria.setPartition(tableNameWithType);
-    // Send message with no callback and infinite timeout on the recipient
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
+    TableConfigRefreshMessage message = new 
TableConfigRefreshMessage(tableNameWithType);
     int numMessagesSent =
-        _helixZkManager.getMessagingService().send(recipientCriteria, 
tableConfigRefreshMessage, null, -1);
+        MessagingServiceUtils.send(messagingService, message, 
Helix.BROKER_RESOURCE_INSTANCE, tableNameWithType, null);
     if (numMessagesSent > 0) {
       // TODO: Would be nice if we can get the name of the instances to which 
messages were sent
       LOGGER.info("Sent {} table config refresh messages to brokers for table: 
{}", numMessagesSent, tableNameWithType);
@@ -3136,19 +3081,23 @@ public class PinotHelixResourceManager {
     }
   }
 
+  /// Sends table config and schema refresh message to servers.
+  private void sendTableConfigSchemaRefreshMessage(String tableNameWithType) {
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
+    TableConfigSchemaRefreshMessage message = new 
TableConfigSchemaRefreshMessage(tableNameWithType);
+    int numMessagesSent = MessagingServiceUtils.send(messagingService, 
message, tableNameWithType);
+    if (numMessagesSent > 0) {
+      LOGGER.info("Sent {} table config and schema refresh messages for table: 
{}", numMessagesSent, tableNameWithType);
+    } else {
+      LOGGER.warn("No table config and schema refresh message sent for table: 
{}", tableNameWithType);
+    }
+  }
+
   private void sendLogicalTableConfigRefreshMessage(String logicalTableName) {
-    LogicalTableConfigRefreshMessage refreshMessage = new 
LogicalTableConfigRefreshMessage(logicalTableName);
-
-    // Send logical table config refresh message to brokers
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
-    recipientCriteria.setSessionSpecific(true);
-    recipientCriteria.setPartition(logicalTableName);
-    // Send message with no callback and infinite timeout on the recipient
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
+    LogicalTableConfigRefreshMessage message = new 
LogicalTableConfigRefreshMessage(logicalTableName);
     int numMessagesSent =
-        _helixZkManager.getMessagingService().send(recipientCriteria, 
refreshMessage, null, -1);
+        MessagingServiceUtils.send(messagingService, message, 
Helix.BROKER_RESOURCE_INSTANCE, logicalTableName, null);
     if (numMessagesSent > 0) {
       LOGGER.info("Sent {} logical table config refresh messages to brokers 
for table: {}", numMessagesSent,
           logicalTableName);
@@ -3158,18 +3107,11 @@ public class PinotHelixResourceManager {
   }
 
   private void sendApplicationQpsQuotaRefreshMessage(String appName) {
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
     ApplicationQpsQuotaRefreshMessage message = new 
ApplicationQpsQuotaRefreshMessage(appName);
-
-    // Send database config refresh message to brokers
-    Criteria criteria = new Criteria();
-    criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    criteria.setInstanceName("%");
-    criteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
-    criteria.setSessionSpecific(true);
-
-    int numMessagesSent = _helixZkManager.getMessagingService().send(criteria, 
message, null, INFINITE_TIMEOUT);
+    int numMessagesSent = MessagingServiceUtils.send(messagingService, 
message, Helix.BROKER_RESOURCE_INSTANCE);
     if (numMessagesSent > 0) {
-      LOGGER.info("Sent {} applcation qps quota refresh messages to brokers 
for application: {}", numMessagesSent,
+      LOGGER.info("Sent {} application qps quota refresh messages to brokers 
for application: {}", numMessagesSent,
           appName);
     } else {
       LOGGER.warn("No application qps quota refresh message sent to brokers 
for application: {}", appName);
@@ -3177,17 +3119,9 @@ public class PinotHelixResourceManager {
   }
 
   private void sendDatabaseConfigRefreshMessage(String databaseName) {
-    DatabaseConfigRefreshMessage databaseConfigRefreshMessage = new 
DatabaseConfigRefreshMessage(databaseName);
-
-    // Send database config refresh message to brokers
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
-    recipientCriteria.setSessionSpecific(true);
-    // Send message with no callback and infinite timeout on the recipient
-    int numMessagesSent =
-        _helixZkManager.getMessagingService().send(recipientCriteria, 
databaseConfigRefreshMessage, null, -1);
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
+    DatabaseConfigRefreshMessage message = new 
DatabaseConfigRefreshMessage(databaseName);
+    int numMessagesSent = MessagingServiceUtils.send(messagingService, 
message, Helix.BROKER_RESOURCE_INSTANCE);
     if (numMessagesSent > 0) {
       LOGGER.info("Sent {} database config refresh messages to brokers for 
database: {}", numMessagesSent,
           databaseName);
@@ -3197,18 +3131,10 @@ public class PinotHelixResourceManager {
   }
 
   private void sendRoutingTableRebuildMessage(String tableNameWithType) {
-    RoutingTableRebuildMessage routingTableRebuildMessage = new 
RoutingTableRebuildMessage(tableNameWithType);
-
-    // Send table config refresh message to brokers
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
-    recipientCriteria.setSessionSpecific(true);
-    recipientCriteria.setPartition(tableNameWithType);
-    // Send message with no callback and infinite timeout on the recipient
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
+    RoutingTableRebuildMessage message = new 
RoutingTableRebuildMessage(tableNameWithType);
     int numMessagesSent =
-        _helixZkManager.getMessagingService().send(recipientCriteria, 
routingTableRebuildMessage, null, -1);
+        MessagingServiceUtils.send(messagingService, message, 
Helix.BROKER_RESOURCE_INSTANCE, tableNameWithType, null);
     if (numMessagesSent > 0) {
       // TODO: Would be nice if we can get the name of the instances to which 
messages were sent
       LOGGER.info("Sent {} routing table rebuild messages to brokers for 
table: {}", numMessagesSent,
@@ -3218,27 +3144,6 @@ public class PinotHelixResourceManager {
     }
   }
 
-  private void sendTableConfigSchemaRefreshMessage(String tableNameWithType, 
List<String> instances) {
-    TableConfigSchemaRefreshMessage refreshMessage = new 
TableConfigSchemaRefreshMessage(tableNameWithType);
-    for (String instance : instances) {
-      // Send refresh message to servers
-      Criteria recipientCriteria = new Criteria();
-      recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-      recipientCriteria.setInstanceName(instance);
-      recipientCriteria.setSessionSpecific(true);
-      ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
-      // Send message with no callback and infinite timeout on the recipient
-      int numMessagesSent = messagingService.send(recipientCriteria, 
refreshMessage, null, -1);
-      if (numMessagesSent > 0) {
-        LOGGER.info("Sent {} schema refresh messages to servers for table: {} 
for instance: {}", numMessagesSent,
-            tableNameWithType, instance);
-      } else {
-        LOGGER.warn("No schema refresh message sent to servers for table: {} 
for instance: {}", tableNameWithType,
-            instance);
-      }
-    }
-  }
-
   /**
    * Update the instance config given the broker instance id
    */
@@ -4714,27 +4619,17 @@ public class PinotHelixResourceManager {
   public PeriodicTaskInvocationResponse invokeControllerPeriodicTask(String 
tableName, String periodicTaskName,
       Map<String, String> taskProperties) {
     String periodicTaskRequestId = API_REQUEST_ID_PREFIX + 
UUID.randomUUID().toString().substring(0, 8);
-
     LOGGER.info("[TaskRequestId: {}] Sending periodic task message to all 
controllers for running task {} against {},"
             + " with properties {}.\"", periodicTaskRequestId, 
periodicTaskName,
         tableName != null ? " table '" + tableName + "'" : "all tables", 
taskProperties);
-
-    // Create and send message to send to all controllers (including this one)
-    Criteria recipientCriteria = new Criteria();
-    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
-    recipientCriteria.setSessionSpecific(true);
-    
recipientCriteria.setResource(CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
-    recipientCriteria.setSelfExcluded(false);
-    RunPeriodicTaskMessage runPeriodicTaskMessage =
+    ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
+    RunPeriodicTaskMessage message =
         new RunPeriodicTaskMessage(periodicTaskRequestId, periodicTaskName, 
tableName, taskProperties);
-
-    ClusterMessagingService clusterMessagingService = 
getHelixZkManager().getMessagingService();
-    int messageCount = clusterMessagingService.send(recipientCriteria, 
runPeriodicTaskMessage, null, -1);
-
+    int numMessagesSent =
+        MessagingServiceUtils.sendIncludingSelf(messagingService, message, 
Helix.LEAD_CONTROLLER_RESOURCE_NAME);
     LOGGER.info("[TaskRequestId: {}] Periodic task execution message sent to 
{} controllers.", periodicTaskRequestId,
-        messageCount);
-    return new PeriodicTaskInvocationResponse(periodicTaskRequestId, 
messageCount > 0);
+        numMessagesSent);
+    return new PeriodicTaskInvocationResponse(periodicTaskRequestId, 
numMessagesSent > 0);
   }
 
   /**
@@ -4761,7 +4656,6 @@ public class PinotHelixResourceManager {
     }
     return tagMinInstanceMap;
   }
-
   /*
    * Uncomment and use for testing on a real cluster
   public static void main(String[] args) throws Exception {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 012ef8d2d5b..f7bcd221d0a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -55,10 +55,8 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ClusterMessagingService;
-import org.apache.helix.Criteria;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
-import org.apache.helix.InstanceType;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -99,6 +97,7 @@ import 
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpd
 import 
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
 import 
org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
 import 
org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionStrategy;
+import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
 import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
 import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
 import org.apache.pinot.core.util.PeerServerSegmentFinder;
@@ -1410,16 +1409,10 @@ public class PinotLLCRealtimeSegmentManager {
         newConsumingSegment, newInstances, realtimeTableName, 
instancesNoLongerServe);
 
     ClusterMessagingService messagingService = 
_helixManager.getMessagingService();
+    IngestionMetricsRemoveMessage message = new 
IngestionMetricsRemoveMessage();
     List<String> instancesSent = new 
ArrayList<>(instancesNoLongerServe.size());
     for (String instance : instancesNoLongerServe) {
-      Criteria recipientCriteria = new Criteria();
-      recipientCriteria.setInstanceName(instance);
-      recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-      recipientCriteria.setResource(realtimeTableName);
-      recipientCriteria.setPartition(committedSegment);
-      recipientCriteria.setSessionSpecific(true);
-      IngestionMetricsRemoveMessage message = new 
IngestionMetricsRemoveMessage();
-      if (messagingService.send(recipientCriteria, message, null, -1) > 0) {
+      if (MessagingServiceUtils.send(messagingService, message, 
realtimeTableName, committedSegment, instance) > 0) {
         instancesSent.add(instance);
       } else {
         LOGGER.warn("Failed to send ingestion metrics remove message for 
table: {} segment: {} to instance: {}",
@@ -2358,20 +2351,14 @@ public class PinotLLCRealtimeSegmentManager {
 
   private void sendForceCommitMessageToServers(String tableNameWithType, 
Set<String> consumingSegments) {
     if (!consumingSegments.isEmpty()) {
-      Criteria recipientCriteria = new Criteria();
-      recipientCriteria.setInstanceName("%");
-      recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-      recipientCriteria.setResource(tableNameWithType);
-      recipientCriteria.setSessionSpecific(true);
+      LOGGER.info("Sending force commit messages for segments: {} of table: 
{}", consumingSegments, tableNameWithType);
+      ClusterMessagingService messagingService = 
_helixManager.getMessagingService();
       ForceCommitMessage message = new ForceCommitMessage(tableNameWithType, 
consumingSegments);
-      int numMessagesSent = 
_helixManager.getMessagingService().send(recipientCriteria, message, null, -1);
+      int numMessagesSent = MessagingServiceUtils.send(messagingService, 
message, tableNameWithType);
       if (numMessagesSent > 0) {
-        LOGGER.info("Sent {} force commit messages for table: {} segments: 
{}", numMessagesSent, tableNameWithType,
-            consumingSegments);
+        LOGGER.info("Sent {} force commit messages for table: {}", 
numMessagesSent, tableNameWithType);
       } else {
-        throw new RuntimeException(
-            String.format("No force commit message was sent for table: %s 
segments: %s", tableNameWithType,
-                consumingSegments));
+        throw new IllegalStateException("No force commit message sent for 
table: " + tableNameWithType);
       }
     }
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
index fd10148ce3e..729b0c49c72 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
@@ -33,8 +34,6 @@ import java.util.function.Consumer;
 import javax.annotation.Nullable;
 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
 import org.apache.helix.ClusterMessagingService;
-import org.apache.helix.Criteria;
-import org.apache.helix.InstanceType;
 import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.messages.SegmentReloadMessage;
 import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -47,6 +46,7 @@ import 
org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
+import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
 import org.apache.pinot.controller.util.TableTierReader;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -319,18 +319,11 @@ public class SegmentRelocator extends 
ControllerPeriodicTask<Void> {
       Map<String, Set<String>> serverToSegmentsToMigrate, 
ClusterMessagingService messagingService) {
     for (Map.Entry<String, Set<String>> entry : 
serverToSegmentsToMigrate.entrySet()) {
       String serverName = entry.getKey();
-      Set<String> segmentNames = entry.getValue();
-      // One SegmentReloadMessage per server but takes all segment names.
-      Criteria recipientCriteria = new Criteria();
-      recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-      recipientCriteria.setInstanceName(serverName);
-      recipientCriteria.setResource(tableNameWithType);
-      recipientCriteria.setSessionSpecific(true);
-      SegmentReloadMessage segmentReloadMessage =
-          new SegmentReloadMessage(tableNameWithType, new 
ArrayList<>(segmentNames), false);
+      List<String> segments = new ArrayList<>(entry.getValue());
       LOGGER.info("Sending SegmentReloadMessage to server: {} to reload 
segments: {} of table: {}", serverName,
-          segmentNames, tableNameWithType);
-      int numMessagesSent = messagingService.send(recipientCriteria, 
segmentReloadMessage, null, -1);
+          segments, tableNameWithType);
+      SegmentReloadMessage message = new 
SegmentReloadMessage(tableNameWithType, segments, false);
+      int numMessagesSent = MessagingServiceUtils.send(messagingService, 
message, tableNameWithType, null, serverName);
       if (numMessagesSent > 0) {
         LOGGER.info("Sent SegmentReloadMessage to server: {} for table: {}", 
serverName, tableNameWithType);
       } else {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/MessagingServiceUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/MessagingServiceUtils.java
new file mode 100644
index 00000000000..88bf6f43433
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/MessagingServiceUtils.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.util;
+
+import javax.annotation.Nullable;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.Criteria;
+import org.apache.helix.InstanceType;
+import org.apache.helix.model.Message;
+
+
+public class MessagingServiceUtils {
+  private MessagingServiceUtils() {
+  }
+
+  /// Sends a message to the recipients specified by the criteria, returns the 
number of messages being sent.
+  public static int send(ClusterMessagingService messagingService, Message 
message, Criteria criteria) {
+    try {
+      return messagingService.send(criteria, message);
+    } catch (Exception e) {
+      // NOTE:
+      // It can throw exception when the target resource doesn't exist (e.g. 
ExternalView has not been created yet). It
+      // is normal case, and we count it as no message being sent.
+      return 0;
+    }
+  }
+
+  public static int send(ClusterMessagingService messagingService, Message 
message, String resource,
+      @Nullable String partition, @Nullable String instanceName, boolean 
includingSelf) {
+    Criteria criteria = new Criteria();
+    criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    criteria.setSessionSpecific(true);
+    criteria.setResource(resource);
+    if (partition != null) {
+      criteria.setPartition(partition);
+    }
+    if (instanceName != null) {
+      criteria.setInstanceName(instanceName);
+    } else {
+      criteria.setInstanceName("%");
+    }
+    criteria.setSelfExcluded(!includingSelf);
+    return send(messagingService, message, criteria);
+  }
+
+  public static int send(ClusterMessagingService messagingService, Message 
message, String resource,
+      @Nullable String partition, @Nullable String instanceName) {
+    return send(messagingService, message, resource, partition, instanceName, 
false);
+  }
+
+  public static int send(ClusterMessagingService messagingService, Message 
message, String resource) {
+    return send(messagingService, message, resource, null, null, false);
+  }
+
+  public static int sendIncludingSelf(ClusterMessagingService 
messagingService, Message message, String resource) {
+    return send(messagingService, message, resource, null, null, true);
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
index e6bbcdd026a..6d058effdb6 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
@@ -44,7 +44,6 @@ import org.apache.pinot.util.TestUtils;
 import org.mockito.ArgumentCaptor;
 import org.testng.annotations.Test;
 
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -94,8 +93,7 @@ public class SegmentRelocatorTest {
 
     ArgumentCaptor<Criteria> criteriaCapture = 
ArgumentCaptor.forClass(Criteria.class);
     ArgumentCaptor<SegmentReloadMessage> reloadMessageCapture = 
ArgumentCaptor.forClass(SegmentReloadMessage.class);
-    verify(messagingService, times(2)).send(criteriaCapture.capture(), 
reloadMessageCapture.capture(), eq(null),
-        eq(-1));
+    verify(messagingService, times(2)).send(criteriaCapture.capture(), 
reloadMessageCapture.capture());
 
     List<Criteria> criteriaList = criteriaCapture.getAllValues();
     List<SegmentReloadMessage> msgList = reloadMessageCapture.getAllValues();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 1e283ca29b9..a2f2b844d11 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -1372,10 +1372,9 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
 
     for (String columnName : segmentPhysicalColumns) {
       ColumnMetadata columnMetadata = 
segmentMetadata.getColumnMetadataFor(columnName);
-      FieldSpec fieldSpecInSchema = schema.getFieldSpecFor(columnName);
       DataSource source = segment.getDataSource(columnName);
-      Preconditions.checkNotNull(columnMetadata);
-      Preconditions.checkNotNull(source);
+      assert columnMetadata != null && source != null;
+      FieldSpec fieldSpecInSchema = schema.getFieldSpecFor(columnName);
 
       // Column is deleted
       if (fieldSpecInSchema == null) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapFilterOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapFilterOperator.java
index ee413306b3b..bc4b19e52b5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapFilterOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapFilterOperator.java
@@ -65,10 +65,14 @@ public class MapFilterOperator extends BaseFilterOperator {
     _columnName = arguments.get(0).getIdentifier();
     _keyName = arguments.get(1).getLiteral().getStringValue();
 
-    // Get JSON index and create operator
-    DataSource dataSource = indexSegment.getDataSource(_columnName);
-    JsonIndexReader jsonIndex = dataSource.getJsonIndex();
-    if (jsonIndex != null && useJsonIndex(_predicate.getType())) {
+    JsonIndexReader jsonIndex = null;
+    if (canUseJsonIndex(_predicate.getType())) {
+      DataSource dataSource = indexSegment.getDataSourceNullable(_columnName);
+      if (dataSource != null) {
+        jsonIndex = dataSource.getJsonIndex();
+      }
+    }
+    if (jsonIndex != null) {
       FilterContext filterContext = createFilterContext();
       _jsonMatchOperator = new JsonMatchFilterOperator(jsonIndex, 
filterContext, numDocs);
       _expressionFilterOperator = null;
@@ -201,7 +205,7 @@ public class MapFilterOperator extends BaseFilterOperator {
    * @param predicateType The type of predicate
    * @return true if the predicate type is supported for JSON index, false 
otherwise
    */
-  private boolean useJsonIndex(Predicate.Type predicateType) {
+  private static boolean canUseJsonIndex(Predicate.Type predicateType) {
     switch (predicateType) {
       case EQ:
       case NOT_EQ:
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
index d5180285955..c422a64a7ee 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
@@ -324,7 +324,11 @@ public class StarTreeUtils {
       return null;
     }
     String column = lhs.getIdentifier();
-    DataSource dataSource = indexSegment.getDataSource(column);
+    DataSource dataSource = indexSegment.getDataSourceNullable(column);
+    if (dataSource == null) {
+      // Star-tree does not support non-existent column
+      return null;
+    }
     Dictionary dictionary = dataSource.getDictionary();
     if (dictionary == null) {
       // Star-tree does not support non-dictionary encoded dimension
@@ -388,7 +392,11 @@ public class StarTreeUtils {
         }
 
         String column = aggregationFunctionColumnPair.getColumn();
-        DataSource dataSource = indexSegment.getDataSource(column);
+        DataSource dataSource = indexSegment.getDataSourceNullable(column);
+        if (dataSource == null) {
+          LOGGER.debug("Cannot use star-tree index because aggregation column: 
'{}' does not exist", column);
+          return null;
+        }
         if (dataSource.getNullValueVector() != null && 
!dataSource.getNullValueVector().getNullBitmap().isEmpty()) {
           LOGGER.debug("Cannot use star-tree index because aggregation column: 
'{}' has null values", column);
           return null;
@@ -396,7 +404,11 @@ public class StarTreeUtils {
       }
 
       for (String column : predicateEvaluatorsMap.keySet()) {
-        DataSource dataSource = indexSegment.getDataSource(column);
+        DataSource dataSource = indexSegment.getDataSourceNullable(column);
+        if (dataSource == null) {
+          LOGGER.debug("Cannot use star-tree index because filter column: '{}' 
does not exist", column);
+          return null;
+        }
         if (dataSource.getNullValueVector() != null && 
!dataSource.getNullValueVector().getNullBitmap().isEmpty()) {
           LOGGER.debug("Cannot use star-tree index because filter column: '{}' 
has null values", column);
           return null;
@@ -410,7 +422,11 @@ public class StarTreeUtils {
         }
       }
       for (String column : groupByColumns) {
-        DataSource dataSource = indexSegment.getDataSource(column);
+        DataSource dataSource = indexSegment.getDataSourceNullable(column);
+        if (dataSource == null) {
+          LOGGER.debug("Cannot use star-tree index because group-by column: 
'{}' does not exist", column);
+          return null;
+        }
         if (dataSource.getNullValueVector() != null && 
!dataSource.getNullValueVector().getNullBitmap().isEmpty()) {
           LOGGER.debug("Cannot use star-tree index because group-by column: 
'{}' has null values", column);
           return null;
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 1c87c1eee3d..89153e55ba3 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -1697,6 +1697,19 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     ingestionConfig.setTransformConfigs(transformConfigs);
     tableConfig.setIngestionConfig(ingestionConfig);
     updateTableConfig(tableConfig);
+
+    // Query the new added columns without reload
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        JsonNode response = postQuery(TEST_STAR_TREE_QUERY_3);
+        return response.get("resultTable").get("rows").get(0).get(0).asInt() 
== 0;
+      } catch (Exception e) {
+        return false;
+      }
+    }, 60_000L, "Failed to query new added columns without reload");
+    // Table size shouldn't change without reload
+    assertEquals(getTableSize(getTableName()), _tableSize);
+
     reloadAllSegments(TEST_STAR_TREE_QUERY_3, false, numTotalDocs);
     int thirdQueryResult =
         
postQuery(TEST_STAR_TREE_QUERY_3_REFERENCE).get("resultTable").get("rows").get(0).get(0).asInt();
@@ -1865,6 +1878,18 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
             CompressionCodec.MV_ENTRY_DICT, null));
     updateTableConfig(tableConfig);
 
+    // Query the new added columns without reload
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        JsonNode response = postQuery(SELECT_STAR_QUERY);
+        return 
response.get("resultTable").get("dataSchema").get("columnNames").size() == 104;
+      } catch (Exception e) {
+        return false;
+      }
+    }, 60_000L, "Failed to query new added columns without reload");
+    // Table size shouldn't change without reload
+    assertEquals(getTableSize(getTableName()), _tableSize);
+
     // Trigger reload and verify column count
     reloadAllSegments(TEST_EXTRA_COLUMNS_QUERY, false, numTotalDocs);
     JsonNode segmentsMetadata = JsonUtils.stringToJsonNode(
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
index 01257777777..4a691e4fc42 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
@@ -42,7 +42,6 @@ public class PinotSegmentColumnReader implements Closeable {
 
   public PinotSegmentColumnReader(IndexSegment indexSegment, String column) {
     DataSource dataSource = indexSegment.getDataSource(column);
-    Preconditions.checkArgument(dataSource != null, "Failed to find data 
source for column: %s", column);
     _forwardIndexReader = dataSource.getForwardIndex();
     Preconditions.checkArgument(_forwardIndexReader != null, "Forward index 
disabled for column: %s", column);
     _forwardIndexReaderContext = _forwardIndexReader.createContext();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to