This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch full-auto-oss-abstraction in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 168a5c5a50d9d5d2d8aaadf64bd415def5818793 Author: jlli_LinkedIn <j...@linkedin.com> AuthorDate: Fri Mar 22 16:00:43 2024 -0700 Extract methods for Pinot table ideal state --- .../pinot/controller/BaseControllerStarter.java | 4 + .../apache/pinot/controller/ControllerConf.java | 11 ++ .../helix/core/PinotHelixResourceManager.java | 46 ++---- .../helix/core/PinotTableIdealStateHelper.java | 145 ------------------ .../DefaultPinotTableIdealStateHelper.java | 151 +++++++++++++++++++ .../FullAutoPinotTableIdealStateHelper.java | 165 +++++++++++++++++++++ .../PinotTableIdealStateHelper.java | 87 +++++++++++ .../PinotTableIdealStateHelperFactory.java | 50 +++++++ .../realtime/MissingConsumingSegmentFinder.java | 4 +- .../realtime/PinotLLCRealtimeSegmentManager.java | 93 +++--------- .../PinotLLCRealtimeSegmentManagerTest.java | 6 +- .../helix/core/retention/RetentionManagerTest.java | 10 +- .../tools/admin/command/MoveReplicaGroup.java | 20 +-- 13 files changed, 513 insertions(+), 279 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 281c397401..c509738fca 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -88,6 +88,7 @@ import org.apache.pinot.controller.helix.RealtimeConsumerMonitor; import org.apache.pinot.controller.helix.SegmentStatusChecker; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.cleanup.StaleInstancesCleanupTask; +import org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory; import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter; @@ -245,6 +246,9 @@ public abstract class BaseControllerStarter implements ServiceStartable { _tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager, _tenantRebalanceExecutorService); } + // Initialize the ideal state helper for Pinot tables. + PinotTableIdealStateHelperFactory.init(_config); + // Initialize the table config tuner registry. TableConfigTunerRegistry.init(_config.getTableConfigTunerPackages()); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index 4598b48eeb..8454aeb4ff 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -275,6 +275,7 @@ public class ControllerConf extends PinotConfiguration { public static final String ACCESS_CONTROL_USERNAME = "access.control.init.username"; public static final String ACCESS_CONTROL_PASSWORD = "access.control.init.password"; public static final String LINEAGE_MANAGER_CLASS = "controller.lineage.manager.class"; + public static final String PINOT_TABLE_IDEALSTATE_HELPER_CLASS = "controller.pinot.table.idealstate.class"; // Amount of the time the segment can take from the beginning of upload to the end of upload. Used when parallel push // protection is enabled. If the upload does not finish within the timeout, next upload can override the previous one. private static final String SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = "controller.segment.upload.timeoutInMillis"; @@ -298,6 +299,8 @@ public class ControllerConf extends PinotConfiguration { private static final String DEFAULT_ACCESS_CONTROL_PASSWORD = "admin"; private static final String DEFAULT_LINEAGE_MANAGER = "org.apache.pinot.controller.helix.core.lineage.DefaultLineageManager"; + private static final String DEFAULT_PINOT_TABLE_IDEALSTATE_HELPER_CLASS = + "org.apache.pinot.controller.helix.core.idealstatehelper"; private static final long DEFAULT_SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = 600_000L; // 10 minutes private static final int DEFAULT_MIN_NUM_CHARS_IN_IS_TO_TURN_ON_COMPRESSION = -1; private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 64; @@ -872,6 +875,14 @@ public class ControllerConf extends PinotConfiguration { setProperty(LINEAGE_MANAGER_CLASS, lineageModifierClass); } + public String getPinotTableIdealstateHelperClass() { + return getProperty(PINOT_TABLE_IDEALSTATE_HELPER_CLASS, DEFAULT_PINOT_TABLE_IDEALSTATE_HELPER_CLASS); + } + + public void setPinotTableIdealstateHelperClass(String pinotTableIdealstateHelperClass) { + setProperty(PINOT_TABLE_IDEALSTATE_HELPER_CLASS, pinotTableIdealstateHelperClass); + } + public long getSegmentUploadTimeoutInMillis() { return getProperty(SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS, DEFAULT_SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 57c75d7618..b74613f67a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -142,6 +142,8 @@ import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssign import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils; +import org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelper; +import org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory; import org.apache.pinot.controller.helix.core.lineage.LineageManager; import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; @@ -232,11 +234,14 @@ public class PinotHelixResourceManager { private SegmentDeletionManager _segmentDeletionManager; private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager; private TableCache _tableCache; + + private final PinotTableIdealStateHelper _pinotTableIdealStateHelper; private final LineageManager _lineageManager; public PinotHelixResourceManager(String zkURL, String helixClusterName, @Nullable String dataDir, boolean isSingleTenantCluster, boolean enableBatchMessageMode, int deletedSegmentsRetentionInDays, - boolean enableTieredSegmentAssignment, LineageManager lineageManager) { + boolean enableTieredSegmentAssignment, PinotTableIdealStateHelper pinotTableIdealStateHelper, + LineageManager lineageManager) { _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL); _helixClusterName = helixClusterName; _dataDir = dataDir; @@ -258,6 +263,7 @@ public class PinotHelixResourceManager { for (int i = 0; i < _tableUpdaterLocks.length; i++) { _tableUpdaterLocks[i] = new Object(); } + _pinotTableIdealStateHelper = pinotTableIdealStateHelper; _lineageManager = lineageManager; } @@ -265,7 +271,7 @@ public class PinotHelixResourceManager { this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), controllerConf.getDataDir(), controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(), controllerConf.getDeletedSegmentsRetentionInDays(), controllerConf.tieredSegmentAssignmentEnabled(), - LineageManagerFactory.create(controllerConf)); + PinotTableIdealStateHelperFactory.create(), LineageManagerFactory.create(controllerConf)); } /** @@ -1583,9 +1589,7 @@ public class PinotHelixResourceManager { Preconditions.checkState(tableType == TableType.OFFLINE || tableType == TableType.REALTIME, "Invalid table type: %s", tableType); - IdealState idealState = - PinotTableIdealStateHelper.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(), - _enableBatchMessageMode); + IdealState idealState = _pinotTableIdealStateHelper.buildEmptyIdealStateFor(tableConfig, _enableBatchMessageMode); // if (tableType == TableType.REALTIME) { // idealState = // PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(), @@ -2256,36 +2260,8 @@ public class PinotHelixResourceManager { SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig, _controllerMetrics); synchronized (getTableUpdaterLock(tableNameWithType)) { - Map<InstancePartitionsType, InstancePartitions> finalInstancePartitionsMap = instancePartitionsMap; - HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> { - assert idealState != null; - Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields(); - Map<String, List<String>> currentAssignmentList = idealState.getRecord().getListFields(); - if (currentAssignment.containsKey(segmentName) && currentAssignmentList.containsKey(segmentName)) { - LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName, - tableNameWithType); - } else { - List<String> assignedInstances = - segmentAssignment.assignSegment(segmentName, currentAssignment, finalInstancePartitionsMap); - LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances, - tableNameWithType); - TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); - if (tableType == TableType.REALTIME) { - // TODO: Once REALTIME uses FULL-AUTO only the listFields should be updated - currentAssignmentList.put(segmentName, Collections.emptyList() - /* SegmentAssignmentUtils.getInstanceStateList(assignedInstances) */); -// currentAssignment.put(segmentName, -// SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE)); - } else { - // TODO: Assess whether to pass in an empty instance list or to set the preferred list - currentAssignmentList.put(segmentName, Collections.emptyList() - /* SegmentAssignmentUtils.getInstanceStateList(assignedInstances) */); - } - // currentAssignment.put(segmentName, - // SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE)); - } - return idealState; - }); + PinotTableIdealStateHelperFactory.create() + .assignSegment(_helixZkManager, tableNameWithType, segmentName, segmentAssignment, instancePartitionsMap); LOGGER.info("Added segment: {} to IdealState for table: {}", segmentName, tableNameWithType); } } catch (Exception e) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java deleted file mode 100644 index 37c4b7555b..0000000000 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.controller.helix.core; - -import java.util.List; -import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.builder.CustomModeISBuilder; -import org.apache.helix.model.builder.FullAutoModeISBuilder; -import org.apache.pinot.spi.config.table.TableType; -import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; -import org.apache.pinot.spi.stream.PartitionGroupMetadata; -import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher; -import org.apache.pinot.spi.stream.StreamConfig; -import org.apache.pinot.spi.utils.builder.TableNameBuilder; -import org.apache.pinot.spi.utils.retry.RetryPolicies; -import org.apache.pinot.spi.utils.retry.RetryPolicy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class PinotTableIdealStateHelper { - private PinotTableIdealStateHelper() { - } - - private static final Logger LOGGER = LoggerFactory.getLogger(PinotTableIdealStateHelper.class); - private static final RetryPolicy DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY = - RetryPolicies.randomDelayRetryPolicy(3, 100L, 200L); - - public static IdealState buildEmptyIdealStateFor(String tableNameWithType, int numReplicas, - boolean enableBatchMessageMode) { - LOGGER.info("Building CUSTOM IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas); - CustomModeISBuilder customModeIdealStateBuilder = new CustomModeISBuilder(tableNameWithType); - customModeIdealStateBuilder - .setStateModel(PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL) - .setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1); - IdealState idealState = customModeIdealStateBuilder.build(); - idealState.setInstanceGroupTag(tableNameWithType); - idealState.setBatchMessageMode(enableBatchMessageMode); - return idealState; - } - - public static IdealState buildEmptyFullAutoIdealStateFor(String tableNameWithType, int numReplicas, - boolean enableBatchMessageMode) { - LOGGER.info("Building FULL-AUTO IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas); - TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); - String stateModel; - if (tableType == null) { - throw new RuntimeException("Failed to get table type from table name: " + tableNameWithType); - } else if (TableType.OFFLINE.equals(tableType)) { - stateModel = - PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; - } else { - stateModel = - PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; - } - - // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, crushed auto-rebalance by default - // TODO: The state model used only works for OFFLINE tables today. Add support for REALTIME state model too - FullAutoModeISBuilder idealStateBuilder = new FullAutoModeISBuilder(tableNameWithType); - idealStateBuilder - .setStateModel(stateModel) - .setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1) - // TODO: Revisit the rebalance strategy to use (maybe we add a custom one) - .setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName()); - // The below config guarantees if active number of replicas is no less than minimum active replica, there will - // not be partition movements happened. - // Set min active replicas to 0 and rebalance delay to 5 minutes so that if any master goes offline, Helix - // controller waits at most 5 minutes and then re-calculate the participant assignment. - // TODO: Assess which of these values need to be tweaked, removed, and what additional values that need to be added - idealStateBuilder.setMinActiveReplica(numReplicas - 1); - idealStateBuilder.setRebalanceDelay(300_000); - idealStateBuilder.enableDelayRebalance(); - // Set instance group tag - IdealState idealState = idealStateBuilder.build(); - idealState.setInstanceGroupTag(tableNameWithType); - idealState.setBatchMessageMode(enableBatchMessageMode); - return idealState; - } - - /** - * Fetches the list of {@link PartitionGroupMetadata} for the new partition groups for the stream, - * with the help of the {@link PartitionGroupConsumptionStatus} of the current partitionGroups. - * - * Reasons why <code>partitionGroupConsumptionStatusList</code> is needed: - * - * 1) - * The current {@link PartitionGroupConsumptionStatus} is used to determine the offsets that have been consumed for - * a partition group. - * An example of where the offsets would be used: - * e.g. If partition group 1 contains shardId 1, with status DONE and endOffset 150. There's 2 possibilities: - * 1) the stream indicates that shardId's last offset is 200. - * This tells Pinot that partition group 1 still has messages which haven't been consumed, and must be included in - * the response. - * 2) the stream indicates that shardId's last offset is 150, - * This tells Pinot that all messages of partition group 1 have been consumed, and it need not be included in the - * response. - * Thus, this call will skip a partition group when it has reached end of life and all messages from that partition - * group have been consumed. - * - * The current {@link PartitionGroupConsumptionStatus} is also used to know about existing groupings of partitions, - * and accordingly make the new partition groups. - * e.g. Assume that partition group 1 has status IN_PROGRESS and contains shards 0,1,2 - * and partition group 2 has status DONE and contains shards 3,4. - * In the above example, the <code>partitionGroupConsumptionStatusList</code> indicates that - * the collection of shards in partition group 1, should remain unchanged in the response, - * whereas shards 3,4 can be added to new partition groups if needed. - * - * @param streamConfig the streamConfig from the tableConfig - * @param partitionGroupConsumptionStatusList List of {@link PartitionGroupConsumptionStatus} for the current - * partition groups. - * The size of this list is equal to the number of partition groups, - * and is created using the latest segment zk metadata. - */ - public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig, - List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList) { - PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = - new PartitionGroupMetadataFetcher(streamConfig, partitionGroupConsumptionStatusList); - try { - DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher); - return partitionGroupMetadataFetcher.getPartitionGroupMetadataList(); - } catch (Exception e) { - Exception fetcherException = partitionGroupMetadataFetcher.getException(); - LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of table: {}", streamConfig.getTopicName(), - streamConfig.getTableNameWithType(), fetcherException); - throw new RuntimeException(fetcherException); - } - } -} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/DefaultPinotTableIdealStateHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/DefaultPinotTableIdealStateHelper.java new file mode 100644 index 0000000000..e6cef02d99 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/DefaultPinotTableIdealStateHelper.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.idealstatehelper; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.helix.HelixManager; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.builder.CustomModeISBuilder; +import org.apache.pinot.common.assignment.InstancePartitions; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.helix.HelixHelper; +import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator; +import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; +import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; +import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.retry.RetryPolicies; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class DefaultPinotTableIdealStateHelper implements PinotTableIdealStateHelper { + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPinotTableIdealStateHelper.class); + private static final RetryPolicy DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY = + RetryPolicies.randomDelayRetryPolicy(3, 100L, 200L); + + @Override + public IdealState buildEmptyIdealStateFor(TableConfig tableConfig, boolean enableBatchMessageMode) { + String tableNameWithType = tableConfig.getTableName(); + int numReplicas = tableConfig.getReplication(); + LOGGER.info("Building CUSTOM IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas); + CustomModeISBuilder customModeIdealStateBuilder = new CustomModeISBuilder(tableNameWithType); + customModeIdealStateBuilder + .setStateModel(PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL) + .setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1); + IdealState idealState = customModeIdealStateBuilder.build(); + idealState.setInstanceGroupTag(tableNameWithType); + idealState.setBatchMessageMode(enableBatchMessageMode); + return idealState; + } + + @Override + public void assignSegment(HelixManager helixManager, String tableNameWithType, String segmentName, + SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { + HelixHelper.updateIdealState(helixManager, tableNameWithType, idealState -> { + assert idealState != null; + Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields(); + if (currentAssignment.containsKey(segmentName)) { + LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName, + tableNameWithType); + } else { + List<String> assignedInstances = + segmentAssignment.assignSegment(segmentName, currentAssignment, instancePartitionsMap); + LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances, + tableNameWithType); + currentAssignment.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, + CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE)); + } + return idealState; + }); + } + + @Override + public void updateInstanceStatesForNewConsumingSegment(IdealState idealState, @Nullable String committingSegmentName, + @Nullable String newSegmentName, SegmentAssignment segmentAssignment, + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { + Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields(); + if (committingSegmentName != null) { + // Change committing segment state to ONLINE + Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet(); + instanceStatesMap.put(committingSegmentName, SegmentAssignmentUtils.getInstanceStateMap(instances, + CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE)); + LOGGER.info("Updating segment: {} to ONLINE state", committingSegmentName); + } + + // There used to be a race condition in pinot (caused by heavy GC on the controller during segment commit) + // that ended up creating multiple consuming segments for the same stream partition, named somewhat like + // tableName__1__25__20210920T190005Z and tableName__1__25__20210920T190007Z. It was fixed by checking the + // Zookeeper Stat object before updating the segment metadata. + // These conditions can happen again due to manual operations considered as fixes in Issues #5559 and #5263 + // The following check prevents the table from going into such a state (but does not prevent the root cause + // of attempting such a zk update). + if (newSegmentName != null) { + LLCSegmentName newLLCSegmentName = new LLCSegmentName(newSegmentName); + int partitionId = newLLCSegmentName.getPartitionGroupId(); + int seqNum = newLLCSegmentName.getSequenceNumber(); + for (String segmentNameStr : instanceStatesMap.keySet()) { + LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentNameStr); + if (llcSegmentName == null) { + // skip the segment name if the name is not in low-level consumer format + // such segment name can appear for uploaded segment + LOGGER.debug("Skip segment name {} not in low-level consumer format", segmentNameStr); + continue; + } + if (llcSegmentName.getPartitionGroupId() == partitionId && llcSegmentName.getSequenceNumber() == seqNum) { + String errorMsg = + String.format("Segment %s is a duplicate of existing segment %s", newSegmentName, segmentNameStr); + LOGGER.error(errorMsg); + throw new HelixHelper.PermanentUpdaterException(errorMsg); + } + } + // Assign instances to the new segment and add instances as state CONSUMING + List<String> instancesAssigned = + segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap); + instanceStatesMap.put(newSegmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, + CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING)); + LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned); + } + } + + @Override + public List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig, + List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList) { + PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = + new PartitionGroupMetadataFetcher(streamConfig, partitionGroupConsumptionStatusList); + try { + DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher); + return partitionGroupMetadataFetcher.getPartitionGroupMetadataList(); + } catch (Exception e) { + Exception fetcherException = partitionGroupMetadataFetcher.getException(); + LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of table: {}", streamConfig.getTopicName(), + streamConfig.getTableNameWithType(), fetcherException); + throw new RuntimeException(fetcherException); + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/FullAutoPinotTableIdealStateHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/FullAutoPinotTableIdealStateHelper.java new file mode 100644 index 0000000000..7b1889fdd7 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/FullAutoPinotTableIdealStateHelper.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.idealstatehelper; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.helix.HelixManager; +import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.builder.FullAutoModeISBuilder; +import org.apache.pinot.common.assignment.InstancePartitions; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.helix.HelixHelper; +import org.apache.pinot.controller.helix.core.PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator; +import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class FullAutoPinotTableIdealStateHelper extends DefaultPinotTableIdealStateHelper { + private static final Logger LOGGER = LoggerFactory.getLogger(FullAutoPinotTableIdealStateHelper.class); + + @Override + public IdealState buildEmptyIdealStateFor(TableConfig tableConfig, boolean enableBatchMessageMode) { + String tableNameWithType = tableConfig.getTableName(); + int numReplicas = tableConfig.getReplication(); + + LOGGER.info("Building FULL-AUTO IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas); + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + String stateModel; + if (tableType == null) { + throw new RuntimeException("Failed to get table type from table name: " + tableNameWithType); + } else if (TableType.OFFLINE.equals(tableType)) { + stateModel = + PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; + } else { + stateModel = + PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; + } + + // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, crushed auto-rebalance by default + // TODO: The state model used only works for OFFLINE tables today. Add support for REALTIME state model too + FullAutoModeISBuilder idealStateBuilder = new FullAutoModeISBuilder(tableNameWithType); + idealStateBuilder + .setStateModel(stateModel) + .setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1) + // TODO: Revisit the rebalance strategy to use (maybe we add a custom one) + .setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName()); + // The below config guarantees if active number of replicas is no less than minimum active replica, there will + // not be partition movements happened. + // Set min active replicas to 0 and rebalance delay to 5 minutes so that if any master goes offline, Helix + // controller waits at most 5 minutes and then re-calculate the participant assignment. + // TODO: Assess which of these values need to be tweaked, removed, and what additional values that need to be added + idealStateBuilder.setMinActiveReplica(numReplicas - 1); + idealStateBuilder.setRebalanceDelay(300_000); + idealStateBuilder.enableDelayRebalance(); + // Set instance group tag + IdealState idealState = idealStateBuilder.build(); + idealState.setInstanceGroupTag(tableNameWithType); + idealState.setBatchMessageMode(enableBatchMessageMode); + return idealState; + } + + @Override + public void assignSegment(HelixManager helixManager, String tableNameWithType, String segmentName, + SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { + HelixHelper.updateIdealState(helixManager, tableNameWithType, idealState -> { + assert idealState != null; + Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields(); + Map<String, List<String>> currentAssignmentList = idealState.getRecord().getListFields(); + if (currentAssignment.containsKey(segmentName) && currentAssignmentList.containsKey(segmentName)) { + LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName, + tableNameWithType); + } else { + List<String> assignedInstances = + segmentAssignment.assignSegment(segmentName, currentAssignment, instancePartitionsMap); + LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances, + tableNameWithType); + currentAssignmentList.put(segmentName, Collections.emptyList()); + } + return idealState; + }); + } + + @Override + public void updateInstanceStatesForNewConsumingSegment(IdealState idealState, @Nullable String committingSegmentName, + @Nullable String newSegmentName, SegmentAssignment segmentAssignment, + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { + Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields(); + Map<String, List<String>> segmentList = idealState.getRecord().getListFields(); + // TODO: Need to figure out the best way to handle committed segments' state change + if (committingSegmentName != null) { + // Change committing segment state to ONLINE +// Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet(); +// instanceStatesMap.put(committingSegmentName, +// SegmentAssignmentUtils.getInstanceStateMap(instances, SegmentStateModel.ONLINE)); +// instanceStatesList.put(newSegmentName, Collections.emptyList() +// /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/); + LOGGER.info("Updating segment: {} to ONLINE state", committingSegmentName); + } + + // There used to be a race condition in pinot (caused by heavy GC on the controller during segment commit) + // that ended up creating multiple consuming segments for the same stream partition, named somewhat like + // tableName__1__25__20210920T190005Z and tableName__1__25__20210920T190007Z. It was fixed by checking the + // Zookeeper Stat object before updating the segment metadata. + // These conditions can happen again due to manual operations considered as fixes in Issues #5559 and #5263 + // The following check prevents the table from going into such a state (but does not prevent the root cause + // of attempting such a zk update). + if (newSegmentName != null) { + LLCSegmentName newLLCSegmentName = new LLCSegmentName(newSegmentName); + int partitionId = newLLCSegmentName.getPartitionGroupId(); + int seqNum = newLLCSegmentName.getSequenceNumber(); + for (String segmentNameStr : instanceStatesMap.keySet()) { + LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentNameStr); + if (llcSegmentName == null) { + // skip the segment name if the name is not in low-level consumer format + // such segment name can appear for uploaded segment + LOGGER.debug("Skip segment name {} not in low-level consumer format", segmentNameStr); + continue; + } + if (llcSegmentName.getPartitionGroupId() == partitionId && llcSegmentName.getSequenceNumber() == seqNum) { + String errorMsg = + String.format("Segment %s is a duplicate of existing segment %s", newSegmentName, segmentNameStr); + LOGGER.error(errorMsg); + throw new HelixHelper.PermanentUpdaterException(errorMsg); + } + } + // Assign instances to the new segment and add instances as state CONSUMING + List<String> instancesAssigned = + segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap); + // No need to check for tableType as offline tables can never go to CONSUMING state. All callers are for REALTIME +// instanceStatesMap.put(newSegmentName, +// SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING)); + // TODO: Once REALTIME segments move to FULL-AUTO, we cannot update the map. Uncomment below lines to update list. + // Assess whether we should set am empty InstanceStateList for the segment or not. i.e. do we support + // this preferred list concept, and does Helix-Auto even allow preferred list concept (from code reading it + // looks like it does) + segmentList.put(newSegmentName, Collections.emptyList() + /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/); + LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned); + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelper.java new file mode 100644 index 0000000000..a30872df83 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelper.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.idealstatehelper; + +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.helix.HelixManager; +import org.apache.helix.model.IdealState; +import org.apache.pinot.common.assignment.InstancePartitions; +import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; +import org.apache.pinot.spi.stream.StreamConfig; + + +public interface PinotTableIdealStateHelper { + + /** + * Builds an empty ideal state for the Pinot table. + * @param tableConfig table config. + * @param enableBatchMessageMode whether to enable batch message mode when building the ideal state. + */ + IdealState buildEmptyIdealStateFor(TableConfig tableConfig, boolean enableBatchMessageMode); + + void assignSegment(HelixManager helixManager, String tableNameWithType, String segmentName, + SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap); + + void updateInstanceStatesForNewConsumingSegment(IdealState idealState, @Nullable String committingSegmentName, + @Nullable String newSegmentName, SegmentAssignment segmentAssignment, + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap); + + /** + * Fetches the list of {@link PartitionGroupMetadata} for the new partition groups for the stream, + * with the help of the {@link PartitionGroupConsumptionStatus} of the current partitionGroups. + * + * Reasons why <code>partitionGroupConsumptionStatusList</code> is needed: + * + * 1) + * The current {@link PartitionGroupConsumptionStatus} is used to determine the offsets that have been consumed for + * a partition group. + * An example of where the offsets would be used: + * e.g. If partition group 1 contains shardId 1, with status DONE and endOffset 150. There's 2 possibilities: + * 1) the stream indicates that shardId's last offset is 200. + * This tells Pinot that partition group 1 still has messages which haven't been consumed, and must be included in + * the response. + * 2) the stream indicates that shardId's last offset is 150, + * This tells Pinot that all messages of partition group 1 have been consumed, and it need not be included in the + * response. + * Thus, this call will skip a partition group when it has reached end of life and all messages from that partition + * group have been consumed. + * + * The current {@link PartitionGroupConsumptionStatus} is also used to know about existing groupings of partitions, + * and accordingly make the new partition groups. + * e.g. Assume that partition group 1 has status IN_PROGRESS and contains shards 0,1,2 + * and partition group 2 has status DONE and contains shards 3,4. + * In the above example, the <code>partitionGroupConsumptionStatusList</code> indicates that + * the collection of shards in partition group 1, should remain unchanged in the response, + * whereas shards 3,4 can be added to new partition groups if needed. + * + * @param streamConfig the streamConfig from the tableConfig + * @param partitionGroupConsumptionStatusList List of {@link PartitionGroupConsumptionStatus} for the current + * partition groups. + * The size of this list is equal to the number of partition groups, + * and is created using the latest segment zk metadata. + */ + List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig, + List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList); +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelperFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelperFactory.java new file mode 100644 index 0000000000..9327ecba04 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelperFactory.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.idealstatehelper; + +import org.apache.pinot.controller.ControllerConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PinotTableIdealStateHelperFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(PinotTableIdealStateHelperFactory.class); + private static PinotTableIdealStateHelper _instance = null; + private static ControllerConf _controllerConf; + + private PinotTableIdealStateHelperFactory() { + } + + public static void init(ControllerConf controllerConf) { + _controllerConf = controllerConf; + } + + public static PinotTableIdealStateHelper create() { + if (_instance == null) { + String pinotTableIdealstateHelperClassPath = _controllerConf.getPinotTableIdealstateHelperClass(); + try { + _instance = (PinotTableIdealStateHelper) Class.forName(pinotTableIdealstateHelperClassPath).newInstance(); + } catch (Exception e) { + LOGGER.error("PinotTableIdealStateHelper not found: {}", pinotTableIdealstateHelperClassPath); + throw new RuntimeException(e); + } + } + return _instance; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java index c9850856cd..94d158220c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java @@ -36,7 +36,7 @@ import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.utils.LLCSegmentName; -import org.apache.pinot.controller.helix.core.PinotTableIdealStateHelper; +import org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory; import org.apache.pinot.spi.stream.OffsetCriteria; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; @@ -79,7 +79,7 @@ public class MissingConsumingSegmentFinder { _partitionGroupIdToLargestStreamOffsetMap = new HashMap<>(); streamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA); try { - PinotTableIdealStateHelper.getPartitionGroupMetadataList(streamConfig, Collections.emptyList()) + PinotTableIdealStateHelperFactory.create().getPartitionGroupMetadataList(streamConfig, Collections.emptyList()) .forEach(metadata -> { _partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); }); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 40215c43a4..0e560bd9ee 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -68,9 +68,9 @@ import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory; import org.apache.pinot.controller.api.resources.Constants; import org.apache.pinot.controller.api.resources.PauseStatus; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; -import org.apache.pinot.controller.helix.core.PinotTableIdealStateHelper; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory; +import org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory; import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor; import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager; import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater; @@ -336,7 +336,7 @@ public class PinotLLCRealtimeSegmentManager { String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions, numPartitionGroups, numReplicas); - updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instancesStateList, null, segmentName, + updateInstanceStatesForNewConsumingSegment(idealState, null, segmentName, segmentAssignment, instancePartitionsMap); } @@ -817,7 +817,7 @@ public class PinotLLCRealtimeSegmentManager { @VisibleForTesting List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig streamConfig, List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList) { - return PinotTableIdealStateHelper.getPartitionGroupMetadataList(streamConfig, + return PinotTableIdealStateHelperFactory.create().getPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); } @@ -1000,8 +1000,7 @@ public class PinotLLCRealtimeSegmentManager { throw new HelixHelper.PermanentUpdaterException( "Exceeded max segment completion time for segment " + committingSegmentName); } - updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), - idealState.getRecord().getListFields(), committingSegmentName, + updateInstanceStatesForNewConsumingSegment(idealState, committingSegmentName, isTablePaused(idealState) ? null : newSegmentName, segmentAssignment, instancePartitionsMap); return idealState; }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f)); @@ -1012,61 +1011,12 @@ public class PinotLLCRealtimeSegmentManager { } @VisibleForTesting - void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, String>> instanceStatesMap, - Map<String, List<String>> instanceStatesList, - @Nullable String committingSegmentName, @Nullable String newSegmentName, SegmentAssignment segmentAssignment, + void updateInstanceStatesForNewConsumingSegment(IdealState idealState, @Nullable String committingSegmentName, + @Nullable String newSegmentName, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { - // TODO: Need to figure out the best way to handle committed segments' state change - if (committingSegmentName != null) { - // Change committing segment state to ONLINE -// Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet(); -// instanceStatesMap.put(committingSegmentName, -// SegmentAssignmentUtils.getInstanceStateMap(instances, SegmentStateModel.ONLINE)); -// instanceStatesList.put(newSegmentName, Collections.emptyList() -// /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/); - LOGGER.info("Updating segment: {} to ONLINE state", committingSegmentName); - } - - // There used to be a race condition in pinot (caused by heavy GC on the controller during segment commit) - // that ended up creating multiple consuming segments for the same stream partition, named somewhat like - // tableName__1__25__20210920T190005Z and tableName__1__25__20210920T190007Z. It was fixed by checking the - // Zookeeper Stat object before updating the segment metadata. - // These conditions can happen again due to manual operations considered as fixes in Issues #5559 and #5263 - // The following check prevents the table from going into such a state (but does not prevent the root cause - // of attempting such a zk update). - if (newSegmentName != null) { - LLCSegmentName newLLCSegmentName = new LLCSegmentName(newSegmentName); - int partitionId = newLLCSegmentName.getPartitionGroupId(); - int seqNum = newLLCSegmentName.getSequenceNumber(); - for (String segmentNameStr : instanceStatesMap.keySet()) { - LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentNameStr); - if (llcSegmentName == null) { - // skip the segment name if the name is not in low-level consumer format - // such segment name can appear for uploaded segment - LOGGER.debug("Skip segment name {} not in low-level consumer format", segmentNameStr); - continue; - } - if (llcSegmentName.getPartitionGroupId() == partitionId && llcSegmentName.getSequenceNumber() == seqNum) { - String errorMsg = - String.format("Segment %s is a duplicate of existing segment %s", newSegmentName, segmentNameStr); - LOGGER.error(errorMsg); - throw new HelixHelper.PermanentUpdaterException(errorMsg); - } - } - // Assign instances to the new segment and add instances as state CONSUMING - List<String> instancesAssigned = - segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap); - // No need to check for tableType as offline tables can never go to CONSUMING state. All callers are for REALTIME -// instanceStatesMap.put(newSegmentName, -// SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING)); - // TODO: Once REALTIME segments move to FULL-AUTO, we cannot update the map. Uncomment below lines to update list. - // Assess whether we should set am empty InstanceStateList for the segment or not. i.e. do we support - // this preferred list concept, and does Helix-Auto even allow preferred list concept (from code reading it - // looks like it does) - instanceStatesList.put(newSegmentName, Collections.emptyList() - /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/); - LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned); - } + PinotTableIdealStateHelperFactory.create() + .updateInstanceStatesForNewConsumingSegment(idealState, committingSegmentName, newSegmentName, + segmentAssignment, instancePartitionsMap); } /* @@ -1222,14 +1172,14 @@ public class PinotLLCRealtimeSegmentManager { (offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0); createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas); - updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, latestSegmentName, - newSegmentName, segmentAssignment, instancePartitionsMap); + updateInstanceStatesForNewConsumingSegment(idealState, latestSegmentName, newSegmentName, + segmentAssignment, instancePartitionsMap); } else { // partition group reached end of life LOGGER.info("PartitionGroup: {} has reached end of life. Updating ideal state for segment: {}. " + "Skipping creation of new ZK metadata and new segment in ideal state", partitionGroupId, latestSegmentName); - updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, latestSegmentName, - null, segmentAssignment, instancePartitionsMap); + updateInstanceStatesForNewConsumingSegment(idealState, latestSegmentName, null, segmentAssignment, + instancePartitionsMap); } } // else, the metadata should be IN_PROGRESS, which is the right state for a consuming segment. @@ -1253,7 +1203,7 @@ public class PinotLLCRealtimeSegmentManager { partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory, latestSegmentZKMetadata.getStartOffset()); // segments are OFFLINE; start from beginning createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs, - newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, instanceStatesList, + newPartitionGroupMetadataList, instancePartitions, idealState, segmentAssignment, instancePartitionsMap, startOffset); } else { if (newPartitionGroupSet.contains(partitionGroupId)) { @@ -1272,7 +1222,7 @@ public class PinotLLCRealtimeSegmentManager { partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory, latestSegmentZKMetadata.getEndOffset()); createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs, - newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, instanceStatesList, + newPartitionGroupMetadataList, instancePartitions, idealState, segmentAssignment, instancePartitionsMap, startOffset); } else { LOGGER.error( @@ -1320,8 +1270,8 @@ public class PinotLLCRealtimeSegmentManager { partitionGroupId, realtimeTableName); _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L); } - updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, previousConsumingSegment, - latestSegmentName, segmentAssignment, instancePartitionsMap); + updateInstanceStatesForNewConsumingSegment(idealState, previousConsumingSegment, latestSegmentName, + segmentAssignment, instancePartitionsMap); } else { LOGGER.error("Got unexpected status: {} in segment ZK metadata for segment: {}", latestSegmentZKMetadata.getStatus(), latestSegmentName); @@ -1336,8 +1286,8 @@ public class PinotLLCRealtimeSegmentManager { String newSegmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions, numPartitions, numReplicas); - updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, null, newSegmentName, - segmentAssignment, instancePartitionsMap); + updateInstanceStatesForNewConsumingSegment(idealState, null, newSegmentName, segmentAssignment, + instancePartitionsMap); } } @@ -1347,8 +1297,7 @@ public class PinotLLCRealtimeSegmentManager { private void createNewConsumingSegment(TableConfig tableConfig, StreamConfig streamConfig, SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs, List<PartitionGroupMetadata> newPartitionGroupMetadataList, InstancePartitions instancePartitions, - Map<String, Map<String, String>> instanceStatesMap, Map<String, List<String>> instancesStateList, - SegmentAssignment segmentAssignment, + IdealState idealState, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, StreamPartitionMsgOffset startOffset) { int numReplicas = getNumReplicas(tableConfig, instancePartitions); int numPartitions = newPartitionGroupMetadataList.size(); @@ -1359,7 +1308,7 @@ public class PinotLLCRealtimeSegmentManager { createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas); String newSegmentName = newLLCSegmentName.getSegmentName(); - updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instancesStateList, null, newSegmentName, + updateInstanceStatesForNewConsumingSegment(idealState, null, newSegmentName, segmentAssignment, instancePartitionsMap); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 26641d515c..ee90649080 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -1220,11 +1220,9 @@ public class PinotLLCRealtimeSegmentManagerTest { void updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName, String newSegmentName, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { - updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), - _idealState.getRecord().getListFields(), committingSegmentName, null, + updateInstanceStatesForNewConsumingSegment(_idealState, committingSegmentName, null, segmentAssignment, instancePartitionsMap); - updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), - _idealState.getRecord().getListFields(), null, newSegmentName, + updateInstanceStatesForNewConsumingSegment(_idealState, null, newSegmentName, segmentAssignment, instancePartitionsMap); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java index dc988ad669..017b0b2a5a 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java @@ -33,8 +33,8 @@ import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; -import org.apache.pinot.controller.helix.core.PinotTableIdealStateHelper; import org.apache.pinot.controller.helix.core.SegmentDeletionManager; +import org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory; import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -270,9 +270,7 @@ public class RetentionManagerTest { final int replicaCount = tableConfig.getReplication(); List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>(); - - IdealState idealState = - PinotTableIdealStateHelper.buildEmptyIdealStateFor(REALTIME_TABLE_NAME, replicaCount, true); + IdealState idealState = PinotTableIdealStateHelperFactory.create().buildEmptyIdealStateFor(tableConfig, true); final int kafkaPartition = 5; final long millisInDays = TimeUnit.DAYS.toMillis(1); @@ -334,9 +332,7 @@ public class RetentionManagerTest { final int replicaCount = tableConfig.getReplication(); List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>(); - - IdealState idealState = - PinotTableIdealStateHelper.buildEmptyIdealStateFor(REALTIME_TABLE_NAME, replicaCount, true); + IdealState idealState = PinotTableIdealStateHelperFactory.create().buildEmptyIdealStateFor(tableConfig, true); final int kafkaPartition = 5; final long millisInDays = TimeUnit.DAYS.toMillis(1); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java index b14cb9e240..08de258d62 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java @@ -46,7 +46,6 @@ import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.common.utils.config.TableConfigUtils; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.spi.utils.retry.RetryPolicies; @@ -211,23 +210,16 @@ public class MoveReplicaGroup extends AbstractBaseAdminCommand implements Comman @Nullable @Override public IdealState apply(@Nullable IdealState input) { - Map<String, Map<String, String>> existingMapField = input.getRecord().getMapFields(); Map<String, List<String>> existingListField = input.getRecord().getListFields(); - TableType tableType = TableNameBuilder.getTableTypeFromTableName(_tableName); for (Map.Entry<String, Map<String, String>> segmentEntry : proposedIdealState.entrySet()) { // existingMapField.put(segmentEntry.getKey(), segmentEntry.getValue()); - if (tableType == TableType.REALTIME) { - // TODO: Update listField only once REALTIME uses FULL-AUTO - existingMapField.put(segmentEntry.getKey(), segmentEntry.getValue()); - } else { - String segmentName = segmentEntry.getKey(); - Map<String, String> segmentMapping = segmentEntry.getValue(); - List<String> listOfHosts = new ArrayList<>(segmentMapping.keySet()); - Collections.sort(listOfHosts); - // TODO: Assess if we want to add the preferred list of hosts or not - existingListField.put(segmentName, Collections.emptyList() /* listOfHosts */); - } + String segmentName = segmentEntry.getKey(); + Map<String, String> segmentMapping = segmentEntry.getValue(); + List<String> listOfHosts = new ArrayList<>(segmentMapping.keySet()); + Collections.sort(listOfHosts); + // TODO: Assess if we want to add the preferred list of hosts or not + existingListField.put(segmentName, Collections.emptyList() /* listOfHosts */); } return input; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org