This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f648c847a4 IGNITE-19713 Start partition storages only for locally 
assigned partitions (#4649)
f648c847a4 is described below

commit f648c847a40501136c40ecb36682cd73a0566c55
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Oct 30 18:21:46 2024 +0400

    IGNITE-19713 Start partition storages only for locally assigned partitions 
(#4649)
---
 .../ignite/internal/replicator/ReplicaManager.java |  3 +-
 .../ignite/internal/table/ItTableScanTest.java     | 34 ++++++----
 .../distributed/ItPartitionStoragesTest.java       | 77 ++++++++++++++++++++++
 .../internal/table/ItTxResourcesVacuumTest.java    | 21 +++---
 .../internal/table/distributed/PartitionSet.java   |  7 +-
 .../internal/table/distributed/TableManager.java   | 40 ++++++-----
 .../distributed/TableManagerRecoveryTest.java      |  5 +-
 .../table/distributed/TableManagerTest.java        |  2 +
 8 files changed, 141 insertions(+), 48 deletions(-)

diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 5e740defc5..666f181f15 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -1380,7 +1380,6 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                         return stopReplica(groupId, context, stopOperation);
                     } else if (state == ReplicaState.STOPPED) {
                         // We need to stop replica and destroy storages 
anyway, because they can be already created.
-                        // See TODO-s for IGNITE-19713
                         return stopReplica(groupId, context, stopOperation);
                     } // else: no-op.
                 } else if (reason == WeakReplicaStopReason.RESTART) {
@@ -1577,7 +1576,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
      *         <li>if {@link #PRIMARY_ONLY} or {@link #STOPPING}: no-op.</li>
      *         <li>if {@link #RESTART_PLANNED} no-op, because replica will be 
stopped within deferred operation;</li>
      *         <li>if {@link #STARTING}: replica is stopped, the next state is 
{@link #STOPPING};</li>
-     *         <li>if {@link #STOPPED}: replica is stopped, see TODO-s for 
IGNITE-19713.</li>
+     *         <li>if {@link #STOPPED}: replica is stopped.</li>
      *     </ul>
      *     <li>if {@link WeakReplicaStopReason#PRIMARY_EXPIRED}:</li>
      *     <ul>
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index 45e618bb6d..7aec095b77 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -30,6 +30,8 @@ import static 
org.apache.ignite.internal.wrapper.Wrappers.unwrapNullable;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -157,23 +159,27 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
                 
tableViewInternal.internalTable().storage().getMvPartition(PART_ID),
                 TestMvPartitionStorage.class
         );
-        TestSortedIndexStorage sortedIdxStorage = unwrapNullable(
-                tableViewInternal.internalTable().storage().getIndex(PART_ID, 
sortedIdxId),
-                TestSortedIndexStorage.class
-        );
 
-        try {
-            assertTrue(
-                    waitForCondition(() -> partitionStorage.pendingCursors() 
== 0, AWAIT_TIMEOUT_MILLIS),
-                    "Alive versioned storage cursors: " + 
partitionStorage.pendingCursors()
+        if (partitionStorage != null) {
+            TestSortedIndexStorage sortedIdxStorage = unwrapNullable(
+                    
tableViewInternal.internalTable().storage().getIndex(PART_ID, sortedIdxId),
+                    TestSortedIndexStorage.class
             );
+            assertThat(sortedIdxStorage, is(notNullValue()));
 
-            assertTrue(
-                    waitForCondition(() -> sortedIdxStorage.pendingCursors() 
== 0, AWAIT_TIMEOUT_MILLIS),
-                    "Alive index storage cursors: " + 
sortedIdxStorage.pendingCursors()
-            );
-        } catch (InterruptedException e) {
-            fail("Waiting cursors close was interrupted.");
+            try {
+                assertTrue(
+                        waitForCondition(() -> 
partitionStorage.pendingCursors() == 0, AWAIT_TIMEOUT_MILLIS),
+                        "Alive versioned storage cursors: " + 
partitionStorage.pendingCursors()
+                );
+
+                assertTrue(
+                        waitForCondition(() -> 
sortedIdxStorage.pendingCursors() == 0, AWAIT_TIMEOUT_MILLIS),
+                        "Alive index storage cursors: " + 
sortedIdxStorage.pendingCursors()
+                );
+            } catch (InterruptedException e) {
+                fail("Waiting cursors close was interrupted.");
+            }
         }
     }
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItPartitionStoragesTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItPartitionStoragesTest.java
new file mode 100644
index 0000000000..d72492098e
--- /dev/null
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItPartitionStoragesTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.executeUpdate;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Table;
+import org.junit.jupiter.api.Test;
+
+class ItPartitionStoragesTest extends ClusterPerTestIntegrationTest {
+    private static final String TABLE_NAME = "TEST_TABLE";
+
+    @Override
+    protected int initialNodes() {
+        return 2;
+    }
+
+    @Test
+    void onlyCreatesPartitionStoragesForPartitionsAssignedToNode() {
+        createTableWith1ReplicaAnd1Partition();
+
+        putToTable();
+
+        int partitionStoragesCount = cluster.runningNodes()
+                .map(node -> unwrapTableImpl(table(node)).internalTable())
+                .mapToInt(internalTable -> 
internalTable.storage().getMvPartition(0) != null ? 1 : 0)
+                .sum();
+
+        assertThat(partitionStoragesCount, is(1));
+    }
+
+    private void createTableWith1ReplicaAnd1Partition() {
+        cluster.doInSession(0, session -> {
+            executeUpdate(
+                    "create zone test_zone with partitions=1, replicas=1, 
storage_profiles='" + DEFAULT_STORAGE_PROFILE + "';",
+                    session
+            );
+            executeUpdate("create table " + TABLE_NAME + " (key int primary 
key, val varchar(20))"
+                    + " with primary_zone='TEST_ZONE', storage_profile='" + 
DEFAULT_STORAGE_PROFILE + "';", session);
+        });
+    }
+
+    private void putToTable() {
+        KeyValueView<Integer, String> keyValueView = 
table().keyValueView(Integer.class, String.class);
+        keyValueView.put(null, 1, "one");
+    }
+
+    private Table table() {
+        return table(cluster.aliveNode());
+    }
+
+    private static Table table(Ignite node) {
+        return node.tables().table(TABLE_NAME);
+    }
+}
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
index 66355d77d9..f0fe597d05 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
@@ -252,7 +252,7 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
 
         Transaction roTxAfter = beginReadOnlyTx(anyNode());
 
-        waitForTxStateReplication(nodes, txId, partId, 10_000);
+        waitForTxStateReplication(nodes, txId, TABLE_NAME, partId, 10_000);
 
         // Check that both volatile and persistent state is vacuumized..
         waitForTxStateVacuum(txId, partId, true, 10_000);
@@ -412,7 +412,7 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
 
         CompletableFuture<Void> commitFut = tx.commitAsync();
 
-        waitForTxStateReplication(commitPartNodes, txId, commitPartId, 10_000);
+        waitForTxStateReplication(commitPartNodes, txId, TABLE_NAME, 
commitPartId, 10_000);
 
         assertThat(cleanupStarted, willCompleteSuccessfully());
 
@@ -605,7 +605,7 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
 
         CompletableFuture<Void> commitFut = tx.commitAsync();
 
-        waitForTxStateReplication(commitPartNodes, txId, commitPartId, 10_000);
+        waitForTxStateReplication(commitPartNodes, txId, TABLE_NAME, 
commitPartId, 10_000);
 
         log.info("Test: state replicated.");
 
@@ -834,11 +834,11 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
                 .allMatch(n -> volatileTxState(n, txId) != null);
     }
 
-    private boolean checkPersistentTxStateOnNodes(Set<String> 
nodeConsistentIds, UUID txId, int partId) {
+    private boolean checkPersistentTxStateOnNodes(Set<String> 
nodeConsistentIds, UUID txId, String tableName, int partId) {
         return cluster.runningNodes()
                 .map(TestWrappers::unwrapIgniteImpl)
                 .filter(n -> nodeConsistentIds.contains(n.name()))
-                .allMatch(n -> persistentTxState(n, txId, partId) != null);
+                .allMatch(n -> persistentTxState(n, txId, tableName, partId) 
!= null);
     }
 
     /**
@@ -849,9 +849,9 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
      * @param partId Commit partition id.
      * @param timeMs Time to wait.
      */
-    private void waitForTxStateReplication(Set<String> nodeConsistentIds, UUID 
txId, int partId, long timeMs)
+    private void waitForTxStateReplication(Set<String> nodeConsistentIds, UUID 
txId, String tableName, int partId, long timeMs)
             throws InterruptedException {
-        assertTrue(waitForCondition(() -> 
checkPersistentTxStateOnNodes(nodeConsistentIds, txId, partId), timeMs));
+        assertTrue(waitForCondition(() -> 
checkPersistentTxStateOnNodes(nodeConsistentIds, txId, tableName, partId), 
timeMs));
     }
 
     /**
@@ -1023,7 +1023,7 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
 
             result = result
                     && volatileTxState(node, txId) == null
-                    && (!checkPersistent || !node.id().equals(cpPrimaryId) || 
persistentTxState(node, txId, partId) == null);
+                    && (!checkPersistent || !node.id().equals(cpPrimaryId) || 
persistentTxState(node, txId, tableName, partId) == null);
         }
 
         return result;
@@ -1052,11 +1052,6 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
         return txManager.stateMeta(txId);
     }
 
-    @Nullable
-    private TransactionMeta persistentTxState(IgniteImpl node, UUID txId, int 
partId) {
-        return persistentTxState(node, txId, TABLE_NAME, partId);
-    }
-
     @Nullable
     private TransactionMeta persistentTxState(IgniteImpl node, UUID txId, 
String tableName, int partId) {
         return runInExecutor(txStateStorageExecutor, () -> {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionSet.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionSet.java
index 186d3ec2d1..0c5089cf53 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionSet.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionSet.java
@@ -48,7 +48,7 @@ public interface PartitionSet {
 
         @Override
         public PartitionSet copy() {
-            return this;
+            return new BitSetPartitionSet();
         }
 
         @Override
@@ -65,6 +65,11 @@ public interface PartitionSet {
         public boolean equals(Object obj) {
             return isEqual(obj);
         }
+
+        @Override
+        public String toString() {
+            return "PartitionSet(empty)";
+        }
     };
 
     /**
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 2ea68705b8..82a7e27858 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -286,7 +286,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     /**
      * Versioned value for tracking RAFT groups initialization and starting 
completion.
      *
-     * <p>Only explicitly updated in {@link 
#startLocalPartitionsAndClients(CompletableFuture, TableImpl, int, boolean, 
HybridTimestamp)}.
+     * <p>Only explicitly updated in {@link 
#startLocalPartitionsAndClients(CompletableFuture, TableImpl, int, boolean, 
long)}.
      *
      * <p>Completed strictly after {@link #localPartitionsVv}.
      */
@@ -764,7 +764,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                         }
                     }
 
-                    return getOrCreatePartitionStorages(table, 
parts).thenAccept(u -> localPartsByTableId.put(tableId, parts));
+                    return getOrCreatePartitionStorages(table, 
parts).thenRun(() -> localPartsByTableId.put(tableId, parts));
                 }, ioExecutor).thenCompose(identity())));
 
         CompletableFuture<?> tablesByIdFuture = tablesVv.get(causalityToken);
@@ -1639,13 +1639,14 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 (ignore, throwable) -> inBusyLock(busyLock, () -> 
assignmentsFuture.thenComposeAsync(newAssignments -> {
                     PartitionSet parts = new BitSetPartitionSet();
 
-                    // TODO: 
https://issues.apache.org/jira/browse/IGNITE-19713 Process assignments and set 
partitions only for
-                    // TODO assigned partitions.
                     for (int i = 0; i < newAssignments.size(); i++) {
-                        parts.set(i);
+                        Assignments partitionAssignments = 
newAssignments.get(i);
+                        if (localMemberAssignment(partitionAssignments) != 
null) {
+                            parts.set(i);
+                        }
                     }
 
-                    return getOrCreatePartitionStorages(table, 
parts).thenAccept(u -> localPartsByTableId.put(tableId, parts));
+                    return getOrCreatePartitionStorages(table, 
parts).thenRun(() -> localPartsByTableId.put(tableId, parts));
                 }, ioExecutor)));
 
         CompletableFuture<?> tablesByIdFuture = tablesVv.get(causalityToken);
@@ -2189,19 +2190,20 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             localServicesStartFuture = localPartitionsVv.get(revision)
                     // TODO https://issues.apache.org/jira/browse/IGNITE-20957 
Revisit this code
                     .thenComposeAsync(
-                            partitionSet -> inBusyLock(busyLock, () -> 
getOrCreatePartitionStorages(tbl, singlePartitionIdSet)),
+                            unused -> inBusyLock(
+                                    busyLock,
+                                    () -> getOrCreatePartitionStorages(tbl, 
singlePartitionIdSet)
+                                            .thenRun(() -> 
localPartsByTableId.compute(
+                                                    replicaGrpId.tableId(),
+                                                    (tableId, oldPartitionSet) 
-> extendPartitionSet(oldPartitionSet, partitionId)
+                                            ))
+                            ),
                             ioExecutor
                     )
                     .thenComposeAsync(unused -> inBusyLock(busyLock, () -> {
-                        if (!isRecovery) {
-                            // We create index storages (and also register the 
necessary structures) for the rebalancing one partition
-                            // before start the raft node, so that the updates 
that come when applying the replication log can safely
-                            // update the indexes. On recovery node, we do not 
need to call this code, since during restoration we start
-                            // all partitions and already register indexes 
there.
-                            lowWatermark.getLowWatermarkSafe(lwm ->
-                                    registerIndexesToTable(tbl, 
catalogService, singlePartitionIdSet, tbl.schemaView(), lwm)
-                            );
-                        }
+                        lowWatermark.getLowWatermarkSafe(lwm ->
+                                registerIndexesToTable(tbl, catalogService, 
singlePartitionIdSet, tbl.schemaView(), lwm)
+                        );
 
                         return 
waitForMetadataCompleteness(assignmentsTimestamp).thenCompose(ignored -> 
inBusyLock(busyLock, () -> {
                             int catalogVersion = 
catalogService.activeCatalogVersion(assignmentsTimestamp);
@@ -2258,6 +2260,12 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 }), ioExecutor);
     }
 
+    private static PartitionSet extendPartitionSet(@Nullable PartitionSet 
oldPartitionSet, int partitionId) {
+        PartitionSet newPartitionSet = 
Objects.requireNonNullElseGet(oldPartitionSet, BitSetPartitionSet::new);
+        newPartitionSet.set(partitionId);
+        return newPartitionSet;
+    }
+
     private boolean isNodeInReducedStableOrPendingAssignments(
             TablePartitionId replicaGrpId,
             @Nullable Assignments stableAssignments,
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index f9fb352b9f..4440701e10 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
 import static 
org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager;
@@ -26,7 +27,6 @@ import static 
org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
-import static 
org.apache.ignite.internal.util.CompletableFutures.emptySetCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
@@ -55,6 +55,7 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ForkJoinPool;
@@ -288,7 +289,7 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
         
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
         when(clusterService.topologyService()).thenReturn(topologyService);
         when(topologyService.localMember()).thenReturn(node);
-        when(distributionZoneManager.dataNodes(anyLong(), anyInt(), 
anyInt())).thenReturn(emptySetCompletedFuture());
+        when(distributionZoneManager.dataNodes(anyLong(), anyInt(), 
anyInt())).thenReturn(completedFuture(Set.of(NODE_NAME)));
 
         when(replicaMgr.startReplica(any(), any(), anyBoolean(), any(), any(), 
any(), any(), any()))
                 .thenReturn(nullCompletedFuture());
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index d147cb185b..1e5bf4ee0d 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -661,6 +661,8 @@ public class TableManagerTest extends IgniteAbstractTest {
     private void testStoragesGetClearedInMiddleOfFailedRebalance(boolean 
isTxStorageUnderRebalance) throws NodeStoppingException {
         when(rm.startRaftGroupService(any(), any(), any(), any()))
                 .thenAnswer(mock -> mock(TopologyAwareRaftGroupService.class));
+        when(distributionZoneManager.dataNodes(anyLong(), anyInt(), anyInt()))
+                .thenReturn(completedFuture(Set.of(NODE_NAME)));
 
         createZone(1, 1);
 

Reply via email to