This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6d6414d582b IGNITE-25040 Adapt distribution zones test to colocation
feature (#5581)
6d6414d582b is described below
commit 6d6414d582b4be64c38a6a92a405a7e4e4e0a344
Author: Slava Koptilin <[email protected]>
AuthorDate: Tue Apr 8 15:06:40 2025 +0300
IGNITE-25040 Adapt distribution zones test to colocation feature (#5581)
---
.../apache/ignite/internal/TestRebalanceUtil.java | 180 ++++++++++++++++++++
...tDistributionZoneMetaStorageCompactionTest.java | 17 +-
.../ItDistributionZonesFiltersTest.java | 73 ++++----
.../rebalance/ItRebalanceDistributedTest.java | 187 +++++++++++----------
.../ignite/internal/rebalance/ItRebalanceTest.java | 25 +--
.../rebalance/ItRebalanceTriggersRecoveryTest.java | 113 +++++++------
.../ignite/internal/table/ItEstimatedSizeTest.java | 8 +-
.../distributionzones/rebalance/RebalanceUtil.java | 61 ++++---
.../rebalance/ZoneRebalanceUtil.java | 74 +++++---
.../partitiondistribution/AssignmentsQueue.java | 5 +-
10 files changed, 477 insertions(+), 266 deletions(-)
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/TestRebalanceUtil.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/TestRebalanceUtil.java
new file mode 100644
index 00000000000..4bd53cc5554
--- /dev/null
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/TestRebalanceUtil.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static
org.apache.ignite.internal.lang.IgniteSystemProperties.enabledColocation;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
+import
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.partitiondistribution.Assignment;
+import org.apache.ignite.internal.partitiondistribution.AssignmentsQueue;
+import org.apache.ignite.internal.replicator.PartitionGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.TableViewInternal;
+
+// TODO https://issues.apache.org/jira/browse/IGNITE-22522 Remove this class
and change its usages to {@link ZoneRebalanceUtil}.
+/**
+ * Helper util class for rebalance tests.
+ */
+public class TestRebalanceUtil {
+ /**
+ * Returns partition replication group identifier.
+ *
+ * @param table Table.
+ * @param partitionId Partition identifier.
+ * @return Partition replication group identifier..
+ */
+ public static PartitionGroupId
partitionReplicationGroupId(TableViewInternal table, int partitionId) {
+ return partitionReplicationGroupId(table.internalTable(), partitionId);
+ }
+
+ /**
+ * Returns partition replication group identifier.
+ *
+ * @param table Table.
+ * @param partitionId Partition identifier.
+ * @return Partition replication group identifier..
+ */
+ public static PartitionGroupId partitionReplicationGroupId(InternalTable
table, int partitionId) {
+ if (enabledColocation()) {
+ return new ZonePartitionId(table.zoneId(), partitionId);
+ } else {
+ return new TablePartitionId(table.tableId(), partitionId);
+ }
+ }
+
+ /**
+ * Returns partition replication group identifier.
+ *
+ * @param tableDescriptor Table descriptor.
+ * @param partitionId Partition identifier.
+ * @return Partition replication group identifier..
+ */
+ public static PartitionGroupId
partitionReplicationGroupId(CatalogTableDescriptor tableDescriptor, int
partitionId) {
+ if (enabledColocation()) {
+ return new ZonePartitionId(tableDescriptor.zoneId(), partitionId);
+ } else {
+ return new TablePartitionId(tableDescriptor.id(), partitionId);
+ }
+ }
+
+ /**
+ * Returns stable partition assignments key.
+ *
+ * @param partitionGroupId Partition group identifier.
+ * @return Stable partition assignments key.
+ */
+ public static ByteArray stablePartitionAssignmentsKey(PartitionGroupId
partitionGroupId) {
+ if (enabledColocation()) {
+ return
ZoneRebalanceUtil.stablePartAssignmentsKey((ZonePartitionId) partitionGroupId);
+ } else {
+ return RebalanceUtil.stablePartAssignmentsKey((TablePartitionId)
partitionGroupId);
+ }
+ }
+
+ /**
+ * Returns pending partition assignments key.
+ *
+ * @param partitionGroupId Partition group identifier.
+ * @return Pending partition assignments key.
+ */
+ public static ByteArray pendingPartitionAssignmentsKey(PartitionGroupId
partitionGroupId) {
+ if (enabledColocation()) {
+ return
ZoneRebalanceUtil.pendingPartAssignmentsQueueKey((ZonePartitionId)
partitionGroupId);
+ } else {
+ return
RebalanceUtil.pendingPartAssignmentsQueueKey((TablePartitionId)
partitionGroupId);
+ }
+ }
+
+ /**
+ * Returns planned partition assignments key.
+ *
+ * @param partitionGroupId Partition group identifier.
+ * @return Planned partition assignments key.
+ */
+ public static ByteArray plannedPartitionAssignmentsKey(PartitionGroupId
partitionGroupId) {
+ if (enabledColocation()) {
+ return
ZoneRebalanceUtil.plannedPartAssignmentsKey((ZonePartitionId) partitionGroupId);
+ } else {
+ return RebalanceUtil.plannedPartAssignmentsKey((TablePartitionId)
partitionGroupId);
+ }
+ }
+
+ /**
+ * Returns stable partition assignments.
+ *
+ * @param metaStorageManager Meta storage manager.
+ * @param table Table.
+ * @param partitionId Partition identifier.
+ * @return Stable partition assignments.
+ */
+ public static CompletableFuture<Set<Assignment>>
stablePartitionAssignments(
+ MetaStorageManager metaStorageManager,
+ TableViewInternal table,
+ int partitionId
+ ) {
+ if (enabledColocation()) {
+ return
ZoneRebalanceUtil.zonePartitionAssignments(metaStorageManager, table.zoneId(),
partitionId);
+ } else {
+ return
RebalanceUtil.stablePartitionAssignments(metaStorageManager, table.tableId(),
partitionId);
+ }
+ }
+
+ /**
+ * Returns pending partition assignments.
+ *
+ * @param metaStorageManager Meta storage manager.
+ * @param table Table.
+ * @param partitionId Partition identifier.
+ * @return Pending partition assignments.
+ */
+ public static CompletableFuture<Set<Assignment>>
pendingPartitionAssignments(
+ MetaStorageManager metaStorageManager,
+ TableViewInternal table,
+ int partitionId
+ ) {
+ return metaStorageManager
+
.get(pendingPartitionAssignmentsKey(partitionReplicationGroupId(table,
partitionId)))
+ .thenApply(e -> e.value() == null ? null :
AssignmentsQueue.fromBytes(e.value()).poll().nodes());
+ }
+
+ /**
+ * Returns planned partition assignments.
+ *
+ * @param metaStorageManager Meta storage manager.
+ * @param table Table.
+ * @param partitionId Partition identifier.
+ * @return Planned partition assignments.
+ */
+ public static CompletableFuture<Set<Assignment>>
plannedPartitionAssignments(
+ MetaStorageManager metaStorageManager,
+ TableViewInternal table,
+ int partitionId
+ ) {
+ return metaStorageManager
+
.get(plannedPartitionAssignmentsKey(partitionReplicationGroupId(table,
partitionId)))
+ .thenApply(e -> e.value() == null ? null :
AssignmentsQueue.fromBytes(e.value()).poll().nodes());
+ }
+}
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZoneMetaStorageCompactionTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZoneMetaStorageCompactionTest.java
index 0e06ec3a83b..474d5bf7f69 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZoneMetaStorageCompactionTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZoneMetaStorageCompactionTest.java
@@ -18,11 +18,12 @@
package org.apache.ignite.internal.distributionzones;
import static java.util.stream.Collectors.toSet;
+import static
org.apache.ignite.internal.TestRebalanceUtil.partitionReplicationGroupId;
+import static
org.apache.ignite.internal.TestRebalanceUtil.stablePartitionAssignmentsKey;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertValueInStorage;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesHistoryKey;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
import static
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTriggerConfiguration.DATA_AVAILABILITY_TIME_SYSTEM_PROPERTY_NAME;
import static
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTriggerConfiguration.INTERVAL_SYSTEM_PROPERTY_NAME;
import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.executeUpdate;
@@ -39,6 +40,7 @@ import org.apache.ignite.InitParametersBuilder;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import
org.apache.ignite.internal.distributionzones.DataNodesHistory.DataNodesHistorySerializer;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -47,7 +49,7 @@ import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
import org.apache.ignite.internal.partitiondistribution.Assignments;
-import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.PartitionGroupId;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@@ -154,19 +156,18 @@ public class ItDistributionZoneMetaStorageCompactionTest
extends ClusterPerTestI
MetaStorageManager metaStorageManager = ignite.metaStorageManager();
- int tableId =
ignite.catalogManager().activeCatalog(ignite.clock().now().longValue()).tables()
+ CatalogTableDescriptor tabledDescriptor =
ignite.catalogManager().activeCatalog(ignite.clock().now().longValue()).tables()
.stream()
.filter(t -> t.name().equals(TABLE_NAME))
.findFirst()
- .orElseThrow()
- .id();
+ .orElseThrow();
- TablePartitionId partId = new TablePartitionId(tableId, 0);
+ PartitionGroupId partId =
partitionReplicationGroupId(tabledDescriptor, 0);
// Checking that there is only one replica in the stable assignments.
assertValueInStorage(
metaStorageManager,
- stablePartAssignmentsKey(partId),
+ stablePartitionAssignmentsKey(partId),
(v) -> Assignments.fromBytes(v).nodes().size(),
1,
3_000L
@@ -180,7 +181,7 @@ public class ItDistributionZoneMetaStorageCompactionTest
extends ClusterPerTestI
// Wait for the rebalancing to finish.
assertValueInStorage(
metaStorageManager,
- stablePartAssignmentsKey(partId),
+ stablePartitionAssignmentsKey(partId),
(v) -> Assignments.fromBytes(v).nodes().size(),
2,
3_000L
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesFiltersTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesFiltersTest.java
index 45efcf85cce..2b3bca90813 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesFiltersTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesFiltersTest.java
@@ -20,14 +20,16 @@ package org.apache.ignite.internal.distributionzones;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
import static
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_ROCKSDB_PROFILE_NAME;
+import static
org.apache.ignite.internal.TestRebalanceUtil.partitionReplicationGroupId;
+import static
org.apache.ignite.internal.TestRebalanceUtil.stablePartitionAssignmentsKey;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_FILTER;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertValueInStorage;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.deserializeLatestDataNodesHistoryEntry;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesHistoryKey;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsQueueKey;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
@@ -39,6 +41,7 @@ import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.TestRebalanceUtil;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
@@ -48,9 +51,8 @@ import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
-import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.PartitionGroupId;
import org.apache.ignite.internal.table.TableViewInternal;
-import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.intellij.lang.annotations.Language;
import org.junit.jupiter.api.Disabled;
@@ -122,7 +124,7 @@ public class ItDistributionZonesFiltersTest extends
ClusterPerTestIntegrationTes
// This node do not pass the filter
@Language("HOCON") String firstNodeAttributes = "{region: EU, storage:
SSD}";
- Ignite node = unwrapIgniteImpl(startNode(1,
createStartConfig(firstNodeAttributes, STORAGE_PROFILES_CONFIGS)));
+ IgniteImpl node = unwrapIgniteImpl(startNode(1,
createStartConfig(firstNodeAttributes, STORAGE_PROFILES_CONFIGS)));
node.sql().execute(
null,
@@ -131,18 +133,15 @@ public class ItDistributionZonesFiltersTest extends
ClusterPerTestIntegrationTes
node.sql().execute(null, createTableSql());
- MetaStorageManager metaStorageManager = IgniteTestUtils
- .getFieldValue(node, IgniteImpl.class, "metaStorageMgr");
+ MetaStorageManager metaStorageManager = node.metaStorageManager();
- TableManager tableManager = IgniteTestUtils.getFieldValue(node,
IgniteImpl.class, "distributedTblMgr");
+ TableViewInternal table =
unwrapTableImpl(node.distributedTableManager().table(TABLE_NAME));
- TableViewInternal table = (TableViewInternal)
tableManager.table(TABLE_NAME);
-
- TablePartitionId partId = new TablePartitionId(table.tableId(), 0);
+ PartitionGroupId partId = partitionReplicationGroupId(table, 0);
assertValueInStorage(
metaStorageManager,
- stablePartAssignmentsKey(partId),
+ stablePartitionAssignmentsKey(partId),
(v) -> Assignments.fromBytes(v).nodes().size(),
1,
TIMEOUT_MILLIS
@@ -176,7 +175,7 @@ public class ItDistributionZonesFiltersTest extends
ClusterPerTestIntegrationTes
// We check that two nodes that pass the filter and storage profiles
are presented in the stable key.
assertValueInStorage(
metaStorageManager,
- stablePartAssignmentsKey(partId),
+ stablePartitionAssignmentsKey(partId),
(v) -> Assignments.fromBytes(v).nodes()
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
Set.of(node(0).name(), node(3).name()),
@@ -195,7 +194,7 @@ public class ItDistributionZonesFiltersTest extends
ClusterPerTestIntegrationTes
void
testAlteringFiltersPropagatedDataNodesToStableImmediately(ConsistencyMode
consistencyMode) throws Exception {
String filter = "$[?(@.region == \"US\" && @.storage == \"SSD\")]";
- Ignite node0 = unwrapIgniteImpl(node(0));
+ IgniteImpl node0 = unwrapIgniteImpl(node(0));
node0.sql().execute(
null,
@@ -204,18 +203,15 @@ public class ItDistributionZonesFiltersTest extends
ClusterPerTestIntegrationTes
node0.sql().execute(null, createTableSql());
- MetaStorageManager metaStorageManager = IgniteTestUtils
- .getFieldValue(node0, IgniteImpl.class, "metaStorageMgr");
+ MetaStorageManager metaStorageManager = node0.metaStorageManager();
- TableManager tableManager = IgniteTestUtils.getFieldValue(node0,
IgniteImpl.class, "distributedTblMgr");
+ TableViewInternal table =
unwrapTableViewInternal(node0.distributedTableManager().table(TABLE_NAME));
- TableViewInternal table = (TableViewInternal)
tableManager.table(TABLE_NAME);
-
- TablePartitionId partId = new TablePartitionId(table.tableId(), 0);
+ PartitionGroupId partId = partitionReplicationGroupId(table, 0);
assertValueInStorage(
metaStorageManager,
- stablePartAssignmentsKey(partId),
+ stablePartitionAssignmentsKey(partId),
(v) -> Assignments.fromBytes(v).nodes()
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
Set.of(node(0).name()),
@@ -235,7 +231,7 @@ public class ItDistributionZonesFiltersTest extends
ClusterPerTestIntegrationTes
// We check that all nodes that pass the filter are presented in the
stable key because altering filter triggers immediate scale up.
assertValueInStorage(
metaStorageManager,
- stablePartAssignmentsKey(partId),
+ stablePartitionAssignmentsKey(partId),
(v) -> Assignments.fromBytes(v).nodes()
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
Set.of(node(0).name(), node(1).name()),
@@ -254,7 +250,7 @@ public class ItDistributionZonesFiltersTest extends
ClusterPerTestIntegrationTes
void
testEmptyDataNodesDoNotPropagatedToStableAfterAlteringFilter(ConsistencyMode
consistencyMode) throws Exception {
String filter = "$[?(@.region == \"US\" && @.storage == \"SSD\")]";
- Ignite node0 = unwrapIgniteImpl(node(0));
+ IgniteImpl node0 = unwrapIgniteImpl(node(0));
node0.sql().execute(
null,
@@ -263,18 +259,15 @@ public class ItDistributionZonesFiltersTest extends
ClusterPerTestIntegrationTes
node0.sql().execute(null, createTableSql());
- MetaStorageManager metaStorageManager = IgniteTestUtils
- .getFieldValue(node0, IgniteImpl.class, "metaStorageMgr");
+ MetaStorageManager metaStorageManager = node0.metaStorageManager();
- TableManager tableManager = IgniteTestUtils.getFieldValue(node0,
IgniteImpl.class, "distributedTblMgr");
+ TableViewInternal table =
unwrapTableViewInternal(node0.distributedTableManager().table(TABLE_NAME));
- TableViewInternal table = (TableViewInternal)
tableManager.table(TABLE_NAME);
-
- TablePartitionId partId = new TablePartitionId(table.tableId(), 0);
+ PartitionGroupId partId = partitionReplicationGroupId(table, 0);
assertValueInStorage(
metaStorageManager,
- stablePartAssignmentsKey(partId),
+ stablePartitionAssignmentsKey(partId),
(v) -> Assignments.fromBytes(v).nodes()
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
Set.of(node(0).name()),
@@ -300,7 +293,7 @@ public class ItDistributionZonesFiltersTest extends
ClusterPerTestIntegrationTes
assertValueInStorage(
metaStorageManager,
- stablePartAssignmentsKey(partId),
+ stablePartitionAssignmentsKey(partId),
(v) -> Assignments.fromBytes(v).nodes()
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
Set.of(node(0).name()),
@@ -320,7 +313,7 @@ public class ItDistributionZonesFiltersTest extends
ClusterPerTestIntegrationTes
String filter = "$[?(@.region == \"EU\" && @.storage == \"HDD\")]";
// This node do not pass the filter.
- Ignite node0 = unwrapIgniteImpl(node(0));
+ IgniteImpl node0 = unwrapIgniteImpl(node(0));
// This node passes the filter
@Language("HOCON") String firstNodeAttributes = "{region: EU, storage:
HDD}";
@@ -344,11 +337,9 @@ public class ItDistributionZonesFiltersTest extends
ClusterPerTestIntegrationTes
node1.sql().execute(null, createTableSql());
- TableManager tableManager = IgniteTestUtils.getFieldValue(node0,
IgniteImpl.class, "distributedTblMgr");
+ TableViewInternal table =
unwrapTableViewInternal(node0.distributedTableManager().table(TABLE_NAME));
- TableViewInternal table = (TableViewInternal)
tableManager.table(TABLE_NAME);
-
- TablePartitionId partId = new TablePartitionId(table.tableId(), 0);
+ PartitionGroupId partId = partitionReplicationGroupId(table, 0);
// Table was created after both nodes was up, so there wasn't any
rebalance.
assertPendingAssignmentsNeverExisted(metaStorageManager, partId);
@@ -392,11 +383,9 @@ public class ItDistributionZonesFiltersTest extends
ClusterPerTestIntegrationTes
node0.sql().execute(null, createTableSql());
- TableManager tableManager = IgniteTestUtils.getFieldValue(node0,
IgniteImpl.class, "distributedTblMgr");
-
- TableViewInternal table = (TableViewInternal)
tableManager.table(TABLE_NAME);
+ TableViewInternal table =
unwrapTableViewInternal(node0.distributedTableManager().table(TABLE_NAME));
- TablePartitionId partId = new TablePartitionId(table.tableId(), 0);
+ PartitionGroupId partId = partitionReplicationGroupId(table, 0);
// Table was created after both nodes was up, so there wasn't any
rebalance.
assertPendingAssignmentsNeverExisted(metaStorageManager, partId);
@@ -463,9 +452,9 @@ public class ItDistributionZonesFiltersTest extends
ClusterPerTestIntegrationTes
private static void assertPendingAssignmentsNeverExisted(
MetaStorageManager metaStorageManager,
- TablePartitionId partId
+ PartitionGroupId partId
) throws InterruptedException, ExecutionException {
-
assertTrue(metaStorageManager.get(pendingPartAssignmentsQueueKey(partId)).get().empty());
+
assertTrue(metaStorageManager.get(TestRebalanceUtil.pendingPartitionAssignmentsKey(partId)).get().empty());
}
private static String createZoneSql(
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 83ee90a4550..a87fa050494 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -23,6 +23,13 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME;
+import static
org.apache.ignite.internal.TestRebalanceUtil.partitionReplicationGroupId;
+import static
org.apache.ignite.internal.TestRebalanceUtil.pendingPartitionAssignments;
+import static
org.apache.ignite.internal.TestRebalanceUtil.pendingPartitionAssignmentsKey;
+import static
org.apache.ignite.internal.TestRebalanceUtil.plannedPartitionAssignments;
+import static
org.apache.ignite.internal.TestRebalanceUtil.plannedPartitionAssignmentsKey;
+import static
org.apache.ignite.internal.TestRebalanceUtil.stablePartitionAssignments;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.configuration.IgnitePaths.cmgPath;
import static
org.apache.ignite.internal.configuration.IgnitePaths.metastoragePath;
@@ -33,10 +40,10 @@ import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUt
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractTablePartitionId;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsQueueKey;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartitionAssignments;
import static
org.apache.ignite.internal.lang.IgniteSystemProperties.enabledColocation;
import static
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignmentForPartition;
+import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
+import static
org.apache.ignite.internal.table.TableTestUtils.getZoneIdByTableNameStrict;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.TestIgnitionManager.DEFAULT_MAX_CLOCK_SKEW_MS;
@@ -140,6 +147,7 @@ import
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
import
org.apache.ignite.internal.distributionzones.rebalance.RebalanceRaftGroupEventsListener;
import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
import
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceRaftGroupEventsListener;
+import
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.hlc.ClockService;
@@ -196,6 +204,7 @@ import
org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
+import org.apache.ignite.internal.replicator.PartitionGroupId;
import org.apache.ignite.internal.replicator.Replica;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
@@ -400,7 +409,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
createTable(node, ZONE_NAME, TABLE_NAME);
- assertTrue(waitForCondition(() -> getPartitionClusterNodes(node,
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
+ assertTrue(waitForCondition(() -> getPartitionStableAssignments(node,
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
electPrimaryReplica(node);
@@ -421,7 +430,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
createTable(node, ZONE_NAME, TABLE_NAME_2);
createTable(node, ZONE_NAME, TABLE_NAME_3);
- assertTrue(waitForCondition(() -> getPartitionClusterNodes(node,
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
+ assertTrue(waitForCondition(() -> getPartitionStableAssignments(node,
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
electPrimaryReplica(node);
@@ -444,7 +453,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
createTable(node, ZONE_NAME, TABLE_NAME);
- assertTrue(waitForCondition(() -> getPartitionClusterNodes(node,
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
+ assertTrue(waitForCondition(() -> getPartitionStableAssignments(node,
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
electPrimaryReplica(node);
@@ -465,7 +474,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
createTable(node, ZONE_NAME, TABLE_NAME);
- assertTrue(waitForCondition(() -> getPartitionClusterNodes(node,
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
+ assertTrue(waitForCondition(() -> getPartitionStableAssignments(node,
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
electPrimaryReplica(node);
@@ -496,7 +505,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
electPrimaryReplica(node0);
- Set<String> partitionNodesConsistentIds =
getPartitionClusterNodes(node0, 0).stream()
+ Set<String> partitionNodesConsistentIds =
getPartitionStableAssignments(node0, 0).stream()
.map(Assignment::consistentId)
.collect(toSet());
@@ -544,7 +553,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
assertTrue(countDownLatch.await(10, SECONDS));
// TODO https://issues.apache.org/jira/browse/IGNITE-22522
tableOrZoneId -> zoneId
- int tableOrZoneId = enabledColocation() ?
nonLeaderTable.internalTable().zoneId() : nonLeaderTable.tableId();
+ int tableOrZoneId = enabledColocation() ? nonLeaderTable.zoneId() :
nonLeaderTable.tableId();
assertThat(
ReplicaTestUtils.getRaftClient(nonLeaderNode.replicaManager,
tableOrZoneId, 0)
.map(raftClient -> raftClient.transferLeadership(new
Peer(nonLeaderNodeConsistentId)))
@@ -567,7 +576,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
createTable(node, ZONE_NAME, TABLE_NAME);
- assertTrue(waitForCondition(() -> getPartitionClusterNodes(node,
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
+ assertTrue(waitForCondition(() -> getPartitionStableAssignments(node,
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
alterZone(node, ZONE_NAME, 1);
@@ -611,7 +620,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
createTableWithOnePartition(node, TABLE_NAME, ZONE_NAME, 3, true);
- Set<Assignment> assignmentsBeforeChangeReplicas =
getPartitionClusterNodes(node, 0);
+ Set<Assignment> assignmentsBeforeChangeReplicas =
getPartitionStableAssignments(node, 0);
changeTableReplicasForSinglePartition(node, ZONE_NAME, 2);
@@ -619,7 +628,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
electPrimaryReplica(node);
- Set<Assignment> assignmentsAfterChangeReplicas =
getPartitionClusterNodes(node, 0);
+ Set<Assignment> assignmentsAfterChangeReplicas =
getPartitionStableAssignments(node, 0);
Set<Assignment> evictedAssignments =
getEvictedAssignments(assignmentsBeforeChangeReplicas,
assignmentsAfterChangeReplicas);
@@ -645,7 +654,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
createTableWithOnePartition(node, TABLE_NAME, ZONE_NAME, 3, true);
- Set<Assignment> assignmentsBeforeChangeReplicas =
getPartitionClusterNodes(node, 0);
+ Set<Assignment> assignmentsBeforeChangeReplicas =
getPartitionStableAssignments(node, 0);
nodes.forEach(n -> {
prepareFinishHandleChangeStableAssignmentEventFuture(n,
TABLE_NAME, 0);
@@ -657,7 +666,9 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
waitPartitionAssignmentsSyncedToExpected(0, 2);
- Assignment evictedAssignment =
first(getEvictedAssignments(assignmentsBeforeChangeReplicas,
getPartitionClusterNodes(node, 0)));
+ Assignment evictedAssignment = first(getEvictedAssignments(
+ assignmentsBeforeChangeReplicas,
+ getPartitionStableAssignments(node, 0)));
Node evictedNode =
findNodeByConsistentId(evictedAssignment.consistentId());
@@ -667,9 +678,9 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
willCompleteSuccessfully()
);
- TablePartitionId tablePartitionId =
evictedNode.getTablePartitionId(TABLE_NAME, 0);
+ PartitionGroupId partitionGroupId =
evictedNode.getPartitionGroupId(TABLE_NAME, 0);
-
assertThat(evictedNode.finishHandleChangeStableAssignmentEventFutures.get(tablePartitionId),
willThrowFast(Exception.class));
+
assertThat(evictedNode.finishHandleChangeStableAssignmentEventFutures.get(partitionGroupId),
willThrowFast(Exception.class));
// Restart evicted node.
int evictedNodeIndex =
findNodeIndexByConsistentId(evictedAssignment.consistentId());
@@ -678,7 +689,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
Node newNode = new Node(testInfo, evictedNode.networkAddress);
-
newNode.finishHandleChangeStableAssignmentEventFutures.put(tablePartitionId,
new CompletableFuture<>());
+
newNode.finishHandleChangeStableAssignmentEventFutures.put(partitionGroupId,
new CompletableFuture<>());
newNode.start();
@@ -687,7 +698,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
nodes.set(evictedNodeIndex, newNode);
// Let's make sure that we will destroy the partition again.
-
assertThat(newNode.finishHandleChangeStableAssignmentEventFutures.get(tablePartitionId),
willSucceedIn(1, TimeUnit.MINUTES));
+
assertThat(newNode.finishHandleChangeStableAssignmentEventFutures.get(partitionGroupId),
willSucceedIn(1, TimeUnit.MINUTES));
checkInvokeDestroyedPartitionStorages(newNode, TABLE_NAME, 0);
}
@@ -744,11 +755,13 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
createTable(node, ZONE_NAME, TABLE_NAME);
+ TableViewInternal table =
unwrapTableViewInternal(node.tableManager.table(TABLE_NAME));
+
waitPartitionAssignmentsSyncedToExpected(0, 1);
electPrimaryReplica(node);
- Set<Assignment> assignmentsBeforeRebalance =
getPartitionClusterNodes(node, 0);
+ Set<Assignment> assignmentsBeforeRebalance =
getPartitionStableAssignments(node, 0);
String newNodeNameForAssignment = nodes.stream()
.map(n -> Assignment.forPeer(n.clusterService.nodeName()))
@@ -758,12 +771,11 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
.consistentId();
Set<Assignment> newAssignment =
Set.of(Assignment.forPeer(newNodeNameForAssignment));
+ PartitionGroupId partitionGroupId = partitionReplicationGroupId(table,
0);
// Write the new assignments to metastore as a pending assignments.
{
- TablePartitionId partId = new TablePartitionId(getTableId(node,
TABLE_NAME), 0);
-
- ByteArray partAssignmentsPendingKey =
pendingPartAssignmentsQueueKey(partId);
+ ByteArray partAssignmentsPendingKey =
pendingPartitionAssignmentsKey(partitionGroupId);
int catalogVersion = node.catalogManager.latestCatalogVersion();
long timestamp =
node.catalogManager.catalog(catalogVersion).time();
@@ -776,13 +788,13 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
}
assertTrue(waitForCondition(
- () -> nodes.stream().allMatch(n -> getPartitionClusterNodes(n,
0).equals(newAssignment)),
+ () -> nodes.stream().allMatch(n ->
getPartitionStableAssignments(n, 0).equals(newAssignment)),
(long) AWAIT_TIMEOUT_MILLIS * nodes.size()
));
// Wait for rebalance to complete.
assertTrue(waitForCondition(
- () -> nodes.stream().allMatch(n -> getPartitionClusterNodes(n,
0).equals(newAssignment)),
+ () -> nodes.stream().allMatch(n ->
getPartitionStableAssignments(n, 0).equals(newAssignment)),
(long) AWAIT_TIMEOUT_MILLIS * nodes.size()
));
@@ -796,12 +808,11 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
));
// Checks that there no any replicas outside replication group
- var replGrpId = new TablePartitionId(getTableId(node, TABLE_NAME), 0);
Predicate<Node> isNodeOutsideReplicationGroup = n ->
!isNodeInAssignments(n, newAssignment);
assertTrue(waitForCondition(
() -> nodes.stream()
.filter(isNodeOutsideReplicationGroup)
- .noneMatch(n -> isReplicationGroupStarted(n,
replGrpId)),
+ .noneMatch(n -> isReplicationGroupStarted(n,
partitionGroupId)),
(long) AWAIT_TIMEOUT_MILLIS * nodes.size()
));
}
@@ -815,11 +826,13 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
createTable(node, ZONE_NAME, TABLE_NAME);
+ TableViewInternal table =
unwrapTableViewInternal(node.tableManager.table(TABLE_NAME));
+
waitPartitionAssignmentsSyncedToExpected(0, 1);
electPrimaryReplica(node);
- var assignmentsBeforeRebalance = getPartitionClusterNodes(node, 0);
+ var assignmentsBeforeRebalance = getPartitionStableAssignments(node,
0);
String nodeNameAssignedBeforeRebalance =
assignmentsBeforeRebalance.stream()
.findFirst()
.orElseThrow()
@@ -832,11 +845,10 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
.name;
Set<Assignment> newAssignment =
Set.of(Assignment.forPeer(newNodeNameForAssignment));
+ PartitionGroupId partId = partitionReplicationGroupId(table, 0);
// Write the new assignments to metastore as a pending assignments.
- TablePartitionId partId = new TablePartitionId(getTableId(node,
TABLE_NAME), 0);
-
- ByteArray partAssignmentsPendingKey =
pendingPartAssignmentsQueueKey(partId);
+ ByteArray partAssignmentsPendingKey =
pendingPartitionAssignmentsKey(partId);
int catalogVersion = node.catalogManager.latestCatalogVersion();
long timestamp = node.catalogManager.catalog(catalogVersion).time();
@@ -865,6 +877,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
// Checks that there no any replicas outside replication group
Predicate<Node> isNodeOutsideReplicationGroup = n ->
!isNodeInAssignments(n, union);
+
assertTrue(waitForCondition(
() -> nodes.stream()
.filter(isNodeOutsideReplicationGroup)
@@ -883,7 +896,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
createTable(node, ZONE_NAME, TABLE_NAME);
- assertTrue(waitForCondition(() -> getPartitionClusterNodes(node,
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
+ assertTrue(waitForCondition(() -> getPartitionStableAssignments(node,
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
// Check default value
checkRebalanceRetryDelay(1, REBALANCE_RETRY_DELAY_DEFAULT);
@@ -919,7 +932,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
private static boolean isNodeUpdatesPeersOnGroupService(Node node,
Set<Peer> desiredPeers) {
return ReplicaTestUtils.getRaftClient(node.replicaManager,
getTableOrZoneId(node, TABLE_NAME), 0)
- .map(raftClient ->
raftClient.peers().stream().collect(toSet()).equals(desiredPeers))
+ .map(raftClient -> new
HashSet<>(raftClient.peers()).equals(desiredPeers))
.orElse(false);
}
@@ -952,6 +965,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
Set<Assignment> plannedAssignments =
calculateAssignmentForPartition(dataNodes, 0, 1, 3);
Node node0 = getNode(0);
+ TableViewInternal table =
unwrapTableViewInternal(node0.tableManager.table(TABLE_NAME));
+ PartitionGroupId partitionGroupId = partitionReplicationGroupId(table,
0);
int catalogVersion = node0.catalogManager.latestCatalogVersion();
long timestamp = node0.catalogManager.catalog(catalogVersion).time();
@@ -959,10 +974,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
byte[] bytesPendingAssignments =
AssignmentsQueue.toBytes(Assignments.of(pendingAssignments, timestamp));
byte[] bytesPlannedAssignments =
Assignments.toBytes(plannedAssignments, timestamp);
- TablePartitionId partId = new TablePartitionId(getTableId(node0,
TABLE_NAME), 0);
-
- ByteArray partAssignmentsPendingKey =
pendingPartAssignmentsQueueKey(partId);
- ByteArray partAssignmentsPlannedKey =
plannedPartAssignmentsKey(partId);
+ ByteArray partAssignmentsPendingKey =
pendingPartitionAssignmentsKey(partitionGroupId);
+ ByteArray partAssignmentsPlannedKey =
plannedPartitionAssignmentsKey(partitionGroupId);
Map<ByteArray, byte[]> msEntries = new HashMap<>();
@@ -987,12 +1000,12 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
private void waitPartitionAssignmentsSyncedToExpected(String tableName,
int partNum, int replicasNum) throws Exception {
assertTrue(waitForCondition(
- () -> nodes.stream().allMatch(n -> getPartitionClusterNodes(n,
tableName, partNum).size() == replicasNum),
+ () -> nodes.stream().allMatch(n ->
getPartitionStableAssignments(n, tableName, partNum).size() == replicasNum),
(long) AWAIT_TIMEOUT_MILLIS * nodes.size()
));
Node anyNode = nodes.get(0);
- Set<Assignment> assignments = getPartitionClusterNodes(anyNode,
tableName, replicasNum);
+ Set<Assignment> assignments = getPartitionStableAssignments(anyNode,
tableName, replicasNum);
assertTrue(waitForCondition(
() -> nodes.stream()
@@ -1005,20 +1018,20 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
// TODO https://issues.apache.org/jira/browse/IGNITE-22522 tableOrZoneId
-> zoneId, remove.
private static int getTableOrZoneId(Node node, String tableName) {
- return enabledColocation() ?
TableTestUtils.getZoneIdByTableNameStrict(node.catalogManager, tableName,
node.hybridClock.nowLong())
- : getTableId(node, tableName);
+ return enabledColocation() ?
getZoneIdByTableNameStrict(node.catalogManager, tableName,
node.hybridClock.nowLong())
+ : getTableIdStrict(node.catalogManager, tableName,
node.hybridClock.nowLong());
}
private void waitPartitionPendingAssignmentsSyncedToExpected(int partNum,
int replicasNum) throws Exception {
assertTrue(waitForCondition(
- () -> nodes.stream().allMatch(n ->
getPartitionPendingClusterNodes(n, partNum).size() == replicasNum),
+ () -> nodes.stream().allMatch(n ->
getPartitionPendingAssignments(n, partNum).size() == replicasNum),
(long) AWAIT_TIMEOUT_MILLIS * nodes.size()
));
}
private void waitPartitionPlannedAssignmentsSyncedToExpected(int partNum,
int replicasNum) throws Exception {
assertTrue(waitForCondition(
- () -> nodes.stream().allMatch(n ->
getPartitionPlannedClusterNodes(n, partNum).size() == replicasNum),
+ () -> nodes.stream().allMatch(n ->
getPartitionPlannedAssignments(n, partNum).size() == replicasNum),
(long) AWAIT_TIMEOUT_MILLIS * nodes.size()
));
}
@@ -1034,9 +1047,9 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
private void electPrimaryReplica(Node primaryReplicaNode) throws
InterruptedException {
Node leaseholderNode =
getLeaseholderNodeForPartition(primaryReplicaNode, 0);
- int tableId = getTableId(primaryReplicaNode, TABLE_NAME);
+ TableViewInternal table =
unwrapTableViewInternal(leaseholderNode.tableManager.table(TABLE_NAME));
- TablePartitionId groupId = new TablePartitionId(tableId, 0);
+ PartitionGroupId groupId = partitionReplicationGroupId(table, 0);
assertTrue(waitForCondition(() ->
isReplicationGroupStarted(leaseholderNode, groupId), AWAIT_TIMEOUT_MILLIS));
@@ -1047,12 +1060,12 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
nodes.forEach(node ->
node.placementDriver.setPrimaryReplicaSupplier(() -> new TestReplicaMetaImpl(
leaseholder.name(),
leaseholder.id(),
- new TablePartitionId(getTableId(node, TABLE_NAME), 0)
+ groupId
)));
}
private Node getLeaseholderNodeForPartition(Node node, int partId) {
- Set<Assignment> assignments = getPartitionClusterNodes(node, partId);
+ Set<Assignment> assignments = getPartitionStableAssignments(node,
partId);
String leaseholderConsistentId =
assignments.stream().findFirst().get().consistentId();
@@ -1062,46 +1075,32 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
.get();
}
- private static Set<Assignment> getPartitionClusterNodes(Node node, int
partNum) {
- return getPartitionClusterNodes(node, TABLE_NAME, partNum);
+ private static Set<Assignment> getPartitionStableAssignments(Node node,
int partNum) {
+ return getPartitionStableAssignments(node, TABLE_NAME, partNum);
}
- private static Set<Assignment> getPartitionClusterNodes(Node node, String
tableName, int partNum) {
- return Optional.ofNullable(getTableId(node, tableName))
- .map(tableId ->
stablePartitionAssignments(node.metaStorageManager, tableId, partNum).join())
- .orElse(Set.of());
- }
+ private static Set<Assignment> getPartitionStableAssignments(Node node,
String tableName, int partNum) {
+ TableViewInternal table =
unwrapTableViewInternal(node.tableManager.table(tableName));
- private static Set<Assignment> getPartitionPendingClusterNodes(Node node,
int partNum) {
- return Optional.ofNullable(getTableId(node, TABLE_NAME))
- .map(tableId ->
partitionPendingAssignments(node.metaStorageManager, tableId, partNum).join())
+ return Optional
+
.ofNullable(stablePartitionAssignments(node.metaStorageManager, table,
partNum).join())
.orElse(Set.of());
}
- private static Set<Assignment> getPartitionPlannedClusterNodes(Node node,
int partNum) {
- return Optional.ofNullable(getTableId(node, TABLE_NAME))
- .map(tableId ->
partitionPlannedAssignments(node.metaStorageManager, tableId, partNum).join())
+ private static Set<Assignment> getPartitionPendingAssignments(Node node,
int partNum) {
+ TableViewInternal table =
unwrapTableViewInternal(node.tableManager.table(TABLE_NAME));
+
+ return Optional
+
.ofNullable(pendingPartitionAssignments(node.metaStorageManager, table,
partNum).join())
.orElse(Set.of());
}
- private static CompletableFuture<Set<Assignment>>
partitionPendingAssignments(
- MetaStorageManager metaStorageManager,
- int tableId,
- int partitionNumber
- ) {
- return metaStorageManager
- .get(pendingPartAssignmentsQueueKey(new
TablePartitionId(tableId, partitionNumber)))
- .thenApply(e -> (e.value() == null) ? null :
AssignmentsQueue.fromBytes(e.value()).poll().nodes());
- }
+ private static Set<Assignment> getPartitionPlannedAssignments(Node node,
int partNum) {
+ TableViewInternal table =
unwrapTableViewInternal(node.tableManager.table(TABLE_NAME));
- private static CompletableFuture<Set<Assignment>>
partitionPlannedAssignments(
- MetaStorageManager metaStorageManager,
- int tableId,
- int partitionNumber
- ) {
- return metaStorageManager
- .get(plannedPartAssignmentsKey(new TablePartitionId(tableId,
partitionNumber)))
- .thenApply(e -> (e.value() == null) ? null :
Assignments.fromBytes(e.value()).nodes());
+ return Optional
+
.ofNullable(plannedPartitionAssignments(node.metaStorageManager, table,
partNum).join())
+ .orElse(Set.of());
}
private class Node {
@@ -1153,7 +1152,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
private final ConfigurationTreeGenerator clusterCfgGenerator;
- private final Map<TablePartitionId, CompletableFuture<Void>>
finishHandleChangeStableAssignmentEventFutures
+ private final Map<PartitionGroupId, CompletableFuture<Void>>
finishHandleChangeStableAssignmentEventFutures
= new ConcurrentHashMap<>();
private final NetworkAddress networkAddress;
@@ -1570,15 +1569,15 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
@Override
protected CompletableFuture<Void>
handleChangeStableAssignmentEvent(WatchEvent evt) {
- TablePartitionId tablePartitionId =
getTablePartitionId(evt);
+ PartitionGroupId partitionGroupId =
getPartitionGroupId(evt);
return super.handleChangeStableAssignmentEvent(evt)
.whenComplete((v, e) -> {
- if (tablePartitionId == null) {
+ if (partitionGroupId == null) {
return;
}
- CompletableFuture<Void> finishFuture =
finishHandleChangeStableAssignmentEventFutures.get(tablePartitionId);
+ CompletableFuture<Void> finishFuture =
finishHandleChangeStableAssignmentEventFutures.get(partitionGroupId);
if (finishFuture == null) {
return;
@@ -1702,7 +1701,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
clusterCfgGenerator.close();
}
- @Nullable TablePartitionId getTablePartitionId(WatchEvent event) {
+ @Nullable PartitionGroupId getPartitionGroupId(WatchEvent event) {
assertTrue(event.single(), event.toString());
Entry stableAssignmentsWatchEvent = event.entryEvent().newEntry();
@@ -1711,13 +1710,19 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
return null;
}
- return extractTablePartitionId(stableAssignmentsWatchEvent.key(),
STABLE_ASSIGNMENTS_PREFIX_BYTES);
+ if (enabledColocation()) {
+ return ZoneRebalanceUtil.extractZonePartitionId(
+ stableAssignmentsWatchEvent.key(),
+ ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES);
+ } else {
+ return
extractTablePartitionId(stableAssignmentsWatchEvent.key(),
STABLE_ASSIGNMENTS_PREFIX_BYTES);
+ }
}
- TablePartitionId getTablePartitionId(String tableName, int
partitionId) {
+ PartitionGroupId getPartitionGroupId(String tableName, int
partitionId) {
InternalTable internalTable = getInternalTable(this, tableName);
- return new TablePartitionId(internalTable.tableId(), partitionId);
+ return partitionReplicationGroupId(internalTable, partitionId);
}
}
@@ -1800,9 +1805,11 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
}
private void prepareFinishHandleChangeStableAssignmentEventFuture(Node
node, String tableName, int partitionId) {
- TablePartitionId tablePartitionId = new
TablePartitionId(getInternalTable(node, tableName).tableId(), partitionId);
+ InternalTable table = getInternalTable(node, tableName);
-
node.finishHandleChangeStableAssignmentEventFutures.put(tablePartitionId, new
CompletableFuture<>());
+ PartitionGroupId partitionGroupId = partitionReplicationGroupId(table,
partitionId);
+
+
node.finishHandleChangeStableAssignmentEventFutures.put(partitionGroupId, new
CompletableFuture<>());
}
private CompletableFuture<?>
collectFinishHandleChangeStableAssignmentEventFuture(
@@ -1817,9 +1824,11 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
continue;
}
- TablePartitionId tablePartitionId = new
TablePartitionId(getInternalTable(node, tableName).tableId(), partitionId);
+ InternalTable table = getInternalTable(node, tableName);
+
+ PartitionGroupId partitionGroupId =
partitionReplicationGroupId(table, partitionId);
- CompletableFuture<Void> future =
node.finishHandleChangeStableAssignmentEventFutures.get(tablePartitionId);
+ CompletableFuture<Void> future =
node.finishHandleChangeStableAssignmentEventFutures.get(partitionGroupId);
assertNotNull(future, String.format("node=%s, table=%s,
partitionId=%s", node.name, tableName, partitionId));
@@ -1877,13 +1886,13 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
private void checkPartitionNodes(int partitionId, int expNodeCount) {
for (Node node : nodes) {
- assertEquals(expNodeCount, getPartitionClusterNodes(node,
partitionId).size(), node.name);
+ assertEquals(expNodeCount, getPartitionStableAssignments(node,
partitionId).size(), node.name);
}
}
private void checkPartitionNodes(String tableName, int partitionId, int
expNodeCount) {
for (Node node : nodes) {
- assertEquals(expNodeCount, getPartitionClusterNodes(node,
tableName, partitionId).size(), node.name);
+ assertEquals(expNodeCount, getPartitionStableAssignments(node,
tableName, partitionId).size(), node.name);
}
}
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
index 8dffe5c4f88..6355d2b8d69 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
@@ -17,10 +17,10 @@
package org.apache.ignite.internal.rebalance;
+import static
org.apache.ignite.internal.TestRebalanceUtil.stablePartitionAssignments;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartitionAssignments;
import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.executeUpdate;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
@@ -34,11 +34,13 @@ import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.ignite.InitParametersBuilder;
import org.apache.ignite.internal.ClusterConfiguration.Builder;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
@@ -94,7 +96,7 @@ public class ItRebalanceTest extends
ClusterPerTestIntegrationTest {
nodeName(0),
nodeName(1),
nodeName(2)
- ), table.tableId());
+ ), table);
BinaryRowEx row = marshalTuple(table, Tuple.create().set("id",
1).set("val", "value1"));
BinaryRowEx key = marshalKey(table, 1, Integer.class);
@@ -120,7 +122,7 @@ public class ItRebalanceTest extends
ClusterPerTestIntegrationTest {
nodeName(0),
nodeName(1),
nodeName(3)
- ), table.tableId());
+ ), table);
assertThat(table.internalTable().get(key, clock.now(),
clusterNode(0)), willBe(notNullValue()));
assertThat(table.internalTable().get(key, clock.now(),
clusterNode(1)), willBe(notNullValue()));
@@ -132,7 +134,7 @@ public class ItRebalanceTest extends
ClusterPerTestIntegrationTest {
nodeName(0),
nodeName(1),
nodeName(2)
- ), table.tableId());
+ ), table);
assertThat(table.internalTable().get(key, clock.now(),
clusterNode(0)), willBe(notNullValue()));
assertThat(table.internalTable().get(key, clock.now(),
clusterNode(1)), willBe(notNullValue()));
@@ -163,15 +165,18 @@ public class ItRebalanceTest extends
ClusterPerTestIntegrationTest {
return marshaller.marshal(key);
}
- private void waitForStableAssignmentsInMetastore(Set<String>
expectedNodes, int table) throws InterruptedException {
+ private void waitForStableAssignmentsInMetastore(Set<String>
expectedNodes, TableViewInternal table) throws InterruptedException {
Set<String>[] lastAssignmentsHolderForLog = new Set[1];
assertTrue(waitForCondition(() -> {
- Set<String> assignments =
-
await(stablePartitionAssignments(unwrapIgniteImpl(cluster.aliveNode()).metaStorageManager(),
table, 0))
- .stream()
- .map(Assignment::consistentId)
- .collect(Collectors.toSet());
+ IgniteImpl ignite = unwrapIgniteImpl(cluster.aliveNode());
+
+ CompletableFuture<Set<Assignment>> assignmentsFuture =
stablePartitionAssignments(ignite.metaStorageManager(), table, 0);
+
+ Set<String> assignments = await(assignmentsFuture)
+ .stream()
+ .map(Assignment::consistentId)
+ .collect(Collectors.toSet());
lastAssignmentsHolderForLog[0] = assignments;
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTriggersRecoveryTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTriggersRecoveryTest.java
index 09c69c8390c..48bb1f9d362 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTriggersRecoveryTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTriggersRecoveryTest.java
@@ -17,11 +17,13 @@
package org.apache.ignite.internal.rebalance;
+import static
org.apache.ignite.internal.TestRebalanceUtil.partitionReplicationGroupId;
+import static
org.apache.ignite.internal.TestRebalanceUtil.pendingPartitionAssignments;
+import static
org.apache.ignite.internal.TestRebalanceUtil.pendingPartitionAssignmentsKey;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.TestWrappers.unwrapTableManager;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsQueueKey;
-import static org.apache.ignite.internal.table.TableTestUtils.getTableId;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertions;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -35,13 +37,13 @@ import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.partitiondistribution.Assignment;
-import org.apache.ignite.internal.partitiondistribution.AssignmentsQueue;
-import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.PartitionGroupId;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.test.WatchListenerInhibitor;
import org.apache.ignite.table.QualifiedName;
@@ -52,6 +54,10 @@ import org.junit.jupiter.api.Test;
* Tests for recovery of the rebalance procedure.
*/
public class ItRebalanceTriggersRecoveryTest extends
ClusterPerTestIntegrationTest {
+ private static final String ZONE_NAME = "TEST_ZONE";
+
+ private static final String TABLE_NAME = "TEST";
+
private static final int PARTITION_ID = 0;
private static final String US_NODE_BOOTSTRAP_CFG_TEMPLATE = "ignite {\n"
@@ -90,16 +96,17 @@ public class ItRebalanceTriggersRecoveryTest extends
ClusterPerTestIntegrationTe
}
@Test
- void testRebalanceTriggersRecoveryAfterFilterUpdate() throws
InterruptedException {
+ void testRebalanceTriggersRecoveryAfterFilterUpdate() throws Exception {
// The nodes from different regions/zones needed to implement the
predictable way of nodes choice.
startNode(1, US_NODE_BOOTSTRAP_CFG_TEMPLATE);
startNode(2, GLOBAL_NODE_BOOTSTRAP_CFG_TEMPLATE);
cluster.doInSession(0, session -> {
- session.execute(null, "CREATE ZONE TEST_ZONE WITH PARTITIONS=1,
REPLICAS=2, DATA_NODES_FILTER='$[?(@.region == \"US\")]', "
+ session.execute(null, "CREATE ZONE " + ZONE_NAME + " WITH
PARTITIONS=1, REPLICAS=2, "
+ + "DATA_NODES_FILTER='$[?(@.region == \"US\")]', "
+ "STORAGE_PROFILES='" + DEFAULT_STORAGE_PROFILE + "'");
- session.execute(null, "CREATE TABLE TEST (id INT PRIMARY KEY, name
INT) ZONE TEST_ZONE");
- session.execute(null, "INSERT INTO TEST VALUES (0, 0)");
+ session.execute(null, "CREATE TABLE " + TABLE_NAME + " (id INT
PRIMARY KEY, name INT) ZONE " + ZONE_NAME);
+ session.execute(null, "INSERT INTO " + TABLE_NAME + " VALUES (0,
0)");
});
assertTrue(waitForCondition(() -> containsPartition(cluster.node(1)),
10_000));
@@ -112,7 +119,7 @@ public class ItRebalanceTriggersRecoveryTest extends
ClusterPerTestIntegrationTe
WatchListenerInhibitor.metastorageEventsInhibitor(cluster.node(2)).startInhibit();
cluster.doInSession(0, session -> {
- session.execute(null, "ALTER ZONE TEST_ZONE SET
DATA_NODES_FILTER='$[?(@.zone == \"global\")]'");
+ session.execute(null, "ALTER ZONE " + ZONE_NAME + " SET
DATA_NODES_FILTER='$[?(@.zone == \"global\")]'");
});
// Check that metastore node schedule the rebalance procedure.
@@ -123,10 +130,7 @@ public class ItRebalanceTriggersRecoveryTest extends
ClusterPerTestIntegrationTe
10_000));
// Remove the pending keys in a barbarian way. So, the rebalance can
be triggered only by the recovery logic now.
- Integer tableId =
getTableId(unwrapIgniteImpl(node(0)).catalogManager(), "TEST", new
HybridClockImpl().nowLong());
- unwrapIgniteImpl(node(0))
- .metaStorageManager()
- .remove(pendingPartAssignmentsQueueKey(new
TablePartitionId(tableId, PARTITION_ID))).join();
+ removePendingPartAssignmentsQueueKey(TABLE_NAME, PARTITION_ID);
restartNode(1);
restartNode(2);
@@ -136,16 +140,16 @@ public class ItRebalanceTriggersRecoveryTest extends
ClusterPerTestIntegrationTe
}
@Test
- void testRebalanceTriggersRecoveryAfterReplicasUpdate() throws
InterruptedException {
+ void testRebalanceTriggersRecoveryAfterReplicasUpdate() throws Exception {
// The nodes from different regions/zones needed to implement the
predictable way of nodes choice.
startNode(1, US_NODE_BOOTSTRAP_CFG_TEMPLATE);
startNode(2, GLOBAL_NODE_BOOTSTRAP_CFG_TEMPLATE);
cluster.doInSession(0, session -> {
- session.execute(null, "CREATE ZONE TEST_ZONE WITH PARTITIONS=1,
REPLICAS=1, "
+ session.execute(null, "CREATE ZONE " + ZONE_NAME + " WITH
PARTITIONS=1, REPLICAS=1, "
+ "DATA_NODES_FILTER='$[?(@.zone == \"global\")]',
STORAGE_PROFILES='" + DEFAULT_STORAGE_PROFILE + "'");
- session.execute(null, "CREATE TABLE TEST (id INT PRIMARY KEY, name
INT) ZONE TEST_ZONE");
- session.execute(null, "INSERT INTO TEST VALUES (0, 0)");
+ session.execute(null, "CREATE TABLE " + TABLE_NAME + " (id INT
PRIMARY KEY, name INT) ZONE " + ZONE_NAME);
+ session.execute(null, "INSERT INTO " + TABLE_NAME + " VALUES (0,
0)");
});
assertTrue(waitForCondition(() -> containsPartition(cluster.node(1)),
10_000));
@@ -158,7 +162,7 @@ public class ItRebalanceTriggersRecoveryTest extends
ClusterPerTestIntegrationTe
WatchListenerInhibitor.metastorageEventsInhibitor(cluster.node(2)).startInhibit();
cluster.doInSession(0, session -> {
- session.execute(null, "ALTER ZONE TEST_ZONE SET REPLICAS=2");
+ session.execute(null, "ALTER ZONE " + ZONE_NAME + " SET
REPLICAS=2");
});
// Check that metastore node schedule the rebalance procedure.
@@ -169,10 +173,7 @@ public class ItRebalanceTriggersRecoveryTest extends
ClusterPerTestIntegrationTe
10_000));
// Remove the pending keys in a barbarian way. So, the rebalance can
be triggered only by the recovery logic now.
- Integer tableId =
getTableId(unwrapIgniteImpl(node(0)).catalogManager(), "TEST", new
HybridClockImpl().nowLong());
- unwrapIgniteImpl(node(0))
- .metaStorageManager()
- .remove(pendingPartAssignmentsQueueKey(new
TablePartitionId(tableId, PARTITION_ID))).join();
+ removePendingPartAssignmentsQueueKey(TABLE_NAME, PARTITION_ID);
restartNode(1);
restartNode(2);
@@ -190,10 +191,10 @@ public class ItRebalanceTriggersRecoveryTest extends
ClusterPerTestIntegrationTe
startNode(3);
cluster.doInSession(0, session -> {
- session.execute(null, "CREATE ZONE TEST_ZONE WITH PARTITIONS=1,
REPLICAS=1, "
+ session.execute(null, "CREATE ZONE " + ZONE_NAME + " WITH
PARTITIONS=1, REPLICAS=1, "
+ "DATA_NODES_FILTER='$[?(@.region == \"US\")]',
STORAGE_PROFILES='" + DEFAULT_STORAGE_PROFILE + "'");
- session.execute(null, "CREATE TABLE TEST (id INT PRIMARY KEY, name
INT) ZONE TEST_ZONE");
- session.execute(null, "INSERT INTO TEST VALUES (0, 0)");
+ session.execute(null, "CREATE TABLE " + TABLE_NAME + " (id INT
PRIMARY KEY, name INT) ZONE " + ZONE_NAME);
+ session.execute(null, "INSERT INTO " + TABLE_NAME + " VALUES (0,
0)");
});
assertTrue(waitForCondition(() -> containsPartition(cluster.node(1)),
10_000));
@@ -202,7 +203,7 @@ public class ItRebalanceTriggersRecoveryTest extends
ClusterPerTestIntegrationTe
stopNode(3);
cluster.doInSession(0, session -> {
- session.execute(null, "ALTER ZONE TEST_ZONE SET REPLICAS=2,
DATA_NODES_FILTER='$[?(@.zone == \"global\")]'");
+ session.execute(null, "ALTER ZONE " + ZONE_NAME + " SET
REPLICAS=2, DATA_NODES_FILTER='$[?(@.zone == \"global\")]'");
});
// Check that new replica from 'global' zone received the data and
rebalance really happened.
@@ -211,48 +212,54 @@ public class ItRebalanceTriggersRecoveryTest extends
ClusterPerTestIntegrationTe
(() ->
getPartitionPendingClusterNodes(unwrapIgniteImpl(node(0)),
PARTITION_ID).equals(Set.of())),
10_000));
- TablePartitionId tablePartitionId =
- new TablePartitionId(
- getTableId(unwrapIgniteImpl(node(0)).catalogManager(),
- "TEST",
- new HybridClockImpl().nowLong()),
- PARTITION_ID
- );
- long pendingsKeysRevisionBeforeRecovery =
unwrapIgniteImpl(node(0)).metaStorageManager()
- .get(pendingPartAssignmentsQueueKey(tablePartitionId))
- .get(10, TimeUnit.SECONDS).revision();
-
+ long pendingKeysRevisionBeforeRecovery =
pendingPartAssignmentsQueueKeyRevision(TABLE_NAME, PARTITION_ID);
startNode(3, GLOBAL_NODE_BOOTSTRAP_CFG_TEMPLATE);
- long pendingsKeysRevisionAfterRecovery =
unwrapIgniteImpl(node(0)).metaStorageManager()
- .get(pendingPartAssignmentsQueueKey(tablePartitionId))
- .get(10, TimeUnit.SECONDS).revision();
+ long pendingKeysRevisionAfterRecovery =
pendingPartAssignmentsQueueKeyRevision(TABLE_NAME, PARTITION_ID);
// Check that recovered node doesn't produce new rebalances for
already processed triggers.
- assertEquals(pendingsKeysRevisionBeforeRecovery,
pendingsKeysRevisionAfterRecovery);
+ assertEquals(pendingKeysRevisionBeforeRecovery,
pendingKeysRevisionAfterRecovery);
}
private static Set<Assignment> getPartitionPendingClusterNodes(IgniteImpl
node, int partNum) {
- return Optional.ofNullable(getTableId(node.catalogManager(), "TEST",
new HybridClockImpl().nowLong()))
- .map(tableId ->
partitionPendingAssignments(node.metaStorageManager(), tableId, partNum).join())
- .orElse(Set.of());
+ CompletableFuture<Set<Assignment>> pendingAssignmentsFuture =
pendingPartitionAssignments(
+ node.metaStorageManager(),
+
unwrapTableViewInternal(node.distributedTableManager().table(TABLE_NAME)),
+ partNum);
+
+ return
Optional.ofNullable(pendingAssignmentsFuture.join()).orElse(Set.of());
}
- private static CompletableFuture<Set<Assignment>>
partitionPendingAssignments(
- MetaStorageManager metaStorageManager,
- int tableId,
- int partitionNumber
- ) {
+ private long pendingPartAssignmentsQueueKeyRevision(String tableName, int
partitionId) throws Exception {
+ MetaStorageManager metaStorageManager =
unwrapIgniteImpl(node(0)).metaStorageManager();
+
+ TableViewInternal table =
unwrapTableViewInternal(unwrapIgniteImpl(node(0)).distributedTableManager().table(tableName));
+
+ PartitionGroupId partitionGroupId = partitionReplicationGroupId(table,
partitionId);
+
return metaStorageManager
- .get(pendingPartAssignmentsQueueKey(new
TablePartitionId(tableId, partitionNumber)))
- .thenApply(e -> (e.value() == null) ? null :
AssignmentsQueue.fromBytes(e.value()).poll().nodes());
+ .get(pendingPartitionAssignmentsKey(partitionGroupId))
+ .get(10, TimeUnit.SECONDS)
+ .revision();
+ }
+
+ private void removePendingPartAssignmentsQueueKey(String tableName, int
partitionId) {
+ IgniteImpl node = unwrapIgniteImpl(cluster.node(0));
+
+ MetaStorageManager metaStorageManager = node.metaStorageManager();
+
+ TableViewInternal table =
unwrapTableViewInternal(node.distributedTableManager().table(tableName));
+
+ ByteArray pendingPartAssignmentsQueueKey =
pendingPartitionAssignmentsKey(partitionReplicationGroupId(table, partitionId));
+
+ metaStorageManager.remove(pendingPartAssignmentsQueueKey).join();
}
private static boolean containsPartition(Ignite node) {
TableManager tableManager = unwrapTableManager(node.tables());
- MvPartitionStorage storage =
tableManager.tableView(QualifiedName.fromSimple("TEST"))
+ MvPartitionStorage storage =
tableManager.tableView(QualifiedName.fromSimple(TABLE_NAME))
.internalTable()
.storage()
.getMvPartition(PARTITION_ID);
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/table/ItEstimatedSizeTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/table/ItEstimatedSizeTest.java
index d37b65864ee..0fc5ed4a858 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/table/ItEstimatedSizeTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/table/ItEstimatedSizeTest.java
@@ -25,7 +25,8 @@ import static
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_TEST_P
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES;
+import static
org.apache.ignite.internal.lang.IgniteSystemProperties.enabledColocation;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -38,6 +39,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -179,7 +181,9 @@ public class ItEstimatedSizeTest extends
ClusterPerTestIntegrationTest {
private Set<String> stableAssignmentNodes() {
MetaStorageManager metaStorageManager =
unwrapIgniteImpl(cluster.aliveNode()).metaStorageManager();
- var stableAssignmentsPrefix = new ByteArray(STABLE_ASSIGNMENTS_PREFIX);
+ var stableAssignmentsPrefix = enabledColocation()
+ ? new
ByteArray(ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES)
+ : new ByteArray(STABLE_ASSIGNMENTS_PREFIX_BYTES);
CompletableFuture<List<Entry>> entriesFuture =
subscribeToList(metaStorageManager.prefix(stableAssignmentsPrefix));
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index 349d17f68d8..21d0a751052 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -77,10 +77,39 @@ import org.jetbrains.annotations.Nullable;
* after switching to zone-based replication.
*/
public class RebalanceUtil {
-
/** Logger. */
private static final IgniteLogger LOG =
Loggers.forClass(RebalanceUtil.class);
+ /** Key prefix for planned assignments. */
+ public static final String PLANNED_ASSIGNMENTS_PREFIX =
"assignments.planned.";
+
+ /** Key prefix for pending assignments. */
+ public static final String PENDING_ASSIGNMENTS_QUEUE_PREFIX =
"assignments.pending.";
+
+ public static final byte[] PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES =
"assignments.pending.".getBytes(UTF_8);
+
+ /** Key prefix for stable assignments. */
+ public static final String STABLE_ASSIGNMENTS_PREFIX =
"assignments.stable.";
+
+ public static final byte[] STABLE_ASSIGNMENTS_PREFIX_BYTES =
STABLE_ASSIGNMENTS_PREFIX.getBytes(UTF_8);
+
+ /** Key prefix for switch reduce assignments. */
+ public static final String ASSIGNMENTS_SWITCH_REDUCE_PREFIX =
"assignments.switch.reduce.";
+
+ public static final byte[] ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES =
ASSIGNMENTS_SWITCH_REDUCE_PREFIX.getBytes(UTF_8);
+
+ /** Key prefix for switch append assignments. */
+ public static final String ASSIGNMENTS_SWITCH_APPEND_PREFIX =
"assignments.switch.append.";
+
+ /** Key prefix for change trigger keys. */
+ public static final String PENDING_CHANGE_TRIGGER_PREFIX =
"pending.change.trigger.";
+
+ static final byte[] PENDING_CHANGE_TRIGGER_PREFIX_BYTES =
PENDING_CHANGE_TRIGGER_PREFIX.getBytes(UTF_8);
+
+ private static final String ASSIGNMENTS_CHAIN_PREFIX =
"assignments.chain.";
+
+ public static final byte[] ASSIGNMENTS_CHAIN_PREFIX_BYTES =
ASSIGNMENTS_CHAIN_PREFIX.getBytes(UTF_8);
+
/**
* Status values for methods like {@link #updatePendingAssignmentsKeys}.
*/
@@ -443,36 +472,6 @@ public class RebalanceUtil {
return allOf(futures);
}
- /** Key prefix for planned assignments. */
- public static final String PLANNED_ASSIGNMENTS_PREFIX =
"assignments.planned.";
-
- /** Key prefix for pending assignments. */
- public static final String PENDING_ASSIGNMENTS_QUEUE_PREFIX =
"assignments.pending.";
-
- public static final byte[] PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES =
"assignments.pending.".getBytes(UTF_8);
-
- /** Key prefix for stable assignments. */
- public static final String STABLE_ASSIGNMENTS_PREFIX =
"assignments.stable.";
-
- public static final byte[] STABLE_ASSIGNMENTS_PREFIX_BYTES =
STABLE_ASSIGNMENTS_PREFIX.getBytes(UTF_8);
-
- /** Key prefix for switch reduce assignments. */
- public static final String ASSIGNMENTS_SWITCH_REDUCE_PREFIX =
"assignments.switch.reduce.";
-
- public static final byte[] ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES =
ASSIGNMENTS_SWITCH_REDUCE_PREFIX.getBytes(UTF_8);
-
- /** Key prefix for switch append assignments. */
- public static final String ASSIGNMENTS_SWITCH_APPEND_PREFIX =
"assignments.switch.append.";
-
- /** Key prefix for change trigger keys. */
- public static final String PENDING_CHANGE_TRIGGER_PREFIX =
"pending.change.trigger.";
-
- static final byte[] PENDING_CHANGE_TRIGGER_PREFIX_BYTES =
PENDING_CHANGE_TRIGGER_PREFIX.getBytes(UTF_8);
-
- private static final String ASSIGNMENTS_CHAIN_PREFIX =
"assignments.chain.";
-
- public static final byte[] ASSIGNMENTS_CHAIN_PREFIX_BYTES =
ASSIGNMENTS_CHAIN_PREFIX.getBytes(UTF_8);
-
/**
* Key that is needed for skipping stale events of pending key change.
*
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
index 69f7516a44f..6ada9712bc3 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
@@ -77,6 +77,32 @@ public class ZoneRebalanceUtil {
/** Logger. */
private static final IgniteLogger LOG =
Loggers.forClass(ZoneRebalanceUtil.class);
+ /** Key prefix for pending assignments. */
+ public static final String PENDING_ASSIGNMENTS_QUEUE_PREFIX =
"zone.assignments.pending.";
+
+ public static final byte[] PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES =
PENDING_ASSIGNMENTS_QUEUE_PREFIX.getBytes(UTF_8);
+
+ /** Key prefix for stable assignments. */
+ public static final String STABLE_ASSIGNMENTS_PREFIX =
"zone.assignments.stable.";
+
+ public static final byte[] STABLE_ASSIGNMENTS_PREFIX_BYTES =
STABLE_ASSIGNMENTS_PREFIX.getBytes(UTF_8);
+
+ /** Key prefix for planned assignments. */
+ public static final String PLANNED_ASSIGNMENTS_PREFIX =
"zone.assignments.planned.";
+
+ /** Key prefix for switch reduce assignments. */
+ public static final String ASSIGNMENTS_SWITCH_REDUCE_PREFIX =
"zone.assignments.switch.reduce.";
+
+ public static final byte[] ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES =
ASSIGNMENTS_SWITCH_REDUCE_PREFIX.getBytes(UTF_8);
+
+ /** Key prefix for switch append assignments. */
+ public static final String ASSIGNMENTS_SWITCH_APPEND_PREFIX =
"zone.assignments.switch.append.";
+
+ /** Key prefix for change trigger keys. */
+ private static final String ZONE_PENDING_CHANGE_TRIGGER_PREFIX =
"zone.pending.change.trigger.";
+
+ static final byte[] ZONE_PENDING_CHANGE_TRIGGER_PREFIX_BYTES =
ZONE_PENDING_CHANGE_TRIGGER_PREFIX.getBytes(UTF_8);
+
/**
* Status values for methods like {@link #updatePendingAssignmentsKeys}.
*/
@@ -419,32 +445,6 @@ public class ZoneRebalanceUtil {
return zoneDescriptor.id() + "/" + zoneDescriptor.name();
}
- /** Key prefix for pending assignments. */
- public static final String PENDING_ASSIGNMENTS_QUEUE_PREFIX =
"zone.assignments.pending.";
-
- public static final byte[] PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES =
PENDING_ASSIGNMENTS_QUEUE_PREFIX.getBytes(UTF_8);
-
- /** Key prefix for stable assignments. */
- public static final String STABLE_ASSIGNMENTS_PREFIX =
"zone.assignments.stable.";
-
- public static final byte[] STABLE_ASSIGNMENTS_PREFIX_BYTES =
STABLE_ASSIGNMENTS_PREFIX.getBytes(UTF_8);
-
- /** Key prefix for planned assignments. */
- public static final String PLANNED_ASSIGNMENTS_PREFIX =
"zone.assignments.planned.";
-
- /** Key prefix for switch reduce assignments. */
- public static final String ASSIGNMENTS_SWITCH_REDUCE_PREFIX =
"zone.assignments.switch.reduce.";
-
- public static final byte[] ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES =
ASSIGNMENTS_SWITCH_REDUCE_PREFIX.getBytes(UTF_8);
-
- /** Key prefix for switch append assignments. */
- public static final String ASSIGNMENTS_SWITCH_APPEND_PREFIX =
"zone.assignments.switch.append.";
-
- /** Key prefix for change trigger keys. */
- private static final String ZONE_PENDING_CHANGE_TRIGGER_PREFIX =
"zone.pending.change.trigger.";
-
- static final byte[] ZONE_PENDING_CHANGE_TRIGGER_PREFIX_BYTES =
ZONE_PENDING_CHANGE_TRIGGER_PREFIX.getBytes(UTF_8);
-
/**
* Key that is needed for skipping stale events of pending key change.
*
@@ -562,7 +562,7 @@ public class ZoneRebalanceUtil {
}
/**
- * Returns partition assignments from meta storage locally.
+ * Returns stable partition assignments from meta storage locally.
*
* @param metaStorageManager Meta storage manager.
* @param zoneId Zone id.
@@ -583,7 +583,7 @@ public class ZoneRebalanceUtil {
}
/**
- * Returns zone assignments for all zone partitions from meta storage
locally. Assignments must be present.
+ * Returns zone stable assignments for all zone partitions from meta
storage locally. Assignments must be present.
*
* @param metaStorageManager Meta storage manager.
* @param zoneId Zone id.
@@ -608,6 +608,24 @@ public class ZoneRebalanceUtil {
.collect(toList());
}
+ /**
+ * Returns stable partition assignments from meta storage.
+ *
+ * @param metaStorageManager Meta storage manager.
+ * @param zoneId Table ID.
+ * @param partitionId Partition ID.
+ * @return Future with partition assignments as a value.
+ */
+ public static CompletableFuture<Set<Assignment>> zonePartitionAssignments(
+ MetaStorageManager metaStorageManager,
+ int zoneId,
+ int partitionId
+ ) {
+ return metaStorageManager
+ .get(stablePartAssignmentsKey(new ZonePartitionId(zoneId,
partitionId)))
+ .thenApply(e -> (e.value() == null) ? null :
Assignments.fromBytes(e.value()).nodes());
+ }
+
/**
* Returns zone assignments for zone partitions from meta storage.
*
diff --git
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsQueue.java
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsQueue.java
index 46403d9abaf..f097d46c324 100644
---
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsQueue.java
+++
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsQueue.java
@@ -38,7 +38,6 @@ import org.jetbrains.annotations.Nullable;
* </ul>
*/
public class AssignmentsQueue implements Iterable<Assignments> {
-
@IgniteToStringInclude
private final Deque<Assignments> queue;
@@ -57,7 +56,7 @@ public class AssignmentsQueue implements
Iterable<Assignments> {
*
* @return the head of this queue
*/
- public @Nullable Assignments poll() {
+ public Assignments poll() {
assert !queue.isEmpty() : "Assignments queue must contain at least one
element.";
return queue.poll();
}
@@ -67,7 +66,7 @@ public class AssignmentsQueue implements
Iterable<Assignments> {
*
* @return the tail of this queue
*/
- public @Nullable Assignments peekLast() {
+ public Assignments peekLast() {
assert !queue.isEmpty() : "Assignments queue must contain at least one
element.";
return queue.peekLast();
}