This is an automated email from the ASF dual-hosted git repository.
somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ccc41ea8e54 Make dedup table use StrictRealtimeSegmentAssignment with
support of multi tiers (#17154)
ccc41ea8e54 is described below
commit ccc41ea8e54ef84c7144345fae5bef3eef4d88ec
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Fri Nov 7 10:16:00 2025 -0800
Make dedup table use StrictRealtimeSegmentAssignment with support of multi
tiers (#17154)
* split strict assignment to singleTier and multiTier versions
* add tiers support in MultiTierStrictRealtimeSegmentAssignment
* add tests
* refine tests
* validate dedup ttl to be smaller than min of segment ages used to select
segments to move to other tiers
* Resolve conflicts
* Add gitignore files
* Add in test
* Checkstyle fix
* Change the method signature
* Add in import
* Change it to map
* Checkstyle fix
---------
Co-authored-by: Xiaobing Li <[email protected]>
---
...va => BaseStrictRealtimeSegmentAssignment.java} | 62 ++----
.../MultiTierStrictRealtimeSegmentAssignment.java | 86 ++++++++
.../segment/RealtimeSegmentAssignment.java | 7 +-
.../segment/SegmentAssignmentFactory.java | 6 +-
.../SingleTierStrictRealtimeSegmentAssignment.java | 77 +++++++
.../helix/core/rebalance/TableRebalancer.java | 6 +-
.../StrictRealtimeSegmentAssignmentTest.java | 225 +++++++++++++++++----
.../segment/local/utils/TableConfigUtils.java | 39 +++-
.../segment/local/utils/TableConfigUtilsTest.java | 84 ++++++++
9 files changed, 501 insertions(+), 91 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseStrictRealtimeSegmentAssignment.java
similarity index 70%
rename from
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
rename to
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseStrictRealtimeSegmentAssignment.java
index 23fefe723fa..2eec6a9b615 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseStrictRealtimeSegmentAssignment.java
@@ -24,20 +24,17 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TreeMap;
import javax.annotation.Nullable;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.metrics.ControllerMeter;
-import org.apache.pinot.common.tier.Tier;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.SegmentUtils;
-import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
/**
- * Segment assignment for LLC real-time table using upsert. The
assignSegment() of RealtimeSegmentAssignment is
+ * Segment assignment for LLC real-time table using upsert/dedup. The
assignSegment() of RealtimeSegmentAssignment is
* overridden to add new segment for a table partition in a way that's
consistent with the assignment in idealState to
* make sure that at any time the segments from the same table partition is
hosted by the same server.
* <ul>
@@ -47,25 +44,28 @@ import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateM
* InstancePartition and the one in idealState are different, the one in
idealState must be used so that segments
* from the same table partition are always hosted on the same server as
set in current idealState. If the
* idealState is not honored, segments from the same table partition may
be assigned to different servers,
- * breaking the key assumption for queries to be correct for the table
using upsert.
+ * breaking the key assumption for queries to be correct for the table
using upsert/dedup.
* </li>
* <li>
- * There is no need to handle COMPLETED segments for tables using upsert,
because their completed segments should
- * not be relocated to servers tagged to host COMPLETED segments.
Basically, upsert-enabled tables can only use
- * servers tagged for CONSUMING segments to host both consuming and
completed segments from a table partition.
+ * There is no need to handle COMPLETED segments for tables using
upsert/dedup, because their completed
+ * segments should not be relocated to servers tagged to host COMPLETED
segments. Basically, upsert/dedup-enabled
+ * tables can only use servers tagged for CONSUMING segments to host both
consuming and completed segments from a
+ * table partition.
* </li>
* </ul>
+ *
+ * The rebalanceTable() method is to be implemented by subclasses to support
multi tiers differently.
*/
-public class StrictRealtimeSegmentAssignment extends RealtimeSegmentAssignment
{
+public abstract class BaseStrictRealtimeSegmentAssignment extends
RealtimeSegmentAssignment {
// Cache segment partition id to avoid ZK reads.
// NOTE:
// 1. This cache is used for table rebalance only, but not segment
assignment. During rebalance, rebalanceTable() can
// be invoked multiple times when the ideal state changes during the
rebalance process.
// 2. The cache won't be refreshed when an existing segment is replaced with
a segment from a different partition.
- // Replacing a segment with a segment from a different partition should
not be allowed for upsert table because it
- // will cause the segment being served by the wrong servers. If this
happens during the table rebalance, another
- // rebalance might be needed to fix the assignment.
+ // Replacing a segment with a segment from a different partition should
not be allowed for upsert/dedup table
+ // because it will cause the segment being served by the wrong servers.
If this happens during the table
+ // rebalance, another rebalance might be needed to fix the assignment.
private final Object2IntOpenHashMap<String> _segmentPartitionIdMap = new
Object2IntOpenHashMap<>();
@Override
@@ -131,7 +131,7 @@ public class StrictRealtimeSegmentAssignment extends
RealtimeSegmentAssignment {
/**
* Returns {@code true} if all instances are OFFLINE (neither ONLINE nor
CONSUMING), {@code false} otherwise.
*/
- private boolean isOfflineSegment(Map<String, String> instanceStateMap) {
+ protected boolean isOfflineSegment(Map<String, String> instanceStateMap) {
return !instanceStateMap.containsValue(SegmentStateModel.ONLINE) &&
!instanceStateMap.containsValue(
SegmentStateModel.CONSUMING);
}
@@ -154,44 +154,10 @@ public class StrictRealtimeSegmentAssignment extends
RealtimeSegmentAssignment {
return idealAssignment.size() == instancesAssigned.size() &&
idealAssignment.containsAll(instancesAssigned);
}
- @Override
- public Map<String, Map<String, String>> rebalanceTable(Map<String,
Map<String, String>> currentAssignment,
- Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
@Nullable List<Tier> sortedTiers,
- @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap,
RebalanceConfig config) {
- Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance
partition type should be provided");
- InstancePartitions instancePartitions =
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
- Preconditions.checkState(instancePartitions != null, "Failed to find
CONSUMING instance partitions for table: %s",
- _tableNameWithType);
- Preconditions.checkArgument(config.isIncludeConsuming(),
- "Consuming segment must be included when rebalancing upsert table:
%s", _tableNameWithType);
- Preconditions.checkState(sortedTiers == null, "Tiers must not be specified
for upsert table: %s",
- _tableNameWithType);
- _logger.info("Rebalancing table: {} with instance partitions: {}",
_tableNameWithType, instancePartitions);
-
- Map<String, Map<String, String>> newAssignment = new TreeMap<>();
- for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
- String segmentName = entry.getKey();
- Map<String, String> instanceStateMap = entry.getValue();
- if (isOfflineSegment(instanceStateMap)) {
- // Keep the OFFLINE segments not moved, and
RealtimeSegmentValidationManager will periodically detect the
- // OFFLINE segments and re-assign them
- newAssignment.put(segmentName, instanceStateMap);
- } else {
- // Reassign CONSUMING and COMPLETED segments
- List<String> instancesAssigned =
- assignConsumingSegment(getPartitionIdUsingCache(segmentName),
instancePartitions);
- String state =
instanceStateMap.containsValue(SegmentStateModel.CONSUMING) ?
SegmentStateModel.CONSUMING
- : SegmentStateModel.ONLINE;
- newAssignment.put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, state));
- }
- }
- return newAssignment;
- }
-
/**
* Returns the partition id of the given segment, using cached partition id
if exists.
*/
- private int getPartitionIdUsingCache(String segmentName) {
+ protected int getPartitionIdUsingCache(String segmentName) {
return _segmentPartitionIdMap.computeIntIfAbsent(segmentName,
this::getPartitionId);
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/MultiTierStrictRealtimeSegmentAssignment.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/MultiTierStrictRealtimeSegmentAssignment.java
new file mode 100644
index 00000000000..aa9c5988390
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/MultiTierStrictRealtimeSegmentAssignment.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.segment;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.tier.Tier;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * This segment assignment policy allows the table to use multiple tiers when
rebalancing table. This can be used for
+ * dedup table, whose segments out of TTL can be moved to new tiers without
messing up the dedup metadata tracked on
+ * the CONSUMING servers.
+ */
+public class MultiTierStrictRealtimeSegmentAssignment extends
BaseStrictRealtimeSegmentAssignment {
+ @Override
+ public Map<String, Map<String, String>> rebalanceTable(Map<String,
Map<String, String>> currentAssignment,
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
@Nullable List<Tier> sortedTiers,
+ @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap,
RebalanceConfig config) {
+ Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance
partition type should be provided");
+ InstancePartitions instancePartitions =
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+ Preconditions.checkState(instancePartitions != null, "Failed to find
CONSUMING instance partitions for table: %s",
+ _tableNameWithType);
+ Preconditions.checkArgument(config.isIncludeConsuming(),
+ "Consuming segment must be included when rebalancing table: %s using
multi-tier "
+ + "StrictRealtimeSegmentAssignment", _tableNameWithType);
+ boolean bootstrap = config.isBootstrap();
+ _logger.info("Rebalancing table: {} with instance partitions: {},
bootstrap: {}", _tableNameWithType,
+ instancePartitions, bootstrap);
+ // Rebalance tiers first. Only completed segments are moved to other tiers.
+ Pair<List<Map<String, Map<String, String>>>, Map<String, Map<String,
String>>> pair =
+ rebalanceTiers(currentAssignment, sortedTiers,
tierInstancePartitionsMap, bootstrap);
+ List<Map<String, Map<String, String>>> newTierAssignments = pair.getLeft();
+ Map<String, Map<String, String>> nonTierAssignment = pair.getRight();
+ Map<String, Map<String, String>> newAssignment = new TreeMap<>();
+ for (Map.Entry<String, Map<String, String>> entry :
nonTierAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> instanceStateMap = entry.getValue();
+ if (isOfflineSegment(instanceStateMap)) {
+ // Keep the OFFLINE segments not moved, and
RealtimeSegmentValidationManager will periodically detect the
+ // OFFLINE segments and re-assign them
+ newAssignment.put(segmentName, instanceStateMap);
+ } else {
+ // Reassign CONSUMING and COMPLETED segments
+ List<String> instancesAssigned =
+ assignConsumingSegment(getPartitionIdUsingCache(segmentName),
instancePartitions);
+ String state =
instanceStateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING)
+ ? CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING
+ : CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
+ newAssignment.put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, state));
+ }
+ }
+ // Add tier assignments, if available
+ if (CollectionUtils.isNotEmpty(newTierAssignments)) {
+ newTierAssignments.forEach(newAssignment::putAll);
+ }
+ _logger.info("Rebalanced table: {}, number of segments to be added/removed
for each instance: {}",
+ _tableNameWithType,
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment,
newAssignment));
+ return newAssignment;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index 616dd8b79bd..c31d5e74970 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -183,6 +183,9 @@ public class RealtimeSegmentAssignment extends
BaseSegmentAssignment {
"Failed to find CONSUMING instance partitions for table: %s",
_tableNameWithType);
boolean includeConsuming = config.isIncludeConsuming();
boolean bootstrap = config.isBootstrap();
+ _logger.info("Rebalancing table: {} with COMPLETED instance partitions:
{}, CONSUMING instance partitions: {}, "
+ + "includeConsuming: {}, bootstrap: {}", _tableNameWithType,
completedInstancePartitions,
+ consumingInstancePartitions, includeConsuming, bootstrap);
// Rebalance tiers first
Pair<List<Map<String, Map<String, String>>>, Map<String, Map<String,
String>>> pair =
rebalanceTiers(currentAssignment, sortedTiers,
tierInstancePartitionsMap, bootstrap);
@@ -190,10 +193,6 @@ public class RealtimeSegmentAssignment extends
BaseSegmentAssignment {
List<Map<String, Map<String, String>>> newTierAssignments = pair.getLeft();
Map<String, Map<String, String>> nonTierAssignment = pair.getRight();
- _logger.info("Rebalancing table: {} with COMPLETED instance partitions:
{}, CONSUMING instance partitions: {}, "
- + "includeConsuming: {}, bootstrap: {}", _tableNameWithType,
completedInstancePartitions,
- consumingInstancePartitions, includeConsuming, bootstrap);
-
Set<String> committingSegments = null;
if (PauselessConsumptionUtils.isPauselessEnabled(_tableConfig)) {
List<String> committingSegmentList =
PinotLLCRealtimeSegmentManager.getCommittingSegments(_tableNameWithType,
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
index e32a7246b2d..e9947cb8e67 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
@@ -21,6 +21,7 @@ package
org.apache.pinot.controller.helix.core.assignment.segment;
import javax.annotation.Nullable;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -40,8 +41,11 @@ public class SegmentAssignmentFactory {
segmentAssignment = new OfflineSegmentAssignment();
} else {
UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+ DedupConfig dedupConfig = tableConfig.getDedupConfig();
if (upsertConfig != null && upsertConfig.getMode() !=
UpsertConfig.Mode.NONE) {
- segmentAssignment = new StrictRealtimeSegmentAssignment();
+ segmentAssignment = new SingleTierStrictRealtimeSegmentAssignment();
+ } else if (dedupConfig != null && dedupConfig.isDedupEnabled()) {
+ segmentAssignment = new MultiTierStrictRealtimeSegmentAssignment();
} else {
segmentAssignment = new RealtimeSegmentAssignment();
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SingleTierStrictRealtimeSegmentAssignment.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SingleTierStrictRealtimeSegmentAssignment.java
new file mode 100644
index 00000000000..fbb582d394d
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SingleTierStrictRealtimeSegmentAssignment.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.segment;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.tier.Tier;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+
+
+/**
+ * This segment assignment policy doesn't allow the table to have multiple
tiers. The upsert table has to use this
+ * today. Because moving upsert table's segments that are out of TTL needs to
move the segments' associated
+ * validDocIds bitmaps as well for the upsert data to stay correct on the new
tiers. Once moving bitmaps is
+ * supported later, the upsert table can use the
MultiTierStrictRealtimeSegmentAssignment to use multi tiers too.
+ */
+public class SingleTierStrictRealtimeSegmentAssignment extends
BaseStrictRealtimeSegmentAssignment {
+ @Override
+ public Map<String, Map<String, String>> rebalanceTable(Map<String,
Map<String, String>> currentAssignment,
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
@Nullable List<Tier> sortedTiers,
+ @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap,
RebalanceConfig config) {
+ Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance
partition type should be provided");
+ InstancePartitions instancePartitions =
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+ Preconditions.checkState(instancePartitions != null, "Failed to find
CONSUMING instance partitions for table: %s",
+ _tableNameWithType);
+ Preconditions.checkArgument(config.isIncludeConsuming(),
+ "Consuming segment must be included when rebalancing table: %s using
single-tier "
+ + "StrictRealtimeSegmentAssignment", _tableNameWithType);
+ Preconditions.checkState(sortedTiers == null,
+ "Tiers must not be specified for table: %s using single-tier
StrictRealtimeSegmentAssignment",
+ _tableNameWithType);
+ _logger.info("Rebalancing table: {} with instance partitions: {}",
_tableNameWithType, instancePartitions);
+
+ Map<String, Map<String, String>> newAssignment = new TreeMap<>();
+ for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> instanceStateMap = entry.getValue();
+ if (isOfflineSegment(instanceStateMap)) {
+ // Keep the OFFLINE segments not moved, and
RealtimeSegmentValidationManager will periodically detect the
+ // OFFLINE segments and re-assign them
+ newAssignment.put(segmentName, instanceStateMap);
+ } else {
+ // Reassign CONSUMING and COMPLETED segments
+ List<String> instancesAssigned =
+ assignConsumingSegment(getPartitionIdUsingCache(segmentName),
instancePartitions);
+ String state =
instanceStateMap.containsValue(SegmentStateModel.CONSUMING) ?
SegmentStateModel.CONSUMING
+ : SegmentStateModel.ONLINE;
+ newAssignment.put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, state));
+ }
+ }
+ _logger.info("Rebalanced table: {}, number of segments to be added/removed
for each instance: {}",
+ _tableNameWithType,
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment,
newAssignment));
+ return newAssignment;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index be374dbed6c..d71fa7f4da8 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -71,10 +71,10 @@ import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.common.utils.config.TierConfigUtils;
import org.apache.pinot.controller.api.resources.ForceCommitBatchConfig;
import
org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
+import
org.apache.pinot.controller.helix.core.assignment.segment.BaseStrictRealtimeSegmentAssignment;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
-import
org.apache.pinot.controller.helix.core.assignment.segment.StrictRealtimeSegmentAssignment;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.segment.local.utils.TableConfigUtils;
@@ -558,7 +558,7 @@ public class TableRebalancer {
// StrictReplicaGroupAssignment::rebalanceTable() and similar limitations
apply here as well
Object2IntOpenHashMap<String> segmentPartitionIdMap = new
Object2IntOpenHashMap<>();
- boolean isStrictRealtimeSegmentAssignment = (segmentAssignment instanceof
StrictRealtimeSegmentAssignment);
+ boolean isStrictRealtimeSegmentAssignment = (segmentAssignment instanceof
BaseStrictRealtimeSegmentAssignment);
PartitionIdFetcher partitionIdFetcher =
new PartitionIdFetcherImpl(tableNameWithType,
TableConfigUtils.getPartitionColumn(tableConfig), _helixManager,
isStrictRealtimeSegmentAssignment);
@@ -622,7 +622,7 @@ public class TableRebalancer {
// If all the segments to be moved remain unchanged (same instance
state map) in the new ideal state, apply
// the same target instance state map for these segments to the new
ideal state as the target assignment
boolean segmentsToMoveChanged = false;
- if (segmentAssignment instanceof StrictRealtimeSegmentAssignment) {
+ if (segmentAssignment instanceof
BaseStrictRealtimeSegmentAssignment) {
// For StrictRealtimeSegmentAssignment, we need to recompute the
target assignment because the assignment
// for new added segments is based on the existing assignment
segmentsToMoveChanged = true;
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
index 72fb874973e..01a868a1bfe 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
@@ -20,15 +20,27 @@ package
org.apache.pinot.controller.helix.core.assignment.segment;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
+import org.apache.helix.AccessOption;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.tier.PinotServerTierStorage;
+import org.apache.pinot.common.tier.Tier;
+import org.apache.pinot.common.tier.TierSegmentSelector;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
+import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -38,11 +50,11 @@ import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateM
import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
@@ -66,7 +78,6 @@ public class StrictRealtimeSegmentAssignmentTest {
InstancePartitionsType.CONSUMING.getInstancePartitionsName(RAW_TABLE_NAME);
private List<String> _segments;
- private SegmentAssignment _segmentAssignment;
private Map<InstancePartitionsType, InstancePartitions>
_instancePartitionsMap;
private InstancePartitions _newConsumingInstancePartitions;
@@ -77,16 +88,6 @@ public class StrictRealtimeSegmentAssignmentTest {
_segments.add(new LLCSegmentName(RAW_TABLE_NAME, segmentId %
NUM_PARTITIONS, segmentId / NUM_PARTITIONS,
System.currentTimeMillis()).getSegmentName());
}
-
- Map<String, String> streamConfigs =
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
- UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
- TableConfig tableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
- .setStreamConfigs(streamConfigs).setUpsertConfig(upsertConfig)
-
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
- .setReplicaGroupStrategyConfig(new
ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1)).build();
- _segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(),
tableConfig, null);
-
_instancePartitionsMap = new TreeMap<>();
// CONSUMING instances:
// {
@@ -125,25 +126,50 @@ public class StrictRealtimeSegmentAssignmentTest {
}
}
- @Test
- public void testFactory() {
- assertTrue(_segmentAssignment instanceof StrictRealtimeSegmentAssignment);
+ @DataProvider(name = "tableTypes")
+ public Object[] getTableTypes() {
+ return new Object[]{"upsert", "dedup"};
}
- @Test
- public void testAssignSegment() {
- assertTrue(_segmentAssignment instanceof StrictRealtimeSegmentAssignment);
+ private static SegmentAssignment createSegmentAssignment(String tableType) {
+ TableConfigBuilder builder = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+ .setNumReplicas(NUM_REPLICAS)
+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
+ .setReplicaGroupStrategyConfig(new
ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1));
+ TableConfig tableConfig;
+ if ("upsert".equalsIgnoreCase(tableType)) {
+ tableConfig = builder.setUpsertConfig(new
UpsertConfig(UpsertConfig.Mode.FULL)).build();
+ } else {
+ tableConfig = builder.setDedupConfig(new DedupConfig()).build();
+ }
+ SegmentAssignment segmentAssignment =
+ SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(),
tableConfig, null);
+ assertSegmentAssignmentType(segmentAssignment, tableType);
+ return segmentAssignment;
+ }
+
+ private static void assertSegmentAssignmentType(SegmentAssignment
segmentAssignment, String tableType) {
+ if ("upsert".equalsIgnoreCase(tableType)) {
+ assertTrue(segmentAssignment instanceof
SingleTierStrictRealtimeSegmentAssignment);
+ } else {
+ assertTrue(segmentAssignment instanceof
MultiTierStrictRealtimeSegmentAssignment);
+ }
+ }
+
+ @Test(dataProvider = "tableTypes")
+ public void testAssignSegment(String tableType) {
+ SegmentAssignment segmentAssignment = createSegmentAssignment(tableType);
Map<InstancePartitionsType, InstancePartitions>
onlyConsumingInstancePartitionMap =
Map.of(InstancePartitionsType.CONSUMING,
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
int numInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES / NUM_REPLICAS;
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
// Add segments for partition 0/1/2, but add no segment for partition 3.
List<String> instancesAssigned;
- boolean consistent;
for (int segmentId = 0; segmentId < 3; segmentId++) {
String segmentName = _segments.get(segmentId);
instancesAssigned =
- _segmentAssignment.assignSegment(segmentName, currentAssignment,
onlyConsumingInstancePartitionMap);
+ segmentAssignment.assignSegment(segmentName, currentAssignment,
onlyConsumingInstancePartitionMap);
assertEquals(instancesAssigned.size(), NUM_REPLICAS);
// Segment 0 (partition 0) should be assigned to instance 0, 3, 6
// Segment 1 (partition 1) should be assigned to instance 1, 4, 7
@@ -170,7 +196,7 @@ public class StrictRealtimeSegmentAssignmentTest {
int segmentId = 3;
String segmentName = _segments.get(segmentId);
instancesAssigned =
- _segmentAssignment.assignSegment(segmentName, currentAssignment,
newConsumingInstancePartitionMap);
+ segmentAssignment.assignSegment(segmentName, currentAssignment,
newConsumingInstancePartitionMap);
assertEquals(instancesAssigned,
Arrays.asList("new_consumingInstance_0", "new_consumingInstance_3",
"new_consumingInstance_6"));
addToAssignment(currentAssignment, segmentId, instancesAssigned);
@@ -179,7 +205,7 @@ public class StrictRealtimeSegmentAssignmentTest {
for (segmentId = 4; segmentId < 7; segmentId++) {
segmentName = _segments.get(segmentId);
instancesAssigned =
- _segmentAssignment.assignSegment(segmentName, currentAssignment,
newConsumingInstancePartitionMap);
+ segmentAssignment.assignSegment(segmentName, currentAssignment,
newConsumingInstancePartitionMap);
assertEquals(instancesAssigned.size(), NUM_REPLICAS);
// Those segments are assigned according to the assignment from
idealState, instead of using new_xxx instances
@@ -196,20 +222,19 @@ public class StrictRealtimeSegmentAssignmentTest {
}
}
- @Test
- public void testAssignSegmentWithOfflineSegment() {
- assertTrue(_segmentAssignment instanceof StrictRealtimeSegmentAssignment);
+ @Test(dataProvider = "tableTypes")
+ public void testAssignSegmentWithOfflineSegment(String tableType) {
+ SegmentAssignment segmentAssignment = createSegmentAssignment(tableType);
Map<InstancePartitionsType, InstancePartitions>
onlyConsumingInstancePartitionMap =
Map.of(InstancePartitionsType.CONSUMING,
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
int numInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES / NUM_REPLICAS;
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
// Add segments for partition 0/1/2, but add no segment for partition 3.
List<String> instancesAssigned;
- boolean consistent;
for (int segmentId = 0; segmentId < 3; segmentId++) {
String segmentName = _segments.get(segmentId);
instancesAssigned =
- _segmentAssignment.assignSegment(segmentName, currentAssignment,
onlyConsumingInstancePartitionMap);
+ segmentAssignment.assignSegment(segmentName, currentAssignment,
onlyConsumingInstancePartitionMap);
assertEquals(instancesAssigned.size(), NUM_REPLICAS);
// Segment 0 (partition 0) should be assigned to instance 0, 3, 6
// Segment 1 (partition 1) should be assigned to instance 1, 4, 7
@@ -237,7 +262,7 @@ public class StrictRealtimeSegmentAssignmentTest {
for (int segmentId = 3; segmentId < 7; segmentId++) {
String segmentName = _segments.get(segmentId);
instancesAssigned =
- _segmentAssignment.assignSegment(segmentName, currentAssignment,
newConsumingInstancePartitionMap);
+ segmentAssignment.assignSegment(segmentName, currentAssignment,
newConsumingInstancePartitionMap);
assertEquals(instancesAssigned.size(), NUM_REPLICAS);
// Those segments are assigned according to the assignment from
idealState, instead of using new_xxx instances
@@ -254,9 +279,110 @@ public class StrictRealtimeSegmentAssignmentTest {
}
}
- @Test(expectedExceptions = IllegalStateException.class)
- public void testAssignSegmentToCompletedServers() {
- _segmentAssignment.assignSegment("seg01", new TreeMap<>(), new
TreeMap<>());
+ @Test
+ public void testRebalanceDedupTableWithTiers() {
+ SegmentAssignment segmentAssignment = createSegmentAssignment("dedup");
+ Map<InstancePartitionsType, InstancePartitions>
onlyConsumingInstancePartitionMap =
+ Map.of(InstancePartitionsType.CONSUMING,
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
+ Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
+ Set<String> segmentsOnTier = new HashSet<>();
+ for (int segmentId = 0; segmentId < 6; segmentId++) {
+ String segmentName = _segments.get(segmentId);
+ if (segmentId < 3) {
+ segmentsOnTier.add(segmentName);
+ }
+ List<String> instancesAssigned =
+ segmentAssignment.assignSegment(segmentName, currentAssignment,
_instancePartitionsMap);
+ currentAssignment.put(segmentName,
+ SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.ONLINE));
+ }
+ String tierName = "coldTier";
+ List<Tier> sortedTiers = createSortedTiers(tierName, segmentsOnTier);
+ Map<String, InstancePartitions> tierInstancePartitionsMap =
createTierInstancePartitionsMap(tierName, 3);
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setIncludeConsuming(true);
+ Map<String, Map<String, String>> newAssignment =
+ segmentAssignment.rebalanceTable(currentAssignment,
onlyConsumingInstancePartitionMap, sortedTiers,
+ tierInstancePartitionsMap, rebalanceConfig);
+ assertEquals(newAssignment.size(), currentAssignment.size());
+ for (String segName : currentAssignment.keySet()) {
+ if (segmentsOnTier.contains(segName)) {
+ assertTrue(newAssignment.get(segName).keySet().stream().allMatch(s ->
s.startsWith(tierName)));
+ } else {
+ assertTrue(
+ newAssignment.get(segName).keySet().stream().allMatch(s ->
s.startsWith(CONSUMING_INSTANCE_NAME_PREFIX)));
+ }
+ }
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class,
expectedExceptionsMessageRegExp = "Tiers must not be "
+ + "specified for table.*")
+ public void testRebalanceUpsertTableWithTiers() {
+ SegmentAssignment segmentAssignment = createSegmentAssignment("upsert");
+ Map<InstancePartitionsType, InstancePartitions>
onlyConsumingInstancePartitionMap =
+ Map.of(InstancePartitionsType.CONSUMING,
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
+ Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
+ Set<String> segmentsOnTier = new HashSet<>();
+ for (int segmentId = 0; segmentId < 6; segmentId++) {
+ String segmentName = _segments.get(segmentId);
+ if (segmentId < 3) {
+ segmentsOnTier.add(segmentName);
+ }
+ List<String> instancesAssigned =
+ segmentAssignment.assignSegment(segmentName, currentAssignment,
_instancePartitionsMap);
+ currentAssignment.put(segmentName,
+ SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.ONLINE));
+ }
+
+ String tierName = "coldTier";
+ List<Tier> sortedTiers = createSortedTiers(tierName, segmentsOnTier);
+ Map<String, InstancePartitions> tierInstancePartitionsMap =
createTierInstancePartitionsMap(tierName, 3);
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setIncludeConsuming(true);
+ segmentAssignment.rebalanceTable(currentAssignment,
onlyConsumingInstancePartitionMap, sortedTiers,
+ tierInstancePartitionsMap, rebalanceConfig);
+ }
+
+ @Test(dataProvider = "tableTypes")
+ public void testAssignSegmentToCompletedServers(String tableType) {
+ SegmentAssignment segmentAssignment = createSegmentAssignment(tableType);
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
new TreeMap<>();
+ instancePartitionsMap.put(InstancePartitionsType.COMPLETED, new
InstancePartitions("completed"));
+ try {
+ segmentAssignment.assignSegment("seg01", new TreeMap<>(),
instancePartitionsMap);
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("Failed to find CONSUMING instance
partitions"), e.getMessage());
+ }
+ instancePartitionsMap.put(InstancePartitionsType.CONSUMING, new
InstancePartitions("consuming"));
+ try {
+ segmentAssignment.assignSegment("seg01", new TreeMap<>(),
instancePartitionsMap);
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("One instance partition type should
be provided"), e.getMessage());
+ }
+ }
+
+ @Test(dataProvider = "tableTypes")
+ public void testRebalanceTableToCompletedServers(String tableType) {
+ SegmentAssignment segmentAssignment = createSegmentAssignment(tableType);
+ String tierName = "coldTier";
+ List<Tier> sortedTiers = createSortedTiers(tierName,
Collections.emptySet());
+ Map<String, InstancePartitions> tierInstancePartitionsMap =
createTierInstancePartitionsMap(tierName, 3);
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
new TreeMap<>();
+ instancePartitionsMap.put(InstancePartitionsType.COMPLETED, new
InstancePartitions("completed"));
+ try {
+ segmentAssignment.rebalanceTable(new TreeMap<>(), instancePartitionsMap,
sortedTiers, tierInstancePartitionsMap,
+ rebalanceConfig);
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("Failed to find CONSUMING instance
partitions"), e.getMessage());
+ }
+ instancePartitionsMap.put(InstancePartitionsType.CONSUMING, new
InstancePartitions("consuming"));
+ try {
+ segmentAssignment.rebalanceTable(new TreeMap<>(), instancePartitionsMap,
sortedTiers, tierInstancePartitionsMap,
+ rebalanceConfig);
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("One instance partition type should
be provided"), e.getMessage());
+ }
}
private void addToAssignment(Map<String, Map<String, String>>
currentAssignment, int segmentId,
@@ -275,11 +401,42 @@ public class StrictRealtimeSegmentAssignmentTest {
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.CONSUMING));
}
- private HelixManager createHelixManager() {
+ private static HelixManager createHelixManager() {
HelixManager helixManager = mock(HelixManager.class);
ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
+ when(propertyStore.get(anyString(), eq(null),
eq(AccessOption.PERSISTENT))).thenAnswer(invocation -> {
+ String path = invocation.getArgument(0, String.class);
+ String segmentName = path.substring(path.lastIndexOf('/') + 1);
+ return new ZNRecord(segmentName);
+ });
when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
- when(propertyStore.get(anyString(), isNull(), anyInt())).thenReturn(new
ZNRecord("0"));
return helixManager;
}
+
+ private static Map<String, InstancePartitions>
createTierInstancePartitionsMap(String tierName, int serverCnt) {
+ Map<String, InstancePartitions> instancePartitionsMap = new HashMap<>();
+ InstancePartitions instancePartitionsColdTier =
+ new
InstancePartitions(InstancePartitionsUtils.getInstancePartitionsName(RAW_TABLE_NAME,
tierName));
+ List<String> serverList = new ArrayList<>();
+ for (int i = 0; i < serverCnt; i++) {
+ serverList.add(tierName + "_server_" + i);
+ }
+ instancePartitionsColdTier.setInstances(0, 0, serverList);
+ instancePartitionsMap.put(tierName, instancePartitionsColdTier);
+ return instancePartitionsMap;
+ }
+
+ private static List<Tier> createSortedTiers(String tierName, Set<String>
segmentsOnTier) {
+ return List.of(new Tier(tierName, new TierSegmentSelector() {
+ @Override
+ public String getType() {
+ return "dummy";
+ }
+
+ @Override
+ public boolean selectSegment(String tableNameWithType, SegmentZKMetadata
segmentZKMetadata) {
+ return segmentsOnTier.contains(segmentZKMetadata.getSegmentName());
+ }
+ }, new PinotServerTierStorage(tierName, null, null)));
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 4ef9e41876c..16c89b24c9b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -90,6 +90,7 @@ import
org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import
org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerConfig;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
@@ -798,6 +799,8 @@ public final class TableConfigUtils {
// specifically for upsert
UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
if (upsertConfig != null) {
+ // Currently, only one tier is allowed for upsert table, as the
committed segments can't be moved to other tiers.
+ Preconditions.checkState(tableConfig.getTierConfigsList() == null, "The
upsert table cannot have multi-tiers");
// no startree index
Preconditions.checkState(CollectionUtils.isEmpty(tableConfig.getIndexingConfig().getStarTreeIndexConfigs())
&& !tableConfig.getIndexingConfig().isEnableDefaultStarTree(),
@@ -939,6 +942,40 @@ public final class TableConfigUtils {
Preconditions.checkState(timeColumnDataType.isNumeric(),
"MetadataTTL must have time column: %s in numeric type, found: %s",
timeColumn, timeColumnDataType);
}
+ if (tableConfig.getTierConfigsList() != null) {
+ validateTTLAndTierConfigsForDedupTable(tableConfig, schema);
+ }
+ }
+
+ @VisibleForTesting
+ static void validateTTLAndTierConfigsForDedupTable(TableConfig tableConfig,
Schema schema) {
+ DedupConfig dedupConfig = tableConfig.getDedupConfig();
+ // Tiers are required to use segmentAge selector, and the min of them
should be >= TTL, so that only segments
+ // out of TTL are moved to other tiers. Also, TTL must be defined on
timeColumn to compare with segmentAges.
+ String timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
+ String dedupTimeColumn = dedupConfig.getDedupTimeColumn();
+ if (dedupTimeColumn != null && !dedupTimeColumn.isEmpty()) {
+ Preconditions.checkState(timeColumn.equalsIgnoreCase(dedupTimeColumn),
+ "DedupTimeColumn: %s is different from table's timeColumn: %s",
dedupTimeColumn, timeColumn);
+ }
+ long minSegmentAgeInMs = Long.MAX_VALUE;
+ for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+ String tierName = tierConfig.getName();
+ String segmentSelectorType = tierConfig.getSegmentSelectorType();
+
Preconditions.checkState(segmentSelectorType.equalsIgnoreCase(TierFactory.TIME_SEGMENT_SELECTOR_TYPE),
+ "Time based segment selector is required but tier: %s uses selector:
%s", tierName, segmentSelectorType);
+ String segmentAge = tierConfig.getSegmentAge();
+ minSegmentAgeInMs = Math.min(minSegmentAgeInMs,
TimeUtils.convertPeriodToMillis(segmentAge));
+ }
+ // Convert TTL value to millisecond based on timeColumn fieldSpec in order
to compare with segment ages.
+ DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(timeColumn);
+ long ttl = (long) dedupConfig.getMetadataTTL();
+ long ttlInMs = dateTimeSpec.getFormatSpec().fromFormatToMillis(ttl);
+ LOGGER.debug(
+ "Converting MetadataTTL: {} to {}ms with DateTimeFieldSpec: {} and to
compare with minSegmentAge: {}ms", ttl,
+ ttlInMs, dateTimeSpec, minSegmentAgeInMs);
+ Preconditions.checkState(ttlInMs < minSegmentAgeInMs,
+ "MetadataTTL: %s(ms) must be smaller than the minimum segmentAge:
%s(ms)", ttlInMs, minSegmentAgeInMs);
}
/**
@@ -1116,8 +1153,8 @@ public final class TableConfigUtils {
Preconditions.checkState(tierNames.add(tierName), "Tier name: %s already
exists in tier configs", tierName);
String segmentSelectorType = tierConfig.getSegmentSelectorType();
- String segmentAge = tierConfig.getSegmentAge();
if
(segmentSelectorType.equalsIgnoreCase(TierFactory.TIME_SEGMENT_SELECTOR_TYPE)) {
+ String segmentAge = tierConfig.getSegmentAge();
Preconditions.checkState(segmentAge != null,
"Must provide 'segmentAge' for segmentSelectorType: %s in tier:
%s", segmentSelectorType, tierName);
Preconditions.checkState(TimeUtils.isPeriodValid(segmentAge),
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 7939fd1e58a..30ea8126ace 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -3478,4 +3478,88 @@ public class TableConfigUtilsTest {
assertTrue(TableConfigUtils.isRelevantToTenant(tableConfig, "tierTag"));
assertFalse(TableConfigUtils.isRelevantToTenant(tableConfig,
"otherTenant"));
}
+
+ @Test
+ public void testValidateTierConfigsForDedupTable() {
+ Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+ .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+ .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .build();
+ // Validate that table's timeColumn must be used as dedupTimeColumn.
+ DedupConfig dedupConfig = new DedupConfig();
+ dedupConfig.setDedupTimeColumn("foo");
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+ .setTimeColumnName(TIME_COLUMN)
+ .setDedupConfig(dedupConfig)
+ .build();
+ try {
+ TableConfigUtils.validateTTLAndTierConfigsForDedupTable(tableConfig,
schema);
+ fail();
+ } catch (IllegalStateException e) {
+ assertEquals(e.getMessage(), "DedupTimeColumn: foo is different from
table's timeColumn: timeColumn");
+ }
+ // Validate that time based segment selector must be used by tiers.
+ dedupConfig = new DedupConfig();
+ dedupConfig.setMetadataTTL(100);
+ tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+ .setTimeColumnName(TIME_COLUMN)
+ .setDedupConfig(dedupConfig)
+ .setTierConfigList(Lists.newArrayList(new TierConfig("tier1",
TierFactory.FIXED_SEGMENT_SELECTOR_TYPE, "", null,
+ TierFactory.PINOT_SERVER_STORAGE_TYPE.toLowerCase(),
"tier1_tag_OFFLINE", null, null)))
+ .build();
+ try {
+ TableConfigUtils.validateTTLAndTierConfigsForDedupTable(tableConfig,
schema);
+ fail();
+ } catch (IllegalStateException e) {
+ assertEquals(e.getMessage(), "Time based segment selector is required
but tier: tier1 uses selector: fixed");
+ }
+ // Validate that TTL must be smaller than min of segment ages of all
segment selectors.
+ // Changing timeColumn time units to make sure TTL is converted to
millisecond properly.
+ dedupConfig = new DedupConfig();
+ dedupConfig.setMetadataTTL(100);
+ tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+ .setTimeColumnName(TIME_COLUMN)
+ .setDedupConfig(dedupConfig)
+ .setTierConfigList(Lists.newArrayList(
+ new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE,
"50s", null,
+ TierFactory.PINOT_SERVER_STORAGE_TYPE.toLowerCase(),
"tier1_tag_OFFLINE", null, null),
+ new TierConfig("tier2",
TierFactory.TIME_SEGMENT_SELECTOR_TYPE.toLowerCase(), "100s", null,
+ TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier2_tag_OFFLINE",
null, null)))
+ .build();
+ // Got TTL=100ms vs. minAge=50s, testing with timeColumn of ms/epoch
+ TableConfigUtils.validateTTLAndTierConfigsForDedupTable(tableConfig,
schema);
+ // Got TTL=100s vs. minAge=50s, testing with timeColumn of sec/epoch
+ schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+ .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+ .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:SECONDS:EPOCH",
"1:SECONDS")
+ .build();
+ try {
+ TableConfigUtils.validateTTLAndTierConfigsForDedupTable(tableConfig,
schema);
+ fail();
+ } catch (IllegalStateException e) {
+ assertEquals(e.getMessage(), "MetadataTTL: 100000(ms) must be smaller
than the minimum segmentAge: 50000(ms)");
+ }
+ // Got TTL=50000ms vs. minAge=50000ms, testing with timeColumn of
timestamp type
+ dedupConfig = new DedupConfig();
+ dedupConfig.setMetadataTTL(50000);
+ tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+ .setTimeColumnName(TIME_COLUMN)
+ .setDedupConfig(dedupConfig)
+ .setTierConfigList(Lists.newArrayList(
+ new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE,
"50s", null,
+ TierFactory.PINOT_SERVER_STORAGE_TYPE.toLowerCase(),
"tier1_tag_OFFLINE", null, null),
+ new TierConfig("tier2",
TierFactory.TIME_SEGMENT_SELECTOR_TYPE.toLowerCase(), "100s", null,
+ TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier2_tag_OFFLINE",
null, null)))
+ .build();
+ schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+ .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+ .addDateTime(TIME_COLUMN, FieldSpec.DataType.TIMESTAMP,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .build();
+ try {
+ TableConfigUtils.validateTTLAndTierConfigsForDedupTable(tableConfig,
schema);
+ fail();
+ } catch (IllegalStateException e) {
+ assertEquals(e.getMessage(), "MetadataTTL: 50000(ms) must be smaller
than the minimum segmentAge: 50000(ms)");
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]