This is an automated email from the ASF dual-hosted git repository.

somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ccc41ea8e54 Make dedup table use StrictRealtimeSegmentAssignment with 
support of multi tiers  (#17154)
ccc41ea8e54 is described below

commit ccc41ea8e54ef84c7144345fae5bef3eef4d88ec
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Fri Nov 7 10:16:00 2025 -0800

    Make dedup table use StrictRealtimeSegmentAssignment with support of multi 
tiers  (#17154)
    
    * split strict assignment to singleTier and multiTier versions
    
    * add tiers support in MultiTierStrictRealtimeSegmentAssignment
    
    * add tests
    
    * refine tests
    
    * validate dedup ttl to be smaller than min of segment ages used to select 
segments to move to other tiers
    
    * Resolve conflicts
    
    * Add gitignore files
    
    * Add in test
    
    * Checkstyle fix
    
    * Change the method signature
    
    * Add in import
    
    * Change it to map
    
    * Checkstyle fix
    
    ---------
    
    Co-authored-by: Xiaobing Li <[email protected]>
---
 ...va => BaseStrictRealtimeSegmentAssignment.java} |  62 ++----
 .../MultiTierStrictRealtimeSegmentAssignment.java  |  86 ++++++++
 .../segment/RealtimeSegmentAssignment.java         |   7 +-
 .../segment/SegmentAssignmentFactory.java          |   6 +-
 .../SingleTierStrictRealtimeSegmentAssignment.java |  77 +++++++
 .../helix/core/rebalance/TableRebalancer.java      |   6 +-
 .../StrictRealtimeSegmentAssignmentTest.java       | 225 +++++++++++++++++----
 .../segment/local/utils/TableConfigUtils.java      |  39 +++-
 .../segment/local/utils/TableConfigUtilsTest.java  |  84 ++++++++
 9 files changed, 501 insertions(+), 91 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseStrictRealtimeSegmentAssignment.java
similarity index 70%
rename from 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
rename to 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseStrictRealtimeSegmentAssignment.java
index 23fefe723fa..2eec6a9b615 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseStrictRealtimeSegmentAssignment.java
@@ -24,20 +24,17 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.metrics.ControllerMeter;
-import org.apache.pinot.common.tier.Tier;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.SegmentUtils;
-import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 
 
 /**
- * Segment assignment for LLC real-time table using upsert. The 
assignSegment() of RealtimeSegmentAssignment is
+ * Segment assignment for LLC real-time table using upsert/dedup. The 
assignSegment() of RealtimeSegmentAssignment is
  * overridden to add new segment for a table partition in a way that's 
consistent with the assignment in idealState to
  * make sure that at any time the segments from the same table partition is 
hosted by the same server.
  * <ul>
@@ -47,25 +44,28 @@ import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateM
  *     InstancePartition and the one in idealState are different, the one in 
idealState must be used so that segments
  *     from the same table partition are always hosted on the same server as 
set in current idealState. If the
  *     idealState is not honored, segments from the same table partition may 
be assigned to different servers,
- *     breaking the key assumption for queries to be correct for the table 
using upsert.
+ *     breaking the key assumption for queries to be correct for the table 
using upsert/dedup.
  *   </li>
  *   <li>
- *     There is no need to handle COMPLETED segments for tables using upsert, 
because their completed segments should
- *     not be relocated to servers tagged to host COMPLETED segments. 
Basically, upsert-enabled tables can only use
- *     servers tagged for CONSUMING segments to host both consuming and 
completed segments from a table partition.
+ *     There is no need to handle COMPLETED segments for tables using 
upsert/dedup, because their completed
+ *     segments should not be relocated to servers tagged to host COMPLETED 
segments. Basically, upsert/dedup-enabled
+ *     tables can only use servers tagged for CONSUMING segments to host both 
consuming and completed segments from a
+ *     table partition.
  *   </li>
  * </ul>
+ *
+ * The rebalanceTable() method is to be implemented by subclasses to support 
multi tiers differently.
  */
-public class StrictRealtimeSegmentAssignment extends RealtimeSegmentAssignment 
{
+public abstract class BaseStrictRealtimeSegmentAssignment extends 
RealtimeSegmentAssignment {
 
   // Cache segment partition id to avoid ZK reads.
   // NOTE:
   // 1. This cache is used for table rebalance only, but not segment 
assignment. During rebalance, rebalanceTable() can
   //    be invoked multiple times when the ideal state changes during the 
rebalance process.
   // 2. The cache won't be refreshed when an existing segment is replaced with 
a segment from a different partition.
-  //    Replacing a segment with a segment from a different partition should 
not be allowed for upsert table because it
-  //    will cause the segment being served by the wrong servers. If this 
happens during the table rebalance, another
-  //    rebalance might be needed to fix the assignment.
+  //    Replacing a segment with a segment from a different partition should 
not be allowed for upsert/dedup table
+  //    because it will cause the segment being served by the wrong servers. 
If this happens during the table
+  //    rebalance, another rebalance might be needed to fix the assignment.
   private final Object2IntOpenHashMap<String> _segmentPartitionIdMap = new 
Object2IntOpenHashMap<>();
 
   @Override
@@ -131,7 +131,7 @@ public class StrictRealtimeSegmentAssignment extends 
RealtimeSegmentAssignment {
   /**
    * Returns {@code true} if all instances are OFFLINE (neither ONLINE nor 
CONSUMING), {@code false} otherwise.
    */
-  private boolean isOfflineSegment(Map<String, String> instanceStateMap) {
+  protected boolean isOfflineSegment(Map<String, String> instanceStateMap) {
     return !instanceStateMap.containsValue(SegmentStateModel.ONLINE) && 
!instanceStateMap.containsValue(
         SegmentStateModel.CONSUMING);
   }
@@ -154,44 +154,10 @@ public class StrictRealtimeSegmentAssignment extends 
RealtimeSegmentAssignment {
     return idealAssignment.size() == instancesAssigned.size() && 
idealAssignment.containsAll(instancesAssigned);
   }
 
-  @Override
-  public Map<String, Map<String, String>> rebalanceTable(Map<String, 
Map<String, String>> currentAssignment,
-      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
@Nullable List<Tier> sortedTiers,
-      @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, 
RebalanceConfig config) {
-    Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance 
partition type should be provided");
-    InstancePartitions instancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
-    Preconditions.checkState(instancePartitions != null, "Failed to find 
CONSUMING instance partitions for table: %s",
-        _tableNameWithType);
-    Preconditions.checkArgument(config.isIncludeConsuming(),
-        "Consuming segment must be included when rebalancing upsert table: 
%s", _tableNameWithType);
-    Preconditions.checkState(sortedTiers == null, "Tiers must not be specified 
for upsert table: %s",
-        _tableNameWithType);
-    _logger.info("Rebalancing table: {} with instance partitions: {}", 
_tableNameWithType, instancePartitions);
-
-    Map<String, Map<String, String>> newAssignment = new TreeMap<>();
-    for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
-      String segmentName = entry.getKey();
-      Map<String, String> instanceStateMap = entry.getValue();
-      if (isOfflineSegment(instanceStateMap)) {
-        // Keep the OFFLINE segments not moved, and 
RealtimeSegmentValidationManager will periodically detect the
-        // OFFLINE segments and re-assign them
-        newAssignment.put(segmentName, instanceStateMap);
-      } else {
-        // Reassign CONSUMING and COMPLETED segments
-        List<String> instancesAssigned =
-            assignConsumingSegment(getPartitionIdUsingCache(segmentName), 
instancePartitions);
-        String state = 
instanceStateMap.containsValue(SegmentStateModel.CONSUMING) ? 
SegmentStateModel.CONSUMING
-            : SegmentStateModel.ONLINE;
-        newAssignment.put(segmentName, 
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, state));
-      }
-    }
-    return newAssignment;
-  }
-
   /**
    * Returns the partition id of the given segment, using cached partition id 
if exists.
    */
-  private int getPartitionIdUsingCache(String segmentName) {
+  protected int getPartitionIdUsingCache(String segmentName) {
     return _segmentPartitionIdMap.computeIntIfAbsent(segmentName, 
this::getPartitionId);
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/MultiTierStrictRealtimeSegmentAssignment.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/MultiTierStrictRealtimeSegmentAssignment.java
new file mode 100644
index 00000000000..aa9c5988390
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/MultiTierStrictRealtimeSegmentAssignment.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.segment;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.tier.Tier;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * This segment assignment policy allows the table to use multiple tiers when 
rebalancing table. This can be used for
+ * dedup table, whose segments out of TTL can be moved to new tiers without 
messing up the dedup metadata tracked on
+ * the CONSUMING servers.
+ */
+public class MultiTierStrictRealtimeSegmentAssignment extends 
BaseStrictRealtimeSegmentAssignment {
+  @Override
+  public Map<String, Map<String, String>> rebalanceTable(Map<String, 
Map<String, String>> currentAssignment,
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
@Nullable List<Tier> sortedTiers,
+      @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, 
RebalanceConfig config) {
+    Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance 
partition type should be provided");
+    InstancePartitions instancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+    Preconditions.checkState(instancePartitions != null, "Failed to find 
CONSUMING instance partitions for table: %s",
+        _tableNameWithType);
+    Preconditions.checkArgument(config.isIncludeConsuming(),
+        "Consuming segment must be included when rebalancing table: %s using 
multi-tier "
+            + "StrictRealtimeSegmentAssignment", _tableNameWithType);
+    boolean bootstrap = config.isBootstrap();
+    _logger.info("Rebalancing table: {} with instance partitions: {}, 
bootstrap: {}", _tableNameWithType,
+        instancePartitions, bootstrap);
+    // Rebalance tiers first. Only completed segments are moved to other tiers.
+    Pair<List<Map<String, Map<String, String>>>, Map<String, Map<String, 
String>>> pair =
+        rebalanceTiers(currentAssignment, sortedTiers, 
tierInstancePartitionsMap, bootstrap);
+    List<Map<String, Map<String, String>>> newTierAssignments = pair.getLeft();
+    Map<String, Map<String, String>> nonTierAssignment = pair.getRight();
+    Map<String, Map<String, String>> newAssignment = new TreeMap<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
nonTierAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> instanceStateMap = entry.getValue();
+      if (isOfflineSegment(instanceStateMap)) {
+        // Keep the OFFLINE segments not moved, and 
RealtimeSegmentValidationManager will periodically detect the
+        // OFFLINE segments and re-assign them
+        newAssignment.put(segmentName, instanceStateMap);
+      } else {
+        // Reassign CONSUMING and COMPLETED segments
+        List<String> instancesAssigned =
+            assignConsumingSegment(getPartitionIdUsingCache(segmentName), 
instancePartitions);
+        String state = 
instanceStateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING)
+            ? CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING
+            : CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
+        newAssignment.put(segmentName, 
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, state));
+      }
+    }
+    // Add tier assignments, if available
+    if (CollectionUtils.isNotEmpty(newTierAssignments)) {
+      newTierAssignments.forEach(newAssignment::putAll);
+    }
+    _logger.info("Rebalanced table: {}, number of segments to be added/removed 
for each instance: {}",
+        _tableNameWithType, 
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment, 
newAssignment));
+    return newAssignment;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index 616dd8b79bd..c31d5e74970 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -183,6 +183,9 @@ public class RealtimeSegmentAssignment extends 
BaseSegmentAssignment {
         "Failed to find CONSUMING instance partitions for table: %s", 
_tableNameWithType);
     boolean includeConsuming = config.isIncludeConsuming();
     boolean bootstrap = config.isBootstrap();
+    _logger.info("Rebalancing table: {} with COMPLETED instance partitions: 
{}, CONSUMING instance partitions: {}, "
+            + "includeConsuming: {}, bootstrap: {}", _tableNameWithType, 
completedInstancePartitions,
+        consumingInstancePartitions, includeConsuming, bootstrap);
     // Rebalance tiers first
     Pair<List<Map<String, Map<String, String>>>, Map<String, Map<String, 
String>>> pair =
         rebalanceTiers(currentAssignment, sortedTiers, 
tierInstancePartitionsMap, bootstrap);
@@ -190,10 +193,6 @@ public class RealtimeSegmentAssignment extends 
BaseSegmentAssignment {
     List<Map<String, Map<String, String>>> newTierAssignments = pair.getLeft();
     Map<String, Map<String, String>> nonTierAssignment = pair.getRight();
 
-    _logger.info("Rebalancing table: {} with COMPLETED instance partitions: 
{}, CONSUMING instance partitions: {}, "
-            + "includeConsuming: {}, bootstrap: {}", _tableNameWithType, 
completedInstancePartitions,
-        consumingInstancePartitions, includeConsuming, bootstrap);
-
     Set<String> committingSegments = null;
     if (PauselessConsumptionUtils.isPauselessEnabled(_tableConfig)) {
       List<String> committingSegmentList = 
PinotLLCRealtimeSegmentManager.getCommittingSegments(_tableNameWithType,
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
index e32a7246b2d..e9947cb8e67 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
@@ -21,6 +21,7 @@ package 
org.apache.pinot.controller.helix.core.assignment.segment;
 import javax.annotation.Nullable;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.spi.config.table.DedupConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -40,8 +41,11 @@ public class SegmentAssignmentFactory {
       segmentAssignment = new OfflineSegmentAssignment();
     } else {
       UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+      DedupConfig dedupConfig = tableConfig.getDedupConfig();
       if (upsertConfig != null && upsertConfig.getMode() != 
UpsertConfig.Mode.NONE) {
-        segmentAssignment = new StrictRealtimeSegmentAssignment();
+        segmentAssignment = new SingleTierStrictRealtimeSegmentAssignment();
+      } else if (dedupConfig != null && dedupConfig.isDedupEnabled()) {
+        segmentAssignment = new MultiTierStrictRealtimeSegmentAssignment();
       } else {
         segmentAssignment = new RealtimeSegmentAssignment();
       }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SingleTierStrictRealtimeSegmentAssignment.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SingleTierStrictRealtimeSegmentAssignment.java
new file mode 100644
index 00000000000..fbb582d394d
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SingleTierStrictRealtimeSegmentAssignment.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.segment;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.tier.Tier;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+
+
+/**
+ * This segment assignment policy doesn't allow the table to have multiple 
tiers. The upsert table has to use this
+ * today. Because moving upsert table's segments that are out of TTL needs to 
move the segments' associated
+ * validDocIds bitmaps as well for the upsert data to stay correct on the new 
tiers. Once moving bitmaps is
+ * supported later, the upsert table can use the 
MultiTierStrictRealtimeSegmentAssignment to use multi tiers too.
+ */
+public class SingleTierStrictRealtimeSegmentAssignment extends 
BaseStrictRealtimeSegmentAssignment {
+  @Override
+  public Map<String, Map<String, String>> rebalanceTable(Map<String, 
Map<String, String>> currentAssignment,
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
@Nullable List<Tier> sortedTiers,
+      @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, 
RebalanceConfig config) {
+    Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance 
partition type should be provided");
+    InstancePartitions instancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+    Preconditions.checkState(instancePartitions != null, "Failed to find 
CONSUMING instance partitions for table: %s",
+        _tableNameWithType);
+    Preconditions.checkArgument(config.isIncludeConsuming(),
+        "Consuming segment must be included when rebalancing table: %s using 
single-tier "
+            + "StrictRealtimeSegmentAssignment", _tableNameWithType);
+    Preconditions.checkState(sortedTiers == null,
+        "Tiers must not be specified for table: %s using single-tier 
StrictRealtimeSegmentAssignment",
+        _tableNameWithType);
+    _logger.info("Rebalancing table: {} with instance partitions: {}", 
_tableNameWithType, instancePartitions);
+
+    Map<String, Map<String, String>> newAssignment = new TreeMap<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> instanceStateMap = entry.getValue();
+      if (isOfflineSegment(instanceStateMap)) {
+        // Keep the OFFLINE segments not moved, and 
RealtimeSegmentValidationManager will periodically detect the
+        // OFFLINE segments and re-assign them
+        newAssignment.put(segmentName, instanceStateMap);
+      } else {
+        // Reassign CONSUMING and COMPLETED segments
+        List<String> instancesAssigned =
+            assignConsumingSegment(getPartitionIdUsingCache(segmentName), 
instancePartitions);
+        String state = 
instanceStateMap.containsValue(SegmentStateModel.CONSUMING) ? 
SegmentStateModel.CONSUMING
+            : SegmentStateModel.ONLINE;
+        newAssignment.put(segmentName, 
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, state));
+      }
+    }
+    _logger.info("Rebalanced table: {}, number of segments to be added/removed 
for each instance: {}",
+        _tableNameWithType, 
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment, 
newAssignment));
+    return newAssignment;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index be374dbed6c..d71fa7f4da8 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -71,10 +71,10 @@ import org.apache.pinot.common.utils.SegmentUtils;
 import org.apache.pinot.common.utils.config.TierConfigUtils;
 import org.apache.pinot.controller.api.resources.ForceCommitBatchConfig;
 import 
org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
+import 
org.apache.pinot.controller.helix.core.assignment.segment.BaseStrictRealtimeSegmentAssignment;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
-import 
org.apache.pinot.controller.helix.core.assignment.segment.StrictRealtimeSegmentAssignment;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.controller.util.TableSizeReader;
 import org.apache.pinot.segment.local.utils.TableConfigUtils;
@@ -558,7 +558,7 @@ public class TableRebalancer {
     // StrictReplicaGroupAssignment::rebalanceTable() and similar limitations 
apply here as well
     Object2IntOpenHashMap<String> segmentPartitionIdMap = new 
Object2IntOpenHashMap<>();
 
-    boolean isStrictRealtimeSegmentAssignment = (segmentAssignment instanceof 
StrictRealtimeSegmentAssignment);
+    boolean isStrictRealtimeSegmentAssignment = (segmentAssignment instanceof 
BaseStrictRealtimeSegmentAssignment);
     PartitionIdFetcher partitionIdFetcher =
         new PartitionIdFetcherImpl(tableNameWithType, 
TableConfigUtils.getPartitionColumn(tableConfig), _helixManager,
             isStrictRealtimeSegmentAssignment);
@@ -622,7 +622,7 @@ public class TableRebalancer {
           // If all the segments to be moved remain unchanged (same instance 
state map) in the new ideal state, apply
           // the same target instance state map for these segments to the new 
ideal state as the target assignment
           boolean segmentsToMoveChanged = false;
-          if (segmentAssignment instanceof StrictRealtimeSegmentAssignment) {
+          if (segmentAssignment instanceof 
BaseStrictRealtimeSegmentAssignment) {
             // For StrictRealtimeSegmentAssignment, we need to recompute the 
target assignment because the assignment
             // for new added segments is based on the existing assignment
             segmentsToMoveChanged = true;
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
index 72fb874973e..01a868a1bfe 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
@@ -20,15 +20,27 @@ package 
org.apache.pinot.controller.helix.core.assignment.segment;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
+import org.apache.helix.AccessOption;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.tier.PinotServerTierStorage;
+import org.apache.pinot.common.tier.Tier;
+import org.apache.pinot.common.tier.TierSegmentSelector;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
+import org.apache.pinot.spi.config.table.DedupConfig;
 import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -38,11 +50,11 @@ import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateM
 import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
@@ -66,7 +78,6 @@ public class StrictRealtimeSegmentAssignmentTest {
       
InstancePartitionsType.CONSUMING.getInstancePartitionsName(RAW_TABLE_NAME);
 
   private List<String> _segments;
-  private SegmentAssignment _segmentAssignment;
   private Map<InstancePartitionsType, InstancePartitions> 
_instancePartitionsMap;
   private InstancePartitions _newConsumingInstancePartitions;
 
@@ -77,16 +88,6 @@ public class StrictRealtimeSegmentAssignmentTest {
       _segments.add(new LLCSegmentName(RAW_TABLE_NAME, segmentId % 
NUM_PARTITIONS, segmentId / NUM_PARTITIONS,
           System.currentTimeMillis()).getSegmentName());
     }
-
-    Map<String, String> streamConfigs = 
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
-    UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
-    TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
-            .setStreamConfigs(streamConfigs).setUpsertConfig(upsertConfig)
-            
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
-            .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1)).build();
-    _segmentAssignment = 
SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(), 
tableConfig, null);
-
     _instancePartitionsMap = new TreeMap<>();
     // CONSUMING instances:
     // {
@@ -125,25 +126,50 @@ public class StrictRealtimeSegmentAssignmentTest {
     }
   }
 
-  @Test
-  public void testFactory() {
-    assertTrue(_segmentAssignment instanceof StrictRealtimeSegmentAssignment);
+  @DataProvider(name = "tableTypes")
+  public Object[] getTableTypes() {
+    return new Object[]{"upsert", "dedup"};
   }
 
-  @Test
-  public void testAssignSegment() {
-    assertTrue(_segmentAssignment instanceof StrictRealtimeSegmentAssignment);
+  private static SegmentAssignment createSegmentAssignment(String tableType) {
+    TableConfigBuilder builder = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+        .setNumReplicas(NUM_REPLICAS)
+        
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+        
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
+        .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1));
+    TableConfig tableConfig;
+    if ("upsert".equalsIgnoreCase(tableType)) {
+      tableConfig = builder.setUpsertConfig(new 
UpsertConfig(UpsertConfig.Mode.FULL)).build();
+    } else {
+      tableConfig = builder.setDedupConfig(new DedupConfig()).build();
+    }
+    SegmentAssignment segmentAssignment =
+        SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(), 
tableConfig, null);
+    assertSegmentAssignmentType(segmentAssignment, tableType);
+    return segmentAssignment;
+  }
+
+  private static void assertSegmentAssignmentType(SegmentAssignment 
segmentAssignment, String tableType) {
+    if ("upsert".equalsIgnoreCase(tableType)) {
+      assertTrue(segmentAssignment instanceof 
SingleTierStrictRealtimeSegmentAssignment);
+    } else {
+      assertTrue(segmentAssignment instanceof 
MultiTierStrictRealtimeSegmentAssignment);
+    }
+  }
+
+  @Test(dataProvider = "tableTypes")
+  public void testAssignSegment(String tableType) {
+    SegmentAssignment segmentAssignment = createSegmentAssignment(tableType);
     Map<InstancePartitionsType, InstancePartitions> 
onlyConsumingInstancePartitionMap =
         Map.of(InstancePartitionsType.CONSUMING, 
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
     int numInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES / NUM_REPLICAS;
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
     // Add segments for partition 0/1/2, but add no segment for partition 3.
     List<String> instancesAssigned;
-    boolean consistent;
     for (int segmentId = 0; segmentId < 3; segmentId++) {
       String segmentName = _segments.get(segmentId);
       instancesAssigned =
-          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
onlyConsumingInstancePartitionMap);
+          segmentAssignment.assignSegment(segmentName, currentAssignment, 
onlyConsumingInstancePartitionMap);
       assertEquals(instancesAssigned.size(), NUM_REPLICAS);
       // Segment 0 (partition 0) should be assigned to instance 0, 3, 6
       // Segment 1 (partition 1) should be assigned to instance 1, 4, 7
@@ -170,7 +196,7 @@ public class StrictRealtimeSegmentAssignmentTest {
     int segmentId = 3;
     String segmentName = _segments.get(segmentId);
     instancesAssigned =
-        _segmentAssignment.assignSegment(segmentName, currentAssignment, 
newConsumingInstancePartitionMap);
+        segmentAssignment.assignSegment(segmentName, currentAssignment, 
newConsumingInstancePartitionMap);
     assertEquals(instancesAssigned,
         Arrays.asList("new_consumingInstance_0", "new_consumingInstance_3", 
"new_consumingInstance_6"));
     addToAssignment(currentAssignment, segmentId, instancesAssigned);
@@ -179,7 +205,7 @@ public class StrictRealtimeSegmentAssignmentTest {
     for (segmentId = 4; segmentId < 7; segmentId++) {
       segmentName = _segments.get(segmentId);
       instancesAssigned =
-          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
newConsumingInstancePartitionMap);
+          segmentAssignment.assignSegment(segmentName, currentAssignment, 
newConsumingInstancePartitionMap);
       assertEquals(instancesAssigned.size(), NUM_REPLICAS);
 
       // Those segments are assigned according to the assignment from 
idealState, instead of using new_xxx instances
@@ -196,20 +222,19 @@ public class StrictRealtimeSegmentAssignmentTest {
     }
   }
 
-  @Test
-  public void testAssignSegmentWithOfflineSegment() {
-    assertTrue(_segmentAssignment instanceof StrictRealtimeSegmentAssignment);
+  @Test(dataProvider = "tableTypes")
+  public void testAssignSegmentWithOfflineSegment(String tableType) {
+    SegmentAssignment segmentAssignment = createSegmentAssignment(tableType);
     Map<InstancePartitionsType, InstancePartitions> 
onlyConsumingInstancePartitionMap =
         Map.of(InstancePartitionsType.CONSUMING, 
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
     int numInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES / NUM_REPLICAS;
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
     // Add segments for partition 0/1/2, but add no segment for partition 3.
     List<String> instancesAssigned;
-    boolean consistent;
     for (int segmentId = 0; segmentId < 3; segmentId++) {
       String segmentName = _segments.get(segmentId);
       instancesAssigned =
-          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
onlyConsumingInstancePartitionMap);
+          segmentAssignment.assignSegment(segmentName, currentAssignment, 
onlyConsumingInstancePartitionMap);
       assertEquals(instancesAssigned.size(), NUM_REPLICAS);
       // Segment 0 (partition 0) should be assigned to instance 0, 3, 6
       // Segment 1 (partition 1) should be assigned to instance 1, 4, 7
@@ -237,7 +262,7 @@ public class StrictRealtimeSegmentAssignmentTest {
     for (int segmentId = 3; segmentId < 7; segmentId++) {
       String segmentName = _segments.get(segmentId);
       instancesAssigned =
-          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
newConsumingInstancePartitionMap);
+          segmentAssignment.assignSegment(segmentName, currentAssignment, 
newConsumingInstancePartitionMap);
       assertEquals(instancesAssigned.size(), NUM_REPLICAS);
 
       // Those segments are assigned according to the assignment from 
idealState, instead of using new_xxx instances
@@ -254,9 +279,110 @@ public class StrictRealtimeSegmentAssignmentTest {
     }
   }
 
-  @Test(expectedExceptions = IllegalStateException.class)
-  public void testAssignSegmentToCompletedServers() {
-    _segmentAssignment.assignSegment("seg01", new TreeMap<>(), new 
TreeMap<>());
+  @Test
+  public void testRebalanceDedupTableWithTiers() {
+    SegmentAssignment segmentAssignment = createSegmentAssignment("dedup");
+    Map<InstancePartitionsType, InstancePartitions> 
onlyConsumingInstancePartitionMap =
+        Map.of(InstancePartitionsType.CONSUMING, 
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
+    Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
+    Set<String> segmentsOnTier = new HashSet<>();
+    for (int segmentId = 0; segmentId < 6; segmentId++) {
+      String segmentName = _segments.get(segmentId);
+      if (segmentId < 3) {
+        segmentsOnTier.add(segmentName);
+      }
+      List<String> instancesAssigned =
+          segmentAssignment.assignSegment(segmentName, currentAssignment, 
_instancePartitionsMap);
+      currentAssignment.put(segmentName,
+          SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.ONLINE));
+    }
+    String tierName = "coldTier";
+    List<Tier> sortedTiers = createSortedTiers(tierName, segmentsOnTier);
+    Map<String, InstancePartitions> tierInstancePartitionsMap = 
createTierInstancePartitionsMap(tierName, 3);
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setIncludeConsuming(true);
+    Map<String, Map<String, String>> newAssignment =
+        segmentAssignment.rebalanceTable(currentAssignment, 
onlyConsumingInstancePartitionMap, sortedTiers,
+            tierInstancePartitionsMap, rebalanceConfig);
+    assertEquals(newAssignment.size(), currentAssignment.size());
+    for (String segName : currentAssignment.keySet()) {
+      if (segmentsOnTier.contains(segName)) {
+        assertTrue(newAssignment.get(segName).keySet().stream().allMatch(s -> 
s.startsWith(tierName)));
+      } else {
+        assertTrue(
+            newAssignment.get(segName).keySet().stream().allMatch(s -> 
s.startsWith(CONSUMING_INSTANCE_NAME_PREFIX)));
+      }
+    }
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class, 
expectedExceptionsMessageRegExp = "Tiers must not be "
+      + "specified for table.*")
+  public void testRebalanceUpsertTableWithTiers() {
+    SegmentAssignment segmentAssignment = createSegmentAssignment("upsert");
+    Map<InstancePartitionsType, InstancePartitions> 
onlyConsumingInstancePartitionMap =
+        Map.of(InstancePartitionsType.CONSUMING, 
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
+    Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
+    Set<String> segmentsOnTier = new HashSet<>();
+    for (int segmentId = 0; segmentId < 6; segmentId++) {
+      String segmentName = _segments.get(segmentId);
+      if (segmentId < 3) {
+        segmentsOnTier.add(segmentName);
+      }
+      List<String> instancesAssigned =
+          segmentAssignment.assignSegment(segmentName, currentAssignment, 
_instancePartitionsMap);
+      currentAssignment.put(segmentName,
+          SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.ONLINE));
+    }
+
+    String tierName = "coldTier";
+    List<Tier> sortedTiers = createSortedTiers(tierName, segmentsOnTier);
+    Map<String, InstancePartitions> tierInstancePartitionsMap = 
createTierInstancePartitionsMap(tierName, 3);
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setIncludeConsuming(true);
+    segmentAssignment.rebalanceTable(currentAssignment, 
onlyConsumingInstancePartitionMap, sortedTiers,
+        tierInstancePartitionsMap, rebalanceConfig);
+  }
+
+  @Test(dataProvider = "tableTypes")
+  public void testAssignSegmentToCompletedServers(String tableType) {
+    SegmentAssignment segmentAssignment = createSegmentAssignment(tableType);
+    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = 
new TreeMap<>();
+    instancePartitionsMap.put(InstancePartitionsType.COMPLETED, new 
InstancePartitions("completed"));
+    try {
+      segmentAssignment.assignSegment("seg01", new TreeMap<>(), 
instancePartitionsMap);
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("Failed to find CONSUMING instance 
partitions"), e.getMessage());
+    }
+    instancePartitionsMap.put(InstancePartitionsType.CONSUMING, new 
InstancePartitions("consuming"));
+    try {
+      segmentAssignment.assignSegment("seg01", new TreeMap<>(), 
instancePartitionsMap);
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("One instance partition type should 
be provided"), e.getMessage());
+    }
+  }
+
+  @Test(dataProvider = "tableTypes")
+  public void testRebalanceTableToCompletedServers(String tableType) {
+    SegmentAssignment segmentAssignment = createSegmentAssignment(tableType);
+    String tierName = "coldTier";
+    List<Tier> sortedTiers = createSortedTiers(tierName, 
Collections.emptySet());
+    Map<String, InstancePartitions> tierInstancePartitionsMap = 
createTierInstancePartitionsMap(tierName, 3);
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = 
new TreeMap<>();
+    instancePartitionsMap.put(InstancePartitionsType.COMPLETED, new 
InstancePartitions("completed"));
+    try {
+      segmentAssignment.rebalanceTable(new TreeMap<>(), instancePartitionsMap, 
sortedTiers, tierInstancePartitionsMap,
+          rebalanceConfig);
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("Failed to find CONSUMING instance 
partitions"), e.getMessage());
+    }
+    instancePartitionsMap.put(InstancePartitionsType.CONSUMING, new 
InstancePartitions("consuming"));
+    try {
+      segmentAssignment.rebalanceTable(new TreeMap<>(), instancePartitionsMap, 
sortedTiers, tierInstancePartitionsMap,
+          rebalanceConfig);
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("One instance partition type should 
be provided"), e.getMessage());
+    }
   }
 
   private void addToAssignment(Map<String, Map<String, String>> 
currentAssignment, int segmentId,
@@ -275,11 +401,42 @@ public class StrictRealtimeSegmentAssignmentTest {
         SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.CONSUMING));
   }
 
-  private HelixManager createHelixManager() {
+  private static HelixManager createHelixManager() {
     HelixManager helixManager = mock(HelixManager.class);
     ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
+    when(propertyStore.get(anyString(), eq(null), 
eq(AccessOption.PERSISTENT))).thenAnswer(invocation -> {
+      String path = invocation.getArgument(0, String.class);
+      String segmentName = path.substring(path.lastIndexOf('/') + 1);
+      return new ZNRecord(segmentName);
+    });
     when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
-    when(propertyStore.get(anyString(), isNull(), anyInt())).thenReturn(new 
ZNRecord("0"));
     return helixManager;
   }
+
+  private static Map<String, InstancePartitions> 
createTierInstancePartitionsMap(String tierName, int serverCnt) {
+    Map<String, InstancePartitions> instancePartitionsMap = new HashMap<>();
+    InstancePartitions instancePartitionsColdTier =
+        new 
InstancePartitions(InstancePartitionsUtils.getInstancePartitionsName(RAW_TABLE_NAME,
 tierName));
+    List<String> serverList = new ArrayList<>();
+    for (int i = 0; i < serverCnt; i++) {
+      serverList.add(tierName + "_server_" + i);
+    }
+    instancePartitionsColdTier.setInstances(0, 0, serverList);
+    instancePartitionsMap.put(tierName, instancePartitionsColdTier);
+    return instancePartitionsMap;
+  }
+
+  private static List<Tier> createSortedTiers(String tierName, Set<String> 
segmentsOnTier) {
+    return List.of(new Tier(tierName, new TierSegmentSelector() {
+      @Override
+      public String getType() {
+        return "dummy";
+      }
+
+      @Override
+      public boolean selectSegment(String tableNameWithType, SegmentZKMetadata 
segmentZKMetadata) {
+        return segmentsOnTier.contains(segmentZKMetadata.getSegmentName());
+      }
+    }, new PinotServerTierStorage(tierName, null, null)));
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 4ef9e41876c..16c89b24c9b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -90,6 +90,7 @@ import 
org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import 
org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerConfig;
 import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
@@ -798,6 +799,8 @@ public final class TableConfigUtils {
     // specifically for upsert
     UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
     if (upsertConfig != null) {
+      // Currently, only one tier is allowed for upsert table, as the 
committed segments can't be moved to other tiers.
+      Preconditions.checkState(tableConfig.getTierConfigsList() == null, "The 
upsert table cannot have multi-tiers");
       // no startree index
       
Preconditions.checkState(CollectionUtils.isEmpty(tableConfig.getIndexingConfig().getStarTreeIndexConfigs())
               && !tableConfig.getIndexingConfig().isEnableDefaultStarTree(),
@@ -939,6 +942,40 @@ public final class TableConfigUtils {
       Preconditions.checkState(timeColumnDataType.isNumeric(),
           "MetadataTTL must have time column: %s in numeric type, found: %s", 
timeColumn, timeColumnDataType);
     }
+    if (tableConfig.getTierConfigsList() != null) {
+      validateTTLAndTierConfigsForDedupTable(tableConfig, schema);
+    }
+  }
+
+  @VisibleForTesting
+  static void validateTTLAndTierConfigsForDedupTable(TableConfig tableConfig, 
Schema schema) {
+    DedupConfig dedupConfig = tableConfig.getDedupConfig();
+    // Tiers are required to use segmentAge selector, and the min of them 
should be >= TTL, so that only segments
+    // out of TTL are moved to other tiers. Also, TTL must be defined on 
timeColumn to compare with segmentAges.
+    String timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
+    String dedupTimeColumn = dedupConfig.getDedupTimeColumn();
+    if (dedupTimeColumn != null && !dedupTimeColumn.isEmpty()) {
+      Preconditions.checkState(timeColumn.equalsIgnoreCase(dedupTimeColumn),
+          "DedupTimeColumn: %s is different from table's timeColumn: %s", 
dedupTimeColumn, timeColumn);
+    }
+    long minSegmentAgeInMs = Long.MAX_VALUE;
+    for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+      String tierName = tierConfig.getName();
+      String segmentSelectorType = tierConfig.getSegmentSelectorType();
+      
Preconditions.checkState(segmentSelectorType.equalsIgnoreCase(TierFactory.TIME_SEGMENT_SELECTOR_TYPE),
+          "Time based segment selector is required but tier: %s uses selector: 
%s", tierName, segmentSelectorType);
+      String segmentAge = tierConfig.getSegmentAge();
+      minSegmentAgeInMs = Math.min(minSegmentAgeInMs, 
TimeUtils.convertPeriodToMillis(segmentAge));
+    }
+    // Convert TTL value to millisecond based on timeColumn fieldSpec in order 
to compare with segment ages.
+    DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(timeColumn);
+    long ttl = (long) dedupConfig.getMetadataTTL();
+    long ttlInMs = dateTimeSpec.getFormatSpec().fromFormatToMillis(ttl);
+    LOGGER.debug(
+        "Converting MetadataTTL: {} to {}ms with DateTimeFieldSpec: {} and to 
compare with minSegmentAge: {}ms", ttl,
+        ttlInMs, dateTimeSpec, minSegmentAgeInMs);
+    Preconditions.checkState(ttlInMs < minSegmentAgeInMs,
+        "MetadataTTL: %s(ms) must be smaller than the minimum segmentAge: 
%s(ms)", ttlInMs, minSegmentAgeInMs);
   }
 
   /**
@@ -1116,8 +1153,8 @@ public final class TableConfigUtils {
       Preconditions.checkState(tierNames.add(tierName), "Tier name: %s already 
exists in tier configs", tierName);
 
       String segmentSelectorType = tierConfig.getSegmentSelectorType();
-      String segmentAge = tierConfig.getSegmentAge();
       if 
(segmentSelectorType.equalsIgnoreCase(TierFactory.TIME_SEGMENT_SELECTOR_TYPE)) {
+        String segmentAge = tierConfig.getSegmentAge();
         Preconditions.checkState(segmentAge != null,
             "Must provide 'segmentAge' for segmentSelectorType: %s in tier: 
%s", segmentSelectorType, tierName);
         Preconditions.checkState(TimeUtils.isPeriodValid(segmentAge),
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 7939fd1e58a..30ea8126ace 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -3478,4 +3478,88 @@ public class TableConfigUtilsTest {
     assertTrue(TableConfigUtils.isRelevantToTenant(tableConfig, "tierTag"));
     assertFalse(TableConfigUtils.isRelevantToTenant(tableConfig, 
"otherTenant"));
   }
+
+  @Test
+  public void testValidateTierConfigsForDedupTable() {
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+        .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+        .build();
+    // Validate that table's timeColumn must be used as dedupTimeColumn.
+    DedupConfig dedupConfig = new DedupConfig();
+    dedupConfig.setDedupTimeColumn("foo");
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setTimeColumnName(TIME_COLUMN)
+        .setDedupConfig(dedupConfig)
+        .build();
+    try {
+      TableConfigUtils.validateTTLAndTierConfigsForDedupTable(tableConfig, 
schema);
+      fail();
+    } catch (IllegalStateException e) {
+      assertEquals(e.getMessage(), "DedupTimeColumn: foo is different from 
table's timeColumn: timeColumn");
+    }
+    // Validate that time based segment selector must be used by tiers.
+    dedupConfig = new DedupConfig();
+    dedupConfig.setMetadataTTL(100);
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setTimeColumnName(TIME_COLUMN)
+        .setDedupConfig(dedupConfig)
+        .setTierConfigList(Lists.newArrayList(new TierConfig("tier1", 
TierFactory.FIXED_SEGMENT_SELECTOR_TYPE, "", null,
+            TierFactory.PINOT_SERVER_STORAGE_TYPE.toLowerCase(), 
"tier1_tag_OFFLINE", null, null)))
+        .build();
+    try {
+      TableConfigUtils.validateTTLAndTierConfigsForDedupTable(tableConfig, 
schema);
+      fail();
+    } catch (IllegalStateException e) {
+      assertEquals(e.getMessage(), "Time based segment selector is required 
but tier: tier1 uses selector: fixed");
+    }
+    // Validate that TTL must be smaller than min of segment ages of all 
segment selectors.
+    // Changing timeColumn time units to make sure TTL is converted to 
millisecond properly.
+    dedupConfig = new DedupConfig();
+    dedupConfig.setMetadataTTL(100);
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setTimeColumnName(TIME_COLUMN)
+        .setDedupConfig(dedupConfig)
+        .setTierConfigList(Lists.newArrayList(
+            new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, 
"50s", null,
+                TierFactory.PINOT_SERVER_STORAGE_TYPE.toLowerCase(), 
"tier1_tag_OFFLINE", null, null),
+            new TierConfig("tier2", 
TierFactory.TIME_SEGMENT_SELECTOR_TYPE.toLowerCase(), "100s", null,
+                TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier2_tag_OFFLINE", 
null, null)))
+        .build();
+    // Got TTL=100ms vs. minAge=50s, testing with timeColumn of ms/epoch
+    TableConfigUtils.validateTTLAndTierConfigsForDedupTable(tableConfig, 
schema);
+    // Got TTL=100s vs. minAge=50s, testing with timeColumn of sec/epoch
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+        .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:SECONDS:EPOCH", 
"1:SECONDS")
+        .build();
+    try {
+      TableConfigUtils.validateTTLAndTierConfigsForDedupTable(tableConfig, 
schema);
+      fail();
+    } catch (IllegalStateException e) {
+      assertEquals(e.getMessage(), "MetadataTTL: 100000(ms) must be smaller 
than the minimum segmentAge: 50000(ms)");
+    }
+    // Got TTL=50000ms vs. minAge=50000ms, testing with timeColumn of 
timestamp type
+    dedupConfig = new DedupConfig();
+    dedupConfig.setMetadataTTL(50000);
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setTimeColumnName(TIME_COLUMN)
+        .setDedupConfig(dedupConfig)
+        .setTierConfigList(Lists.newArrayList(
+            new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, 
"50s", null,
+                TierFactory.PINOT_SERVER_STORAGE_TYPE.toLowerCase(), 
"tier1_tag_OFFLINE", null, null),
+            new TierConfig("tier2", 
TierFactory.TIME_SEGMENT_SELECTOR_TYPE.toLowerCase(), "100s", null,
+                TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier2_tag_OFFLINE", 
null, null)))
+        .build();
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+        .addDateTime(TIME_COLUMN, FieldSpec.DataType.TIMESTAMP, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+        .build();
+    try {
+      TableConfigUtils.validateTTLAndTierConfigsForDedupTable(tableConfig, 
schema);
+      fail();
+    } catch (IllegalStateException e) {
+      assertEquals(e.getMessage(), "MetadataTTL: 50000(ms) must be smaller 
than the minimum segmentAge: 50000(ms)");
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to