This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f6f59ebedf Fix issues for realtime table reload (#9885)
f6f59ebedf is described below

commit f6f59ebedf8386fddb3e6895cc7c078acf64b43f
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Dec 1 10:36:14 2022 -0800

    Fix issues for realtime table reload (#9885)
---
 .../manager/realtime/RealtimeTableDataManager.java | 10 +++--
 .../query/executor/ServerQueryExecutorV1Impl.java  | 49 +++++++++++-----------
 .../tests/LLCRealtimeClusterIntegrationTest.java   |  7 +++-
 .../segment/index/loader/IndexLoadingConfig.java   |  8 ++--
 4 files changed, 40 insertions(+), 34 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 2c616c438c..6d5bfe1de8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -538,10 +538,14 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
    * Replaces a committed LLC REALTIME segment.
    */
   public void replaceLLSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig) {
+    File indexDir = new File(_indexDir, segmentName);
+    // Use the latest table config and schema to load the segment
+    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
+    Preconditions.checkState(tableConfig != null, "Failed to get table config 
for table: {}", _tableNameWithType);
+    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
tableConfig);
+    indexLoadingConfig.updateTableConfigAndSchema(tableConfig, schema);
     try {
-      File indexDir = new File(_indexDir, segmentName);
-      Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
-      addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
schema));
+      addSegment(indexDir, indexLoadingConfig);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 83ded8ef3f..803621d52a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -81,7 +81,6 @@ import org.apache.pinot.spi.exception.QueryCancelledException;
 import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.joda.time.Interval;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -210,40 +209,38 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
     }
 
     // Gather stats for realtime consuming segments
+    // TODO: the freshness time should not be collected at query time because 
there is no guarantee that the consuming
+    //       segment is queried (consuming segment might be pruned, or the 
server only contains relocated committed
+    //       segments)
     int numConsumingSegmentsQueried = 0;
-    int numOnlineSegments = 0;
     long minIndexTimeMs = 0;
     long minIngestionTimeMs = 0;
     long maxEndTimeMs = 0;
     if (tableDataManager instanceof RealtimeTableDataManager) {
-      numConsumingSegmentsQueried = 0;
-      numOnlineSegments = 0;
       minIndexTimeMs = Long.MAX_VALUE;
       minIngestionTimeMs = Long.MAX_VALUE;
       maxEndTimeMs = Long.MIN_VALUE;
       for (IndexSegment indexSegment : indexSegments) {
+        SegmentMetadata segmentMetadata = indexSegment.getSegmentMetadata();
         if (indexSegment instanceof MutableSegment) {
           numConsumingSegmentsQueried += 1;
-          SegmentMetadata segmentMetadata = indexSegment.getSegmentMetadata();
           long indexTimeMs = segmentMetadata.getLastIndexedTimestamp();
-          if (indexTimeMs != Long.MIN_VALUE && indexTimeMs < minIndexTimeMs) {
-            minIndexTimeMs = indexTimeMs;
+          if (indexTimeMs > 0) {
+            minIndexTimeMs = Math.min(minIndexTimeMs, indexTimeMs);
           }
           long ingestionTimeMs = segmentMetadata.getLatestIngestionTimestamp();
-          if (ingestionTimeMs != Long.MIN_VALUE && ingestionTimeMs < 
minIngestionTimeMs) {
-            minIngestionTimeMs = ingestionTimeMs;
+          if (ingestionTimeMs > 0) {
+            minIngestionTimeMs = Math.min(minIngestionTimeMs, ingestionTimeMs);
           }
         } else if (indexSegment instanceof ImmutableSegment) {
-          SegmentMetadata segmentMetadata = indexSegment.getSegmentMetadata();
           long indexCreationTime = segmentMetadata.getIndexCreationTime();
-          numOnlineSegments++;
-          if (indexCreationTime != Long.MIN_VALUE) {
+          if (indexCreationTime > 0) {
             maxEndTimeMs = Math.max(maxEndTimeMs, indexCreationTime);
           } else {
             // NOTE: the endTime may be totally inaccurate based on the value 
added in the timeColumn
-            Interval timeInterval = segmentMetadata.getTimeInterval();
-            if (timeInterval != null) {
-              maxEndTimeMs = Math.max(maxEndTimeMs, 
timeInterval.getEndMillis());
+            long endTime = segmentMetadata.getEndTime();
+            if (endTime > 0) {
+              maxEndTimeMs = Math.max(maxEndTimeMs, endTime);
             }
           }
         }
@@ -314,22 +311,24 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
     }
 
     if (tableDataManager instanceof RealtimeTableDataManager) {
-      long minConsumingFreshnessTimeMs;
       if (numConsumingSegmentsQueried > 0) {
-        minConsumingFreshnessTimeMs = minIngestionTimeMs != Long.MAX_VALUE ? 
minIngestionTimeMs : minIndexTimeMs;
         
instanceResponse.addMetadata(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(),
             Integer.toString(numConsumingSegmentsQueried));
-        
instanceResponse.addMetadata(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
-            Long.toString(minConsumingFreshnessTimeMs));
-        LOGGER.debug("Request {} queried {} consuming segments with 
minConsumingFreshnessTimeMs: {}", requestId,
-            numConsumingSegmentsQueried, minConsumingFreshnessTimeMs);
-      } else if (numConsumingSegmentsQueried == 0 && maxEndTimeMs != 
Long.MIN_VALUE) {
+      }
+      long minConsumingFreshnessTimeMs = 0;
+      if (minIngestionTimeMs != Long.MAX_VALUE) {
+        minConsumingFreshnessTimeMs = minIndexTimeMs;
+      } else if (minIndexTimeMs != Long.MAX_VALUE) {
+        minConsumingFreshnessTimeMs = minIndexTimeMs;
+      } else if (maxEndTimeMs != Long.MIN_VALUE) {
         minConsumingFreshnessTimeMs = maxEndTimeMs;
+      }
+      if (minConsumingFreshnessTimeMs > 0) {
         
instanceResponse.addMetadata(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
-            Long.toString(maxEndTimeMs));
-        LOGGER.debug("Request {} queried {} consuming segments with 
minConsumingFreshnessTimeMs: {}", requestId,
-            numConsumingSegmentsQueried, minConsumingFreshnessTimeMs);
+            Long.toString(minConsumingFreshnessTimeMs));
       }
+      LOGGER.debug("Request {} queried {} consuming segments with 
minConsumingFreshnessTimeMs: {}", requestId,
+          numConsumingSegmentsQueried, minConsumingFreshnessTimeMs);
     }
 
     LOGGER.debug("Query processing time for request Id - {}: {}", requestId, 
queryProcessingTime);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 5c589bd9cb..69fab5c183 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -60,8 +60,9 @@ import static org.testng.Assert.assertTrue;
  */
 public class LLCRealtimeClusterIntegrationTest extends 
BaseRealtimeClusterIntegrationTest {
   private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test";
+  // NOTE: The test query should match all the segments so that all the 
segments are guaranteed to be reloaded
   private static final String TEST_UPDATED_INVERTED_INDEX_QUERY =
-      "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
+      "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = -9999";
   private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS = 
Collections.singletonList("DivActualElapsedTime");
   private static final long RANDOM_SEED = System.currentTimeMillis();
   private static final Random RANDOM = new Random(RANDOM_SEED);
@@ -237,6 +238,7 @@ public class LLCRealtimeClusterIntegrationTest extends 
BaseRealtimeClusterIntegr
     JsonNode queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
     assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
     assertTrue(queryResponse.get("numEntriesScannedInFilter").asLong() > 0L);
+    long result = 
queryResponse.get("resultTable").get("rows").get(0).get(0).asLong();
 
     TableConfig tableConfig = getRealtimeTableConfig();
     
tableConfig.getIndexingConfig().setInvertedIndexColumns(UPDATED_INVERTED_INDEX_COLUMNS);
@@ -246,7 +248,8 @@ public class LLCRealtimeClusterIntegrationTest extends 
BaseRealtimeClusterIntegr
     TestUtils.waitForCondition(aVoid -> {
       try {
         JsonNode queryResponse1 = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
-        // Total docs should not change during reload
+        // Query result and total docs should not change during reload
+        
assertEquals(queryResponse1.get("resultTable").get("rows").get(0).get(0).asLong(),
 result);
         assertEquals(queryResponse1.get("totalDocs").asLong(), numTotalDocs);
 
         long numConsumingSegmentsQueried = 
queryResponse1.get("numConsumingSegmentsQueried").asLong();
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
index 704d8f02c7..231e43408c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
@@ -105,7 +105,7 @@ public class IndexLoadingConfig {
   public IndexLoadingConfig(InstanceDataManagerConfig 
instanceDataManagerConfig, TableConfig tableConfig,
       @Nullable Schema schema) {
     extractFromInstanceConfig(instanceDataManagerConfig);
-    extractFromTableConfigAndSchema(tableConfig, schema);
+    updateTableConfigAndSchema(tableConfig, schema);
   }
 
   @VisibleForTesting
@@ -117,7 +117,7 @@ public class IndexLoadingConfig {
   public IndexLoadingConfig() {
   }
 
-  private void extractFromTableConfigAndSchema(TableConfig tableConfig, 
@Nullable Schema schema) {
+  public void updateTableConfigAndSchema(TableConfig tableConfig, @Nullable 
Schema schema) {
     if (schema != null) {
       TimestampIndexUtils.applyTimestampIndex(tableConfig, schema);
     }
@@ -512,8 +512,8 @@ public class IndexLoadingConfig {
    */
   @VisibleForTesting
   public void setForwardIndexDisabledColumns(Set<String> 
forwardIndexDisabledColumns) {
-    _forwardIndexDisabledColumns = forwardIndexDisabledColumns == null ? 
Collections.emptySet()
-        : forwardIndexDisabledColumns;
+    _forwardIndexDisabledColumns =
+        forwardIndexDisabledColumns == null ? Collections.emptySet() : 
forwardIndexDisabledColumns;
   }
 
   public Set<String> getNoDictionaryColumns() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to