Copilot commented on code in PR #17587:
URL: https://github.com/apache/pinot/pull/17587#discussion_r2735887132


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java:
##########
@@ -96,6 +133,52 @@ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
     }
   }
 
+  @Override
+  public List<PartitionGroupMetadata> computePartitionGroupMetadata(String 
clientId, StreamConfig streamConfig,
+      List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatuses, 
int timeoutMillis)
+      throws IOException, java.util.concurrent.TimeoutException {
+    Optional<List<Integer>> subsetOpt =
+        
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(_config.getStreamConfigMap());
+    if (subsetOpt.isEmpty()) {
+      return 
StreamMetadataProvider.super.computePartitionGroupMetadata(clientId, 
streamConfig,
+          partitionGroupConsumptionStatuses, timeoutMillis);
+    }
+    List<Integer> subset = subsetOpt.get();
+    Set<Integer> topicIds = fetchPartitionIds(timeoutMillis);
+    Map<Integer, StreamPartitionMsgOffset> consumptionByPartition = new 
HashMap<>();
+    for (PartitionGroupConsumptionStatus s : 
partitionGroupConsumptionStatuses) {
+      consumptionByPartition.put(s.getStreamPartitionGroupId(), 
s.getEndOffset());
+    }
+    StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
+    List<PartitionGroupMetadata> result = new ArrayList<>(subset.size());
+    for (Integer partitionId : subset) {
+      if (!topicIds.contains(partitionId)) {

Review Comment:
   Silently skipping non-existent partitions contradicts the earlier 
validation. If a partition ID was validated at lines 90-94, it should always 
exist in topicIds here. This skip suggests partitions could disappear between 
fetchPartitionIds and this method, which would be a race condition. Either 
remove this check (trusting the earlier validation) or log a warning if this 
unexpected state occurs.
   ```suggestion
         if (!topicIds.contains(partitionId)) {
           LOGGER.warn(
               "Configured partition id {} does not exist in topic {} when 
computing partition group metadata. "
                   + "This indicates that topic partitions may have changed 
between validation and metadata "
                   + "computation. Skipping this partition. Current topic 
partitions: {}",
               partitionId, _topic, topicIds);
   ```



##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java:
##########
@@ -96,6 +133,52 @@ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
     }
   }
 
+  @Override
+  public List<PartitionGroupMetadata> computePartitionGroupMetadata(String 
clientId, StreamConfig streamConfig,
+      List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatuses, 
int timeoutMillis)
+      throws IOException, java.util.concurrent.TimeoutException {
+    Optional<List<Integer>> subsetOpt =
+        
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(_config.getStreamConfigMap());
+    if (subsetOpt.isEmpty()) {
+      return 
StreamMetadataProvider.super.computePartitionGroupMetadata(clientId, 
streamConfig,
+          partitionGroupConsumptionStatuses, timeoutMillis);
+    }
+    List<Integer> subset = subsetOpt.get();
+    Set<Integer> topicIds = fetchPartitionIds(timeoutMillis);
+    Map<Integer, StreamPartitionMsgOffset> consumptionByPartition = new 
HashMap<>();
+    for (PartitionGroupConsumptionStatus s : 
partitionGroupConsumptionStatuses) {
+      consumptionByPartition.put(s.getStreamPartitionGroupId(), 
s.getEndOffset());
+    }
+    StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
+    List<PartitionGroupMetadata> result = new ArrayList<>(subset.size());
+    for (Integer partitionId : subset) {
+      if (!topicIds.contains(partitionId)) {

Review Comment:
   Silently skipping non-existent partitions contradicts the earlier 
validation. If a partition ID was validated at lines 90-94, it should always 
exist in topicIds here. This skip suggests partitions could disappear between 
fetchPartitionIds and this method, which would be a race condition. Either 
remove this check (trusting the earlier validation) or log a warning if this 
unexpected state occurs.
   ```suggestion
         if (!topicIds.contains(partitionId)) {
           LOGGER.warn("Configured partition id {} does not exist in topic {} 
when computing partition group metadata. "
                   + "Topic partitions: {}", partitionId, _topic, topicIds);
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/ImplicitRealtimeTablePartitionSelector.java:
##########
@@ -54,22 +56,35 @@ public ImplicitRealtimeTablePartitionSelector(TableConfig 
tableConfig,
       String tableNameWithType, @Nullable InstancePartitions 
existingInstancePartitions, boolean minimizeDataMovement,
       StreamMetadataProvider streamMetadataProvider) {
     super(replicaGroupPartitionConfig, tableNameWithType, 
existingInstancePartitions, minimizeDataMovement);
-    _numPartitions = getStreamNumPartitions(streamMetadataProvider);
-  }
-
-  private int getStreamNumPartitions(StreamMetadataProvider 
streamMetadataProvider) {
+    List<Integer> partitionIds = null;
     try (streamMetadataProvider) {
-      return streamMetadataProvider.fetchPartitionCount(10_000L);
+      _numPartitions = streamMetadataProvider.fetchPartitionCount(10_000L);
+      try {
+        Set<Integer> ids = streamMetadataProvider.fetchPartitionIds(10_000L);
+        if (ids != null && !ids.isEmpty()) {
+          partitionIds = new ArrayList<>(ids);
+          Collections.sort(partitionIds);
+        }
+      } catch (UnsupportedOperationException e) {
+        // Stream does not expose partition IDs; instance partitions will use 
0..numPartitions-1
+      }

Review Comment:
   Catching and silently ignoring UnsupportedOperationException makes it 
unclear which streams support fetchPartitionIds and which don't. Consider 
logging at DEBUG level that the stream doesn't expose partition IDs, so 
operators can understand why explicit partition IDs weren't used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to