This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 1cb09cf59d2b4dd9b6ff79fdbe48775ae532ea60 Author: Neha Pawar <[email protected]> AuthorDate: Thu Dec 31 15:39:44 2020 -0800 More controller side changes --- .../helix/core/PinotHelixResourceManager.java | 4 +- .../helix/core/PinotTableIdealStateBuilder.java | 16 +++--- .../realtime/PinotLLCRealtimeSegmentManager.java | 57 ++++++++-------------- .../PinotLLCRealtimeSegmentManagerTest.java | 4 +- .../realtime/LLRealtimeSegmentDataManager.java | 2 +- .../impl/fakestream/FakeStreamConsumerFactory.java | 2 +- .../fakestream/FakeStreamMetadataProvider.java | 12 +++-- .../kafka09/KafkaStreamMetadataProvider.java | 36 ++++++++++---- .../kafka09/KafkaPartitionLevelConsumerTest.java | 4 +- .../kafka20/KafkaStreamMetadataProvider.java | 3 +- ...Fetcher.java => PartitionGroupInfoFetcher.java} | 31 +++++------- .../pinot/spi/stream/StreamMetadataProvider.java | 2 +- 12 files changed, 86 insertions(+), 87 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index ebfbfa1..e80c06b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -1337,7 +1337,7 @@ public class PinotHelixResourceManager { idealState = PinotTableIdealStateBuilder .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState, _enableBatchMessageMode); - _pinotLLCRealtimeSegmentManager.setupNewShardedTable(realtimeTableConfig, idealState); + _pinotLLCRealtimeSegmentManager.setupNewTable(realtimeTableConfig, idealState); LOGGER.info("Successfully setup table for SHARDED consumers for {} ", realtimeTableName); } else { @@ -1366,7 +1366,7 @@ public class PinotHelixResourceManager { idealState = PinotTableIdealStateBuilder .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState, _enableBatchMessageMode); - _pinotLLCRealtimeSegmentManager.setupNewShardedTable(realtimeTableConfig, idealState); + _pinotLLCRealtimeSegmentManager.setupNewTable(realtimeTableConfig, idealState); LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName); } else { LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java index a7b3c9e..8b200bb 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java @@ -30,10 +30,10 @@ import org.apache.pinot.common.metadata.instance.InstanceZKMetadata; import org.apache.pinot.common.utils.StringUtil; import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.common.utils.helix.HelixHelper; -import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.stream.PartitionGroupInfo; import org.apache.pinot.spi.stream.PartitionGroupMetadata; -import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher; +import org.apache.pinot.spi.stream.PartitionGroupInfoFetcher; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.retry.RetryPolicies; @@ -115,15 +115,15 @@ public class PinotTableIdealStateBuilder { return idealState; } - public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig, + public static List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) { - PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = - new PartitionGroupMetadataFetcher(streamConfig, currentPartitionGroupMetadataList); + PartitionGroupInfoFetcher partitionGroupInfoFetcher = + new PartitionGroupInfoFetcher(streamConfig, currentPartitionGroupMetadataList); try { - RetryPolicies.noDelayRetryPolicy(3).attempt(partitionGroupMetadataFetcher); - return partitionGroupMetadataFetcher.getPartitionGroupMetadataList(); + RetryPolicies.noDelayRetryPolicy(3).attempt(partitionGroupInfoFetcher); + return partitionGroupInfoFetcher.getPartitionGroupInfoList(); } catch (Exception e) { - Exception fetcherException = partitionGroupMetadataFetcher.getException(); + Exception fetcherException = partitionGroupInfoFetcher.getException(); LOGGER.error("Could not get partition count for {}", streamConfig.getTopicName(), fetcherException); throw new RuntimeException(fetcherException); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 423a0b2..d899b4c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -48,7 +48,6 @@ import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata; import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; -import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; @@ -208,7 +207,7 @@ public class PinotLLCRealtimeSegmentManager { /** * Sets up the realtime table ideal state for a table of consumer type SHARDED */ - public void setupNewShardedTable(TableConfig tableConfig, IdealState idealState) { + public void setupNewTable(TableConfig tableConfig, IdealState idealState) { Preconditions.checkState(!_isStopping, "Segment manager is stopping"); String realtimeTableName = tableConfig.getTableName(); @@ -220,18 +219,8 @@ public class PinotLLCRealtimeSegmentManager { new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); // get new partition groups and their metadata - StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); - StreamMetadataProvider streamMetadataProvider = streamConsumerFactory - .createStreamMetadataProvider(streamConfig.getTopicName() + "_" + System.currentTimeMillis()); - - List<PartitionGroupInfo> newPartitionGroupMetadataList; - try { - newPartitionGroupMetadataList = - streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 5000); - } catch (TimeoutException e) { - throw new IllegalStateException(e); - } - int numPartitionGroups = newPartitionGroupMetadataList.size(); + List<PartitionGroupInfo> newPartitionGroupInfoList = getPartitionGroupInfoList(streamConfig, Collections.emptyList()); + int numPartitionGroups = newPartitionGroupInfoList.size(); InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); int numReplicas = getNumReplicas(tableConfig, instancePartitions); @@ -242,7 +231,7 @@ public class PinotLLCRealtimeSegmentManager { long currentTimeMs = getCurrentTimeMs(); Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields(); - for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupMetadataList) { + for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) { String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo, currentTimeMs, instancePartitions, numPartitionGroups, numReplicas); updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment, @@ -507,18 +496,10 @@ public class PinotLLCRealtimeSegmentManager { List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState); PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); - StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); - StreamMetadataProvider streamMetadataProvider = streamConsumerFactory - .createStreamMetadataProvider(streamConfig.getTopicName() + " " + System.currentTimeMillis()); // find new partition groups [A],[B],[C],[D] - List<PartitionGroupInfo> newPartitionGroupMetadataList; - try { - newPartitionGroupMetadataList = - streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 1000); - } catch (TimeoutException e) { - throw new IllegalStateException(e); - } + List<PartitionGroupInfo> newPartitionGroupMetadataList = + getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList); // create new segment metadata, only if it is not IN_PROGRESS in the current state Map<Integer, PartitionGroupMetadata> currentGroupIdToMetadata = currentPartitionGroupMetadataList.stream().collect( @@ -721,9 +702,9 @@ public class PinotLLCRealtimeSegmentManager { } @VisibleForTesting - List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig, + List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) { - return PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig, currentPartitionGroupMetadataList); + return PinotTableIdealStateBuilder.getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList); } @VisibleForTesting @@ -843,7 +824,7 @@ public class PinotLLCRealtimeSegmentManager { if (idealState.isEnabled()) { List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState); - int numPartitions = getPartitionGroupMetadataList(streamConfig, currentPartitionGroupMetadataList).size(); + int numPartitions = getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList).size(); return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, numPartitions); } else { LOGGER.info("Skipping LLC segments validation for disabled table: {}", realtimeTableName); @@ -1005,7 +986,7 @@ public class PinotLLCRealtimeSegmentManager { // and restart consumption from the same offset (if possible) or a newer offset (if realtime stream does not have the same offset). // In latter case, report data loss. for (Map.Entry<Integer, LLCRealtimeSegmentZKMetadata> entry : latestSegmentZKMetadataMap.entrySet()) { - int partitionId = entry.getKey(); + int partitionGroupId = entry.getKey(); LLCRealtimeSegmentZKMetadata latestSegmentZKMetadata = entry.getValue(); String latestSegmentName = latestSegmentZKMetadata.getSegmentName(); LLCSegmentName latestLLCSegmentName = new LLCSegmentName(latestSegmentName); @@ -1049,10 +1030,10 @@ public class PinotLLCRealtimeSegmentManager { StreamPartitionMsgOffset startOffset = offsetFactory.create(latestSegmentZKMetadata.getStartOffset()); // Start offset must be higher than the start offset of the stream StreamPartitionMsgOffset partitionStartOffset = - getPartitionOffset(streamConfig, OffsetCriteria.SMALLEST_OFFSET_CRITERIA, partitionId); + getPartitionOffset(streamConfig, OffsetCriteria.SMALLEST_OFFSET_CRITERIA, partitionGroupId); if (partitionStartOffset.compareTo(startOffset) > 0) { LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startOffset, - partitionStartOffset, partitionId, realtimeTableName); + partitionStartOffset, partitionGroupId, realtimeTableName); _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L); startOffset = partitionStartOffset; } @@ -1089,7 +1070,7 @@ public class PinotLLCRealtimeSegmentManager { String previousConsumingSegment = null; for (Map.Entry<String, Map<String, String>> segmentEntry : instanceStatesMap.entrySet()) { LLCSegmentName llcSegmentName = new LLCSegmentName(segmentEntry.getKey()); - if (llcSegmentName.getPartitionGroupId() == partitionId && segmentEntry.getValue() + if (llcSegmentName.getPartitionGroupId() == partitionGroupId && segmentEntry.getValue() .containsValue(SegmentStateModel.CONSUMING)) { previousConsumingSegment = llcSegmentName.getSegmentName(); break; @@ -1098,7 +1079,7 @@ public class PinotLLCRealtimeSegmentManager { if (previousConsumingSegment == null) { LOGGER .error("Failed to find previous CONSUMING segment for partition: {} of table: {}, potential data loss", - partitionId, realtimeTableName); + partitionGroupId, realtimeTableName); _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L); } updateInstanceStatesForNewConsumingSegment(instanceStatesMap, previousConsumingSegment, latestSegmentName, @@ -1111,10 +1092,14 @@ public class PinotLLCRealtimeSegmentManager { } // Set up new partitions if not exist - for (int partitionId = 0; partitionId < numPartitions; partitionId++) { - if (!latestSegmentZKMetadataMap.containsKey(partitionId)) { + List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState); + List<PartitionGroupInfo> partitionGroupInfoList = + getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList); + for (PartitionGroupInfo partitionGroupInfo : partitionGroupInfoList) { + int partitionGroupId = partitionGroupInfo.getPartitionGroupId(); + if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) { String newSegmentName = - setupNewPartitionGroup(tableConfig, streamConfig, partitionId, currentTimeMs, instancePartitions, numPartitions, + setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo, currentTimeMs, instancePartitions, numPartitions, numReplicas); updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment, instancePartitionsMap); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 743e719..42bdedc 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -50,7 +50,6 @@ import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignme import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor; import org.apache.pinot.controller.util.SegmentCompletionUtils; import org.apache.pinot.core.indexsegment.generator.SegmentVersion; -import org.apache.pinot.core.realtime.impl.fakestream.FakePartitionGroupMetadata; import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl; import org.apache.pinot.spi.config.table.TableConfig; @@ -60,6 +59,7 @@ import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pinot.spi.stream.PartitionGroupInfo; import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.PartitionLevelStreamConfig; import org.apache.pinot.spi.stream.StreamConfig; @@ -914,7 +914,7 @@ public class PinotLLCRealtimeSegmentManagerTest { } @Override - List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) { + List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) { return IntStream.range(0, _numPartitions).mapToObj(FakePartitionGroupMetadata::new).collect(Collectors.toList()); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index f00facc..0cd1fba 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -1239,7 +1239,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { int numPartitions = columnPartitionConfig.getNumPartitions(); try { // fixme: get this from ideal state - int numStreamPartitions = _streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 5000).size(); + int numStreamPartitions = _streamMetadataProvider.getPartitionGroupInfoList(Collections.emptyList(), 5000).size(); if (numStreamPartitions != numPartitions) { segmentLogger.warn( "Number of stream partitions: {} does not match number of partitions in the partition config: {}, using number of stream partitions", diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java index 54be1b6..6121eef 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java @@ -88,7 +88,7 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory { // stream metadata provider StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider(clientId); - int partitionCount = streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 10_000).size(); + int partitionCount = streamMetadataProvider.getPartitionGroupInfoList(Collections.emptyList(), 10_000).size(); System.out.println(partitionCount); // Partition metadata provider diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java index c96d06a..0de0ce2 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pinot.spi.stream.PartitionGroupInfo; import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamMetadataProvider; @@ -35,9 +36,11 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; */ public class FakeStreamMetadataProvider implements StreamMetadataProvider { private final int _numPartitions; + private StreamConfig _streamConfig; public FakeStreamMetadataProvider(StreamConfig streamConfig) { _numPartitions = FakeStreamConfigUtils.getNumPartitions(streamConfig); + _streamConfig = streamConfig; } @Override @@ -46,11 +49,12 @@ public class FakeStreamMetadataProvider implements StreamMetadataProvider { } @Override - public List<PartitionGroupMetadata> getPartitionGroupMetadataList( - List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) { - List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>(); + public List<PartitionGroupInfo> getPartitionGroupInfoList( + List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) + throws TimeoutException { + List<PartitionGroupInfo> partitionGroupMetadataList = new ArrayList<>(); for (int i = 0; i < _numPartitions; i++) { - partitionGroupMetadataList.add(new FakePartitionGroupMetadata(i)); + partitionGroupMetadataList.add(new PartitionGroupInfo(i, fetchStreamPartitionOffset(_streamConfig.getOffsetCriteria(), 5000).toString())); } return partitionGroupMetadataList; } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java index 865ae96..2d0ad31 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java @@ -40,6 +40,7 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.protocol.Errors; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pinot.spi.stream.PartitionGroupInfo; import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamMetadataProvider; @@ -54,13 +55,14 @@ import org.slf4j.LoggerFactory; public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implements StreamMetadataProvider { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamMetadataProvider.class); + private StreamConfig _streamConfig; + /** * Create a partition specific metadata provider - * @param streamConfig - * @param partition */ public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition) { super(clientId, streamConfig, partition, new KafkaSimpleConsumerFactoryImpl()); + _streamConfig = streamConfig; } /** @@ -69,18 +71,21 @@ public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implemen */ public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig) { super(clientId, streamConfig, new KafkaSimpleConsumerFactoryImpl()); + _streamConfig = streamConfig; } @VisibleForTesting public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition, KafkaSimpleConsumerFactory kafkaSimpleConsumerFactory) { super(clientId, streamConfig, partition, kafkaSimpleConsumerFactory); + _streamConfig = streamConfig; } @VisibleForTesting public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig, KafkaSimpleConsumerFactory kafkaSimpleConsumerFactory) { super(clientId, streamConfig, kafkaSimpleConsumerFactory); + _streamConfig = streamConfig; } /** @@ -156,19 +161,30 @@ public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implemen } /** - * Fetch the partition group metadata list + * Fetch the partitionGroupMetadata list. * @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition. - * Hence current partition groups are not needed to compute the new partition groups */ @Override - public List<PartitionGroupMetadata> getPartitionGroupMetadataList( - @Nullable List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) { + public List<PartitionGroupInfo> getPartitionGroupInfoList( + List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) + throws java.util.concurrent.TimeoutException { int partitionCount = fetchPartitionCountInternal(timeoutMillis); - List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>(partitionCount); - for (int i = 0; i < partitionCount; i++) { - partitionGroupMetadataList.add(new KafkaPartitionGroupMetadata(i)); + List<PartitionGroupInfo> newPartitionGroupInfoList = new ArrayList<>(partitionCount); + + // add a PartitionGroupInfo into the list foreach partition already present in current. + // the end checkpoint is set as checkpoint + for (PartitionGroupMetadata currentPartitionGroupMetadata : currentPartitionGroupsMetadata) { + newPartitionGroupInfoList.add(new PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(), + currentPartitionGroupMetadata.getEndCheckpoint())); + } + // add PartitiongroupInfo for new partitions + // use offset criteria from stream config + for (int i = currentPartitionGroupsMetadata.size(); i < partitionCount; i++) { + StreamPartitionMsgOffset streamPartitionMsgOffset = + fetchStreamPartitionOffset(_streamConfig.getOffsetCriteria(), 5000); + newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset.toString())); } - return partitionGroupMetadataList; + return newPartitionGroupInfoList; } public synchronized long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java index 43b72a8..9d3091e 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java @@ -267,7 +267,7 @@ public class KafkaPartitionLevelConsumerTest { } @Test - public void testGetPartitionCount() { + public void testGetPartitionCount() throws Exception { String streamType = "kafka"; String streamKafkaTopicName = "theTopic"; String streamKafkaBrokerList = "abcd:1234,bcde:2345"; @@ -291,7 +291,7 @@ public class KafkaPartitionLevelConsumerTest { KafkaStreamMetadataProvider streamMetadataProvider = new KafkaStreamMetadataProvider(clientId, streamConfig, mockKafkaSimpleConsumerFactory); - Assert.assertEquals(streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 10000L), 2); + Assert.assertEquals(streamMetadataProvider.getPartitionGroupInfoList(Collections.emptyList(), 10000L), 2); } @Test diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java index eb606f2..ef22b6a 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java @@ -58,10 +58,9 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa /** * Fetch the partitionGroupMetadata list. * @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition. - * Hence current partition groups are not needed to compute the new partition groups */ @Override - public List<PartitionGroupInfo> getPartitionGroupMetadataList( + public List<PartitionGroupInfo> getPartitionGroupInfoList( List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) throws TimeoutException { int partitionCount = _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java similarity index 64% rename from pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java index e1ce1a6..d13be10 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java @@ -27,27 +27,24 @@ import org.slf4j.LoggerFactory; /** * Fetches the partition count of a stream using the {@link StreamMetadataProvider} */ -public class PartitionGroupMetadataFetcher implements Callable<Boolean> { +public class PartitionGroupInfoFetcher implements Callable<Boolean> { - private static final Logger LOGGER = LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class); + private static final Logger LOGGER = LoggerFactory.getLogger(PartitionGroupInfoFetcher.class); - private int _partitionCount = -1; - private List<PartitionGroupMetadata> _partitionGroupMetadataList; - private List<PartitionGroupMetadata> _currentPartitionGroupMetadata; - private final StreamConfig _streamConfig; - private StreamConsumerFactory _streamConsumerFactory; + private List<PartitionGroupInfo> _partitionGroupInfoList; + private final List<PartitionGroupMetadata> _currentPartitionGroupMetadata; + private final StreamConsumerFactory _streamConsumerFactory; private Exception _exception; private final String _topicName; - public PartitionGroupMetadataFetcher(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) { - _streamConfig = streamConfig; - _streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfig); + public PartitionGroupInfoFetcher(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) { + _streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); _topicName = streamConfig.getTopicName(); _currentPartitionGroupMetadata = currentPartitionGroupMetadataList; } - public List<PartitionGroupMetadata> getPartitionGroupMetadataList() { - return _partitionGroupMetadataList; + public List<PartitionGroupInfo> getPartitionGroupInfoList() { + return _partitionGroupInfoList; } public Exception getException() { @@ -55,21 +52,19 @@ public class PartitionGroupMetadataFetcher implements Callable<Boolean> { } /** - * Callable to fetch the number of partitions of the stream given the stream metadata - * @return - * @throws Exception + * Callable to fetch the partition group info for the stream */ @Override public Boolean call() throws Exception { - String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-" + _topicName; + String clientId = PartitionGroupInfoFetcher.class.getSimpleName() + "-" + _topicName; try ( StreamMetadataProvider streamMetadataProvider = _streamConsumerFactory.createStreamMetadataProvider(clientId)) { - _partitionGroupMetadataList = streamMetadataProvider.getPartitionGroupMetadataList(_currentPartitionGroupMetadata, /*maxWaitTimeMs=*/5000L); + _partitionGroupInfoList = streamMetadataProvider.getPartitionGroupInfoList(_currentPartitionGroupMetadata, /*maxWaitTimeMs=*/5000L); if (_exception != null) { // We had at least one failure, but succeeded now. Log an info - LOGGER.info("Successfully retrieved partition count as {} for topic {}", _partitionCount, _topicName); + LOGGER.info("Successfully retrieved partition group info for topic {}", _topicName); } return Boolean.TRUE; } catch (TransientConsumerException e) { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java index a9cd2d6..f595ea3 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java @@ -40,7 +40,7 @@ public interface StreamMetadataProvider extends Closeable { int fetchPartitionCount(long timeoutMillis); // takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state - List<PartitionGroupInfo> getPartitionGroupMetadataList( + List<PartitionGroupInfo> getPartitionGroupInfoList( List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) throws TimeoutException; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
