npawar commented on a change in pull request #6667:
URL: https://github.com/apache/incubator-pinot/pull/6667#discussion_r608049809



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -448,14 +493,35 @@ private void commitSegmentMetadataInternal(String 
realtimeTableName,
     // Refresh the Broker routing to reflect the changes in the segment ZK 
metadata
     _helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, 
committingSegmentName, false, true);
 
-    // Step-2
+    // Using the latest segment of each partition group, creates a list of 
{@link PartitionGroupStatus}
+    PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+        IngestionConfigUtils.getStreamConfigMap(tableConfig));
+    List<PartitionGroupStatus> currentPartitionGroupStatusList =
+        getPartitionGroupStatusList(idealState, streamConfig);
+
+    // Fetches new partition groups, given current list of {@link 
PartitionGroupStatus}.
+    List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+        getNewPartitionGroupMetadataList(streamConfig, 
currentPartitionGroupStatusList);
+    Set<Integer> newPartitionGroupSet =
+        
newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId).collect(Collectors.toSet());
+    int numPartitionGroups = newPartitionGroupMetadataList.size();
+
+    // Only if committingSegment's partitionGroup is present in the 
newPartitionGroupMetadataList, we create new segment metadata
+    String newConsumingSegmentName = null;
+    String rawTableName = 
TableNameBuilder.extractRawTableName(realtimeTableName);
     long newSegmentCreationTimeMs = getCurrentTimeMs();
-    LLCSegmentName newLLCSegmentName =
-        getNextLLCSegmentName(new LLCSegmentName(committingSegmentName), 
newSegmentCreationTimeMs);
-    createNewSegmentZKMetadata(tableConfig,
-        new PartitionLevelStreamConfig(tableConfig.getTableName(), 
IngestionConfigUtils.getStreamConfigMap(tableConfig)),
-        newLLCSegmentName, newSegmentCreationTimeMs, 
committingSegmentDescriptor, committingSegmentZKMetadata,
-        instancePartitions, numPartitionGroups, numReplicas);
+    if (newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
+      LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, 
committingSegmentPartitionGroupId,
+          committingLLCSegment.getSequenceNumber() + 1, 
newSegmentCreationTimeMs);
+      createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, 
newSegmentCreationTimeMs,
+          committingSegmentDescriptor, committingSegmentZKMetadata, 
instancePartitions, numPartitionGroups, numReplicas);
+      newConsumingSegmentName = newLLCSegment.getSegmentName();
+    }
+
+    // TODO: also create the new partition groups here, instead of waiting 
till the {@link RealtimeSegmentValidationManager} runs

Review comment:
       moved todo to after ideal state update




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to