mcvsubbu commented on a change in pull request #6667: URL: https://github.com/apache/incubator-pinot/pull/6667#discussion_r607990280
########## File path: pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupStatus.java ########## @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.stream; + +/** + * A PartitionGroup is a group of partitions/shards that the same consumer should consume from. + * This class contains all information which describes the latest state of a partition group. + * It is constructed by looking at the segment zk metadata of the latest segment of each partition group. + * It consists of: + * 1. partitionGroupId - A unique ID for the partitionGroup + * 2. sequenceNumber - The sequenceNumber this partitionGroup is currently at + * 3. startOffset - The start offset that the latest segment started consuming from + * 4. endOffset - The endOffset (if segment consuming from this partition group has finished consuming the segment and recorded the end offset) + * 5. status - IN_PROGRESS/DONE + * + * This information is needed by the stream, when grouping the partitions/shards into new partition groups. + */ +public class PartitionGroupStatus { Review comment: I thought we agreed on `PartitionGroupConsumptionStatus` ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -448,14 +493,35 @@ private void commitSegmentMetadataInternal(String realtimeTableName, // Refresh the Broker routing to reflect the changes in the segment ZK metadata _helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, committingSegmentName, false, true); - // Step-2 + // Using the latest segment of each partition group, creates a list of {@link PartitionGroupStatus} + PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), + IngestionConfigUtils.getStreamConfigMap(tableConfig)); + List<PartitionGroupStatus> currentPartitionGroupStatusList = + getPartitionGroupStatusList(idealState, streamConfig); + + // Fetches new partition groups, given current list of {@link PartitionGroupStatus}. + List<PartitionGroupMetadata> newPartitionGroupMetadataList = + getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupStatusList); + Set<Integer> newPartitionGroupSet = + newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId).collect(Collectors.toSet()); + int numPartitionGroups = newPartitionGroupMetadataList.size(); + + // Only if committingSegment's partitionGroup is present in the newPartitionGroupMetadataList, we create new segment metadata + String newConsumingSegmentName = null; + String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); long newSegmentCreationTimeMs = getCurrentTimeMs(); - LLCSegmentName newLLCSegmentName = - getNextLLCSegmentName(new LLCSegmentName(committingSegmentName), newSegmentCreationTimeMs); - createNewSegmentZKMetadata(tableConfig, - new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)), - newLLCSegmentName, newSegmentCreationTimeMs, committingSegmentDescriptor, committingSegmentZKMetadata, - instancePartitions, numPartitionGroups, numReplicas); + if (newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) { + LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId, + committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs); + createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs, + committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitionGroups, numReplicas); + newConsumingSegmentName = newLLCSegment.getSegmentName(); + } + + // TODO: also create the new partition groups here, instead of waiting till the {@link RealtimeSegmentValidationManager} runs Review comment: I would do this _after_ the ideal state update is completed for the committing partition. That way, if there are any errors in creating the new partitions for whatever reason, you can still complete the current segment and let it continue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
