npawar commented on a change in pull request #6667:
URL: https://github.com/apache/incubator-pinot/pull/6667#discussion_r595607024



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -448,14 +494,39 @@ private void commitSegmentMetadataInternal(String 
realtimeTableName,
     // Refresh the Broker routing to reflect the changes in the segment ZK 
metadata
     _helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, 
committingSegmentName, false, true);
 
-    // Step-2
+    // Get current partition groups - this gives current state of latest 
segments for each partition
+    // E.g. [A - DONE], [B - IN_PROGRESS], [C - IN_PROGRESS]
+    PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+        IngestionConfigUtils.getStreamConfigMap(tableConfig));
+    List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
+        getCurrentPartitionGroupMetadataList(idealState, streamConfig);
+
+    // Fetches new partition groups, given current partition groups metadata.
+    // Assume stream has partitions A, B, C, all still consuming. Result will 
be A, B, C
+    // Assume A was split into D, E, but messages of A are yet to be consumed, 
result will be A, B, C
+    // Assume A was split into D, E and all messages of A are consumed, result 
will be B, C, D, E.
+    List<PartitionGroupInfo> newPartitionGroupInfoList =
+        getPartitionGroupInfoList(streamConfig, 
currentPartitionGroupMetadataList);
+    Set<Integer> newPartitionGroupSet =
+        
newPartitionGroupInfoList.stream().map(PartitionGroupInfo::getPartitionGroupId).collect(Collectors.toSet());
+    int numPartitions = newPartitionGroupInfoList.size();
+
+    // Only if committingSegment's partitionGroup is present in the 
newPartitionGroupInfoList, we create new segment metadata

Review comment:
       CurrentPartitionGroupMetadata is calculated using the latest segment zk 
metadata, for every partition group that is found in the ideal state. 
   
   So let's say we had only partitionGroup 0, the 
   `currentPartitionGroupMetadata=[(partitionGroupId:0, seqId:0, startOffset:0, 
endOffset: -, status:IN_PROGRESS)]. 
   `
   
   Now say this partition group 0 was committing, the 
currentPartitionGroupMetadata is calculated after the DONE is updated, hence 
   `currentPartitionGroupMetadata=[(partitionGroupId:0, seqId:0, startOffset:0, 
endOffset: 130, status:DONE)].`
   
   Now, let's say the partition group 0 started consuming for the next segment, 
   `currentPartitionGroupMetadata=[(partitionGroupId:0, seqId:1, 
startOffset:130, endOffset: -, status:IN_PROGRESS)].`
   
   Now, say that the partition 0 was split by the operator into 1,2. As a 
result, our partition group 0 detects that the partition has no more messages, 
and commits.
   `currentPartitionGroupMetadata=[(partitionGroupId:0, seqId:1, 
startOffset:130, endOffset: 210, status:DONE)].
   `
   
   Next time the validation manager runs, new partition groups 1 and 2 are 
detected, and consuming segments are created for them.
   
   Henceforth, the currentPartitionGroupMetadata call will always return 
partition group 0.
   `currentPartitionGroupMetadata=[(partitionGroupId:0, seqId:1, 
startOffset:130, endOffset: 210, status:DONE), (partitionGroupId:1, seqId:0, 
startOffset:0, endOffset: -, status:IN_PROGRESS), (partitionGroupId:2, seqId:0, 
startOffset:0, endOffset: -, status:IN_PROGRESS)].`
   
   So, this `A partition may have disappeared (and so does not show up in the 
current list of partitions available for consumption)` will never happen.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to