This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit eb0e4c782829dd3d33ae50ee91def7e09028b090 Author: Neha Pawar <[email protected]> AuthorDate: Thu Dec 31 15:49:33 2020 -0800 Checnges in test to make it complie --- .../controller/helix/core/PinotHelixResourceManager.java | 4 ++-- .../core/realtime/PinotLLCRealtimeSegmentManager.java | 2 +- .../core/realtime/PinotLLCRealtimeSegmentManagerTest.java | 15 +++++++++++---- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index e80c06b..5388eeb 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -1337,7 +1337,7 @@ public class PinotHelixResourceManager { idealState = PinotTableIdealStateBuilder .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState, _enableBatchMessageMode); - _pinotLLCRealtimeSegmentManager.setupNewTable(realtimeTableConfig, idealState); + _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState); LOGGER.info("Successfully setup table for SHARDED consumers for {} ", realtimeTableName); } else { @@ -1366,7 +1366,7 @@ public class PinotHelixResourceManager { idealState = PinotTableIdealStateBuilder .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState, _enableBatchMessageMode); - _pinotLLCRealtimeSegmentManager.setupNewTable(realtimeTableConfig, idealState); + _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState); LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName); } else { LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index d899b4c..cf3a401 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -207,7 +207,7 @@ public class PinotLLCRealtimeSegmentManager { /** * Sets up the realtime table ideal state for a table of consumer type SHARDED */ - public void setupNewTable(TableConfig tableConfig, IdealState idealState) { + public void setUpNewTable(TableConfig tableConfig, IdealState idealState) { Preconditions.checkState(!_isStopping, "Segment manager is stopping"); String realtimeTableName = tableConfig.getTableName(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 42bdedc..75c8057 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -907,15 +907,22 @@ public class PinotLLCRealtimeSegmentManagerTest { @Override void updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName, - String newSegmentName, SegmentAssignment segmentAssignment, + List<String> newSegmentNames, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), committingSegmentName, - newSegmentName, segmentAssignment, instancePartitionsMap); + null, segmentAssignment, instancePartitionsMap); + for (String segmentName : newSegmentNames) { + updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), null, + segmentName, segmentAssignment, instancePartitionsMap); + } } @Override - List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) { - return IntStream.range(0, _numPartitions).mapToObj(FakePartitionGroupMetadata::new).collect(Collectors.toList()); + List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig, + List<PartitionGroupMetadata> currentPartitionGroupMetadataList) { + return IntStream.range(0, _numPartitions).mapToObj(i -> new PartitionGroupInfo(i, + getPartitionOffset(streamConfig, OffsetCriteria.SMALLEST_OFFSET_CRITERIA, i).toString())) + .collect(Collectors.toList()); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
