npawar commented on a change in pull request #6667:
URL: https://github.com/apache/incubator-pinot/pull/6667#discussion_r604541798
##########
File path:
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
##########
@@ -53,4 +57,34 @@ default StreamPartitionMsgOffset
fetchStreamPartitionOffset(@Nonnull OffsetCrite
long offset = fetchPartitionOffset(offsetCriteria, timeoutMillis);
return new LongMsgOffset(offset);
}
+
+ /**
+ * Fetch the list of partition group info for the latest state of the stream.
+ * Default behavior is the one for the Kafka stream, where each partition
group contains only one partition
+ * @param currentPartitionGroupsMetadata The list of metadata for the
current partition groups
+ */
+ default List<PartitionGroupInfo> getPartitionGroupInfoList(String clientId,
StreamConfig streamConfig,
+ List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int
timeoutMillis)
+ throws TimeoutException, IOException {
+ int partitionCount = fetchPartitionCount(timeoutMillis);
+ List<PartitionGroupInfo> newPartitionGroupInfoList = new
ArrayList<>(partitionCount);
+
+ // Add a PartitionGroupInfo into the list foreach partition already
present in current.
+ for (PartitionGroupMetadata currentPartitionGroupMetadata :
currentPartitionGroupsMetadata) {
+ newPartitionGroupInfoList.add(new
PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(),
+ currentPartitionGroupMetadata.getEndOffset()));
Review comment:
Done. As far as the current impl is concerned, that offset is never
going to be used.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]