npawar commented on a change in pull request #6667:
URL: https://github.com/apache/incubator-pinot/pull/6667#discussion_r595611251
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -448,14 +494,39 @@ private void commitSegmentMetadataInternal(String
realtimeTableName,
// Refresh the Broker routing to reflect the changes in the segment ZK
metadata
_helixResourceManager.sendSegmentRefreshMessage(realtimeTableName,
committingSegmentName, false, true);
- // Step-2
+ // Get current partition groups - this gives current state of latest
segments for each partition
+ // E.g. [A - DONE], [B - IN_PROGRESS], [C - IN_PROGRESS]
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
+ getCurrentPartitionGroupMetadataList(idealState, streamConfig);
+
+ // Fetches new partition groups, given current partition groups metadata.
+ // Assume stream has partitions A, B, C, all still consuming. Result will
be A, B, C
+ // Assume A was split into D, E, but messages of A are yet to be consumed,
result will be A, B, C
+ // Assume A was split into D, E and all messages of A are consumed, result
will be B, C, D, E.
+ List<PartitionGroupInfo> newPartitionGroupInfoList =
+ getPartitionGroupInfoList(streamConfig,
currentPartitionGroupMetadataList);
+ Set<Integer> newPartitionGroupSet =
+
newPartitionGroupInfoList.stream().map(PartitionGroupInfo::getPartitionGroupId).collect(Collectors.toSet());
+ int numPartitions = newPartitionGroupInfoList.size();
+
+ // Only if committingSegment's partitionGroup is present in the
newPartitionGroupInfoList, we create new segment metadata
Review comment:
The server is still the only one trying to detect end of partition. We
are setting stopReason as "endOfPartitionGroup". Only difference is that we are
relying on the controller's "new partition group info" to decide which segments
to create.
The controller is not trying to detect end of partition groups explicitly.
It is simply returning the new groups that it thinks should be. And we create
new consuming segments for everything that is in the new partition groups list,
and is not already present. It prolly looks a lil odd right now, because we
only create new metadata for the committing segment. But the eventual goal is
to create new metadata for all newly detected partitions.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]