Copilot commented on code in PR #17811:
URL: https://github.com/apache/pinot/pull/17811#discussion_r2881554094


##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -81,8 +91,11 @@ private Boolean fetchSingleStream()
     StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);

Review Comment:
   The failure logs in this method still say "Could not get partition count" / 
"retrieved PartitionGroupMetadata", but the code path is now computing and 
returning StreamMetadata (wrapping the PartitionGroupMetadata list into 
StreamMetadata). Please update these log messages to reflect the new semantics 
so operational debugging/alerts remain accurate.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -81,8 +91,11 @@ private Boolean fetchSingleStream()
     StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
     try (StreamMetadataProvider streamMetadataProvider = 
streamConsumerFactory.createStreamMetadataProvider(

Review Comment:
   Log messages in this method still refer to "PartitionGroupMetadata" even 
though the fetcher now returns per-stream StreamMetadata (and the list is 
wrapped into StreamMetadata immediately above). Please update the log message 
to avoid misleading operators when debugging metadata fetches.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -122,15 +135,17 @@ private Boolean fetchMultipleStreams()
               .collect(Collectors.toList());
       try (StreamMetadataProvider streamMetadataProvider = 
streamConsumerFactory.createStreamMetadataProvider(

Review Comment:
   When remapping PartitionGroupMetadata for multi-stream tables, this code 
reconstructs a new PartitionGroupMetadata but only copies partitionGroupId and 
startOffset, dropping the new sequenceNumber field. Even if current providers 
always use the default, preserving sequenceNumber here (or explicitly setting 
it) will prevent subtle bugs if any provider/caller starts populating it.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java:
##########
@@ -90,18 +90,18 @@ public static IdealState buildEmptyIdealStateFor(String 
tableNameWithType, int n
    * @param pausedTopicIndices List of inactive topic indices. Index is the 
index of the topic in the streamConfigMaps.
    * @param forceGetOffsetFromStream - details in 
PinotLLCRealtimeSegmentManager.fetchPartitionGroupIdToSmallestOffset
    */
-  public static List<PartitionGroupMetadata> 
getPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
+  public static List<StreamMetadata> getStreamMetadataList(List<StreamConfig> 
streamConfigs,
       List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList, List<Integer> pausedTopicIndices,
       boolean forceGetOffsetFromStream) {
     PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = new 
PartitionGroupMetadataFetcher(
         streamConfigs, partitionGroupConsumptionStatusList, 
pausedTopicIndices, forceGetOffsetFromStream);
     try {
       
DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher);
-      return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
+      return partitionGroupMetadataFetcher.getStreamMetadataList();
     } catch (Exception e) {
       Exception fetcherException = 
partitionGroupMetadataFetcher.getException();
       String tableNameWithType = streamConfigs.get(0).getTableNameWithType();
-      LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of 
table: {}",
+      LOGGER.error("Could not get StreamMetadata for topic: {} of table: {}",
           streamConfigs.stream().map(streamConfig -> 
streamConfig.getTopicName()).reduce((a, b) -> a + "," + b),
           tableNameWithType, fetcherException);

Review Comment:
   The logger argument for the topic list uses 
`streamConfigs.stream()...reduce(...)`, which returns an Optional and will log 
as `Optional[...]` (and can be empty). Consider using `Collectors.joining(",")` 
(or `map(...).collect(joining(","))`) so the log message is clean and reliably 
formatted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to