This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch full-auto-oss-abstraction
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit f0e458d1baec2deb7084a2f37afcee3ee586ceac
Author: jlli_LinkedIn <j...@linkedin.com>
AuthorDate: Fri Mar 22 16:00:43 2024 -0700

    Extract methods for Pinot table ideal state
---
 .../pinot/controller/BaseControllerStarter.java    |   4 +
 .../helix/core/PinotHelixResourceManager.java      |  14 +-
 .../helix/core/PinotTableIdealStateHelper.java     | 145 ---------------------
 .../DefaultPinotTableIdealStateHelper.java         |  76 +++++++++++
 .../FullAutoPinotTableIdealStateHelper.java        |  57 ++++++++
 .../PinotTableIdealStateHelper.java                |  57 ++++++++
 .../PinotTableIdealStateHelperFactory.java         |  26 ++++
 .../realtime/MissingConsumingSegmentFinder.java    |   4 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   |   4 +-
 .../helix/core/retention/RetentionManagerTest.java |  10 +-
 10 files changed, 236 insertions(+), 161 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 281c397401..c509738fca 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -88,6 +88,7 @@ import 
org.apache.pinot.controller.helix.RealtimeConsumerMonitor;
 import org.apache.pinot.controller.helix.SegmentStatusChecker;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.cleanup.StaleInstancesCleanupTask;
+import 
org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory;
 import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter;
@@ -245,6 +246,9 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
       _tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager, 
_tenantRebalanceExecutorService);
     }
 
+    // Initialize the ideal state helper for Pinot tables.
+    PinotTableIdealStateHelperFactory.init(_config);
+
     // Initialize the table config tuner registry.
     TableConfigTunerRegistry.init(_config.getTableConfigTunerPackages());
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 57c75d7618..1fa664b062 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -142,6 +142,8 @@ import 
org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssign
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import 
org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelper;
+import 
org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory;
 import org.apache.pinot.controller.helix.core.lineage.LineageManager;
 import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
@@ -232,11 +234,14 @@ public class PinotHelixResourceManager {
   private SegmentDeletionManager _segmentDeletionManager;
   private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
   private TableCache _tableCache;
+
+  private final PinotTableIdealStateHelper _pinotTableIdealStateHelper;
   private final LineageManager _lineageManager;
 
   public PinotHelixResourceManager(String zkURL, String helixClusterName, 
@Nullable String dataDir,
       boolean isSingleTenantCluster, boolean enableBatchMessageMode, int 
deletedSegmentsRetentionInDays,
-      boolean enableTieredSegmentAssignment, LineageManager lineageManager) {
+      boolean enableTieredSegmentAssignment, PinotTableIdealStateHelper 
pinotTableIdealStateHelper,
+      LineageManager lineageManager) {
     _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
     _helixClusterName = helixClusterName;
     _dataDir = dataDir;
@@ -258,6 +263,7 @@ public class PinotHelixResourceManager {
     for (int i = 0; i < _tableUpdaterLocks.length; i++) {
       _tableUpdaterLocks[i] = new Object();
     }
+    _pinotTableIdealStateHelper = pinotTableIdealStateHelper;
     _lineageManager = lineageManager;
   }
 
@@ -265,7 +271,7 @@ public class PinotHelixResourceManager {
     this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), 
controllerConf.getDataDir(),
         controllerConf.tenantIsolationEnabled(), 
controllerConf.getEnableBatchMessageMode(),
         controllerConf.getDeletedSegmentsRetentionInDays(), 
controllerConf.tieredSegmentAssignmentEnabled(),
-        LineageManagerFactory.create(controllerConf));
+        PinotTableIdealStateHelperFactory.create(), 
LineageManagerFactory.create(controllerConf));
   }
 
   /**
@@ -1583,9 +1589,7 @@ public class PinotHelixResourceManager {
     Preconditions.checkState(tableType == TableType.OFFLINE || tableType == 
TableType.REALTIME,
         "Invalid table type: %s", tableType);
 
-    IdealState idealState =
-        
PinotTableIdealStateHelper.buildEmptyFullAutoIdealStateFor(tableNameWithType, 
tableConfig.getReplication(),
-            _enableBatchMessageMode);
+    IdealState idealState = 
_pinotTableIdealStateHelper.buildEmptyIdealStateFor(tableConfig, 
_enableBatchMessageMode);
 //    if (tableType == TableType.REALTIME) {
 //      idealState =
 //           
PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, 
tableConfig.getReplication(),
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java
deleted file mode 100644
index 37c4b7555b..0000000000
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core;
-
-import java.util.List;
-import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.builder.CustomModeISBuilder;
-import org.apache.helix.model.builder.FullAutoModeISBuilder;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
-import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
-import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.spi.utils.retry.RetryPolicies;
-import org.apache.pinot.spi.utils.retry.RetryPolicy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class PinotTableIdealStateHelper {
-  private PinotTableIdealStateHelper() {
-  }
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotTableIdealStateHelper.class);
-  private static final RetryPolicy DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY =
-      RetryPolicies.randomDelayRetryPolicy(3, 100L, 200L);
-
-  public static IdealState buildEmptyIdealStateFor(String tableNameWithType, 
int numReplicas,
-      boolean enableBatchMessageMode) {
-    LOGGER.info("Building CUSTOM IdealState for Table: {}, numReplicas: {}", 
tableNameWithType, numReplicas);
-    CustomModeISBuilder customModeIdealStateBuilder = new 
CustomModeISBuilder(tableNameWithType);
-    customModeIdealStateBuilder
-        
.setStateModel(PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
-        
.setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1);
-    IdealState idealState = customModeIdealStateBuilder.build();
-    idealState.setInstanceGroupTag(tableNameWithType);
-    idealState.setBatchMessageMode(enableBatchMessageMode);
-    return idealState;
-  }
-
-  public static IdealState buildEmptyFullAutoIdealStateFor(String 
tableNameWithType, int numReplicas,
-      boolean enableBatchMessageMode) {
-    LOGGER.info("Building FULL-AUTO IdealState for Table: {}, numReplicas: 
{}", tableNameWithType, numReplicas);
-    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
-    String stateModel;
-    if (tableType == null) {
-      throw new RuntimeException("Failed to get table type from table name: " 
+ tableNameWithType);
-    } else if (TableType.OFFLINE.equals(tableType)) {
-      stateModel =
-          
PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
-    } else {
-      stateModel =
-          
PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
-    }
-
-    // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, 
crushed auto-rebalance by default
-    // TODO: The state model used only works for OFFLINE tables today. Add 
support for REALTIME state model too
-    FullAutoModeISBuilder idealStateBuilder = new 
FullAutoModeISBuilder(tableNameWithType);
-    idealStateBuilder
-        .setStateModel(stateModel)
-        
.setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1)
-        // TODO: Revisit the rebalance strategy to use (maybe we add a custom 
one)
-        .setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName());
-    // The below config guarantees if active number of replicas is no less 
than minimum active replica, there will
-    // not be partition movements happened.
-    // Set min active replicas to 0 and rebalance delay to 5 minutes so that 
if any master goes offline, Helix
-    // controller waits at most 5 minutes and then re-calculate the 
participant assignment.
-    // TODO: Assess which of these values need to be tweaked, removed, and 
what additional values that need to be added
-    idealStateBuilder.setMinActiveReplica(numReplicas - 1);
-    idealStateBuilder.setRebalanceDelay(300_000);
-    idealStateBuilder.enableDelayRebalance();
-    // Set instance group tag
-    IdealState idealState = idealStateBuilder.build();
-    idealState.setInstanceGroupTag(tableNameWithType);
-    idealState.setBatchMessageMode(enableBatchMessageMode);
-    return idealState;
-  }
-
-  /**
-   * Fetches the list of {@link PartitionGroupMetadata} for the new partition 
groups for the stream,
-   * with the help of the {@link PartitionGroupConsumptionStatus} of the 
current partitionGroups.
-   *
-   * Reasons why <code>partitionGroupConsumptionStatusList</code> is needed:
-   *
-   * 1)
-   * The current {@link PartitionGroupConsumptionStatus} is used to determine 
the offsets that have been consumed for
-   * a partition group.
-   * An example of where the offsets would be used:
-   * e.g. If partition group 1 contains shardId 1, with status DONE and 
endOffset 150. There's 2 possibilities:
-   * 1) the stream indicates that shardId's last offset is 200.
-   * This tells Pinot that partition group 1 still has messages which haven't 
been consumed, and must be included in
-   * the response.
-   * 2) the stream indicates that shardId's last offset is 150,
-   * This tells Pinot that all messages of partition group 1 have been 
consumed, and it need not be included in the
-   * response.
-   * Thus, this call will skip a partition group when it has reached end of 
life and all messages from that partition
-   * group have been consumed.
-   *
-   * The current {@link PartitionGroupConsumptionStatus} is also used to know 
about existing groupings of partitions,
-   * and accordingly make the new partition groups.
-   * e.g. Assume that partition group 1 has status IN_PROGRESS and contains 
shards 0,1,2
-   * and partition group 2 has status DONE and contains shards 3,4.
-   * In the above example, the 
<code>partitionGroupConsumptionStatusList</code> indicates that
-   * the collection of shards in partition group 1, should remain unchanged in 
the response,
-   * whereas shards 3,4 can be added to new partition groups if needed.
-   *
-   * @param streamConfig the streamConfig from the tableConfig
-   * @param partitionGroupConsumptionStatusList List of {@link 
PartitionGroupConsumptionStatus} for the current
-   *                                            partition groups.
-   *                                          The size of this list is equal 
to the number of partition groups,
-   *                                          and is created using the latest 
segment zk metadata.
-   */
-  public static List<PartitionGroupMetadata> 
getPartitionGroupMetadataList(StreamConfig streamConfig,
-      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList) {
-    PartitionGroupMetadataFetcher partitionGroupMetadataFetcher =
-        new PartitionGroupMetadataFetcher(streamConfig, 
partitionGroupConsumptionStatusList);
-    try {
-      
DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher);
-      return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
-    } catch (Exception e) {
-      Exception fetcherException = 
partitionGroupMetadataFetcher.getException();
-      LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of 
table: {}", streamConfig.getTopicName(),
-          streamConfig.getTableNameWithType(), fetcherException);
-      throw new RuntimeException(fetcherException);
-    }
-  }
-}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/DefaultPinotTableIdealStateHelper.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/DefaultPinotTableIdealStateHelper.java
new file mode 100644
index 0000000000..4ccac131ba
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/DefaultPinotTableIdealStateHelper.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.idealstatehelper;
+
+import java.util.List;
+import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.helix.model.builder.FullAutoModeISBuilder;
+import 
org.apache.pinot.controller.helix.core.PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator;
+import 
org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DefaultPinotTableIdealStateHelper implements 
PinotTableIdealStateHelper {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultPinotTableIdealStateHelper.class);
+  private static final RetryPolicy DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY =
+      RetryPolicies.randomDelayRetryPolicy(3, 100L, 200L);
+
+  @Override
+  public IdealState buildEmptyIdealStateFor(TableConfig tableConfig, boolean 
enableBatchMessageMode) {
+    String tableNameWithType = tableConfig.getTableName();
+    int numReplicas = tableConfig.getReplication();
+    LOGGER.info("Building CUSTOM IdealState for Table: {}, numReplicas: {}", 
tableNameWithType, numReplicas);
+    CustomModeISBuilder customModeIdealStateBuilder = new 
CustomModeISBuilder(tableNameWithType);
+    customModeIdealStateBuilder
+        
.setStateModel(PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
+        
.setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1);
+    IdealState idealState = customModeIdealStateBuilder.build();
+    idealState.setInstanceGroupTag(tableNameWithType);
+    idealState.setBatchMessageMode(enableBatchMessageMode);
+    return idealState;
+  }
+
+  @Override
+  public List<PartitionGroupMetadata> 
getPartitionGroupMetadataList(StreamConfig streamConfig,
+      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList) {
+    PartitionGroupMetadataFetcher partitionGroupMetadataFetcher =
+        new PartitionGroupMetadataFetcher(streamConfig, 
partitionGroupConsumptionStatusList);
+    try {
+      
DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher);
+      return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
+    } catch (Exception e) {
+      Exception fetcherException = 
partitionGroupMetadataFetcher.getException();
+      LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of 
table: {}", streamConfig.getTopicName(),
+          streamConfig.getTableNameWithType(), fetcherException);
+      throw new RuntimeException(fetcherException);
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/FullAutoPinotTableIdealStateHelper.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/FullAutoPinotTableIdealStateHelper.java
new file mode 100644
index 0000000000..c8d2000ac2
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/FullAutoPinotTableIdealStateHelper.java
@@ -0,0 +1,57 @@
+package org.apache.pinot.controller.helix.core.idealstatehelper;
+
+import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.builder.FullAutoModeISBuilder;
+import 
org.apache.pinot.controller.helix.core.PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class FullAutoPinotTableIdealStateHelper extends 
DefaultPinotTableIdealStateHelper {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FullAutoPinotTableIdealStateHelper.class);
+
+  @Override
+  public IdealState buildEmptyIdealStateFor(TableConfig tableConfig, boolean 
enableBatchMessageMode) {
+    String tableNameWithType = tableConfig.getTableName();
+    int numReplicas = tableConfig.getReplication();
+
+    LOGGER.info("Building FULL-AUTO IdealState for Table: {}, numReplicas: 
{}", tableNameWithType, numReplicas);
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    String stateModel;
+    if (tableType == null) {
+      throw new RuntimeException("Failed to get table type from table name: " 
+ tableNameWithType);
+    } else if (TableType.OFFLINE.equals(tableType)) {
+      stateModel =
+          
PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    } else {
+      stateModel =
+          
PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    }
+
+    // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, 
crushed auto-rebalance by default
+    // TODO: The state model used only works for OFFLINE tables today. Add 
support for REALTIME state model too
+    FullAutoModeISBuilder idealStateBuilder = new 
FullAutoModeISBuilder(tableNameWithType);
+    idealStateBuilder
+        .setStateModel(stateModel)
+        
.setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1)
+        // TODO: Revisit the rebalance strategy to use (maybe we add a custom 
one)
+        .setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName());
+    // The below config guarantees if active number of replicas is no less 
than minimum active replica, there will
+    // not be partition movements happened.
+    // Set min active replicas to 0 and rebalance delay to 5 minutes so that 
if any master goes offline, Helix
+    // controller waits at most 5 minutes and then re-calculate the 
participant assignment.
+    // TODO: Assess which of these values need to be tweaked, removed, and 
what additional values that need to be added
+    idealStateBuilder.setMinActiveReplica(numReplicas - 1);
+    idealStateBuilder.setRebalanceDelay(300_000);
+    idealStateBuilder.enableDelayRebalance();
+    // Set instance group tag
+    IdealState idealState = idealStateBuilder.build();
+    idealState.setInstanceGroupTag(tableNameWithType);
+    idealState.setBatchMessageMode(enableBatchMessageMode);
+    return idealState;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelper.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelper.java
new file mode 100644
index 0000000000..84297cc665
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelper.java
@@ -0,0 +1,57 @@
+package org.apache.pinot.controller.helix.core.idealstatehelper;
+
+import java.util.List;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamConfig;
+
+
+public interface PinotTableIdealStateHelper {
+
+  /**
+   * Builds an empty ideal state for the Pinot table.
+   * @param tableConfig table config.
+   * @param enableBatchMessageMode whether to enable batch message mode when 
building the ideal state.
+   */
+  IdealState buildEmptyIdealStateFor(TableConfig tableConfig, boolean 
enableBatchMessageMode);
+
+  /**
+   * Fetches the list of {@link PartitionGroupMetadata} for the new partition 
groups for the stream,
+   * with the help of the {@link PartitionGroupConsumptionStatus} of the 
current partitionGroups.
+   *
+   * Reasons why <code>partitionGroupConsumptionStatusList</code> is needed:
+   *
+   * 1)
+   * The current {@link PartitionGroupConsumptionStatus} is used to determine 
the offsets that have been consumed for
+   * a partition group.
+   * An example of where the offsets would be used:
+   * e.g. If partition group 1 contains shardId 1, with status DONE and 
endOffset 150. There's 2 possibilities:
+   * 1) the stream indicates that shardId's last offset is 200.
+   * This tells Pinot that partition group 1 still has messages which haven't 
been consumed, and must be included in
+   * the response.
+   * 2) the stream indicates that shardId's last offset is 150,
+   * This tells Pinot that all messages of partition group 1 have been 
consumed, and it need not be included in the
+   * response.
+   * Thus, this call will skip a partition group when it has reached end of 
life and all messages from that partition
+   * group have been consumed.
+   *
+   * The current {@link PartitionGroupConsumptionStatus} is also used to know 
about existing groupings of partitions,
+   * and accordingly make the new partition groups.
+   * e.g. Assume that partition group 1 has status IN_PROGRESS and contains 
shards 0,1,2
+   * and partition group 2 has status DONE and contains shards 3,4.
+   * In the above example, the 
<code>partitionGroupConsumptionStatusList</code> indicates that
+   * the collection of shards in partition group 1, should remain unchanged in 
the response,
+   * whereas shards 3,4 can be added to new partition groups if needed.
+   *
+   * @param streamConfig the streamConfig from the tableConfig
+   * @param partitionGroupConsumptionStatusList List of {@link 
PartitionGroupConsumptionStatus} for the current
+   *                                            partition groups.
+   *                                          The size of this list is equal 
to the number of partition groups,
+   *                                          and is created using the latest 
segment zk metadata.
+   */
+  List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig 
streamConfig,
+      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList);
+
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelperFactory.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelperFactory.java
new file mode 100644
index 0000000000..0f1890db6c
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelperFactory.java
@@ -0,0 +1,26 @@
+package org.apache.pinot.controller.helix.core.idealstatehelper;
+
+import org.apache.pinot.controller.ControllerConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PinotTableIdealStateHelperFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotTableIdealStateHelperFactory.class);
+  private static PinotTableIdealStateHelper INSTANCE = null;
+  private static ControllerConf _controllerConf;
+
+  private PinotTableIdealStateHelperFactory() {
+  }
+
+  public static void init(ControllerConf controllerConf) {
+    _controllerConf = controllerConf;
+  }
+
+  public static PinotTableIdealStateHelper create() {
+    if (INSTANCE == null) {
+      INSTANCE = new DefaultPinotTableIdealStateHelper();
+    }
+    return INSTANCE;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
index c9850856cd..94d158220c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
@@ -36,7 +36,7 @@ import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.controller.helix.core.PinotTableIdealStateHelper;
+import 
org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
@@ -79,7 +79,7 @@ public class MissingConsumingSegmentFinder {
     _partitionGroupIdToLargestStreamOffsetMap = new HashMap<>();
     streamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA);
     try {
-      PinotTableIdealStateHelper.getPartitionGroupMetadataList(streamConfig, 
Collections.emptyList())
+      
PinotTableIdealStateHelperFactory.create().getPartitionGroupMetadataList(streamConfig,
 Collections.emptyList())
           .forEach(metadata -> {
             
_partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), 
metadata.getStartOffset());
           });
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 40215c43a4..0f1b68e8f9 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -68,9 +68,9 @@ import 
org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
 import org.apache.pinot.controller.api.resources.Constants;
 import org.apache.pinot.controller.api.resources.PauseStatus;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.PinotTableIdealStateHelper;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
+import 
org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
@@ -817,7 +817,7 @@ public class PinotLLCRealtimeSegmentManager {
   @VisibleForTesting
   List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig 
streamConfig,
       List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList) {
-    return 
PinotTableIdealStateHelper.getPartitionGroupMetadataList(streamConfig,
+    return 
PinotTableIdealStateHelperFactory.create().getPartitionGroupMetadataList(streamConfig,
         currentPartitionGroupConsumptionStatusList);
   }
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index dc988ad669..017b0b2a5a 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -33,8 +33,8 @@ import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.PinotTableIdealStateHelper;
 import org.apache.pinot.controller.helix.core.SegmentDeletionManager;
+import 
org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -270,9 +270,7 @@ public class RetentionManagerTest {
     final int replicaCount = tableConfig.getReplication();
 
     List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>();
-
-    IdealState idealState =
-        
PinotTableIdealStateHelper.buildEmptyIdealStateFor(REALTIME_TABLE_NAME, 
replicaCount, true);
+    IdealState idealState = 
PinotTableIdealStateHelperFactory.create().buildEmptyIdealStateFor(tableConfig, 
true);
 
     final int kafkaPartition = 5;
     final long millisInDays = TimeUnit.DAYS.toMillis(1);
@@ -334,9 +332,7 @@ public class RetentionManagerTest {
     final int replicaCount = tableConfig.getReplication();
 
     List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>();
-
-    IdealState idealState =
-        
PinotTableIdealStateHelper.buildEmptyIdealStateFor(REALTIME_TABLE_NAME, 
replicaCount, true);
+    IdealState idealState = 
PinotTableIdealStateHelperFactory.create().buildEmptyIdealStateFor(tableConfig, 
true);
 
     final int kafkaPartition = 5;
     final long millisInDays = TimeUnit.DAYS.toMillis(1);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to