This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch full-auto-oss-abstraction
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 168a5c5a50d9d5d2d8aaadf64bd415def5818793
Author: jlli_LinkedIn <j...@linkedin.com>
AuthorDate: Fri Mar 22 16:00:43 2024 -0700

    Extract methods for Pinot table ideal state
---
 .../pinot/controller/BaseControllerStarter.java    |   4 +
 .../apache/pinot/controller/ControllerConf.java    |  11 ++
 .../helix/core/PinotHelixResourceManager.java      |  46 ++----
 .../helix/core/PinotTableIdealStateHelper.java     | 145 ------------------
 .../DefaultPinotTableIdealStateHelper.java         | 151 +++++++++++++++++++
 .../FullAutoPinotTableIdealStateHelper.java        | 165 +++++++++++++++++++++
 .../PinotTableIdealStateHelper.java                |  87 +++++++++++
 .../PinotTableIdealStateHelperFactory.java         |  50 +++++++
 .../realtime/MissingConsumingSegmentFinder.java    |   4 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  93 +++---------
 .../PinotLLCRealtimeSegmentManagerTest.java        |   6 +-
 .../helix/core/retention/RetentionManagerTest.java |  10 +-
 .../tools/admin/command/MoveReplicaGroup.java      |  20 +--
 13 files changed, 513 insertions(+), 279 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 281c397401..c509738fca 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -88,6 +88,7 @@ import 
org.apache.pinot.controller.helix.RealtimeConsumerMonitor;
 import org.apache.pinot.controller.helix.SegmentStatusChecker;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.cleanup.StaleInstancesCleanupTask;
+import 
org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory;
 import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter;
@@ -245,6 +246,9 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
       _tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager, 
_tenantRebalanceExecutorService);
     }
 
+    // Initialize the ideal state helper for Pinot tables.
+    PinotTableIdealStateHelperFactory.init(_config);
+
     // Initialize the table config tuner registry.
     TableConfigTunerRegistry.init(_config.getTableConfigTunerPackages());
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 4598b48eeb..8454aeb4ff 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -275,6 +275,7 @@ public class ControllerConf extends PinotConfiguration {
   public static final String ACCESS_CONTROL_USERNAME = 
"access.control.init.username";
   public static final String ACCESS_CONTROL_PASSWORD = 
"access.control.init.password";
   public static final String LINEAGE_MANAGER_CLASS = 
"controller.lineage.manager.class";
+  public static final String PINOT_TABLE_IDEALSTATE_HELPER_CLASS = 
"controller.pinot.table.idealstate.class";
   // Amount of the time the segment can take from the beginning of upload to 
the end of upload. Used when parallel push
   // protection is enabled. If the upload does not finish within the timeout, 
next upload can override the previous one.
   private static final String SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = 
"controller.segment.upload.timeoutInMillis";
@@ -298,6 +299,8 @@ public class ControllerConf extends PinotConfiguration {
   private static final String DEFAULT_ACCESS_CONTROL_PASSWORD = "admin";
   private static final String DEFAULT_LINEAGE_MANAGER =
       "org.apache.pinot.controller.helix.core.lineage.DefaultLineageManager";
+  private static final String DEFAULT_PINOT_TABLE_IDEALSTATE_HELPER_CLASS =
+      "org.apache.pinot.controller.helix.core.idealstatehelper";
   private static final long DEFAULT_SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = 
600_000L; // 10 minutes
   private static final int DEFAULT_MIN_NUM_CHARS_IN_IS_TO_TURN_ON_COMPRESSION 
= -1;
   private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 
64;
@@ -872,6 +875,14 @@ public class ControllerConf extends PinotConfiguration {
     setProperty(LINEAGE_MANAGER_CLASS, lineageModifierClass);
   }
 
+  public String getPinotTableIdealstateHelperClass() {
+    return getProperty(PINOT_TABLE_IDEALSTATE_HELPER_CLASS, 
DEFAULT_PINOT_TABLE_IDEALSTATE_HELPER_CLASS);
+  }
+
+  public void setPinotTableIdealstateHelperClass(String 
pinotTableIdealstateHelperClass) {
+    setProperty(PINOT_TABLE_IDEALSTATE_HELPER_CLASS, 
pinotTableIdealstateHelperClass);
+  }
+
   public long getSegmentUploadTimeoutInMillis() {
     return getProperty(SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS, 
DEFAULT_SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS);
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 57c75d7618..b74613f67a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -142,6 +142,8 @@ import 
org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssign
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import 
org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelper;
+import 
org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory;
 import org.apache.pinot.controller.helix.core.lineage.LineageManager;
 import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
@@ -232,11 +234,14 @@ public class PinotHelixResourceManager {
   private SegmentDeletionManager _segmentDeletionManager;
   private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
   private TableCache _tableCache;
+
+  private final PinotTableIdealStateHelper _pinotTableIdealStateHelper;
   private final LineageManager _lineageManager;
 
   public PinotHelixResourceManager(String zkURL, String helixClusterName, 
@Nullable String dataDir,
       boolean isSingleTenantCluster, boolean enableBatchMessageMode, int 
deletedSegmentsRetentionInDays,
-      boolean enableTieredSegmentAssignment, LineageManager lineageManager) {
+      boolean enableTieredSegmentAssignment, PinotTableIdealStateHelper 
pinotTableIdealStateHelper,
+      LineageManager lineageManager) {
     _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
     _helixClusterName = helixClusterName;
     _dataDir = dataDir;
@@ -258,6 +263,7 @@ public class PinotHelixResourceManager {
     for (int i = 0; i < _tableUpdaterLocks.length; i++) {
       _tableUpdaterLocks[i] = new Object();
     }
+    _pinotTableIdealStateHelper = pinotTableIdealStateHelper;
     _lineageManager = lineageManager;
   }
 
@@ -265,7 +271,7 @@ public class PinotHelixResourceManager {
     this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), 
controllerConf.getDataDir(),
         controllerConf.tenantIsolationEnabled(), 
controllerConf.getEnableBatchMessageMode(),
         controllerConf.getDeletedSegmentsRetentionInDays(), 
controllerConf.tieredSegmentAssignmentEnabled(),
-        LineageManagerFactory.create(controllerConf));
+        PinotTableIdealStateHelperFactory.create(), 
LineageManagerFactory.create(controllerConf));
   }
 
   /**
@@ -1583,9 +1589,7 @@ public class PinotHelixResourceManager {
     Preconditions.checkState(tableType == TableType.OFFLINE || tableType == 
TableType.REALTIME,
         "Invalid table type: %s", tableType);
 
-    IdealState idealState =
-        
PinotTableIdealStateHelper.buildEmptyFullAutoIdealStateFor(tableNameWithType, 
tableConfig.getReplication(),
-            _enableBatchMessageMode);
+    IdealState idealState = 
_pinotTableIdealStateHelper.buildEmptyIdealStateFor(tableConfig, 
_enableBatchMessageMode);
 //    if (tableType == TableType.REALTIME) {
 //      idealState =
 //           
PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, 
tableConfig.getReplication(),
@@ -2256,36 +2260,8 @@ public class PinotHelixResourceManager {
       SegmentAssignment segmentAssignment =
           SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, 
tableConfig, _controllerMetrics);
       synchronized (getTableUpdaterLock(tableNameWithType)) {
-        Map<InstancePartitionsType, InstancePartitions> 
finalInstancePartitionsMap = instancePartitionsMap;
-        HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, 
idealState -> {
-          assert idealState != null;
-          Map<String, Map<String, String>> currentAssignment = 
idealState.getRecord().getMapFields();
-          Map<String, List<String>> currentAssignmentList = 
idealState.getRecord().getListFields();
-          if (currentAssignment.containsKey(segmentName) && 
currentAssignmentList.containsKey(segmentName)) {
-            LOGGER.warn("Segment: {} already exists in the IdealState for 
table: {}, do not update", segmentName,
-                tableNameWithType);
-          } else {
-            List<String> assignedInstances =
-                segmentAssignment.assignSegment(segmentName, 
currentAssignment, finalInstancePartitionsMap);
-            LOGGER.info("Assigning segment: {} to instances: {} for table: 
{}", segmentName, assignedInstances,
-                tableNameWithType);
-            TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
-            if (tableType == TableType.REALTIME) {
-              // TODO: Once REALTIME uses FULL-AUTO only the listFields should 
be updated
-              currentAssignmentList.put(segmentName, Collections.emptyList()
-                  /* 
SegmentAssignmentUtils.getInstanceStateList(assignedInstances) */);
-//              currentAssignment.put(segmentName,
-//                  
SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, 
SegmentStateModel.ONLINE));
-            } else {
-              // TODO: Assess whether to pass in an empty instance list or to 
set the preferred list
-              currentAssignmentList.put(segmentName, Collections.emptyList()
-                  /* 
SegmentAssignmentUtils.getInstanceStateList(assignedInstances) */);
-            }
-            // currentAssignment.put(segmentName,
-            //     
SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, 
SegmentStateModel.ONLINE));
-          }
-          return idealState;
-        });
+        PinotTableIdealStateHelperFactory.create()
+            .assignSegment(_helixZkManager, tableNameWithType, segmentName, 
segmentAssignment, instancePartitionsMap);
         LOGGER.info("Added segment: {} to IdealState for table: {}", 
segmentName, tableNameWithType);
       }
     } catch (Exception e) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java
deleted file mode 100644
index 37c4b7555b..0000000000
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core;
-
-import java.util.List;
-import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.builder.CustomModeISBuilder;
-import org.apache.helix.model.builder.FullAutoModeISBuilder;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
-import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
-import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.spi.utils.retry.RetryPolicies;
-import org.apache.pinot.spi.utils.retry.RetryPolicy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class PinotTableIdealStateHelper {
-  private PinotTableIdealStateHelper() {
-  }
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotTableIdealStateHelper.class);
-  private static final RetryPolicy DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY =
-      RetryPolicies.randomDelayRetryPolicy(3, 100L, 200L);
-
-  public static IdealState buildEmptyIdealStateFor(String tableNameWithType, 
int numReplicas,
-      boolean enableBatchMessageMode) {
-    LOGGER.info("Building CUSTOM IdealState for Table: {}, numReplicas: {}", 
tableNameWithType, numReplicas);
-    CustomModeISBuilder customModeIdealStateBuilder = new 
CustomModeISBuilder(tableNameWithType);
-    customModeIdealStateBuilder
-        
.setStateModel(PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
-        
.setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1);
-    IdealState idealState = customModeIdealStateBuilder.build();
-    idealState.setInstanceGroupTag(tableNameWithType);
-    idealState.setBatchMessageMode(enableBatchMessageMode);
-    return idealState;
-  }
-
-  public static IdealState buildEmptyFullAutoIdealStateFor(String 
tableNameWithType, int numReplicas,
-      boolean enableBatchMessageMode) {
-    LOGGER.info("Building FULL-AUTO IdealState for Table: {}, numReplicas: 
{}", tableNameWithType, numReplicas);
-    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
-    String stateModel;
-    if (tableType == null) {
-      throw new RuntimeException("Failed to get table type from table name: " 
+ tableNameWithType);
-    } else if (TableType.OFFLINE.equals(tableType)) {
-      stateModel =
-          
PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
-    } else {
-      stateModel =
-          
PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
-    }
-
-    // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, 
crushed auto-rebalance by default
-    // TODO: The state model used only works for OFFLINE tables today. Add 
support for REALTIME state model too
-    FullAutoModeISBuilder idealStateBuilder = new 
FullAutoModeISBuilder(tableNameWithType);
-    idealStateBuilder
-        .setStateModel(stateModel)
-        
.setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1)
-        // TODO: Revisit the rebalance strategy to use (maybe we add a custom 
one)
-        .setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName());
-    // The below config guarantees if active number of replicas is no less 
than minimum active replica, there will
-    // not be partition movements happened.
-    // Set min active replicas to 0 and rebalance delay to 5 minutes so that 
if any master goes offline, Helix
-    // controller waits at most 5 minutes and then re-calculate the 
participant assignment.
-    // TODO: Assess which of these values need to be tweaked, removed, and 
what additional values that need to be added
-    idealStateBuilder.setMinActiveReplica(numReplicas - 1);
-    idealStateBuilder.setRebalanceDelay(300_000);
-    idealStateBuilder.enableDelayRebalance();
-    // Set instance group tag
-    IdealState idealState = idealStateBuilder.build();
-    idealState.setInstanceGroupTag(tableNameWithType);
-    idealState.setBatchMessageMode(enableBatchMessageMode);
-    return idealState;
-  }
-
-  /**
-   * Fetches the list of {@link PartitionGroupMetadata} for the new partition 
groups for the stream,
-   * with the help of the {@link PartitionGroupConsumptionStatus} of the 
current partitionGroups.
-   *
-   * Reasons why <code>partitionGroupConsumptionStatusList</code> is needed:
-   *
-   * 1)
-   * The current {@link PartitionGroupConsumptionStatus} is used to determine 
the offsets that have been consumed for
-   * a partition group.
-   * An example of where the offsets would be used:
-   * e.g. If partition group 1 contains shardId 1, with status DONE and 
endOffset 150. There's 2 possibilities:
-   * 1) the stream indicates that shardId's last offset is 200.
-   * This tells Pinot that partition group 1 still has messages which haven't 
been consumed, and must be included in
-   * the response.
-   * 2) the stream indicates that shardId's last offset is 150,
-   * This tells Pinot that all messages of partition group 1 have been 
consumed, and it need not be included in the
-   * response.
-   * Thus, this call will skip a partition group when it has reached end of 
life and all messages from that partition
-   * group have been consumed.
-   *
-   * The current {@link PartitionGroupConsumptionStatus} is also used to know 
about existing groupings of partitions,
-   * and accordingly make the new partition groups.
-   * e.g. Assume that partition group 1 has status IN_PROGRESS and contains 
shards 0,1,2
-   * and partition group 2 has status DONE and contains shards 3,4.
-   * In the above example, the 
<code>partitionGroupConsumptionStatusList</code> indicates that
-   * the collection of shards in partition group 1, should remain unchanged in 
the response,
-   * whereas shards 3,4 can be added to new partition groups if needed.
-   *
-   * @param streamConfig the streamConfig from the tableConfig
-   * @param partitionGroupConsumptionStatusList List of {@link 
PartitionGroupConsumptionStatus} for the current
-   *                                            partition groups.
-   *                                          The size of this list is equal 
to the number of partition groups,
-   *                                          and is created using the latest 
segment zk metadata.
-   */
-  public static List<PartitionGroupMetadata> 
getPartitionGroupMetadataList(StreamConfig streamConfig,
-      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList) {
-    PartitionGroupMetadataFetcher partitionGroupMetadataFetcher =
-        new PartitionGroupMetadataFetcher(streamConfig, 
partitionGroupConsumptionStatusList);
-    try {
-      
DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher);
-      return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
-    } catch (Exception e) {
-      Exception fetcherException = 
partitionGroupMetadataFetcher.getException();
-      LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of 
table: {}", streamConfig.getTopicName(),
-          streamConfig.getTableNameWithType(), fetcherException);
-      throw new RuntimeException(fetcherException);
-    }
-  }
-}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/DefaultPinotTableIdealStateHelper.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/DefaultPinotTableIdealStateHelper.java
new file mode 100644
index 0000000000..e6cef02d99
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/DefaultPinotTableIdealStateHelper.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.idealstatehelper;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import 
org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
+import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
+import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DefaultPinotTableIdealStateHelper implements 
PinotTableIdealStateHelper {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultPinotTableIdealStateHelper.class);
+  private static final RetryPolicy DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY =
+      RetryPolicies.randomDelayRetryPolicy(3, 100L, 200L);
+
+  @Override
+  public IdealState buildEmptyIdealStateFor(TableConfig tableConfig, boolean 
enableBatchMessageMode) {
+    String tableNameWithType = tableConfig.getTableName();
+    int numReplicas = tableConfig.getReplication();
+    LOGGER.info("Building CUSTOM IdealState for Table: {}, numReplicas: {}", 
tableNameWithType, numReplicas);
+    CustomModeISBuilder customModeIdealStateBuilder = new 
CustomModeISBuilder(tableNameWithType);
+    customModeIdealStateBuilder
+        
.setStateModel(PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
+        
.setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1);
+    IdealState idealState = customModeIdealStateBuilder.build();
+    idealState.setInstanceGroupTag(tableNameWithType);
+    idealState.setBatchMessageMode(enableBatchMessageMode);
+    return idealState;
+  }
+
+  @Override
+  public void assignSegment(HelixManager helixManager, String 
tableNameWithType, String segmentName,
+      SegmentAssignment segmentAssignment, Map<InstancePartitionsType, 
InstancePartitions> instancePartitionsMap) {
+    HelixHelper.updateIdealState(helixManager, tableNameWithType, idealState 
-> {
+      assert idealState != null;
+      Map<String, Map<String, String>> currentAssignment = 
idealState.getRecord().getMapFields();
+      if (currentAssignment.containsKey(segmentName)) {
+        LOGGER.warn("Segment: {} already exists in the IdealState for table: 
{}, do not update", segmentName,
+            tableNameWithType);
+      } else {
+        List<String> assignedInstances =
+            segmentAssignment.assignSegment(segmentName, currentAssignment, 
instancePartitionsMap);
+        LOGGER.info("Assigning segment: {} to instances: {} for table: {}", 
segmentName, assignedInstances,
+            tableNameWithType);
+        currentAssignment.put(segmentName, 
SegmentAssignmentUtils.getInstanceStateMap(assignedInstances,
+            CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE));
+      }
+      return idealState;
+    });
+  }
+
+  @Override
+  public void updateInstanceStatesForNewConsumingSegment(IdealState 
idealState, @Nullable String committingSegmentName,
+      @Nullable String newSegmentName, SegmentAssignment segmentAssignment,
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
+    Map<String, Map<String, String>> instanceStatesMap = 
idealState.getRecord().getMapFields();
+    if (committingSegmentName != null) {
+      // Change committing segment state to ONLINE
+      Set<String> instances = 
instanceStatesMap.get(committingSegmentName).keySet();
+      instanceStatesMap.put(committingSegmentName, 
SegmentAssignmentUtils.getInstanceStateMap(instances,
+          CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE));
+      LOGGER.info("Updating segment: {} to ONLINE state", 
committingSegmentName);
+    }
+
+    // There used to be a race condition in pinot (caused by heavy GC on the 
controller during segment commit)
+    // that ended up creating multiple consuming segments for the same stream 
partition, named somewhat like
+    // tableName__1__25__20210920T190005Z and 
tableName__1__25__20210920T190007Z. It was fixed by checking the
+    // Zookeeper Stat object before updating the segment metadata.
+    // These conditions can happen again due to manual operations considered 
as fixes in Issues #5559 and #5263
+    // The following check prevents the table from going into such a state 
(but does not prevent the root cause
+    // of attempting such a zk update).
+    if (newSegmentName != null) {
+      LLCSegmentName newLLCSegmentName = new LLCSegmentName(newSegmentName);
+      int partitionId = newLLCSegmentName.getPartitionGroupId();
+      int seqNum = newLLCSegmentName.getSequenceNumber();
+      for (String segmentNameStr : instanceStatesMap.keySet()) {
+        LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentNameStr);
+        if (llcSegmentName == null) {
+          // skip the segment name if the name is not in low-level consumer 
format
+          // such segment name can appear for uploaded segment
+          LOGGER.debug("Skip segment name {} not in low-level consumer 
format", segmentNameStr);
+          continue;
+        }
+        if (llcSegmentName.getPartitionGroupId() == partitionId && 
llcSegmentName.getSequenceNumber() == seqNum) {
+          String errorMsg =
+              String.format("Segment %s is a duplicate of existing segment 
%s", newSegmentName, segmentNameStr);
+          LOGGER.error(errorMsg);
+          throw new HelixHelper.PermanentUpdaterException(errorMsg);
+        }
+      }
+      // Assign instances to the new segment and add instances as state 
CONSUMING
+      List<String> instancesAssigned =
+          segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, 
instancePartitionsMap);
+      instanceStatesMap.put(newSegmentName, 
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
+          CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING));
+      LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", 
newSegmentName, instancesAssigned);
+    }
+  }
+
+  @Override
+  public List<PartitionGroupMetadata> 
getPartitionGroupMetadataList(StreamConfig streamConfig,
+      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList) {
+    PartitionGroupMetadataFetcher partitionGroupMetadataFetcher =
+        new PartitionGroupMetadataFetcher(streamConfig, 
partitionGroupConsumptionStatusList);
+    try {
+      
DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher);
+      return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
+    } catch (Exception e) {
+      Exception fetcherException = 
partitionGroupMetadataFetcher.getException();
+      LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of 
table: {}", streamConfig.getTopicName(),
+          streamConfig.getTableNameWithType(), fetcherException);
+      throw new RuntimeException(fetcherException);
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/FullAutoPinotTableIdealStateHelper.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/FullAutoPinotTableIdealStateHelper.java
new file mode 100644
index 0000000000..7b1889fdd7
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/FullAutoPinotTableIdealStateHelper.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.idealstatehelper;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixManager;
+import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.builder.FullAutoModeISBuilder;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import 
org.apache.pinot.controller.helix.core.PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator;
+import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class FullAutoPinotTableIdealStateHelper extends 
DefaultPinotTableIdealStateHelper {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FullAutoPinotTableIdealStateHelper.class);
+
+  @Override
+  public IdealState buildEmptyIdealStateFor(TableConfig tableConfig, boolean 
enableBatchMessageMode) {
+    String tableNameWithType = tableConfig.getTableName();
+    int numReplicas = tableConfig.getReplication();
+
+    LOGGER.info("Building FULL-AUTO IdealState for Table: {}, numReplicas: 
{}", tableNameWithType, numReplicas);
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    String stateModel;
+    if (tableType == null) {
+      throw new RuntimeException("Failed to get table type from table name: " 
+ tableNameWithType);
+    } else if (TableType.OFFLINE.equals(tableType)) {
+      stateModel =
+          
PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    } else {
+      stateModel =
+          
PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    }
+
+    // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, 
crushed auto-rebalance by default
+    // TODO: The state model used only works for OFFLINE tables today. Add 
support for REALTIME state model too
+    FullAutoModeISBuilder idealStateBuilder = new 
FullAutoModeISBuilder(tableNameWithType);
+    idealStateBuilder
+        .setStateModel(stateModel)
+        
.setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1)
+        // TODO: Revisit the rebalance strategy to use (maybe we add a custom 
one)
+        .setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName());
+    // The below config guarantees if active number of replicas is no less 
than minimum active replica, there will
+    // not be partition movements happened.
+    // Set min active replicas to 0 and rebalance delay to 5 minutes so that 
if any master goes offline, Helix
+    // controller waits at most 5 minutes and then re-calculate the 
participant assignment.
+    // TODO: Assess which of these values need to be tweaked, removed, and 
what additional values that need to be added
+    idealStateBuilder.setMinActiveReplica(numReplicas - 1);
+    idealStateBuilder.setRebalanceDelay(300_000);
+    idealStateBuilder.enableDelayRebalance();
+    // Set instance group tag
+    IdealState idealState = idealStateBuilder.build();
+    idealState.setInstanceGroupTag(tableNameWithType);
+    idealState.setBatchMessageMode(enableBatchMessageMode);
+    return idealState;
+  }
+
+  @Override
+  public void assignSegment(HelixManager helixManager, String 
tableNameWithType, String segmentName,
+      SegmentAssignment segmentAssignment, Map<InstancePartitionsType, 
InstancePartitions> instancePartitionsMap) {
+    HelixHelper.updateIdealState(helixManager, tableNameWithType, idealState 
-> {
+      assert idealState != null;
+      Map<String, Map<String, String>> currentAssignment = 
idealState.getRecord().getMapFields();
+      Map<String, List<String>> currentAssignmentList = 
idealState.getRecord().getListFields();
+      if (currentAssignment.containsKey(segmentName) && 
currentAssignmentList.containsKey(segmentName)) {
+        LOGGER.warn("Segment: {} already exists in the IdealState for table: 
{}, do not update", segmentName,
+            tableNameWithType);
+      } else {
+        List<String> assignedInstances =
+            segmentAssignment.assignSegment(segmentName, currentAssignment, 
instancePartitionsMap);
+        LOGGER.info("Assigning segment: {} to instances: {} for table: {}", 
segmentName, assignedInstances,
+            tableNameWithType);
+        currentAssignmentList.put(segmentName, Collections.emptyList());
+      }
+      return idealState;
+    });
+  }
+
+  @Override
+  public void updateInstanceStatesForNewConsumingSegment(IdealState 
idealState, @Nullable String committingSegmentName,
+      @Nullable String newSegmentName, SegmentAssignment segmentAssignment,
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
+    Map<String, Map<String, String>> instanceStatesMap = 
idealState.getRecord().getMapFields();
+    Map<String, List<String>> segmentList = 
idealState.getRecord().getListFields();
+    // TODO: Need to figure out the best way to handle committed segments' 
state change
+    if (committingSegmentName != null) {
+      // Change committing segment state to ONLINE
+//      Set<String> instances = 
instanceStatesMap.get(committingSegmentName).keySet();
+//      instanceStatesMap.put(committingSegmentName,
+//          SegmentAssignmentUtils.getInstanceStateMap(instances, 
SegmentStateModel.ONLINE));
+//      instanceStatesList.put(newSegmentName, Collections.emptyList()
+//          
/*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
+      LOGGER.info("Updating segment: {} to ONLINE state", 
committingSegmentName);
+    }
+
+    // There used to be a race condition in pinot (caused by heavy GC on the 
controller during segment commit)
+    // that ended up creating multiple consuming segments for the same stream 
partition, named somewhat like
+    // tableName__1__25__20210920T190005Z and 
tableName__1__25__20210920T190007Z. It was fixed by checking the
+    // Zookeeper Stat object before updating the segment metadata.
+    // These conditions can happen again due to manual operations considered 
as fixes in Issues #5559 and #5263
+    // The following check prevents the table from going into such a state 
(but does not prevent the root cause
+    // of attempting such a zk update).
+    if (newSegmentName != null) {
+      LLCSegmentName newLLCSegmentName = new LLCSegmentName(newSegmentName);
+      int partitionId = newLLCSegmentName.getPartitionGroupId();
+      int seqNum = newLLCSegmentName.getSequenceNumber();
+      for (String segmentNameStr : instanceStatesMap.keySet()) {
+        LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentNameStr);
+        if (llcSegmentName == null) {
+          // skip the segment name if the name is not in low-level consumer 
format
+          // such segment name can appear for uploaded segment
+          LOGGER.debug("Skip segment name {} not in low-level consumer 
format", segmentNameStr);
+          continue;
+        }
+        if (llcSegmentName.getPartitionGroupId() == partitionId && 
llcSegmentName.getSequenceNumber() == seqNum) {
+          String errorMsg =
+              String.format("Segment %s is a duplicate of existing segment 
%s", newSegmentName, segmentNameStr);
+          LOGGER.error(errorMsg);
+          throw new HelixHelper.PermanentUpdaterException(errorMsg);
+        }
+      }
+      // Assign instances to the new segment and add instances as state 
CONSUMING
+      List<String> instancesAssigned =
+          segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, 
instancePartitionsMap);
+      // No need to check for tableType as offline tables can never go to 
CONSUMING state. All callers are for REALTIME
+//      instanceStatesMap.put(newSegmentName,
+//          SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.CONSUMING));
+      // TODO: Once REALTIME segments move to FULL-AUTO, we cannot update the 
map. Uncomment below lines to update list.
+      //       Assess whether we should set am empty InstanceStateList for the 
segment or not. i.e. do we support
+      //       this preferred list concept, and does Helix-Auto even allow 
preferred list concept (from code reading it
+      //       looks like it does)
+      segmentList.put(newSegmentName, Collections.emptyList()
+          /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
+      LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", 
newSegmentName, instancesAssigned);
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelper.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelper.java
new file mode 100644
index 0000000000..a30872df83
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelper.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.idealstatehelper;
+
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamConfig;
+
+
+public interface PinotTableIdealStateHelper {
+
+  /**
+   * Builds an empty ideal state for the Pinot table.
+   * @param tableConfig table config.
+   * @param enableBatchMessageMode whether to enable batch message mode when 
building the ideal state.
+   */
+  IdealState buildEmptyIdealStateFor(TableConfig tableConfig, boolean 
enableBatchMessageMode);
+
+  void assignSegment(HelixManager helixManager, String tableNameWithType, 
String segmentName,
+      SegmentAssignment segmentAssignment, Map<InstancePartitionsType, 
InstancePartitions> instancePartitionsMap);
+
+  void updateInstanceStatesForNewConsumingSegment(IdealState idealState, 
@Nullable String committingSegmentName,
+      @Nullable String newSegmentName, SegmentAssignment segmentAssignment,
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap);
+
+  /**
+   * Fetches the list of {@link PartitionGroupMetadata} for the new partition 
groups for the stream,
+   * with the help of the {@link PartitionGroupConsumptionStatus} of the 
current partitionGroups.
+   *
+   * Reasons why <code>partitionGroupConsumptionStatusList</code> is needed:
+   *
+   * 1)
+   * The current {@link PartitionGroupConsumptionStatus} is used to determine 
the offsets that have been consumed for
+   * a partition group.
+   * An example of where the offsets would be used:
+   * e.g. If partition group 1 contains shardId 1, with status DONE and 
endOffset 150. There's 2 possibilities:
+   * 1) the stream indicates that shardId's last offset is 200.
+   * This tells Pinot that partition group 1 still has messages which haven't 
been consumed, and must be included in
+   * the response.
+   * 2) the stream indicates that shardId's last offset is 150,
+   * This tells Pinot that all messages of partition group 1 have been 
consumed, and it need not be included in the
+   * response.
+   * Thus, this call will skip a partition group when it has reached end of 
life and all messages from that partition
+   * group have been consumed.
+   *
+   * The current {@link PartitionGroupConsumptionStatus} is also used to know 
about existing groupings of partitions,
+   * and accordingly make the new partition groups.
+   * e.g. Assume that partition group 1 has status IN_PROGRESS and contains 
shards 0,1,2
+   * and partition group 2 has status DONE and contains shards 3,4.
+   * In the above example, the 
<code>partitionGroupConsumptionStatusList</code> indicates that
+   * the collection of shards in partition group 1, should remain unchanged in 
the response,
+   * whereas shards 3,4 can be added to new partition groups if needed.
+   *
+   * @param streamConfig the streamConfig from the tableConfig
+   * @param partitionGroupConsumptionStatusList List of {@link 
PartitionGroupConsumptionStatus} for the current
+   *                                            partition groups.
+   *                                          The size of this list is equal 
to the number of partition groups,
+   *                                          and is created using the latest 
segment zk metadata.
+   */
+  List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig 
streamConfig,
+      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList);
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelperFactory.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelperFactory.java
new file mode 100644
index 0000000000..9327ecba04
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelperFactory.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.idealstatehelper;
+
+import org.apache.pinot.controller.ControllerConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PinotTableIdealStateHelperFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotTableIdealStateHelperFactory.class);
+  private static PinotTableIdealStateHelper _instance = null;
+  private static ControllerConf _controllerConf;
+
+  private PinotTableIdealStateHelperFactory() {
+  }
+
+  public static void init(ControllerConf controllerConf) {
+    _controllerConf = controllerConf;
+  }
+
+  public static PinotTableIdealStateHelper create() {
+    if (_instance == null) {
+      String pinotTableIdealstateHelperClassPath = 
_controllerConf.getPinotTableIdealstateHelperClass();
+      try {
+        _instance = (PinotTableIdealStateHelper) 
Class.forName(pinotTableIdealstateHelperClassPath).newInstance();
+      } catch (Exception e) {
+        LOGGER.error("PinotTableIdealStateHelper not found: {}", 
pinotTableIdealstateHelperClassPath);
+        throw new RuntimeException(e);
+      }
+    }
+    return _instance;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
index c9850856cd..94d158220c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
@@ -36,7 +36,7 @@ import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.controller.helix.core.PinotTableIdealStateHelper;
+import 
org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
@@ -79,7 +79,7 @@ public class MissingConsumingSegmentFinder {
     _partitionGroupIdToLargestStreamOffsetMap = new HashMap<>();
     streamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA);
     try {
-      PinotTableIdealStateHelper.getPartitionGroupMetadataList(streamConfig, 
Collections.emptyList())
+      
PinotTableIdealStateHelperFactory.create().getPartitionGroupMetadataList(streamConfig,
 Collections.emptyList())
           .forEach(metadata -> {
             
_partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), 
metadata.getStartOffset());
           });
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 40215c43a4..0e560bd9ee 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -68,9 +68,9 @@ import 
org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
 import org.apache.pinot.controller.api.resources.Constants;
 import org.apache.pinot.controller.api.resources.PauseStatus;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.PinotTableIdealStateHelper;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
+import 
org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
@@ -336,7 +336,7 @@ public class PinotLLCRealtimeSegmentManager {
       String segmentName =
           setupNewPartitionGroup(tableConfig, streamConfig, 
partitionGroupMetadata, currentTimeMs, instancePartitions,
               numPartitionGroups, numReplicas);
-      updateInstanceStatesForNewConsumingSegment(instanceStatesMap, 
instancesStateList, null, segmentName,
+      updateInstanceStatesForNewConsumingSegment(idealState, null, segmentName,
           segmentAssignment, instancePartitionsMap);
     }
 
@@ -817,7 +817,7 @@ public class PinotLLCRealtimeSegmentManager {
   @VisibleForTesting
   List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig 
streamConfig,
       List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList) {
-    return 
PinotTableIdealStateHelper.getPartitionGroupMetadataList(streamConfig,
+    return 
PinotTableIdealStateHelperFactory.create().getPartitionGroupMetadataList(streamConfig,
         currentPartitionGroupConsumptionStatusList);
   }
 
@@ -1000,8 +1000,7 @@ public class PinotLLCRealtimeSegmentManager {
         throw new HelixHelper.PermanentUpdaterException(
             "Exceeded max segment completion time for segment " + 
committingSegmentName);
       }
-      
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(),
-          idealState.getRecord().getListFields(), committingSegmentName,
+      updateInstanceStatesForNewConsumingSegment(idealState, 
committingSegmentName,
           isTablePaused(idealState) ? null : newSegmentName, 
segmentAssignment, instancePartitionsMap);
       return idealState;
     }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f));
@@ -1012,61 +1011,12 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   @VisibleForTesting
-  void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, 
String>> instanceStatesMap,
-      Map<String, List<String>> instanceStatesList,
-      @Nullable String committingSegmentName, @Nullable String newSegmentName, 
SegmentAssignment segmentAssignment,
+  void updateInstanceStatesForNewConsumingSegment(IdealState idealState, 
@Nullable String committingSegmentName,
+      @Nullable String newSegmentName, SegmentAssignment segmentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
-    // TODO: Need to figure out the best way to handle committed segments' 
state change
-    if (committingSegmentName != null) {
-      // Change committing segment state to ONLINE
-//      Set<String> instances = 
instanceStatesMap.get(committingSegmentName).keySet();
-//      instanceStatesMap.put(committingSegmentName,
-//          SegmentAssignmentUtils.getInstanceStateMap(instances, 
SegmentStateModel.ONLINE));
-//      instanceStatesList.put(newSegmentName, Collections.emptyList()
-//          
/*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
-      LOGGER.info("Updating segment: {} to ONLINE state", 
committingSegmentName);
-    }
-
-    // There used to be a race condition in pinot (caused by heavy GC on the 
controller during segment commit)
-    // that ended up creating multiple consuming segments for the same stream 
partition, named somewhat like
-    // tableName__1__25__20210920T190005Z and 
tableName__1__25__20210920T190007Z. It was fixed by checking the
-    // Zookeeper Stat object before updating the segment metadata.
-    // These conditions can happen again due to manual operations considered 
as fixes in Issues #5559 and #5263
-    // The following check prevents the table from going into such a state 
(but does not prevent the root cause
-    // of attempting such a zk update).
-    if (newSegmentName != null) {
-      LLCSegmentName newLLCSegmentName = new LLCSegmentName(newSegmentName);
-      int partitionId = newLLCSegmentName.getPartitionGroupId();
-      int seqNum = newLLCSegmentName.getSequenceNumber();
-      for (String segmentNameStr : instanceStatesMap.keySet()) {
-        LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentNameStr);
-        if (llcSegmentName == null) {
-          // skip the segment name if the name is not in low-level consumer 
format
-          // such segment name can appear for uploaded segment
-          LOGGER.debug("Skip segment name {} not in low-level consumer 
format", segmentNameStr);
-          continue;
-        }
-        if (llcSegmentName.getPartitionGroupId() == partitionId && 
llcSegmentName.getSequenceNumber() == seqNum) {
-          String errorMsg =
-              String.format("Segment %s is a duplicate of existing segment 
%s", newSegmentName, segmentNameStr);
-          LOGGER.error(errorMsg);
-          throw new HelixHelper.PermanentUpdaterException(errorMsg);
-        }
-      }
-      // Assign instances to the new segment and add instances as state 
CONSUMING
-      List<String> instancesAssigned =
-          segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, 
instancePartitionsMap);
-      // No need to check for tableType as offline tables can never go to 
CONSUMING state. All callers are for REALTIME
-//      instanceStatesMap.put(newSegmentName,
-//          SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.CONSUMING));
-      // TODO: Once REALTIME segments move to FULL-AUTO, we cannot update the 
map. Uncomment below lines to update list.
-      //       Assess whether we should set am empty InstanceStateList for the 
segment or not. i.e. do we support
-      //       this preferred list concept, and does Helix-Auto even allow 
preferred list concept (from code reading it
-      //       looks like it does)
-      instanceStatesList.put(newSegmentName, Collections.emptyList()
-          /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
-      LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", 
newSegmentName, instancesAssigned);
-    }
+    PinotTableIdealStateHelperFactory.create()
+        .updateInstanceStatesForNewConsumingSegment(idealState, 
committingSegmentName, newSegmentName,
+            segmentAssignment, instancePartitionsMap);
   }
 
   /*
@@ -1222,14 +1172,14 @@ public class PinotLLCRealtimeSegmentManager {
                       
(offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
               createNewSegmentZKMetadata(tableConfig, streamConfig, 
newLLCSegmentName, currentTimeMs,
                   committingSegmentDescriptor, latestSegmentZKMetadata, 
instancePartitions, numPartitions, numReplicas);
-              updateInstanceStatesForNewConsumingSegment(instanceStatesMap, 
instanceStatesList, latestSegmentName,
-                  newSegmentName, segmentAssignment, instancePartitionsMap);
+              updateInstanceStatesForNewConsumingSegment(idealState, 
latestSegmentName, newSegmentName,
+                  segmentAssignment, instancePartitionsMap);
             } else { // partition group reached end of life
               LOGGER.info("PartitionGroup: {} has reached end of life. 
Updating ideal state for segment: {}. "
                       + "Skipping creation of new ZK metadata and new segment 
in ideal state", partitionGroupId,
                   latestSegmentName);
-              updateInstanceStatesForNewConsumingSegment(instanceStatesMap, 
instanceStatesList, latestSegmentName,
-                  null, segmentAssignment, instancePartitionsMap);
+              updateInstanceStatesForNewConsumingSegment(idealState, 
latestSegmentName, null, segmentAssignment,
+                  instancePartitionsMap);
             }
           }
           // else, the metadata should be IN_PROGRESS, which is the right 
state for a consuming segment.
@@ -1253,7 +1203,7 @@ public class PinotLLCRealtimeSegmentManager {
                     partitionGroupIdToSmallestStreamOffset, 
tableConfig.getTableName(), offsetFactory,
                     latestSegmentZKMetadata.getStartOffset()); // segments are 
OFFLINE; start from beginning
             createNewConsumingSegment(tableConfig, streamConfig, 
latestSegmentZKMetadata, currentTimeMs,
-                newPartitionGroupMetadataList, instancePartitions, 
instanceStatesMap, instanceStatesList,
+                newPartitionGroupMetadataList, instancePartitions, idealState,
                 segmentAssignment, instancePartitionsMap, startOffset);
           } else {
             if (newPartitionGroupSet.contains(partitionGroupId)) {
@@ -1272,7 +1222,7 @@ public class PinotLLCRealtimeSegmentManager {
                         partitionGroupIdToSmallestStreamOffset, 
tableConfig.getTableName(), offsetFactory,
                         latestSegmentZKMetadata.getEndOffset());
                 createNewConsumingSegment(tableConfig, streamConfig, 
latestSegmentZKMetadata, currentTimeMs,
-                    newPartitionGroupMetadataList, instancePartitions, 
instanceStatesMap, instanceStatesList,
+                    newPartitionGroupMetadataList, instancePartitions, 
idealState,
                     segmentAssignment, instancePartitionsMap, startOffset);
               } else {
                 LOGGER.error(
@@ -1320,8 +1270,8 @@ public class PinotLLCRealtimeSegmentManager {
                 partitionGroupId, realtimeTableName);
             _controllerMetrics.addMeteredTableValue(realtimeTableName, 
ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
           }
-          updateInstanceStatesForNewConsumingSegment(instanceStatesMap, 
instanceStatesList, previousConsumingSegment,
-              latestSegmentName, segmentAssignment, instancePartitionsMap);
+          updateInstanceStatesForNewConsumingSegment(idealState, 
previousConsumingSegment, latestSegmentName,
+              segmentAssignment, instancePartitionsMap);
         } else {
           LOGGER.error("Got unexpected status: {} in segment ZK metadata for 
segment: {}",
               latestSegmentZKMetadata.getStatus(), latestSegmentName);
@@ -1336,8 +1286,8 @@ public class PinotLLCRealtimeSegmentManager {
         String newSegmentName =
             setupNewPartitionGroup(tableConfig, streamConfig, 
partitionGroupMetadata, currentTimeMs, instancePartitions,
                 numPartitions, numReplicas);
-        updateInstanceStatesForNewConsumingSegment(instanceStatesMap, 
instanceStatesList, null, newSegmentName,
-            segmentAssignment, instancePartitionsMap);
+        updateInstanceStatesForNewConsumingSegment(idealState, null, 
newSegmentName, segmentAssignment,
+            instancePartitionsMap);
       }
     }
 
@@ -1347,8 +1297,7 @@ public class PinotLLCRealtimeSegmentManager {
   private void createNewConsumingSegment(TableConfig tableConfig, StreamConfig 
streamConfig,
       SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs,
       List<PartitionGroupMetadata> newPartitionGroupMetadataList, 
InstancePartitions instancePartitions,
-      Map<String, Map<String, String>> instanceStatesMap, Map<String, 
List<String>> instancesStateList,
-      SegmentAssignment segmentAssignment,
+      IdealState idealState, SegmentAssignment segmentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
StreamPartitionMsgOffset startOffset) {
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
     int numPartitions = newPartitionGroupMetadataList.size();
@@ -1359,7 +1308,7 @@ public class PinotLLCRealtimeSegmentManager {
     createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, 
currentTimeMs, committingSegmentDescriptor,
         latestSegmentZKMetadata, instancePartitions, numPartitions, 
numReplicas);
     String newSegmentName = newLLCSegmentName.getSegmentName();
-    updateInstanceStatesForNewConsumingSegment(instanceStatesMap, 
instancesStateList, null, newSegmentName,
+    updateInstanceStatesForNewConsumingSegment(idealState, null, 
newSegmentName,
         segmentAssignment, instancePartitionsMap);
   }
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 26641d515c..ee90649080 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -1220,11 +1220,9 @@ public class PinotLLCRealtimeSegmentManagerTest {
     void updateIdealStateOnSegmentCompletion(String realtimeTableName, String 
committingSegmentName,
         String newSegmentName, SegmentAssignment segmentAssignment,
         Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) 
{
-      
updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
-          _idealState.getRecord().getListFields(), committingSegmentName, null,
+      updateInstanceStatesForNewConsumingSegment(_idealState, 
committingSegmentName, null,
           segmentAssignment, instancePartitionsMap);
-      
updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
-          _idealState.getRecord().getListFields(), null, newSegmentName,
+      updateInstanceStatesForNewConsumingSegment(_idealState, null, 
newSegmentName,
           segmentAssignment, instancePartitionsMap);
     }
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index dc988ad669..017b0b2a5a 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -33,8 +33,8 @@ import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.PinotTableIdealStateHelper;
 import org.apache.pinot.controller.helix.core.SegmentDeletionManager;
+import 
org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -270,9 +270,7 @@ public class RetentionManagerTest {
     final int replicaCount = tableConfig.getReplication();
 
     List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>();
-
-    IdealState idealState =
-        
PinotTableIdealStateHelper.buildEmptyIdealStateFor(REALTIME_TABLE_NAME, 
replicaCount, true);
+    IdealState idealState = 
PinotTableIdealStateHelperFactory.create().buildEmptyIdealStateFor(tableConfig, 
true);
 
     final int kafkaPartition = 5;
     final long millisInDays = TimeUnit.DAYS.toMillis(1);
@@ -334,9 +332,7 @@ public class RetentionManagerTest {
     final int replicaCount = tableConfig.getReplication();
 
     List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>();
-
-    IdealState idealState =
-        
PinotTableIdealStateHelper.buildEmptyIdealStateFor(REALTIME_TABLE_NAME, 
replicaCount, true);
+    IdealState idealState = 
PinotTableIdealStateHelperFactory.create().buildEmptyIdealStateFor(tableConfig, 
true);
 
     final int kafkaPartition = 5;
     final long millisInDays = TimeUnit.DAYS.toMillis(1);
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java
index b14cb9e240..08de258d62 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java
@@ -46,7 +46,6 @@ import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -211,23 +210,16 @@ public class MoveReplicaGroup extends 
AbstractBaseAdminCommand implements Comman
       @Nullable
       @Override
       public IdealState apply(@Nullable IdealState input) {
-        Map<String, Map<String, String>> existingMapField = 
input.getRecord().getMapFields();
         Map<String, List<String>> existingListField = 
input.getRecord().getListFields();
 
-        TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(_tableName);
         for (Map.Entry<String, Map<String, String>> segmentEntry : 
proposedIdealState.entrySet()) {
           // existingMapField.put(segmentEntry.getKey(), 
segmentEntry.getValue());
-          if (tableType == TableType.REALTIME) {
-            // TODO: Update listField only once REALTIME uses FULL-AUTO
-            existingMapField.put(segmentEntry.getKey(), 
segmentEntry.getValue());
-          } else {
-            String segmentName = segmentEntry.getKey();
-            Map<String, String> segmentMapping = segmentEntry.getValue();
-            List<String> listOfHosts = new 
ArrayList<>(segmentMapping.keySet());
-            Collections.sort(listOfHosts);
-            // TODO: Assess if we want to add the preferred list of hosts or 
not
-            existingListField.put(segmentName, Collections.emptyList() /* 
listOfHosts */);
-          }
+          String segmentName = segmentEntry.getKey();
+          Map<String, String> segmentMapping = segmentEntry.getValue();
+          List<String> listOfHosts = new ArrayList<>(segmentMapping.keySet());
+          Collections.sort(listOfHosts);
+          // TODO: Assess if we want to add the preferred list of hosts or not
+          existingListField.put(segmentName, Collections.emptyList() /* 
listOfHosts */);
         }
         return input;
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to