This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ae77667671 Enhance SegmentPartitionMetadataManager to handle new
segment (#11585)
ae77667671 is described below
commit ae77667671594afa4c8fbd673eb8ae21ae3420d9
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Sep 13 14:17:14 2023 -0700
Enhance SegmentPartitionMetadataManager to handle new segment (#11585)
---
.../SegmentPartitionMetadataManager.java | 110 +++++++++++++++++----
.../SegmentPartitionMetadataManagerTest.java | 75 ++++++++------
.../pinot/core/routing/TablePartitionInfo.java | 6 +-
.../pinot/query/QueryEnvironmentTestBase.java | 2 +-
4 files changed, 141 insertions(+), 52 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java
index 0ed89225f3..0ca3c3d317 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java
@@ -29,10 +29,12 @@ import javax.annotation.Nullable;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.broker.routing.instanceselector.InstanceSelector;
import
org.apache.pinot.broker.routing.segmentmetadata.SegmentZkMetadataFetchListener;
import org.apache.pinot.core.routing.TablePartitionInfo;
import org.apache.pinot.core.routing.TablePartitionInfo.PartitionInfo;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.spi.utils.CommonConstants;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,6 +51,7 @@ import org.slf4j.LoggerFactory;
public class SegmentPartitionMetadataManager implements
SegmentZkMetadataFetchListener {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentPartitionMetadataManager.class);
private static final int INVALID_PARTITION_ID = -1;
+ private static final long INVALID_PUSH_TIME_MS = -1L;
private final String _tableNameWithType;
@@ -77,15 +80,17 @@ public class SegmentPartitionMetadataManager implements
SegmentZkMetadataFetchLi
int numSegments = onlineSegments.size();
for (int i = 0; i < numSegments; i++) {
String segment = onlineSegments.get(i);
- SegmentPartitionInfo partitionInfo =
- SegmentPartitionUtils.extractPartitionInfo(_tableNameWithType,
_partitionColumn, segment, znRecords.get(i));
- SegmentInfo segmentInfo = new SegmentInfo(getPartitionId(partitionInfo),
getOnlineServers(externalView, segment));
+ ZNRecord znRecord = znRecords.get(i);
+ SegmentInfo segmentInfo = new SegmentInfo(getPartitionId(segment,
znRecord), getPushTimeMs(znRecord),
+ getOnlineServers(externalView, segment));
_segmentInfoMap.put(segment, segmentInfo);
}
computeTablePartitionInfo();
}
- private int getPartitionId(@Nullable SegmentPartitionInfo
segmentPartitionInfo) {
+ private int getPartitionId(String segment, @Nullable ZNRecord znRecord) {
+ SegmentPartitionInfo segmentPartitionInfo =
+ SegmentPartitionUtils.extractPartitionInfo(_tableNameWithType,
_partitionColumn, segment, znRecord);
if (segmentPartitionInfo == null || segmentPartitionInfo ==
SegmentPartitionUtils.INVALID_PARTITION_INFO) {
return INVALID_PARTITION_ID;
}
@@ -106,7 +111,20 @@ public class SegmentPartitionMetadataManager implements
SegmentZkMetadataFetchLi
return partitions.iterator().next();
}
- private List<String> getOnlineServers(ExternalView externalView, String
segment) {
+ private static long getPushTimeMs(@Nullable ZNRecord znRecord) {
+ if (znRecord == null) {
+ return INVALID_PUSH_TIME_MS;
+ }
+ String pushTimeString =
znRecord.getSimpleField(CommonConstants.Segment.PUSH_TIME);
+ // Handle legacy push time key
+ if (pushTimeString == null) {
+ pushTimeString =
znRecord.getSimpleField(CommonConstants.Segment.Offline.PUSH_TIME);
+ }
+ // Return INVALID_PUSH_TIME_MS if unavailable for backward compatibility
+ return pushTimeString != null ? Long.parseLong(pushTimeString) :
INVALID_PUSH_TIME_MS;
+ }
+
+ private static List<String> getOnlineServers(ExternalView externalView,
String segment) {
Map<String, String> instanceStateMap = externalView.getStateMap(segment);
if (instanceStateMap == null) {
return Collections.emptyList();
@@ -123,16 +141,23 @@ public class SegmentPartitionMetadataManager implements
SegmentZkMetadataFetchLi
private void computeTablePartitionInfo() {
PartitionInfo[] partitionInfoMap = new PartitionInfo[_numPartitions];
- Set<String> segmentsWithInvalidPartition = new HashSet<>();
+ List<String> segmentsWithInvalidPartition = new ArrayList<>();
+ List<Map.Entry<String, SegmentInfo>> newSegmentInfoEntries = new
ArrayList<>();
+ long currentTimeMs = System.currentTimeMillis();
for (Map.Entry<String, SegmentInfo> entry : _segmentInfoMap.entrySet()) {
String segment = entry.getKey();
SegmentInfo segmentInfo = entry.getValue();
int partitionId = segmentInfo._partitionId;
- List<String> onlineServers = segmentInfo._onlineServers;
if (partitionId == INVALID_PARTITION_ID) {
segmentsWithInvalidPartition.add(segment);
continue;
}
+ // Process new segments in the end
+ if (InstanceSelector.isNewSegment(segmentInfo._pushTimeMs,
currentTimeMs)) {
+ newSegmentInfoEntries.add(entry);
+ continue;
+ }
+ List<String> onlineServers = segmentInfo._onlineServers;
PartitionInfo partitionInfo = partitionInfoMap[partitionId];
if (partitionInfo == null) {
Set<String> fullyReplicatedServers = new HashSet<>(onlineServers);
@@ -151,8 +176,48 @@ public class SegmentPartitionMetadataManager implements
SegmentZkMetadataFetchLi
LOGGER.warn("Found {} segments: {} with invalid partition from table:
{}", numSegmentsWithInvalidPartition,
segmentsWithInvalidPartition, _tableNameWithType);
} else {
- LOGGER.warn("Found {} segments: {} with invalid partition from table:
{}", numSegmentsWithInvalidPartition,
- segmentsWithInvalidPartition, _tableNameWithType);
+ LOGGER.warn("Found {} segments: {}... with invalid partition from
table: {}", numSegmentsWithInvalidPartition,
+ segmentsWithInvalidPartition.subList(0, 10), _tableNameWithType);
+ }
+ }
+ // Process new segments
+ if (!newSegmentInfoEntries.isEmpty()) {
+ List<String> excludedNewSegments = new ArrayList<>();
+ for (Map.Entry<String, SegmentInfo> entry : newSegmentInfoEntries) {
+ String segment = entry.getKey();
+ SegmentInfo segmentInfo = entry.getValue();
+ int partitionId = segmentInfo._partitionId;
+ List<String> onlineServers = segmentInfo._onlineServers;
+ PartitionInfo partitionInfo = partitionInfoMap[partitionId];
+ if (partitionInfo == null) {
+ // If the new segment is the first segment of a partition, treat it
as regular segment
+ Set<String> fullyReplicatedServers = new HashSet<>(onlineServers);
+ List<String> segments = new ArrayList<>();
+ segments.add(segment);
+ partitionInfo = new PartitionInfo(fullyReplicatedServers, segments);
+ partitionInfoMap[partitionId] = partitionInfo;
+ } else {
+ // If the new segment is not the first segment of a partition, add
it only if it won't reduce the fully
+ // replicated servers. It is common that a new segment (newly
pushed, or a new consuming segment) doesn't have
+ // all the replicas available yet, and we want to exclude it from
the partition info until all the replicas
+ // are available.
+ //noinspection SlowListContainsAll
+ if
(onlineServers.containsAll(partitionInfo._fullyReplicatedServers)) {
+ partitionInfo._segments.add(segment);
+ } else {
+ excludedNewSegments.add(segment);
+ }
+ }
+ }
+ if (!excludedNewSegments.isEmpty()) {
+ int numExcludedNewSegments = excludedNewSegments.size();
+ if (numExcludedNewSegments <= 10) {
+ LOGGER.info("Excluded {} new segments: {} without all replicas
available from table: {}",
+ numExcludedNewSegments, excludedNewSegments, _tableNameWithType);
+ } else {
+ LOGGER.info("Excluded {} new segments: {}... without all replicas
available from table: {}",
+ numExcludedNewSegments, excludedNewSegments.subList(0, 10),
_tableNameWithType);
+ }
}
}
_tablePartitionInfo =
@@ -167,16 +232,20 @@ public class SegmentPartitionMetadataManager implements
SegmentZkMetadataFetchLi
int numSegments = pulledSegments.size();
for (int i = 0; i < numSegments; i++) {
String segment = pulledSegments.get(i);
- SegmentPartitionInfo partitionInfo =
- SegmentPartitionUtils.extractPartitionInfo(_tableNameWithType,
_partitionColumn, segment, znRecords.get(i));
- SegmentInfo segmentInfo = new SegmentInfo(getPartitionId(partitionInfo),
getOnlineServers(externalView, segment));
+ ZNRecord znRecord = znRecords.get(i);
+ SegmentInfo segmentInfo = new SegmentInfo(getPartitionId(segment,
znRecord), getPushTimeMs(znRecord),
+ getOnlineServers(externalView, segment));
_segmentInfoMap.put(segment, segmentInfo);
}
// Update online servers for all online segments
for (String segment : onlineSegments) {
SegmentInfo segmentInfo = _segmentInfoMap.get(segment);
if (segmentInfo == null) {
- segmentInfo = new SegmentInfo(INVALID_PARTITION_ID,
getOnlineServers(externalView, segment));
+ // NOTE: This should not happen, but we still handle it gracefully by
adding an invalid SegmentInfo
+ LOGGER.error("Failed to find segment info for segment: {} in table: {}
while handling assignment change",
+ segment, _tableNameWithType);
+ segmentInfo =
+ new SegmentInfo(INVALID_PARTITION_ID, INVALID_PUSH_TIME_MS,
getOnlineServers(externalView, segment));
_segmentInfoMap.put(segment, segmentInfo);
} else {
segmentInfo._onlineServers = getOnlineServers(externalView, segment);
@@ -188,15 +257,18 @@ public class SegmentPartitionMetadataManager implements
SegmentZkMetadataFetchLi
@Override
public synchronized void refreshSegment(String segment, @Nullable ZNRecord
znRecord) {
- SegmentPartitionInfo partitionInfo =
- SegmentPartitionUtils.extractPartitionInfo(_tableNameWithType,
_partitionColumn, segment, znRecord);
- int partitionId = getPartitionId(partitionInfo);
+ int partitionId = getPartitionId(segment, znRecord);
+ long pushTimeMs = getPushTimeMs(znRecord);
SegmentInfo segmentInfo = _segmentInfoMap.get(segment);
if (segmentInfo == null) {
- segmentInfo = new SegmentInfo(partitionId, Collections.emptyList());
+ // NOTE: This should not happen, but we still handle it gracefully by
adding an invalid SegmentInfo
+ LOGGER.error("Failed to find segment info for segment: {} in table: {}
while handling segment refresh", segment,
+ _tableNameWithType);
+ segmentInfo = new SegmentInfo(partitionId, pushTimeMs,
Collections.emptyList());
_segmentInfoMap.put(segment, segmentInfo);
} else {
segmentInfo._partitionId = partitionId;
+ segmentInfo._pushTimeMs = pushTimeMs;
}
computeTablePartitionInfo();
}
@@ -207,10 +279,12 @@ public class SegmentPartitionMetadataManager implements
SegmentZkMetadataFetchLi
private static class SegmentInfo {
int _partitionId;
+ long _pushTimeMs;
List<String> _onlineServers;
- SegmentInfo(int partitionId, List<String> onlineServers) {
+ SegmentInfo(int partitionId, long pushTimeMs, List<String> onlineServers) {
_partitionId = partitionId;
+ _pushTimeMs = pushTimeMs;
_onlineServers = onlineServers;
}
}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java
index 6d5999707b..5d70a504fb 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java
@@ -21,7 +21,6 @@ package org.apache.pinot.broker.routing.segmentpartition;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -39,11 +38,6 @@ import
org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.core.routing.TablePartitionInfo;
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
-import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
-import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -85,15 +79,12 @@ public class SegmentPartitionMetadataManagerTest extends
ControllerTest {
@Test
public void
testPartitionMetadataManagerProcessingThroughSegmentChangesSinglePartitionTable()
{
- // NOTE: Ideal state and external view are not used in the current
implementation
- TableConfig tableConfig =
- getTableConfig(new String[]{PARTITION_COLUMN}, new
String[]{PARTITION_COLUMN_FUNC}, new int[]{NUM_PARTITIONS});
- ExternalView externalView = new ExternalView(tableConfig.getTableName());
+ ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
Map<String, Map<String, String>> segmentAssignment =
externalView.getRecord().getMapFields();
Map<String, String> onlineInstanceStateMap = ImmutableMap.of(SERVER_0,
ONLINE, SERVER_1, ONLINE);
Set<String> onlineSegments = new HashSet<>();
// NOTE: Ideal state is not used in the current implementation.
- IdealState idealState = new IdealState("");
+ IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
SegmentPartitionMetadataManager partitionMetadataManager =
new SegmentPartitionMetadataManager(OFFLINE_TABLE_NAME,
PARTITION_COLUMN, PARTITION_COLUMN_FUNC,
@@ -130,11 +121,30 @@ public class SegmentPartitionMetadataManagerTest extends
ControllerTest {
assertEquals(tablePartitionInfo.getPartitionInfoMap(), new
TablePartitionInfo.PartitionInfo[NUM_PARTITIONS]);
assertTrue(tablePartitionInfo.getSegmentsWithInvalidPartition().isEmpty());
+ // Same logic applies to the new segment
+ onlineSegments.add(segmentWithoutPartitionMetadata);
+ segmentAssignment.put(segmentWithoutPartitionMetadata,
onlineInstanceStateMap);
+ segmentZKMetadataWithoutPartitionMetadata = new
SegmentZKMetadata(segmentWithoutPartitionMetadata);
+
segmentZKMetadataWithoutPartitionMetadata.setPushTime(System.currentTimeMillis());
+ ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME,
+ segmentZKMetadataWithoutPartitionMetadata);
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
+ tablePartitionInfo = partitionMetadataManager.getTablePartitionInfo();
+ assertEquals(tablePartitionInfo.getPartitionInfoMap(), new
TablePartitionInfo.PartitionInfo[NUM_PARTITIONS]);
+ assertEquals(tablePartitionInfo.getSegmentsWithInvalidPartition(),
+ Collections.singletonList(segmentWithoutPartitionMetadata));
+ onlineSegments.remove(segmentWithoutPartitionMetadata);
+ segmentAssignment.remove(segmentWithoutPartitionMetadata);
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
+ tablePartitionInfo = partitionMetadataManager.getTablePartitionInfo();
+ assertEquals(tablePartitionInfo.getPartitionInfoMap(), new
TablePartitionInfo.PartitionInfo[NUM_PARTITIONS]);
+ assertTrue(tablePartitionInfo.getSegmentsWithInvalidPartition().isEmpty());
+
// Adding segments inline with the partition column config should yield
correct partition results
String segment0 = "segment0";
onlineSegments.add(segment0);
segmentAssignment.put(segment0, Collections.singletonMap(SERVER_0,
ONLINE));
- setSegmentZKPartitionMetadata(segment0, PARTITION_COLUMN_FUNC,
NUM_PARTITIONS, 0);
+ setSegmentZKMetadata(segment0, PARTITION_COLUMN_FUNC, NUM_PARTITIONS, 0,
0L);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
tablePartitionInfo = partitionMetadataManager.getTablePartitionInfo();
TablePartitionInfo.PartitionInfo[] partitionInfoMap =
tablePartitionInfo.getPartitionInfoMap();
@@ -147,7 +157,7 @@ public class SegmentPartitionMetadataManagerTest extends
ControllerTest {
String segment1 = "segment1";
onlineSegments.add(segment1);
segmentAssignment.put(segment1, Collections.singletonMap(SERVER_1,
ONLINE));
- setSegmentZKPartitionMetadata(segment1, PARTITION_COLUMN_FUNC,
NUM_PARTITIONS, 1);
+ setSegmentZKMetadata(segment1, PARTITION_COLUMN_FUNC, NUM_PARTITIONS, 1,
0L);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
tablePartitionInfo = partitionMetadataManager.getTablePartitionInfo();
partitionInfoMap = tablePartitionInfo.getPartitionInfoMap();
@@ -158,7 +168,7 @@ public class SegmentPartitionMetadataManagerTest extends
ControllerTest {
assertTrue(tablePartitionInfo.getSegmentsWithInvalidPartition().isEmpty());
// Updating partition metadata without refreshing should have no effect
- setSegmentZKPartitionMetadata(segment0, PARTITION_COLUMN_FUNC_ALT,
NUM_PARTITIONS_ALT, 0);
+ setSegmentZKMetadata(segment0, PARTITION_COLUMN_FUNC_ALT,
NUM_PARTITIONS_ALT, 0, 0L);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
tablePartitionInfo = partitionMetadataManager.getTablePartitionInfo();
partitionInfoMap = tablePartitionInfo.getPartitionInfoMap();
@@ -179,7 +189,7 @@ public class SegmentPartitionMetadataManagerTest extends
ControllerTest {
assertEquals(tablePartitionInfo.getSegmentsWithInvalidPartition(),
Collections.singletonList(segment0));
// Refresh the changed segment back to inline, and both segments should
now be back on the partition list
- setSegmentZKPartitionMetadata(segment0, PARTITION_COLUMN_FUNC,
NUM_PARTITIONS, 0);
+ setSegmentZKMetadata(segment0, PARTITION_COLUMN_FUNC, NUM_PARTITIONS, 0,
0L);
segmentZkMetadataFetcher.refreshSegment(segment0);
tablePartitionInfo = partitionMetadataManager.getTablePartitionInfo();
partitionInfoMap = tablePartitionInfo.getPartitionInfoMap();
@@ -205,7 +215,7 @@ public class SegmentPartitionMetadataManagerTest extends
ControllerTest {
String segment2 = "segment2";
onlineSegments.add(segment2);
segmentAssignment.put(segment2, Collections.singletonMap(SERVER_1,
ONLINE));
- setSegmentZKPartitionMetadata(segment2, PARTITION_COLUMN_FUNC,
NUM_PARTITIONS, 1);
+ setSegmentZKMetadata(segment2, PARTITION_COLUMN_FUNC, NUM_PARTITIONS, 1,
0L);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
tablePartitionInfo = partitionMetadataManager.getTablePartitionInfo();
partitionInfoMap = tablePartitionInfo.getPartitionInfoMap();
@@ -226,35 +236,40 @@ public class SegmentPartitionMetadataManagerTest extends
ControllerTest {
assertEqualsNoOrder(partitionInfoMap[1]._segments.toArray(), new
String[]{segment1, segment2});
assertTrue(tablePartitionInfo.getSegmentsWithInvalidPartition().isEmpty());
- // Making all of them replicated will show full list
+ // Adding a new segment without available replica should not update the
partition map
+ String newSegment = "newSegment";
+ onlineSegments.add(newSegment);
+ setSegmentZKMetadata(newSegment, PARTITION_COLUMN_FUNC, NUM_PARTITIONS, 0,
System.currentTimeMillis());
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
+ tablePartitionInfo = partitionMetadataManager.getTablePartitionInfo();
+ partitionInfoMap = tablePartitionInfo.getPartitionInfoMap();
+ assertEquals(partitionInfoMap[0]._fullyReplicatedServers,
Collections.singleton(SERVER_0));
+ assertEquals(partitionInfoMap[0]._segments,
Collections.singleton(segment0));
+ assertEquals(partitionInfoMap[1]._fullyReplicatedServers,
Collections.singleton(SERVER_0));
+ assertEqualsNoOrder(partitionInfoMap[1]._segments.toArray(), new
String[]{segment1, segment2});
+ assertTrue(tablePartitionInfo.getSegmentsWithInvalidPartition().isEmpty());
+
+ // Making all of them replicated will show full list, even for the new
segment
segmentAssignment.put(segment0, ImmutableMap.of(SERVER_0, ONLINE,
SERVER_1, ONLINE));
segmentAssignment.put(segment1, ImmutableMap.of(SERVER_0, ONLINE,
SERVER_1, ONLINE));
segmentAssignment.put(segment2, ImmutableMap.of(SERVER_0, ONLINE,
SERVER_1, ONLINE));
+ segmentAssignment.put(newSegment, ImmutableMap.of(SERVER_0, ONLINE,
SERVER_1, ONLINE));
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
tablePartitionInfo = partitionMetadataManager.getTablePartitionInfo();
partitionInfoMap = tablePartitionInfo.getPartitionInfoMap();
assertEquals(partitionInfoMap[0]._fullyReplicatedServers,
ImmutableSet.of(SERVER_0, SERVER_1));
- assertEquals(partitionInfoMap[0]._segments,
Collections.singleton(segment0));
+ assertEqualsNoOrder(partitionInfoMap[0]._segments.toArray(), new
String[]{segment0, newSegment});
assertEquals(partitionInfoMap[1]._fullyReplicatedServers,
ImmutableSet.of(SERVER_0, SERVER_1));
assertEqualsNoOrder(partitionInfoMap[1]._segments.toArray(), new
String[]{segment1, segment2});
assertTrue(tablePartitionInfo.getSegmentsWithInvalidPartition().isEmpty());
}
- private TableConfig getTableConfig(String[] partitionColumns, String[]
partitionFunctions, int[] partitionSizes) {
- Map<String, ColumnPartitionConfig> partitionColumnMetadataMap = new
HashMap<>();
- for (int idx = 0; idx < partitionColumns.length; idx++) {
- partitionColumnMetadataMap.put(partitionColumns[idx],
- new ColumnPartitionConfig(partitionFunctions[idx],
partitionSizes[idx]));
- }
- return new
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME)
- .setSegmentPartitionConfig(new
SegmentPartitionConfig(partitionColumnMetadataMap)).build();
- }
-
- private void setSegmentZKPartitionMetadata(String segment, String
partitionFunction, int numPartitions,
- int partitionId) {
+ private void setSegmentZKMetadata(String segment, String partitionFunction,
int numPartitions, int partitionId,
+ long pushTimeMs) {
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segment);
segmentZKMetadata.setPartitionMetadata(new
SegmentPartitionMetadata(Collections.singletonMap(PARTITION_COLUMN,
new ColumnPartitionMetadata(partitionFunction, numPartitions,
Collections.singleton(partitionId), null))));
+ segmentZKMetadata.setPushTime(pushTimeMs);
ZKMetadataProvider.setSegmentZKMetadata(_propertyStore,
OFFLINE_TABLE_NAME, segmentZKMetadata);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/routing/TablePartitionInfo.java
b/pinot-core/src/main/java/org/apache/pinot/core/routing/TablePartitionInfo.java
index 1faef75c77..3ff464bed0 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/routing/TablePartitionInfo.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/routing/TablePartitionInfo.java
@@ -28,10 +28,10 @@ public class TablePartitionInfo {
private final String _partitionFunctionName;
private final int _numPartitions;
private final PartitionInfo[] _partitionInfoMap;
- private final Set<String> _segmentsWithInvalidPartition;
+ private final List<String> _segmentsWithInvalidPartition;
public TablePartitionInfo(String tableNameWithType, String partitionColumn,
String partitionFunctionName,
- int numPartitions, PartitionInfo[] partitionInfoMap, Set<String>
segmentsWithInvalidPartition) {
+ int numPartitions, PartitionInfo[] partitionInfoMap, List<String>
segmentsWithInvalidPartition) {
_tableNameWithType = tableNameWithType;
_partitionColumn = partitionColumn;
_partitionFunctionName = partitionFunctionName;
@@ -60,7 +60,7 @@ public class TablePartitionInfo {
return _partitionInfoMap;
}
- public Set<String> getSegmentsWithInvalidPartition() {
+ public List<String> getSegmentsWithInvalidPartition() {
return _segmentsWithInvalidPartition;
}
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
index dfed800ec1..3dbc6833ef 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
@@ -246,7 +246,7 @@ public class QueryEnvironmentTestBase {
}
TablePartitionInfo tablePartitionInfo =
new TablePartitionInfo(tableNameWithType, partitionColumn,
"hashCode", numPartitions, partitionIdToInfoMap,
- Collections.emptySet());
+ Collections.emptyList());
partitionInfoMap.put(tableNameWithType, tablePartitionInfo);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]