This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f648c847a4 IGNITE-19713 Start partition storages only for locally
assigned partitions (#4649)
f648c847a4 is described below
commit f648c847a40501136c40ecb36682cd73a0566c55
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Oct 30 18:21:46 2024 +0400
IGNITE-19713 Start partition storages only for locally assigned partitions
(#4649)
---
.../ignite/internal/replicator/ReplicaManager.java | 3 +-
.../ignite/internal/table/ItTableScanTest.java | 34 ++++++----
.../distributed/ItPartitionStoragesTest.java | 77 ++++++++++++++++++++++
.../internal/table/ItTxResourcesVacuumTest.java | 21 +++---
.../internal/table/distributed/PartitionSet.java | 7 +-
.../internal/table/distributed/TableManager.java | 40 ++++++-----
.../distributed/TableManagerRecoveryTest.java | 5 +-
.../table/distributed/TableManagerTest.java | 2 +
8 files changed, 141 insertions(+), 48 deletions(-)
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 5e740defc5..666f181f15 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -1380,7 +1380,6 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
return stopReplica(groupId, context, stopOperation);
} else if (state == ReplicaState.STOPPED) {
// We need to stop replica and destroy storages
anyway, because they can be already created.
- // See TODO-s for IGNITE-19713
return stopReplica(groupId, context, stopOperation);
} // else: no-op.
} else if (reason == WeakReplicaStopReason.RESTART) {
@@ -1577,7 +1576,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
* <li>if {@link #PRIMARY_ONLY} or {@link #STOPPING}: no-op.</li>
* <li>if {@link #RESTART_PLANNED} no-op, because replica will be
stopped within deferred operation;</li>
* <li>if {@link #STARTING}: replica is stopped, the next state is
{@link #STOPPING};</li>
- * <li>if {@link #STOPPED}: replica is stopped, see TODO-s for
IGNITE-19713.</li>
+ * <li>if {@link #STOPPED}: replica is stopped.</li>
* </ul>
* <li>if {@link WeakReplicaStopReason#PRIMARY_EXPIRED}:</li>
* <ul>
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index 45e618bb6d..7aec095b77 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -30,6 +30,8 @@ import static
org.apache.ignite.internal.wrapper.Wrappers.unwrapNullable;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -157,23 +159,27 @@ public class ItTableScanTest extends
BaseSqlIntegrationTest {
tableViewInternal.internalTable().storage().getMvPartition(PART_ID),
TestMvPartitionStorage.class
);
- TestSortedIndexStorage sortedIdxStorage = unwrapNullable(
- tableViewInternal.internalTable().storage().getIndex(PART_ID,
sortedIdxId),
- TestSortedIndexStorage.class
- );
- try {
- assertTrue(
- waitForCondition(() -> partitionStorage.pendingCursors()
== 0, AWAIT_TIMEOUT_MILLIS),
- "Alive versioned storage cursors: " +
partitionStorage.pendingCursors()
+ if (partitionStorage != null) {
+ TestSortedIndexStorage sortedIdxStorage = unwrapNullable(
+
tableViewInternal.internalTable().storage().getIndex(PART_ID, sortedIdxId),
+ TestSortedIndexStorage.class
);
+ assertThat(sortedIdxStorage, is(notNullValue()));
- assertTrue(
- waitForCondition(() -> sortedIdxStorage.pendingCursors()
== 0, AWAIT_TIMEOUT_MILLIS),
- "Alive index storage cursors: " +
sortedIdxStorage.pendingCursors()
- );
- } catch (InterruptedException e) {
- fail("Waiting cursors close was interrupted.");
+ try {
+ assertTrue(
+ waitForCondition(() ->
partitionStorage.pendingCursors() == 0, AWAIT_TIMEOUT_MILLIS),
+ "Alive versioned storage cursors: " +
partitionStorage.pendingCursors()
+ );
+
+ assertTrue(
+ waitForCondition(() ->
sortedIdxStorage.pendingCursors() == 0, AWAIT_TIMEOUT_MILLIS),
+ "Alive index storage cursors: " +
sortedIdxStorage.pendingCursors()
+ );
+ } catch (InterruptedException e) {
+ fail("Waiting cursors close was interrupted.");
+ }
}
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItPartitionStoragesTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItPartitionStoragesTest.java
new file mode 100644
index 0000000000..d72492098e
--- /dev/null
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItPartitionStoragesTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
+import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.executeUpdate;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Table;
+import org.junit.jupiter.api.Test;
+
+class ItPartitionStoragesTest extends ClusterPerTestIntegrationTest {
+ private static final String TABLE_NAME = "TEST_TABLE";
+
+ @Override
+ protected int initialNodes() {
+ return 2;
+ }
+
+ @Test
+ void onlyCreatesPartitionStoragesForPartitionsAssignedToNode() {
+ createTableWith1ReplicaAnd1Partition();
+
+ putToTable();
+
+ int partitionStoragesCount = cluster.runningNodes()
+ .map(node -> unwrapTableImpl(table(node)).internalTable())
+ .mapToInt(internalTable ->
internalTable.storage().getMvPartition(0) != null ? 1 : 0)
+ .sum();
+
+ assertThat(partitionStoragesCount, is(1));
+ }
+
+ private void createTableWith1ReplicaAnd1Partition() {
+ cluster.doInSession(0, session -> {
+ executeUpdate(
+ "create zone test_zone with partitions=1, replicas=1,
storage_profiles='" + DEFAULT_STORAGE_PROFILE + "';",
+ session
+ );
+ executeUpdate("create table " + TABLE_NAME + " (key int primary
key, val varchar(20))"
+ + " with primary_zone='TEST_ZONE', storage_profile='" +
DEFAULT_STORAGE_PROFILE + "';", session);
+ });
+ }
+
+ private void putToTable() {
+ KeyValueView<Integer, String> keyValueView =
table().keyValueView(Integer.class, String.class);
+ keyValueView.put(null, 1, "one");
+ }
+
+ private Table table() {
+ return table(cluster.aliveNode());
+ }
+
+ private static Table table(Ignite node) {
+ return node.tables().table(TABLE_NAME);
+ }
+}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
index 66355d77d9..f0fe597d05 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
@@ -252,7 +252,7 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
Transaction roTxAfter = beginReadOnlyTx(anyNode());
- waitForTxStateReplication(nodes, txId, partId, 10_000);
+ waitForTxStateReplication(nodes, txId, TABLE_NAME, partId, 10_000);
// Check that both volatile and persistent state is vacuumized..
waitForTxStateVacuum(txId, partId, true, 10_000);
@@ -412,7 +412,7 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
CompletableFuture<Void> commitFut = tx.commitAsync();
- waitForTxStateReplication(commitPartNodes, txId, commitPartId, 10_000);
+ waitForTxStateReplication(commitPartNodes, txId, TABLE_NAME,
commitPartId, 10_000);
assertThat(cleanupStarted, willCompleteSuccessfully());
@@ -605,7 +605,7 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
CompletableFuture<Void> commitFut = tx.commitAsync();
- waitForTxStateReplication(commitPartNodes, txId, commitPartId, 10_000);
+ waitForTxStateReplication(commitPartNodes, txId, TABLE_NAME,
commitPartId, 10_000);
log.info("Test: state replicated.");
@@ -834,11 +834,11 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
.allMatch(n -> volatileTxState(n, txId) != null);
}
- private boolean checkPersistentTxStateOnNodes(Set<String>
nodeConsistentIds, UUID txId, int partId) {
+ private boolean checkPersistentTxStateOnNodes(Set<String>
nodeConsistentIds, UUID txId, String tableName, int partId) {
return cluster.runningNodes()
.map(TestWrappers::unwrapIgniteImpl)
.filter(n -> nodeConsistentIds.contains(n.name()))
- .allMatch(n -> persistentTxState(n, txId, partId) != null);
+ .allMatch(n -> persistentTxState(n, txId, tableName, partId)
!= null);
}
/**
@@ -849,9 +849,9 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
* @param partId Commit partition id.
* @param timeMs Time to wait.
*/
- private void waitForTxStateReplication(Set<String> nodeConsistentIds, UUID
txId, int partId, long timeMs)
+ private void waitForTxStateReplication(Set<String> nodeConsistentIds, UUID
txId, String tableName, int partId, long timeMs)
throws InterruptedException {
- assertTrue(waitForCondition(() ->
checkPersistentTxStateOnNodes(nodeConsistentIds, txId, partId), timeMs));
+ assertTrue(waitForCondition(() ->
checkPersistentTxStateOnNodes(nodeConsistentIds, txId, tableName, partId),
timeMs));
}
/**
@@ -1023,7 +1023,7 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
result = result
&& volatileTxState(node, txId) == null
- && (!checkPersistent || !node.id().equals(cpPrimaryId) ||
persistentTxState(node, txId, partId) == null);
+ && (!checkPersistent || !node.id().equals(cpPrimaryId) ||
persistentTxState(node, txId, tableName, partId) == null);
}
return result;
@@ -1052,11 +1052,6 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
return txManager.stateMeta(txId);
}
- @Nullable
- private TransactionMeta persistentTxState(IgniteImpl node, UUID txId, int
partId) {
- return persistentTxState(node, txId, TABLE_NAME, partId);
- }
-
@Nullable
private TransactionMeta persistentTxState(IgniteImpl node, UUID txId,
String tableName, int partId) {
return runInExecutor(txStateStorageExecutor, () -> {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionSet.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionSet.java
index 186d3ec2d1..0c5089cf53 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionSet.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionSet.java
@@ -48,7 +48,7 @@ public interface PartitionSet {
@Override
public PartitionSet copy() {
- return this;
+ return new BitSetPartitionSet();
}
@Override
@@ -65,6 +65,11 @@ public interface PartitionSet {
public boolean equals(Object obj) {
return isEqual(obj);
}
+
+ @Override
+ public String toString() {
+ return "PartitionSet(empty)";
+ }
};
/**
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 2ea68705b8..82a7e27858 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -286,7 +286,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
/**
* Versioned value for tracking RAFT groups initialization and starting
completion.
*
- * <p>Only explicitly updated in {@link
#startLocalPartitionsAndClients(CompletableFuture, TableImpl, int, boolean,
HybridTimestamp)}.
+ * <p>Only explicitly updated in {@link
#startLocalPartitionsAndClients(CompletableFuture, TableImpl, int, boolean,
long)}.
*
* <p>Completed strictly after {@link #localPartitionsVv}.
*/
@@ -764,7 +764,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
}
}
- return getOrCreatePartitionStorages(table,
parts).thenAccept(u -> localPartsByTableId.put(tableId, parts));
+ return getOrCreatePartitionStorages(table,
parts).thenRun(() -> localPartsByTableId.put(tableId, parts));
}, ioExecutor).thenCompose(identity())));
CompletableFuture<?> tablesByIdFuture = tablesVv.get(causalityToken);
@@ -1639,13 +1639,14 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
(ignore, throwable) -> inBusyLock(busyLock, () ->
assignmentsFuture.thenComposeAsync(newAssignments -> {
PartitionSet parts = new BitSetPartitionSet();
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-19713 Process assignments and set
partitions only for
- // TODO assigned partitions.
for (int i = 0; i < newAssignments.size(); i++) {
- parts.set(i);
+ Assignments partitionAssignments =
newAssignments.get(i);
+ if (localMemberAssignment(partitionAssignments) !=
null) {
+ parts.set(i);
+ }
}
- return getOrCreatePartitionStorages(table,
parts).thenAccept(u -> localPartsByTableId.put(tableId, parts));
+ return getOrCreatePartitionStorages(table,
parts).thenRun(() -> localPartsByTableId.put(tableId, parts));
}, ioExecutor)));
CompletableFuture<?> tablesByIdFuture = tablesVv.get(causalityToken);
@@ -2189,19 +2190,20 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
localServicesStartFuture = localPartitionsVv.get(revision)
// TODO https://issues.apache.org/jira/browse/IGNITE-20957
Revisit this code
.thenComposeAsync(
- partitionSet -> inBusyLock(busyLock, () ->
getOrCreatePartitionStorages(tbl, singlePartitionIdSet)),
+ unused -> inBusyLock(
+ busyLock,
+ () -> getOrCreatePartitionStorages(tbl,
singlePartitionIdSet)
+ .thenRun(() ->
localPartsByTableId.compute(
+ replicaGrpId.tableId(),
+ (tableId, oldPartitionSet)
-> extendPartitionSet(oldPartitionSet, partitionId)
+ ))
+ ),
ioExecutor
)
.thenComposeAsync(unused -> inBusyLock(busyLock, () -> {
- if (!isRecovery) {
- // We create index storages (and also register the
necessary structures) for the rebalancing one partition
- // before start the raft node, so that the updates
that come when applying the replication log can safely
- // update the indexes. On recovery node, we do not
need to call this code, since during restoration we start
- // all partitions and already register indexes
there.
- lowWatermark.getLowWatermarkSafe(lwm ->
- registerIndexesToTable(tbl,
catalogService, singlePartitionIdSet, tbl.schemaView(), lwm)
- );
- }
+ lowWatermark.getLowWatermarkSafe(lwm ->
+ registerIndexesToTable(tbl, catalogService,
singlePartitionIdSet, tbl.schemaView(), lwm)
+ );
return
waitForMetadataCompleteness(assignmentsTimestamp).thenCompose(ignored ->
inBusyLock(busyLock, () -> {
int catalogVersion =
catalogService.activeCatalogVersion(assignmentsTimestamp);
@@ -2258,6 +2260,12 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}), ioExecutor);
}
+ private static PartitionSet extendPartitionSet(@Nullable PartitionSet
oldPartitionSet, int partitionId) {
+ PartitionSet newPartitionSet =
Objects.requireNonNullElseGet(oldPartitionSet, BitSetPartitionSet::new);
+ newPartitionSet.set(partitionId);
+ return newPartitionSet;
+ }
+
private boolean isNodeInReducedStableOrPendingAssignments(
TablePartitionId replicaGrpId,
@Nullable Assignments stableAssignments,
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index f9fb352b9f..4440701e10 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table.distributed;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static
org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager;
@@ -26,7 +27,6 @@ import static
org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
-import static
org.apache.ignite.internal.util.CompletableFutures.emptySetCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
@@ -55,6 +55,7 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
@@ -288,7 +289,7 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
when(clusterService.topologyService()).thenReturn(topologyService);
when(topologyService.localMember()).thenReturn(node);
- when(distributionZoneManager.dataNodes(anyLong(), anyInt(),
anyInt())).thenReturn(emptySetCompletedFuture());
+ when(distributionZoneManager.dataNodes(anyLong(), anyInt(),
anyInt())).thenReturn(completedFuture(Set.of(NODE_NAME)));
when(replicaMgr.startReplica(any(), any(), anyBoolean(), any(), any(),
any(), any(), any()))
.thenReturn(nullCompletedFuture());
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index d147cb185b..1e5bf4ee0d 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -661,6 +661,8 @@ public class TableManagerTest extends IgniteAbstractTest {
private void testStoragesGetClearedInMiddleOfFailedRebalance(boolean
isTxStorageUnderRebalance) throws NodeStoppingException {
when(rm.startRaftGroupService(any(), any(), any(), any()))
.thenAnswer(mock -> mock(TopologyAwareRaftGroupService.class));
+ when(distributionZoneManager.dataNodes(anyLong(), anyInt(), anyInt()))
+ .thenReturn(completedFuture(Set.of(NODE_NAME)));
createZone(1, 1);