Copilot commented on code in PR #17587:
URL: https://github.com/apache/pinot/pull/17587#discussion_r2735887132
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java:
##########
@@ -96,6 +133,52 @@ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
}
}
+ @Override
+ public List<PartitionGroupMetadata> computePartitionGroupMetadata(String
clientId, StreamConfig streamConfig,
+ List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatuses,
int timeoutMillis)
+ throws IOException, java.util.concurrent.TimeoutException {
+ Optional<List<Integer>> subsetOpt =
+
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(_config.getStreamConfigMap());
+ if (subsetOpt.isEmpty()) {
+ return
StreamMetadataProvider.super.computePartitionGroupMetadata(clientId,
streamConfig,
+ partitionGroupConsumptionStatuses, timeoutMillis);
+ }
+ List<Integer> subset = subsetOpt.get();
+ Set<Integer> topicIds = fetchPartitionIds(timeoutMillis);
+ Map<Integer, StreamPartitionMsgOffset> consumptionByPartition = new
HashMap<>();
+ for (PartitionGroupConsumptionStatus s :
partitionGroupConsumptionStatuses) {
+ consumptionByPartition.put(s.getStreamPartitionGroupId(),
s.getEndOffset());
+ }
+ StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ List<PartitionGroupMetadata> result = new ArrayList<>(subset.size());
+ for (Integer partitionId : subset) {
+ if (!topicIds.contains(partitionId)) {
Review Comment:
Silently skipping non-existent partitions contradicts the earlier
validation. If a partition ID was validated at lines 90-94, it should always
exist in topicIds here. This skip suggests partitions could disappear between
fetchPartitionIds and this method, which would be a race condition. Either
remove this check (trusting the earlier validation) or log a warning if this
unexpected state occurs.
```suggestion
if (!topicIds.contains(partitionId)) {
LOGGER.warn(
"Configured partition id {} does not exist in topic {} when
computing partition group metadata. "
+ "This indicates that topic partitions may have changed
between validation and metadata "
+ "computation. Skipping this partition. Current topic
partitions: {}",
partitionId, _topic, topicIds);
```
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java:
##########
@@ -96,6 +133,52 @@ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
}
}
+ @Override
+ public List<PartitionGroupMetadata> computePartitionGroupMetadata(String
clientId, StreamConfig streamConfig,
+ List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatuses,
int timeoutMillis)
+ throws IOException, java.util.concurrent.TimeoutException {
+ Optional<List<Integer>> subsetOpt =
+
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(_config.getStreamConfigMap());
+ if (subsetOpt.isEmpty()) {
+ return
StreamMetadataProvider.super.computePartitionGroupMetadata(clientId,
streamConfig,
+ partitionGroupConsumptionStatuses, timeoutMillis);
+ }
+ List<Integer> subset = subsetOpt.get();
+ Set<Integer> topicIds = fetchPartitionIds(timeoutMillis);
+ Map<Integer, StreamPartitionMsgOffset> consumptionByPartition = new
HashMap<>();
+ for (PartitionGroupConsumptionStatus s :
partitionGroupConsumptionStatuses) {
+ consumptionByPartition.put(s.getStreamPartitionGroupId(),
s.getEndOffset());
+ }
+ StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ List<PartitionGroupMetadata> result = new ArrayList<>(subset.size());
+ for (Integer partitionId : subset) {
+ if (!topicIds.contains(partitionId)) {
Review Comment:
Silently skipping non-existent partitions contradicts the earlier
validation. If a partition ID was validated at lines 90-94, it should always
exist in topicIds here. This skip suggests partitions could disappear between
fetchPartitionIds and this method, which would be a race condition. Either
remove this check (trusting the earlier validation) or log a warning if this
unexpected state occurs.
```suggestion
if (!topicIds.contains(partitionId)) {
LOGGER.warn("Configured partition id {} does not exist in topic {}
when computing partition group metadata. "
+ "Topic partitions: {}", partitionId, _topic, topicIds);
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/ImplicitRealtimeTablePartitionSelector.java:
##########
@@ -54,22 +56,35 @@ public ImplicitRealtimeTablePartitionSelector(TableConfig
tableConfig,
String tableNameWithType, @Nullable InstancePartitions
existingInstancePartitions, boolean minimizeDataMovement,
StreamMetadataProvider streamMetadataProvider) {
super(replicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions, minimizeDataMovement);
- _numPartitions = getStreamNumPartitions(streamMetadataProvider);
- }
-
- private int getStreamNumPartitions(StreamMetadataProvider
streamMetadataProvider) {
+ List<Integer> partitionIds = null;
try (streamMetadataProvider) {
- return streamMetadataProvider.fetchPartitionCount(10_000L);
+ _numPartitions = streamMetadataProvider.fetchPartitionCount(10_000L);
+ try {
+ Set<Integer> ids = streamMetadataProvider.fetchPartitionIds(10_000L);
+ if (ids != null && !ids.isEmpty()) {
+ partitionIds = new ArrayList<>(ids);
+ Collections.sort(partitionIds);
+ }
+ } catch (UnsupportedOperationException e) {
+ // Stream does not expose partition IDs; instance partitions will use
0..numPartitions-1
+ }
Review Comment:
Catching and silently ignoring UnsupportedOperationException makes it
unclear which streams support fetchPartitionIds and which don't. Consider
logging at DEBUG level that the stream doesn't expose partition IDs, so
operators can understand why explicit partition IDs weren't used.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]