Jackie-Jiang commented on code in PR #11943:
URL: https://github.com/apache/pinot/pull/11943#discussion_r1394997095


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -518,40 +523,56 @@ private void commitSegmentMetadataInternal(String 
realtimeTableName,
      */
 
     // Step-1
+    long startTimeNs1 = System.nanoTime();
     SegmentZKMetadata committingSegmentZKMetadata =
         updateCommittingSegmentZKMetadata(realtimeTableName, 
committingSegmentDescriptor);
     // Refresh the Broker routing to reflect the changes in the segment ZK 
metadata
     _helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, 
committingSegmentName, false, true);
 
-    // Using the latest segment of each partition group, creates a list of 
{@link PartitionGroupConsumptionStatus}
-    StreamConfig streamConfig =
-        new StreamConfig(tableConfig.getTableName(), 
IngestionConfigUtils.getStreamConfigMap(tableConfig));
-    List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
-        getPartitionGroupConsumptionStatusList(idealState, streamConfig);
-
-    // Fetches new partition groups, given current list of {@link 
PartitionGroupConsumptionStatus}.
-    List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-        getNewPartitionGroupMetadataList(streamConfig, 
currentPartitionGroupConsumptionStatusList);
-    Set<Integer> newPartitionGroupSet =
-        
newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId)
-            .collect(Collectors.toSet());
-    int numPartitionGroups = newPartitionGroupMetadataList.size();
-
+    // Step-2
+    long startTimeNs2 = System.nanoTime();
     String newConsumingSegmentName = null;
-    if (!isTablePaused(idealState) && 
newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
-      // Only if committingSegment's partitionGroup is present in the 
newPartitionGroupMetadataList, we create new
-      // segment metadata
-      String rawTableName = 
TableNameBuilder.extractRawTableName(realtimeTableName);
-      long newSegmentCreationTimeMs = getCurrentTimeMs();
-      LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, 
committingSegmentPartitionGroupId,
-          committingLLCSegment.getSequenceNumber() + 1, 
newSegmentCreationTimeMs);
-      createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, 
newSegmentCreationTimeMs,
-          committingSegmentDescriptor, committingSegmentZKMetadata, 
instancePartitions, numPartitionGroups, numReplicas,
-          newPartitionGroupMetadataList);
-      newConsumingSegmentName = newLLCSegment.getSegmentName();
+    if (!isTablePaused(idealState)) {
+      int numPartitionGroups;
+      boolean shouldCreateNewConsumingSegment;
+      StreamConfig streamConfig =
+          new StreamConfig(tableConfig.getTableName(), 
IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      try {
+        numPartitionGroups = getNumPartitionGroups(streamConfig);
+        shouldCreateNewConsumingSegment = numPartitionGroups > 
committingSegmentPartitionGroupId;
+      } catch (Exception e) {
+        LOGGER.info("Failed to read partition count from stream metadata 
provider for table: {}, exception: {}. "
+                + "Reading all partition group metadata to determine partition 
count and partition group status.",
+            realtimeTableName, e.toString());
+        // TODO: Find a better way to determine partition count and if the 
committing partition group is fully consumed.
+        //       We don't need to read partition group metadata for other 
partition groups.
+        List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
+            getPartitionGroupConsumptionStatusList(idealState, streamConfig);
+        List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+            getNewPartitionGroupMetadataList(streamConfig, 
currentPartitionGroupConsumptionStatusList);
+        numPartitionGroups = newPartitionGroupMetadataList.size();
+        shouldCreateNewConsumingSegment = false;
+        for (PartitionGroupMetadata newPartitionGroupMetadata : 
newPartitionGroupMetadataList) {
+          if (newPartitionGroupMetadata.getPartitionGroupId() == 
committingSegmentPartitionGroupId) {
+            shouldCreateNewConsumingSegment = true;
+            break;
+          }
+        }
+      }
+      if (shouldCreateNewConsumingSegment) {

Review Comment:
   @jadami10 Added `Set<Integer> fetchPartitionIds(long timeoutMillis)` into 
`StreamMetadataProvider` with a default implementation. Let me know if it suits 
your need



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to