This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cf8cf91cdd Revert "make dedup table use strict replica group
assignment too (#15778)" (#15853)
cf8cf91cdd is described below
commit cf8cf91cdd3ca88e98d42b3414495b454762f9b7
Author: Xiaobing <[email protected]>
AuthorDate: Tue May 20 11:52:14 2025 -0700
Revert "make dedup table use strict replica group assignment too (#15778)"
(#15853)
This reverts commit 6372d78d98fd391f62670a7d324b99f83c946bd6.
The tiered storage support is required for dedup table for it to use strict
assignment correctly
---
.../segment/SegmentAssignmentFactory.java | 5 +-
.../segment/StrictRealtimeSegmentAssignment.java | 23 ++++----
.../StrictRealtimeSegmentAssignmentTest.java | 69 ++++++++++------------
3 files changed, 42 insertions(+), 55 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
index 7e37026438..e32a7246b2 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
@@ -21,7 +21,6 @@ package
org.apache.pinot.controller.helix.core.assignment.segment;
import javax.annotation.Nullable;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -41,9 +40,7 @@ public class SegmentAssignmentFactory {
segmentAssignment = new OfflineSegmentAssignment();
} else {
UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
- DedupConfig dedupConfig = tableConfig.getDedupConfig();
- if ((upsertConfig != null && upsertConfig.getMode() !=
UpsertConfig.Mode.NONE) || (dedupConfig != null
- && dedupConfig.isDedupEnabled())) {
+ if (upsertConfig != null && upsertConfig.getMode() !=
UpsertConfig.Mode.NONE) {
segmentAssignment = new StrictRealtimeSegmentAssignment();
} else {
segmentAssignment = new RealtimeSegmentAssignment();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
index 6c76332a24..2bc0ada5bd 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
@@ -37,7 +37,7 @@ import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateM
/**
- * Segment assignment for LLC real-time table using upsert or dedup. The
assignSegment() of RealtimeSegmentAssignment is
+ * Segment assignment for LLC real-time table using upsert. The
assignSegment() of RealtimeSegmentAssignment is
* overridden to add new segment for a table partition in a way that's
consistent with the assignment in idealState to
* make sure that at any time the segments from the same table partition is
hosted by the same server.
* <ul>
@@ -47,13 +47,12 @@ import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateM
* InstancePartition and the one in idealState are different, the one in
idealState must be used so that segments
* from the same table partition are always hosted on the same server as
set in current idealState. If the
* idealState is not honored, segments from the same table partition may
be assigned to different servers,
- * breaking the key assumption for queries to be correct for the table
using upsert or dedup.
+ * breaking the key assumption for queries to be correct for the table
using upsert.
* </li>
* <li>
- * There is no need to handle COMPLETED segments for tables using upsert
or dedup, because their completed
- * segments should not be relocated to servers tagged to host COMPLETED
segments. Basically, tables using upsert
- * or dedup can only use servers tagged for CONSUMING segments to host
both consuming and completed segments from
- * a table partition.
+ * There is no need to handle COMPLETED segments for tables using upsert,
because their completed segments should
+ * not be relocated to servers tagged to host COMPLETED segments.
Basically, upsert-enabled tables can only use
+ * servers tagged for CONSUMING segments to host both consuming and
completed segments from a table partition.
* </li>
* </ul>
*/
@@ -64,9 +63,9 @@ public class StrictRealtimeSegmentAssignment extends
RealtimeSegmentAssignment {
// 1. This cache is used for table rebalance only, but not segment
assignment. During rebalance, rebalanceTable() can
// be invoked multiple times when the ideal state changes during the
rebalance process.
// 2. The cache won't be refreshed when an existing segment is replaced with
a segment from a different partition.
- // Replacing a segment with a segment from a different partition should
not be allowed for upsert/dedup table
- // because it will cause the segment being served by the wrong servers.
If this happens during the table
- // rebalance, another rebalance might be needed to fix the assignment.
+ // Replacing a segment with a segment from a different partition should
not be allowed for upsert table because it
+ // will cause the segment being served by the wrong servers. If this
happens during the table rebalance, another
+ // rebalance might be needed to fix the assignment.
private final Object2IntOpenHashMap<String> _segmentPartitionIdMap = new
Object2IntOpenHashMap<>();
@Override
@@ -164,10 +163,8 @@ public class StrictRealtimeSegmentAssignment extends
RealtimeSegmentAssignment {
Preconditions.checkState(instancePartitions != null, "Failed to find
CONSUMING instance partitions for table: %s",
_tableNameWithType);
Preconditions.checkArgument(config.isIncludeConsuming(),
- "Consuming segment must be included when rebalancing upsert/dedup
table: %s", _tableNameWithType);
- // TODO: consider to support tiers for segments out of metadata TTL for
upsert/dedup table, as those segments
- // won't be included in the upsert/dedup metadata as kept on
CONSUMING tier.
- Preconditions.checkState(sortedTiers == null, "Tiers must not be specified
for upsert/dedup table: %s",
+ "Consuming segment must be included when rebalancing upsert table:
%s", _tableNameWithType);
+ Preconditions.checkState(sortedTiers == null, "Tiers must not be specified
for upsert table: %s",
_tableNameWithType);
_logger.info("Rebalancing table: {} with instance partitions: {}",
_tableNameWithType, instancePartitions);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
index 43cfb9998d..6775329d37 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
@@ -30,7 +30,6 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
-import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -40,7 +39,6 @@ import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateM
import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.anyInt;
@@ -69,6 +67,7 @@ public class StrictRealtimeSegmentAssignmentTest {
InstancePartitionsType.CONSUMING.getInstancePartitionsName(RAW_TABLE_NAME);
private List<String> _segments;
+ private SegmentAssignment _segmentAssignment;
private Map<InstancePartitionsType, InstancePartitions>
_instancePartitionsMap;
private InstancePartitions _newConsumingInstancePartitions;
@@ -79,6 +78,16 @@ public class StrictRealtimeSegmentAssignmentTest {
_segments.add(new LLCSegmentName(RAW_TABLE_NAME, segmentId %
NUM_PARTITIONS, segmentId / NUM_PARTITIONS,
System.currentTimeMillis()).getSegmentName());
}
+
+ Map<String, String> streamConfigs =
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
+ UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
+ .setStreamConfigs(streamConfigs).setUpsertConfig(upsertConfig)
+
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
+ .setReplicaGroupStrategyConfig(new
ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1)).build();
+ _segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(),
tableConfig, null);
+
_instancePartitionsMap = new TreeMap<>();
// CONSUMING instances:
// {
@@ -117,40 +126,25 @@ public class StrictRealtimeSegmentAssignmentTest {
}
}
- @DataProvider(name = "tableTypes")
- public Object[] getTableTypes() {
- return new Object[]{"upsert", "dedup"};
- }
-
- private static SegmentAssignment createSegmentAssignment(String tableType) {
- TableConfigBuilder builder = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
- .setNumReplicas(NUM_REPLICAS)
-
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
-
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
- .setReplicaGroupStrategyConfig(new
ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1));
- TableConfig tableConfig;
- if ("upsert".equalsIgnoreCase(tableType)) {
- tableConfig = builder.setUpsertConfig(new
UpsertConfig(UpsertConfig.Mode.FULL)).build();
- } else {
- tableConfig = builder.setDedupConfig(new DedupConfig()).build();
- }
- return SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(),
tableConfig, null);
+ @Test
+ public void testFactory() {
+ assertTrue(_segmentAssignment instanceof StrictRealtimeSegmentAssignment);
}
- @Test(dataProvider = "tableTypes")
- public void testAssignSegment(String tableType) {
- SegmentAssignment segmentAssignment = createSegmentAssignment(tableType);
- assertTrue(segmentAssignment instanceof StrictRealtimeSegmentAssignment);
+ @Test
+ public void testAssignSegment() {
+ assertTrue(_segmentAssignment instanceof StrictRealtimeSegmentAssignment);
Map<InstancePartitionsType, InstancePartitions>
onlyConsumingInstancePartitionMap =
ImmutableMap.of(InstancePartitionsType.CONSUMING,
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
int numInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES / NUM_REPLICAS;
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
// Add segments for partition 0/1/2, but add no segment for partition 3.
List<String> instancesAssigned;
+ boolean consistent;
for (int segmentId = 0; segmentId < 3; segmentId++) {
String segmentName = _segments.get(segmentId);
instancesAssigned =
- segmentAssignment.assignSegment(segmentName, currentAssignment,
onlyConsumingInstancePartitionMap);
+ _segmentAssignment.assignSegment(segmentName, currentAssignment,
onlyConsumingInstancePartitionMap);
assertEquals(instancesAssigned.size(), NUM_REPLICAS);
// Segment 0 (partition 0) should be assigned to instance 0, 3, 6
// Segment 1 (partition 1) should be assigned to instance 1, 4, 7
@@ -177,7 +171,7 @@ public class StrictRealtimeSegmentAssignmentTest {
int segmentId = 3;
String segmentName = _segments.get(segmentId);
instancesAssigned =
- segmentAssignment.assignSegment(segmentName, currentAssignment,
newConsumingInstancePartitionMap);
+ _segmentAssignment.assignSegment(segmentName, currentAssignment,
newConsumingInstancePartitionMap);
assertEquals(instancesAssigned,
Arrays.asList("new_consumingInstance_0", "new_consumingInstance_3",
"new_consumingInstance_6"));
addToAssignment(currentAssignment, segmentId, instancesAssigned);
@@ -186,7 +180,7 @@ public class StrictRealtimeSegmentAssignmentTest {
for (segmentId = 4; segmentId < 7; segmentId++) {
segmentName = _segments.get(segmentId);
instancesAssigned =
- segmentAssignment.assignSegment(segmentName, currentAssignment,
newConsumingInstancePartitionMap);
+ _segmentAssignment.assignSegment(segmentName, currentAssignment,
newConsumingInstancePartitionMap);
assertEquals(instancesAssigned.size(), NUM_REPLICAS);
// Those segments are assigned according to the assignment from
idealState, instead of using new_xxx instances
@@ -203,20 +197,20 @@ public class StrictRealtimeSegmentAssignmentTest {
}
}
- @Test(dataProvider = "tableTypes")
- public void testAssignSegmentWithOfflineSegment(String tableType) {
- SegmentAssignment segmentAssignment = createSegmentAssignment(tableType);
- assertTrue(segmentAssignment instanceof StrictRealtimeSegmentAssignment);
+ @Test
+ public void testAssignSegmentWithOfflineSegment() {
+ assertTrue(_segmentAssignment instanceof StrictRealtimeSegmentAssignment);
Map<InstancePartitionsType, InstancePartitions>
onlyConsumingInstancePartitionMap =
ImmutableMap.of(InstancePartitionsType.CONSUMING,
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
int numInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES / NUM_REPLICAS;
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
// Add segments for partition 0/1/2, but add no segment for partition 3.
List<String> instancesAssigned;
+ boolean consistent;
for (int segmentId = 0; segmentId < 3; segmentId++) {
String segmentName = _segments.get(segmentId);
instancesAssigned =
- segmentAssignment.assignSegment(segmentName, currentAssignment,
onlyConsumingInstancePartitionMap);
+ _segmentAssignment.assignSegment(segmentName, currentAssignment,
onlyConsumingInstancePartitionMap);
assertEquals(instancesAssigned.size(), NUM_REPLICAS);
// Segment 0 (partition 0) should be assigned to instance 0, 3, 6
// Segment 1 (partition 1) should be assigned to instance 1, 4, 7
@@ -244,7 +238,7 @@ public class StrictRealtimeSegmentAssignmentTest {
for (int segmentId = 3; segmentId < 7; segmentId++) {
String segmentName = _segments.get(segmentId);
instancesAssigned =
- segmentAssignment.assignSegment(segmentName, currentAssignment,
newConsumingInstancePartitionMap);
+ _segmentAssignment.assignSegment(segmentName, currentAssignment,
newConsumingInstancePartitionMap);
assertEquals(instancesAssigned.size(), NUM_REPLICAS);
// Those segments are assigned according to the assignment from
idealState, instead of using new_xxx instances
@@ -261,10 +255,9 @@ public class StrictRealtimeSegmentAssignmentTest {
}
}
- @Test(expectedExceptions = IllegalStateException.class, dataProvider =
"tableTypes")
- public void testAssignSegmentToCompletedServers(String tableType) {
- SegmentAssignment segmentAssignment = createSegmentAssignment(tableType);
- segmentAssignment.assignSegment("seg01", new TreeMap<>(), new TreeMap<>());
+ @Test(expectedExceptions = IllegalStateException.class)
+ public void testAssignSegmentToCompletedServers() {
+ _segmentAssignment.assignSegment("seg01", new TreeMap<>(), new
TreeMap<>());
}
private void addToAssignment(Map<String, Map<String, String>>
currentAssignment, int segmentId,
@@ -283,7 +276,7 @@ public class StrictRealtimeSegmentAssignmentTest {
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.CONSUMING));
}
- private static HelixManager createHelixManager() {
+ private HelixManager createHelixManager() {
HelixManager helixManager = mock(HelixManager.class);
ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]