This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ae77667671 Enhance SegmentPartitionMetadataManager to handle new 
segment (#11585)
ae77667671 is described below

commit ae77667671594afa4c8fbd673eb8ae21ae3420d9
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Sep 13 14:17:14 2023 -0700

    Enhance SegmentPartitionMetadataManager to handle new segment (#11585)
---
 .../SegmentPartitionMetadataManager.java           | 110 +++++++++++++++++----
 .../SegmentPartitionMetadataManagerTest.java       |  75 ++++++++------
 .../pinot/core/routing/TablePartitionInfo.java     |   6 +-
 .../pinot/query/QueryEnvironmentTestBase.java      |   2 +-
 4 files changed, 141 insertions(+), 52 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java
index 0ed89225f3..0ca3c3d317 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java
@@ -29,10 +29,12 @@ import javax.annotation.Nullable;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.broker.routing.instanceselector.InstanceSelector;
 import 
org.apache.pinot.broker.routing.segmentmetadata.SegmentZkMetadataFetchListener;
 import org.apache.pinot.core.routing.TablePartitionInfo;
 import org.apache.pinot.core.routing.TablePartitionInfo.PartitionInfo;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.spi.utils.CommonConstants;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,6 +51,7 @@ import org.slf4j.LoggerFactory;
 public class SegmentPartitionMetadataManager implements 
SegmentZkMetadataFetchListener {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentPartitionMetadataManager.class);
   private static final int INVALID_PARTITION_ID = -1;
+  private static final long INVALID_PUSH_TIME_MS = -1L;
 
   private final String _tableNameWithType;
 
@@ -77,15 +80,17 @@ public class SegmentPartitionMetadataManager implements 
SegmentZkMetadataFetchLi
     int numSegments = onlineSegments.size();
     for (int i = 0; i < numSegments; i++) {
       String segment = onlineSegments.get(i);
-      SegmentPartitionInfo partitionInfo =
-          SegmentPartitionUtils.extractPartitionInfo(_tableNameWithType, 
_partitionColumn, segment, znRecords.get(i));
-      SegmentInfo segmentInfo = new SegmentInfo(getPartitionId(partitionInfo), 
getOnlineServers(externalView, segment));
+      ZNRecord znRecord = znRecords.get(i);
+      SegmentInfo segmentInfo = new SegmentInfo(getPartitionId(segment, 
znRecord), getPushTimeMs(znRecord),
+          getOnlineServers(externalView, segment));
       _segmentInfoMap.put(segment, segmentInfo);
     }
     computeTablePartitionInfo();
   }
 
-  private int getPartitionId(@Nullable SegmentPartitionInfo 
segmentPartitionInfo) {
+  private int getPartitionId(String segment, @Nullable ZNRecord znRecord) {
+    SegmentPartitionInfo segmentPartitionInfo =
+        SegmentPartitionUtils.extractPartitionInfo(_tableNameWithType, 
_partitionColumn, segment, znRecord);
     if (segmentPartitionInfo == null || segmentPartitionInfo == 
SegmentPartitionUtils.INVALID_PARTITION_INFO) {
       return INVALID_PARTITION_ID;
     }
@@ -106,7 +111,20 @@ public class SegmentPartitionMetadataManager implements 
SegmentZkMetadataFetchLi
     return partitions.iterator().next();
   }
 
-  private List<String> getOnlineServers(ExternalView externalView, String 
segment) {
+  private static long getPushTimeMs(@Nullable ZNRecord znRecord) {
+    if (znRecord == null) {
+      return INVALID_PUSH_TIME_MS;
+    }
+    String pushTimeString = 
znRecord.getSimpleField(CommonConstants.Segment.PUSH_TIME);
+    // Handle legacy push time key
+    if (pushTimeString == null) {
+      pushTimeString = 
znRecord.getSimpleField(CommonConstants.Segment.Offline.PUSH_TIME);
+    }
+    // Return INVALID_PUSH_TIME_MS if unavailable for backward compatibility
+    return pushTimeString != null ? Long.parseLong(pushTimeString) : 
INVALID_PUSH_TIME_MS;
+  }
+
+  private static List<String> getOnlineServers(ExternalView externalView, 
String segment) {
     Map<String, String> instanceStateMap = externalView.getStateMap(segment);
     if (instanceStateMap == null) {
       return Collections.emptyList();
@@ -123,16 +141,23 @@ public class SegmentPartitionMetadataManager implements 
SegmentZkMetadataFetchLi
 
   private void computeTablePartitionInfo() {
     PartitionInfo[] partitionInfoMap = new PartitionInfo[_numPartitions];
-    Set<String> segmentsWithInvalidPartition = new HashSet<>();
+    List<String> segmentsWithInvalidPartition = new ArrayList<>();
+    List<Map.Entry<String, SegmentInfo>> newSegmentInfoEntries = new 
ArrayList<>();
+    long currentTimeMs = System.currentTimeMillis();
     for (Map.Entry<String, SegmentInfo> entry : _segmentInfoMap.entrySet()) {
       String segment = entry.getKey();
       SegmentInfo segmentInfo = entry.getValue();
       int partitionId = segmentInfo._partitionId;
-      List<String> onlineServers = segmentInfo._onlineServers;
       if (partitionId == INVALID_PARTITION_ID) {
         segmentsWithInvalidPartition.add(segment);
         continue;
       }
+      // Process new segments in the end
+      if (InstanceSelector.isNewSegment(segmentInfo._pushTimeMs, 
currentTimeMs)) {
+        newSegmentInfoEntries.add(entry);
+        continue;
+      }
+      List<String> onlineServers = segmentInfo._onlineServers;
       PartitionInfo partitionInfo = partitionInfoMap[partitionId];
       if (partitionInfo == null) {
         Set<String> fullyReplicatedServers = new HashSet<>(onlineServers);
@@ -151,8 +176,48 @@ public class SegmentPartitionMetadataManager implements 
SegmentZkMetadataFetchLi
         LOGGER.warn("Found {} segments: {} with invalid partition from table: 
{}", numSegmentsWithInvalidPartition,
             segmentsWithInvalidPartition, _tableNameWithType);
       } else {
-        LOGGER.warn("Found {} segments: {} with invalid partition from table: 
{}", numSegmentsWithInvalidPartition,
-            segmentsWithInvalidPartition, _tableNameWithType);
+        LOGGER.warn("Found {} segments: {}... with invalid partition from 
table: {}", numSegmentsWithInvalidPartition,
+            segmentsWithInvalidPartition.subList(0, 10), _tableNameWithType);
+      }
+    }
+    // Process new segments
+    if (!newSegmentInfoEntries.isEmpty()) {
+      List<String> excludedNewSegments = new ArrayList<>();
+      for (Map.Entry<String, SegmentInfo> entry : newSegmentInfoEntries) {
+        String segment = entry.getKey();
+        SegmentInfo segmentInfo = entry.getValue();
+        int partitionId = segmentInfo._partitionId;
+        List<String> onlineServers = segmentInfo._onlineServers;
+        PartitionInfo partitionInfo = partitionInfoMap[partitionId];
+        if (partitionInfo == null) {
+          // If the new segment is the first segment of a partition, treat it 
as regular segment
+          Set<String> fullyReplicatedServers = new HashSet<>(onlineServers);
+          List<String> segments = new ArrayList<>();
+          segments.add(segment);
+          partitionInfo = new PartitionInfo(fullyReplicatedServers, segments);
+          partitionInfoMap[partitionId] = partitionInfo;
+        } else {
+          // If the new segment is not the first segment of a partition, add 
it only if it won't reduce the fully
+          // replicated servers. It is common that a new segment (newly 
pushed, or a new consuming segment) doesn't have
+          // all the replicas available yet, and we want to exclude it from 
the partition info until all the replicas
+          // are available.
+          //noinspection SlowListContainsAll
+          if 
(onlineServers.containsAll(partitionInfo._fullyReplicatedServers)) {
+            partitionInfo._segments.add(segment);
+          } else {
+            excludedNewSegments.add(segment);
+          }
+        }
+      }
+      if (!excludedNewSegments.isEmpty()) {
+        int numExcludedNewSegments = excludedNewSegments.size();
+        if (numExcludedNewSegments <= 10) {
+          LOGGER.info("Excluded {} new segments: {} without all replicas 
available from table: {}",
+              numExcludedNewSegments, excludedNewSegments, _tableNameWithType);
+        } else {
+          LOGGER.info("Excluded {} new segments: {}... without all replicas 
available from table: {}",
+              numExcludedNewSegments, excludedNewSegments.subList(0, 10), 
_tableNameWithType);
+        }
       }
     }
     _tablePartitionInfo =
@@ -167,16 +232,20 @@ public class SegmentPartitionMetadataManager implements 
SegmentZkMetadataFetchLi
     int numSegments = pulledSegments.size();
     for (int i = 0; i < numSegments; i++) {
       String segment = pulledSegments.get(i);
-      SegmentPartitionInfo partitionInfo =
-          SegmentPartitionUtils.extractPartitionInfo(_tableNameWithType, 
_partitionColumn, segment, znRecords.get(i));
-      SegmentInfo segmentInfo = new SegmentInfo(getPartitionId(partitionInfo), 
getOnlineServers(externalView, segment));
+      ZNRecord znRecord = znRecords.get(i);
+      SegmentInfo segmentInfo = new SegmentInfo(getPartitionId(segment, 
znRecord), getPushTimeMs(znRecord),
+          getOnlineServers(externalView, segment));
       _segmentInfoMap.put(segment, segmentInfo);
     }
     // Update online servers for all online segments
     for (String segment : onlineSegments) {
       SegmentInfo segmentInfo = _segmentInfoMap.get(segment);
       if (segmentInfo == null) {
-        segmentInfo = new SegmentInfo(INVALID_PARTITION_ID, 
getOnlineServers(externalView, segment));
+        // NOTE: This should not happen, but we still handle it gracefully by 
adding an invalid SegmentInfo
+        LOGGER.error("Failed to find segment info for segment: {} in table: {} 
while handling assignment change",
+            segment, _tableNameWithType);
+        segmentInfo =
+            new SegmentInfo(INVALID_PARTITION_ID, INVALID_PUSH_TIME_MS, 
getOnlineServers(externalView, segment));
         _segmentInfoMap.put(segment, segmentInfo);
       } else {
         segmentInfo._onlineServers = getOnlineServers(externalView, segment);
@@ -188,15 +257,18 @@ public class SegmentPartitionMetadataManager implements 
SegmentZkMetadataFetchLi
 
   @Override
   public synchronized void refreshSegment(String segment, @Nullable ZNRecord 
znRecord) {
-    SegmentPartitionInfo partitionInfo =
-        SegmentPartitionUtils.extractPartitionInfo(_tableNameWithType, 
_partitionColumn, segment, znRecord);
-    int partitionId = getPartitionId(partitionInfo);
+    int partitionId = getPartitionId(segment, znRecord);
+    long pushTimeMs = getPushTimeMs(znRecord);
     SegmentInfo segmentInfo = _segmentInfoMap.get(segment);
     if (segmentInfo == null) {
-      segmentInfo = new SegmentInfo(partitionId, Collections.emptyList());
+      // NOTE: This should not happen, but we still handle it gracefully by 
adding an invalid SegmentInfo
+      LOGGER.error("Failed to find segment info for segment: {} in table: {} 
while handling segment refresh", segment,
+          _tableNameWithType);
+      segmentInfo = new SegmentInfo(partitionId, pushTimeMs, 
Collections.emptyList());
       _segmentInfoMap.put(segment, segmentInfo);
     } else {
       segmentInfo._partitionId = partitionId;
+      segmentInfo._pushTimeMs = pushTimeMs;
     }
     computeTablePartitionInfo();
   }
@@ -207,10 +279,12 @@ public class SegmentPartitionMetadataManager implements 
SegmentZkMetadataFetchLi
 
   private static class SegmentInfo {
     int _partitionId;
+    long _pushTimeMs;
     List<String> _onlineServers;
 
-    SegmentInfo(int partitionId, List<String> onlineServers) {
+    SegmentInfo(int partitionId, long pushTimeMs, List<String> onlineServers) {
       _partitionId = partitionId;
+      _pushTimeMs = pushTimeMs;
       _onlineServers = onlineServers;
     }
   }
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java
index 6d5999707b..5d70a504fb 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java
@@ -21,7 +21,6 @@ package org.apache.pinot.broker.routing.segmentpartition;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -39,11 +38,6 @@ import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.core.routing.TablePartitionInfo;
 import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
-import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
-import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -85,15 +79,12 @@ public class SegmentPartitionMetadataManagerTest extends 
ControllerTest {
 
   @Test
   public void 
testPartitionMetadataManagerProcessingThroughSegmentChangesSinglePartitionTable()
 {
-    // NOTE: Ideal state and external view are not used in the current 
implementation
-    TableConfig tableConfig =
-        getTableConfig(new String[]{PARTITION_COLUMN}, new 
String[]{PARTITION_COLUMN_FUNC}, new int[]{NUM_PARTITIONS});
-    ExternalView externalView = new ExternalView(tableConfig.getTableName());
+    ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
     Map<String, Map<String, String>> segmentAssignment = 
externalView.getRecord().getMapFields();
     Map<String, String> onlineInstanceStateMap = ImmutableMap.of(SERVER_0, 
ONLINE, SERVER_1, ONLINE);
     Set<String> onlineSegments = new HashSet<>();
     // NOTE: Ideal state is not used in the current implementation.
-    IdealState idealState = new IdealState("");
+    IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
 
     SegmentPartitionMetadataManager partitionMetadataManager =
         new SegmentPartitionMetadataManager(OFFLINE_TABLE_NAME, 
PARTITION_COLUMN, PARTITION_COLUMN_FUNC,
@@ -130,11 +121,30 @@ public class SegmentPartitionMetadataManagerTest extends 
ControllerTest {
     assertEquals(tablePartitionInfo.getPartitionInfoMap(), new 
TablePartitionInfo.PartitionInfo[NUM_PARTITIONS]);
     assertTrue(tablePartitionInfo.getSegmentsWithInvalidPartition().isEmpty());
 
+    // Same logic applies to the new segment
+    onlineSegments.add(segmentWithoutPartitionMetadata);
+    segmentAssignment.put(segmentWithoutPartitionMetadata, 
onlineInstanceStateMap);
+    segmentZKMetadataWithoutPartitionMetadata = new 
SegmentZKMetadata(segmentWithoutPartitionMetadata);
+    
segmentZKMetadataWithoutPartitionMetadata.setPushTime(System.currentTimeMillis());
+    ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME,
+        segmentZKMetadataWithoutPartitionMetadata);
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
+    tablePartitionInfo = partitionMetadataManager.getTablePartitionInfo();
+    assertEquals(tablePartitionInfo.getPartitionInfoMap(), new 
TablePartitionInfo.PartitionInfo[NUM_PARTITIONS]);
+    assertEquals(tablePartitionInfo.getSegmentsWithInvalidPartition(),
+        Collections.singletonList(segmentWithoutPartitionMetadata));
+    onlineSegments.remove(segmentWithoutPartitionMetadata);
+    segmentAssignment.remove(segmentWithoutPartitionMetadata);
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
+    tablePartitionInfo = partitionMetadataManager.getTablePartitionInfo();
+    assertEquals(tablePartitionInfo.getPartitionInfoMap(), new 
TablePartitionInfo.PartitionInfo[NUM_PARTITIONS]);
+    assertTrue(tablePartitionInfo.getSegmentsWithInvalidPartition().isEmpty());
+
     // Adding segments inline with the partition column config should yield 
correct partition results
     String segment0 = "segment0";
     onlineSegments.add(segment0);
     segmentAssignment.put(segment0, Collections.singletonMap(SERVER_0, 
ONLINE));
-    setSegmentZKPartitionMetadata(segment0, PARTITION_COLUMN_FUNC, 
NUM_PARTITIONS, 0);
+    setSegmentZKMetadata(segment0, PARTITION_COLUMN_FUNC, NUM_PARTITIONS, 0, 
0L);
     segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
     tablePartitionInfo = partitionMetadataManager.getTablePartitionInfo();
     TablePartitionInfo.PartitionInfo[] partitionInfoMap = 
tablePartitionInfo.getPartitionInfoMap();
@@ -147,7 +157,7 @@ public class SegmentPartitionMetadataManagerTest extends 
ControllerTest {
     String segment1 = "segment1";
     onlineSegments.add(segment1);
     segmentAssignment.put(segment1, Collections.singletonMap(SERVER_1, 
ONLINE));
-    setSegmentZKPartitionMetadata(segment1, PARTITION_COLUMN_FUNC, 
NUM_PARTITIONS, 1);
+    setSegmentZKMetadata(segment1, PARTITION_COLUMN_FUNC, NUM_PARTITIONS, 1, 
0L);
     segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
     tablePartitionInfo = partitionMetadataManager.getTablePartitionInfo();
     partitionInfoMap = tablePartitionInfo.getPartitionInfoMap();
@@ -158,7 +168,7 @@ public class SegmentPartitionMetadataManagerTest extends 
ControllerTest {
     assertTrue(tablePartitionInfo.getSegmentsWithInvalidPartition().isEmpty());
 
     // Updating partition metadata without refreshing should have no effect
-    setSegmentZKPartitionMetadata(segment0, PARTITION_COLUMN_FUNC_ALT, 
NUM_PARTITIONS_ALT, 0);
+    setSegmentZKMetadata(segment0, PARTITION_COLUMN_FUNC_ALT, 
NUM_PARTITIONS_ALT, 0, 0L);
     segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
     tablePartitionInfo = partitionMetadataManager.getTablePartitionInfo();
     partitionInfoMap = tablePartitionInfo.getPartitionInfoMap();
@@ -179,7 +189,7 @@ public class SegmentPartitionMetadataManagerTest extends 
ControllerTest {
     assertEquals(tablePartitionInfo.getSegmentsWithInvalidPartition(), 
Collections.singletonList(segment0));
 
     // Refresh the changed segment back to inline, and both segments should 
now be back on the partition list
-    setSegmentZKPartitionMetadata(segment0, PARTITION_COLUMN_FUNC, 
NUM_PARTITIONS, 0);
+    setSegmentZKMetadata(segment0, PARTITION_COLUMN_FUNC, NUM_PARTITIONS, 0, 
0L);
     segmentZkMetadataFetcher.refreshSegment(segment0);
     tablePartitionInfo = partitionMetadataManager.getTablePartitionInfo();
     partitionInfoMap = tablePartitionInfo.getPartitionInfoMap();
@@ -205,7 +215,7 @@ public class SegmentPartitionMetadataManagerTest extends 
ControllerTest {
     String segment2 = "segment2";
     onlineSegments.add(segment2);
     segmentAssignment.put(segment2, Collections.singletonMap(SERVER_1, 
ONLINE));
-    setSegmentZKPartitionMetadata(segment2, PARTITION_COLUMN_FUNC, 
NUM_PARTITIONS, 1);
+    setSegmentZKMetadata(segment2, PARTITION_COLUMN_FUNC, NUM_PARTITIONS, 1, 
0L);
     segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
     tablePartitionInfo = partitionMetadataManager.getTablePartitionInfo();
     partitionInfoMap = tablePartitionInfo.getPartitionInfoMap();
@@ -226,35 +236,40 @@ public class SegmentPartitionMetadataManagerTest extends 
ControllerTest {
     assertEqualsNoOrder(partitionInfoMap[1]._segments.toArray(), new 
String[]{segment1, segment2});
     assertTrue(tablePartitionInfo.getSegmentsWithInvalidPartition().isEmpty());
 
-    // Making all of them replicated will show full list
+    // Adding a new segment without available replica should not update the 
partition map
+    String newSegment = "newSegment";
+    onlineSegments.add(newSegment);
+    setSegmentZKMetadata(newSegment, PARTITION_COLUMN_FUNC, NUM_PARTITIONS, 0, 
System.currentTimeMillis());
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
+    tablePartitionInfo = partitionMetadataManager.getTablePartitionInfo();
+    partitionInfoMap = tablePartitionInfo.getPartitionInfoMap();
+    assertEquals(partitionInfoMap[0]._fullyReplicatedServers, 
Collections.singleton(SERVER_0));
+    assertEquals(partitionInfoMap[0]._segments, 
Collections.singleton(segment0));
+    assertEquals(partitionInfoMap[1]._fullyReplicatedServers, 
Collections.singleton(SERVER_0));
+    assertEqualsNoOrder(partitionInfoMap[1]._segments.toArray(), new 
String[]{segment1, segment2});
+    assertTrue(tablePartitionInfo.getSegmentsWithInvalidPartition().isEmpty());
+
+    // Making all of them replicated will show full list, even for the new 
segment
     segmentAssignment.put(segment0, ImmutableMap.of(SERVER_0, ONLINE, 
SERVER_1, ONLINE));
     segmentAssignment.put(segment1, ImmutableMap.of(SERVER_0, ONLINE, 
SERVER_1, ONLINE));
     segmentAssignment.put(segment2, ImmutableMap.of(SERVER_0, ONLINE, 
SERVER_1, ONLINE));
+    segmentAssignment.put(newSegment, ImmutableMap.of(SERVER_0, ONLINE, 
SERVER_1, ONLINE));
     segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
     tablePartitionInfo = partitionMetadataManager.getTablePartitionInfo();
     partitionInfoMap = tablePartitionInfo.getPartitionInfoMap();
     assertEquals(partitionInfoMap[0]._fullyReplicatedServers, 
ImmutableSet.of(SERVER_0, SERVER_1));
-    assertEquals(partitionInfoMap[0]._segments, 
Collections.singleton(segment0));
+    assertEqualsNoOrder(partitionInfoMap[0]._segments.toArray(), new 
String[]{segment0, newSegment});
     assertEquals(partitionInfoMap[1]._fullyReplicatedServers, 
ImmutableSet.of(SERVER_0, SERVER_1));
     assertEqualsNoOrder(partitionInfoMap[1]._segments.toArray(), new 
String[]{segment1, segment2});
     assertTrue(tablePartitionInfo.getSegmentsWithInvalidPartition().isEmpty());
   }
 
-  private TableConfig getTableConfig(String[] partitionColumns, String[] 
partitionFunctions, int[] partitionSizes) {
-    Map<String, ColumnPartitionConfig> partitionColumnMetadataMap = new 
HashMap<>();
-    for (int idx = 0; idx < partitionColumns.length; idx++) {
-      partitionColumnMetadataMap.put(partitionColumns[idx],
-          new ColumnPartitionConfig(partitionFunctions[idx], 
partitionSizes[idx]));
-    }
-    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME)
-        .setSegmentPartitionConfig(new 
SegmentPartitionConfig(partitionColumnMetadataMap)).build();
-  }
-
-  private void setSegmentZKPartitionMetadata(String segment, String 
partitionFunction, int numPartitions,
-      int partitionId) {
+  private void setSegmentZKMetadata(String segment, String partitionFunction, 
int numPartitions, int partitionId,
+      long pushTimeMs) {
     SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segment);
     segmentZKMetadata.setPartitionMetadata(new 
SegmentPartitionMetadata(Collections.singletonMap(PARTITION_COLUMN,
         new ColumnPartitionMetadata(partitionFunction, numPartitions, 
Collections.singleton(partitionId), null))));
+    segmentZKMetadata.setPushTime(pushTimeMs);
     ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, 
OFFLINE_TABLE_NAME, segmentZKMetadata);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/routing/TablePartitionInfo.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/routing/TablePartitionInfo.java
index 1faef75c77..3ff464bed0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/routing/TablePartitionInfo.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/routing/TablePartitionInfo.java
@@ -28,10 +28,10 @@ public class TablePartitionInfo {
   private final String _partitionFunctionName;
   private final int _numPartitions;
   private final PartitionInfo[] _partitionInfoMap;
-  private final Set<String> _segmentsWithInvalidPartition;
+  private final List<String> _segmentsWithInvalidPartition;
 
   public TablePartitionInfo(String tableNameWithType, String partitionColumn, 
String partitionFunctionName,
-      int numPartitions, PartitionInfo[] partitionInfoMap, Set<String> 
segmentsWithInvalidPartition) {
+      int numPartitions, PartitionInfo[] partitionInfoMap, List<String> 
segmentsWithInvalidPartition) {
     _tableNameWithType = tableNameWithType;
     _partitionColumn = partitionColumn;
     _partitionFunctionName = partitionFunctionName;
@@ -60,7 +60,7 @@ public class TablePartitionInfo {
     return _partitionInfoMap;
   }
 
-  public Set<String> getSegmentsWithInvalidPartition() {
+  public List<String> getSegmentsWithInvalidPartition() {
     return _segmentsWithInvalidPartition;
   }
 
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
index dfed800ec1..3dbc6833ef 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
@@ -246,7 +246,7 @@ public class QueryEnvironmentTestBase {
         }
         TablePartitionInfo tablePartitionInfo =
             new TablePartitionInfo(tableNameWithType, partitionColumn, 
"hashCode", numPartitions, partitionIdToInfoMap,
-                Collections.emptySet());
+                Collections.emptyList());
         partitionInfoMap.put(tableNameWithType, tablePartitionInfo);
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to