Jackie-Jiang commented on code in PR #11943:
URL: https://github.com/apache/pinot/pull/11943#discussion_r1394997095
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -518,40 +523,56 @@ private void commitSegmentMetadataInternal(String
realtimeTableName,
*/
// Step-1
+ long startTimeNs1 = System.nanoTime();
SegmentZKMetadata committingSegmentZKMetadata =
updateCommittingSegmentZKMetadata(realtimeTableName,
committingSegmentDescriptor);
// Refresh the Broker routing to reflect the changes in the segment ZK
metadata
_helixResourceManager.sendSegmentRefreshMessage(realtimeTableName,
committingSegmentName, false, true);
- // Using the latest segment of each partition group, creates a list of
{@link PartitionGroupConsumptionStatus}
- StreamConfig streamConfig =
- new StreamConfig(tableConfig.getTableName(),
IngestionConfigUtils.getStreamConfigMap(tableConfig));
- List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
- getPartitionGroupConsumptionStatusList(idealState, streamConfig);
-
- // Fetches new partition groups, given current list of {@link
PartitionGroupConsumptionStatus}.
- List<PartitionGroupMetadata> newPartitionGroupMetadataList =
- getNewPartitionGroupMetadataList(streamConfig,
currentPartitionGroupConsumptionStatusList);
- Set<Integer> newPartitionGroupSet =
-
newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId)
- .collect(Collectors.toSet());
- int numPartitionGroups = newPartitionGroupMetadataList.size();
-
+ // Step-2
+ long startTimeNs2 = System.nanoTime();
String newConsumingSegmentName = null;
- if (!isTablePaused(idealState) &&
newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
- // Only if committingSegment's partitionGroup is present in the
newPartitionGroupMetadataList, we create new
- // segment metadata
- String rawTableName =
TableNameBuilder.extractRawTableName(realtimeTableName);
- long newSegmentCreationTimeMs = getCurrentTimeMs();
- LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName,
committingSegmentPartitionGroupId,
- committingLLCSegment.getSequenceNumber() + 1,
newSegmentCreationTimeMs);
- createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment,
newSegmentCreationTimeMs,
- committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, numPartitionGroups, numReplicas,
- newPartitionGroupMetadataList);
- newConsumingSegmentName = newLLCSegment.getSegmentName();
+ if (!isTablePaused(idealState)) {
+ int numPartitionGroups;
+ boolean shouldCreateNewConsumingSegment;
+ StreamConfig streamConfig =
+ new StreamConfig(tableConfig.getTableName(),
IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ try {
+ numPartitionGroups = getNumPartitionGroups(streamConfig);
+ shouldCreateNewConsumingSegment = numPartitionGroups >
committingSegmentPartitionGroupId;
+ } catch (Exception e) {
+ LOGGER.info("Failed to read partition count from stream metadata
provider for table: {}, exception: {}. "
+ + "Reading all partition group metadata to determine partition
count and partition group status.",
+ realtimeTableName, e.toString());
+ // TODO: Find a better way to determine partition count and if the
committing partition group is fully consumed.
+ // We don't need to read partition group metadata for other
partition groups.
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
+ getPartitionGroupConsumptionStatusList(idealState, streamConfig);
+ List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+ getNewPartitionGroupMetadataList(streamConfig,
currentPartitionGroupConsumptionStatusList);
+ numPartitionGroups = newPartitionGroupMetadataList.size();
+ shouldCreateNewConsumingSegment = false;
+ for (PartitionGroupMetadata newPartitionGroupMetadata :
newPartitionGroupMetadataList) {
+ if (newPartitionGroupMetadata.getPartitionGroupId() ==
committingSegmentPartitionGroupId) {
+ shouldCreateNewConsumingSegment = true;
+ break;
+ }
+ }
+ }
+ if (shouldCreateNewConsumingSegment) {
Review Comment:
@jadami10 Added `Set<Integer> fetchPartitionIds(long timeoutMillis)` into
`StreamMetadataProvider` with a default implementation. Let me know if it suits
your need
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]