Copilot commented on code in PR #17811:
URL: https://github.com/apache/pinot/pull/17811#discussion_r2892962724


##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -122,29 +140,53 @@ private Boolean fetchMultipleStreams()
               .collect(Collectors.toList());
       try (StreamMetadataProvider streamMetadataProvider = 
streamConsumerFactory.createStreamMetadataProvider(
           StreamConsumerFactory.getUniqueClientId(clientId))) {
-        _newPartitionGroupMetadataList.addAll(
+        List<PartitionGroupMetadata> partitionGroupMetadataList =
             streamMetadataProvider.computePartitionGroupMetadata(clientId,
-                    streamConfig, topicPartitionGroupConsumptionStatusList, 
/*maxWaitTimeMs=*/15000,
+                    streamConfig, topicPartitionGroupConsumptionStatusList,
+                    /*maxWaitTimeMs=*/METADATA_FETCH_TIMEOUT_MS,
                     _forceGetOffsetFromStream)
                 .stream()
                 .map(metadata -> new PartitionGroupMetadata(
                     
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(metadata.getPartitionGroupId(),
-                        index), metadata.getStartOffset()))
-                .collect(Collectors.toList()));
-        if (_exception != null) {
-          // We had at least one failure, but succeeded now. Log an info
-          LOGGER.info("Successfully retrieved PartitionGroupMetadata for topic 
{}", topicName);
-        }
+                        index), metadata.getStartOffset(), 
metadata.getSequenceNumber()))
+                .collect(Collectors.toList());
+        int partitionCount = getNumPartitions(streamMetadataProvider, 
partitionGroupMetadataList);
+        _streamMetadataList.add(
+            new StreamMetadata(streamConfig, partitionCount, 
partitionGroupMetadataList));
       } catch (TransientConsumerException e) {
-        LOGGER.warn("Transient Exception: Could not get partition count for 
topic {}", topicName, e);
+        LOGGER.warn("Transient Exception: Could not get StreamMetadata for 
topic {}", topicName, e);
         _exception = e;
         return Boolean.FALSE;
       } catch (Exception e) {
-        LOGGER.warn("Could not get partition count for topic {}", topicName, 
e);
+        LOGGER.warn("Could not get StreamMetadata for topic {}", topicName, e);
         _exception = e;
         throw e;
       }
     }
     return Boolean.TRUE;
   }
+
+  private int getNumPartitions(StreamMetadataProvider streamMetadataProvider,
+      List<PartitionGroupMetadata> partitionGroupMetadataList) {
+    if (usesDefaultComputePartitionGroupMetadata(streamMetadataProvider)) {
+      return partitionGroupMetadataList.size();
+    }
+    return 
streamMetadataProvider.fetchPartitionCount(/*timeoutMillis=*/METADATA_FETCH_TIMEOUT_MS);
+  }
+
+  private boolean 
usesDefaultComputePartitionGroupMetadata(StreamMetadataProvider 
streamMetadataProvider) {
+    Class<?> providerClass = streamMetadataProvider.getClass();
+    return isDefaultComputeMethod(providerClass, 
COMPUTE_PARTITION_GROUP_METADATA_ARGUMENT_TYPES)
+        && isDefaultComputeMethod(providerClass, 
COMPUTE_PARTITION_GROUP_METADATA_WITH_FORCE_ARGUMENT_TYPES);

Review Comment:
   `usesDefaultComputePartitionGroupMetadata()` requires *both* overloads of 
`computePartitionGroupMetadata` to be default (`&&`). For providers that 
override only the 4-arg overload but rely on the default 5-arg overload (which 
is the one this fetcher actually calls), this returns false and 
`getNumPartitions()` will always invoke `fetchPartitionCount()`, potentially 
causing an extra remote metadata call per refresh. Consider basing the check on 
the overload that is actually invoked (the 5-arg signature), or otherwise 
handling the “4-arg overridden, 5-arg default” case explicitly to avoid 
redundant partition-count fetches.
   ```suggestion
       return isDefaultComputeMethod(providerClass, 
COMPUTE_PARTITION_GROUP_METADATA_WITH_FORCE_ARGUMENT_TYPES);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to