This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 003edce538 IGNITE-22956 Implement local events about zone replica 
start in PartitionReplicaLifecycleManager. (#4294)
003edce538 is described below

commit 003edce5383ea77d44dda9cbc8755327e382bc3a
Author: Kirill Gusakov <[email protected]>
AuthorDate: Tue Sep 3 17:04:52 2024 +0300

    IGNITE-22956 Implement local events about zone replica start in 
PartitionReplicaLifecycleManager. (#4294)
---
 .../replicator/ItReplicaLifecycleTest.java         |  68 +++++++-
 .../replicator/LocalPartitionReplicaEvent.java     |  30 ++++
 .../PartitionReplicaEventParameters.java           |  48 ++++++
 .../PartitionReplicaLifecycleManager.java          | 113 +++++++++++--
 .../replicator/ZonePartitionReplicaListener.java   |  11 ++
 .../internal/table/distributed/TableManager.java   | 175 +++++++++++++++++----
 6 files changed, 400 insertions(+), 45 deletions(-)

diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index 5aea3a6229..7f15fecdb6 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -46,7 +46,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -58,10 +60,12 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
@@ -69,6 +73,7 @@ import java.util.function.Function;
 import java.util.function.LongFunction;
 import java.util.function.LongSupplier;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.ignite.internal.affinity.AffinityUtils;
 import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.affinity.Assignments;
@@ -111,6 +116,7 @@ import org.apache.ignite.internal.hlc.ClockWaiter;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.index.IndexManager;
+import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -123,7 +129,7 @@ import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
-import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
 import org.apache.ignite.internal.metrics.NoOpMetricManager;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.StaticNodeFinder;
@@ -141,6 +147,7 @@ import 
org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.storage.LogStorageFactory;
 import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
 import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
+import org.apache.ignite.internal.replicator.Replica;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
@@ -582,6 +589,36 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
         );
     }
 
+    @Test
+    void testTableReplicaListenersCreationAfterRebalance(TestInfo testInfo) 
throws Exception {
+        startNodes(testInfo, 3);
+
+        Assignment replicaAssignment = (Assignment) 
AffinityUtils.calculateAssignmentForPartition(
+                nodes.values().stream().map(n -> 
n.name).collect(Collectors.toList()), 0, 1).toArray()[0];
+
+        Node node = getNode(replicaAssignment.consistentId());
+
+        
placementDriver.setPrimary(node.clusterService.topologyService().localMember());
+
+        DistributionZonesTestUtil.createZone(node.catalogManager, "test_zone", 
1, 1);
+
+        int zoneId = DistributionZonesTestUtil.getZoneId(node.catalogManager, 
"test_zone", node.hybridClock.nowLong());
+
+        assertTrue(waitForCondition(() -> assertTableListenersCount(node, 
zoneId, 0), 10_000L));
+
+        createTable(node, "test_zone", "test_table");
+
+
+        assertTrue(waitForCondition(() -> assertTableListenersCount(node, 
zoneId, 1), 10_000L));
+
+        alterZone(node.catalogManager, "test_zone", 3);
+
+        assertTrue(waitForCondition(
+                () -> IntStream.range(0, 3).allMatch(i -> 
assertTableListenersCount(getNode(i), zoneId, 1)),
+                30_000L
+        ));
+    }
+
     @Test
     void testReplicaIsStartedOnNodeStart(TestInfo testInfo) throws Exception {
         startNodes(testInfo, 3);
@@ -984,7 +1021,8 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
 
             LogicalTopologyServiceImpl logicalTopologyService = new 
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
 
-            KeyValueStorage keyValueStorage = new 
SimpleInMemoryKeyValueStorage(name);
+            KeyValueStorage keyValueStorage =
+                    new RocksDbKeyValueStorage(name, resolveDir(dir, 
"metaStorageTestKeyValue"), failureProcessor);
 
             var topologyAwareRaftGroupServiceFactory = new 
TopologyAwareRaftGroupServiceFactory(
                     clusterService,
@@ -1352,4 +1390,30 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
     private static boolean skipMetaStorageInvoke(Collection<Operation> ops, 
String prefix) {
         return ops.stream().anyMatch(op -> new String(toByteArray(op.key()), 
StandardCharsets.UTF_8).startsWith(prefix));
     }
+
+    private static Path resolveDir(Path workDir, String dirName) {
+        Path newDirPath = workDir.resolve(dirName);
+
+        try {
+            return Files.createDirectories(newDirPath);
+        } catch (IOException e) {
+            throw new IgniteInternalException(e);
+        }
+    }
+
+    private boolean assertTableListenersCount(Node node, int zoneId, int 
count) {
+        try {
+            CompletableFuture<Replica> replicaFut = 
node.replicaManager.replica(new ZonePartitionId(zoneId, 0));
+
+            if (replicaFut == null) {
+                return false;
+            }
+
+            Replica replica = replicaFut.get(1, TimeUnit.SECONDS);
+
+            return replica != null && (((ZonePartitionReplicaListener) 
replica.listener()).tableReplicaListeners().size() == count);
+        } catch (ExecutionException | InterruptedException | TimeoutException 
e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEvent.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEvent.java
new file mode 100644
index 0000000000..933f528032
--- /dev/null
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEvent.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import org.apache.ignite.internal.event.Event;
+
+/**
+ * Events produced by {@link PartitionReplicaLifecycleManager}.
+ */
+public enum LocalPartitionReplicaEvent implements Event {
+    /**
+     * Fired when partition replica has started.
+     */
+    AFTER_REPLICA_STARTED
+}
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaEventParameters.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaEventParameters.java
new file mode 100644
index 0000000000..762c015b67
--- /dev/null
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaEventParameters.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import org.apache.ignite.internal.event.EventParameters;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+
+/**
+ * Parameters for the events about zone partition replicas produced by {@link 
PartitionReplicaLifecycleManager}.
+ */
+public class PartitionReplicaEventParameters implements EventParameters {
+    /** Zone partition id. */
+    private final ZonePartitionId zonePartitionId;
+
+    /**
+     * Constructor.
+     *
+     * @param zonePartitionId Zone partition id.
+     */
+    public PartitionReplicaEventParameters(ZonePartitionId zonePartitionId) {
+        this.zonePartitionId = zonePartitionId;
+    }
+
+    /**
+     * Returns zone partition id.
+     *
+     * @return Zone partition id.
+     */
+    public ZonePartitionId zonePartitionId() {
+        return zonePartitionId;
+    }
+}
+
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index bb3dfca2d3..713f23a24d 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -69,7 +69,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.StampedLock;
 import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.IntStream;
@@ -86,6 +90,7 @@ import 
org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.distributionzones.rebalance.PartitionMover;
 import 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceRaftGroupEventsListener;
 import 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil;
+import org.apache.ignite.internal.event.AbstractEventProducer;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
@@ -110,12 +115,15 @@ import 
org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.service.LeaderWithTerm;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.replicator.Replica;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import 
org.apache.ignite.internal.replicator.ReplicaManager.WeakReplicaStopReason;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.replicator.listener.ReplicaListener;
 import org.apache.ignite.internal.schema.SchemaSyncService;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.ExceptionUtils;
@@ -130,7 +138,8 @@ import org.jetbrains.annotations.Nullable;
  * - Stop the same nodes on the zone removing.
  * - Support the rebalance mechanism and start the new replication nodes when 
the rebalance triggers occurred.
  */
-public class PartitionReplicaLifecycleManager implements IgniteComponent {
+public class PartitionReplicaLifecycleManager  extends
+        AbstractEventProducer<LocalPartitionReplicaEvent, 
PartitionReplicaEventParameters> implements IgniteComponent {
     public static final String FEATURE_FLAG_NAME = 
"IGNITE_ZONE_BASED_REPLICATION";
     /* Feature flag for zone based collocation track */
     // TODO IGNITE-22115 remove it
@@ -162,6 +171,9 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
 
     private final Set<ReplicationGroupId> replicationGroupIds = 
ConcurrentHashMap.newKeySet();
 
+    /** (zoneId -> lock) map to provide concurrent access to the zone replicas 
list. */
+    private final Map<Integer, StampedLock> zonePartitionsLocks = new 
ConcurrentHashMap<>();
+
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
@@ -450,18 +462,43 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
 
         Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> {
             try {
-                replicationGroupIds.add(replicaGrpId);
+                AtomicReference<Long> stamp = new AtomicReference<>(null);
 
                 return replicaMgr.startReplica(
-                        replicaGrpId,
-                        (raftClient) -> new ZonePartitionReplicaListener(
-                                new 
ExecutorInclinedRaftCommandRunner(raftClient, partitionOperationsExecutor)),
-                        new FailFastSnapshotStorageFactory(),
-                        stablePeersAndLearners,
-                        raftGroupListener,
-                        raftGroupEventsListener,
-                        busyLock
-                ).thenApply(ignored -> true);
+                                replicaGrpId,
+                                (raftClient) -> new 
ZonePartitionReplicaListener(
+                                        new 
ExecutorInclinedRaftCommandRunner(raftClient, partitionOperationsExecutor)),
+                                new FailFastSnapshotStorageFactory(),
+                                stablePeersAndLearners,
+                                raftGroupListener,
+                                raftGroupEventsListener,
+                                busyLock
+                        ).thenCompose(replica -> {
+                            zonePartitionsLocks.compute(zoneId, (id, lock) -> {
+                                if (lock == null) {
+                                    lock = new StampedLock();
+                                }
+
+                                stamp.set(lock.writeLock());
+
+                                return lock;
+                            });
+
+                            replicationGroupIds.add(replicaGrpId);
+
+                            return fireEvent(
+                                    
LocalPartitionReplicaEvent.AFTER_REPLICA_STARTED,
+                                    new PartitionReplicaEventParameters(
+                                            new 
ZonePartitionId(replicaGrpId.zoneId(), replicaGrpId.partitionId())
+                                    )
+                            );
+                        })
+                        .whenComplete((unused, throwable) -> {
+                            if (stamp.get() != null) {
+                                
zonePartitionsLocks.get(zoneId).unlockWrite(stamp.get());
+                            }
+                        })
+                        .thenApply(unused -> false);
             } catch (NodeStoppingException e) {
                 return failedFuture(e);
             }
@@ -669,11 +706,15 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
     /**
      * Check if the current node has local replica for this {@link 
ZonePartitionId}.
      *
+     * <p>Important: this method must be invoked always under the according 
stamped lock.
+     *
      * @param zonePartitionId Zone partition id.
      * @return true if local replica exists, false otherwise.
      */
     // TODO: https://issues.apache.org/jira/browse/IGNITE-22624 replace this 
method by the replicas await process.
     public boolean hasLocalPartition(ZonePartitionId zonePartitionId) {
+        assert 
zonePartitionsLocks.get(zonePartitionId.zoneId()).tryWriteLock() == 0;
+
         return replicationGroupIds.contains(zonePartitionId);
     }
 
@@ -1271,4 +1312,54 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
             LOG.error("Unable to clean zones resources", e);
         }
     }
+
+    /**
+     * Lock the zones replica list for any changes. {@link 
#hasLocalPartition(ZonePartitionId)} must be executed under this lock always.
+     *
+     * @param zoneId Zone id.
+     * @return Stamp, which must be used for further unlock.
+     */
+    public long lockZoneForRead(int zoneId) {
+        AtomicLong stamp = new AtomicLong();
+
+        zonePartitionsLocks.compute(zoneId, (id, l) -> {
+            if (l == null) {
+                l = new StampedLock();
+            }
+
+            stamp.set(l.readLock());
+
+            return l;
+        });
+
+        return stamp.get();
+    }
+
+    /**
+     * Unlock zones replica list.
+     *
+     * @param zoneId Zone id.
+     * @param stamp Stamp, produced by the according {@link 
#hasLocalPartition(ZonePartitionId) call.}
+     */
+    public void unlockZoneForRead(int zoneId, long stamp) {
+        zonePartitionsLocks.get(zoneId).unlockRead(stamp);
+    }
+
+    /**
+     * Load a new table partition listener to the zone replica.
+     *
+     * <p>Important: This method must be called only with the guarantee, that 
the replica is exist at the current moment.
+     *
+     * @param zonePartitionId Zone partition id.
+     * @param tablePartitionId Table partition id.
+     * @param createListener Lazy replica listener from RAFT command runner 
builder.
+     */
+    public void loadTableListenerToZoneReplica(ZonePartitionId 
zonePartitionId, TablePartitionId tablePartitionId,
+            Function<RaftCommandRunner, ReplicaListener> createListener) {
+        CompletableFuture<Replica> replicaFut = 
replicaMgr.replica(zonePartitionId);
+
+        assert replicaFut != null && replicaFut.isDone();
+
+        ((ZonePartitionReplicaListener) 
replicaFut.join().listener()).addTableReplicaListener(tablePartitionId, 
createListener);
+    }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
index 45c30025fd..4fa12d845d 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
@@ -35,6 +35,7 @@ import 
org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.replicator.message.TableAware;
 import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.jetbrains.annotations.VisibleForTesting;
 
 /**
  * Zone partition replica listener.
@@ -120,4 +121,14 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
     public void addTableReplicaListener(TablePartitionId partitionId, 
Function<RaftCommandRunner, ReplicaListener> replicaListener) {
         replicas.put(partitionId, replicaListener.apply(raftClient));
     }
+
+    /**
+     * Return table replicas listeners.
+     *
+     * @return Table replicas listeners.
+     */
+    @VisibleForTesting
+    public Map<TablePartitionId, ReplicaListener> tableReplicaListeners() {
+        return replicas;
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index a1872c6482..b4f00604df 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -57,6 +57,7 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.emptyListComple
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
@@ -68,6 +69,7 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -137,8 +139,9 @@ import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.TopologyService;
 import 
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
+import 
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent;
+import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaEventParameters;
 import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
-import 
org.apache.ignite.internal.partition.replicator.ZonePartitionReplicaListener;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
 import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
@@ -212,7 +215,6 @@ import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedS
 import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbTableStorage;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.Cursor;
-import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.Lazy;
@@ -416,6 +418,10 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     @Nullable
     private StreamerReceiverRunner streamerReceiverRunner;
 
+    private final CompletableFuture<Void> readyToProcessTableStarts = new 
CompletableFuture<>();
+
+    private final Map<Integer, Set<TableImpl>> tablesPerZone = new 
ConcurrentHashMap<>();
+
     /**
      * Creates a new table manager.
      *
@@ -579,6 +585,11 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         );
 
         fullStateTransferIndexChooser = new 
FullStateTransferIndexChooser(catalogService, lowWatermark, indexMetaStorage);
+
+        partitionReplicaLifecycleManager.listen(
+                LocalPartitionReplicaEvent.AFTER_REPLICA_STARTED,
+                this::onZoneReplicaCreated
+        );
     }
 
     @Override
@@ -598,6 +609,12 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
             cleanUpResourcesForDroppedTablesOnRecoveryBusy();
 
+            sharedTxStateStorage.start();
+
+            // This future unblocks the process of tables start.
+            // All needed storages, like the TxStateRocksDbSharedStorage must 
be started already.
+            readyToProcessTableStarts.complete(null);
+
             startTables(recoveryRevision, lowWatermark.getLowWatermark());
 
             processAssignmentsOnRecovery(recoveryRevision);
@@ -638,6 +655,39 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         return 
executorInclinedSchemaSyncService.waitForMetadataCompleteness(HybridTimestamp.hybridTimestamp(ts));
     }
 
+    private CompletableFuture<Boolean> 
onZoneReplicaCreated(PartitionReplicaEventParameters parameters) {
+        if (!PartitionReplicaLifecycleManager.ENABLED) {
+            return completedFuture(false);
+        }
+
+        return inBusyLockAsync(busyLock, () -> {
+            List<CompletableFuture<?>> futs = new ArrayList<>();
+
+            readyToProcessTableStarts.thenRun(() -> {
+                Set<TableImpl> zoneTables = 
zoneTables(parameters.zonePartitionId().zoneId());
+
+                PartitionSet singlePartitionIdSet = 
PartitionSet.of(parameters.zonePartitionId().partitionId());
+
+                zoneTables.forEach(tbl -> {
+                    futs.add(inBusyLockAsync(busyLock,
+                            () -> supplyAsync(() -> 
getOrCreatePartitionStorages(tbl, singlePartitionIdSet), ioExecutor))
+                            .thenCompose(identity())
+                            .thenRunAsync(() -> inBusyLock(busyLock, () -> {
+                                lowWatermark.getLowWatermarkSafe(lwm ->
+                                        registerIndexesToTable(tbl, 
catalogService, singlePartitionIdSet, tbl.schemaView(), lwm)
+                                );
+
+                                preparePartitionResourcesAndLoadToZoneReplica(
+                                        tbl, 
parameters.zonePartitionId().partitionId(), 
parameters.zonePartitionId().zoneId());
+                            }), ioExecutor)
+                    );
+                });
+            });
+
+            return allOf(futs.toArray(new 
CompletableFuture[]{})).thenApply((unused) -> false);
+        });
+    }
+
     private CompletableFuture<Boolean> 
prepareTableResourcesAndLoadToZoneReplica(CreateTableEventParameters 
parameters) {
         if (!PartitionReplicaLifecycleManager.ENABLED) {
             return completedFuture(false);
@@ -647,19 +697,32 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         CatalogTableDescriptor tableDescriptor = parameters.tableDescriptor();
         CatalogZoneDescriptor zoneDescriptor = 
getZoneDescriptor(tableDescriptor, parameters.catalogVersion());
 
+        return prepareTableResourcesAndLoadToZoneReplica(causalityToken, 
zoneDescriptor, tableDescriptor, false);
+    }
+
+    private CompletableFuture<Boolean> 
prepareTableResourcesAndLoadToZoneReplica(
+            long causalityToken,
+            CatalogZoneDescriptor zoneDescriptor,
+            CatalogTableDescriptor tableDescriptor,
+            boolean onNodeRecovery
+    ) {
         TableImpl table =  createTableImpl(causalityToken, tableDescriptor, 
zoneDescriptor);
 
+        int tableId = tableDescriptor.id();
+
         tablesVv.update(causalityToken, (ignore, e) -> inBusyLock(busyLock, () 
-> {
             if (e != null) {
                 return failedFuture(e);
             }
 
-            return schemaManager.schemaRegistry(causalityToken, 
parameters.tableId()).thenAccept(table::schemaView);
+            return schemaManager.schemaRegistry(causalityToken, 
tableId).thenAccept(table::schemaView);
         }));
 
+        long stamp = 
partitionReplicaLifecycleManager.lockZoneForRead(zoneDescriptor.id());
+
         // NB: all vv.update() calls must be made from the synchronous part of 
the method (not in thenCompose()/etc!).
         CompletableFuture<?> localPartsUpdateFuture = 
localPartitionsVv.update(causalityToken,
-                (ignore, throwable) -> inBusyLock(busyLock, () -> 
nullCompletedFuture().thenComposeAsync((ignored) -> {
+                (ignore, throwable) -> inBusyLock(busyLock, () -> 
supplyAsync(() -> {
                     PartitionSet parts = new BitSetPartitionSet();
 
                     for (int i = 0; i < zoneDescriptor.partitions(); i++) {
@@ -668,8 +731,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                         }
                     }
 
-                    return getOrCreatePartitionStorages(table, 
parts).thenAccept(u -> localPartsByTableId.put(parameters.tableId(), parts));
-                }, ioExecutor)));
+                    return getOrCreatePartitionStorages(table, 
parts).thenAccept(u -> localPartsByTableId.put(tableId, parts));
+                }, ioExecutor).thenCompose(identity())));
 
         CompletableFuture<?> tablesByIdFuture = tablesVv.get(causalityToken);
 
@@ -678,29 +741,42 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 return failedFuture(e);
             }
 
-            return allOf(localPartsUpdateFuture, 
tablesByIdFuture).thenComposeAsync(ignore -> inBusyLock(busyLock, () -> {
-                        var startPartsFut = new 
ArrayList<CompletableFuture<?>>();
+            return allOf(localPartsUpdateFuture, 
tablesByIdFuture).thenRunAsync(() -> inBusyLock(busyLock, () -> {
+                        if (onNodeRecovery) {
+                            SchemaRegistry schemaRegistry = table.schemaView();
+                            PartitionSet partitionSet = 
localPartsByTableId.get(tableId);
+                            // LWM starts updating only after the node is 
restored.
+                            HybridTimestamp lwm = 
lowWatermark.getLowWatermark();
+
+                            registerIndexesToTable(table, catalogService, 
partitionSet, schemaRegistry, lwm);
+                        }
 
                         for (int i = 0; i < zoneDescriptor.partitions(); i++) {
                             if 
(partitionReplicaLifecycleManager.hasLocalPartition(new 
ZonePartitionId(zoneDescriptor.id(), i))) {
-                                
startPartsFut.add(preparePartitionResourcesAndLoadToZoneReplica(
-                                        table,
-                                        i,
-                                        zoneDescriptor.id()));
+                                
preparePartitionResourcesAndLoadToZoneReplica(table, i, zoneDescriptor.id());
                             }
                         }
-
-                        return 
allOf(startPartsFut.toArray(CompletableFuture[]::new));
                     }
+
             ), ioExecutor);
         });
 
-        tables.put(parameters.tableId(), table);
+        tables.put(tableId, table);
 
         // TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible 
performance degradation.
-        return createPartsFut.thenAccept(ignore -> 
startedTables.put(parameters.tableId(), table))
-                .thenApply(unused -> false);
+        return createPartsFut.thenAccept(ignore -> startedTables.put(tableId, 
table))
+                .handle((v, th) -> {
+                    
partitionReplicaLifecycleManager.unlockZoneForRead(zoneDescriptor.id(), stamp);
+
+                    if (th != null) {
+                        sneakyThrow(th);
+                    }
 
+                    addTableToZone(zoneDescriptor.id(), table);
+
+                    return v;
+                })
+                .thenApply(unused -> false);
     }
 
     /**
@@ -709,9 +785,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
      * @param table Table.
      * @param partId Partition id.
      * @param zoneId Zone id.
-     * @return Future, which will complete when the table processor loaded to 
the zone replica.
      */
-    private CompletableFuture<Void> 
preparePartitionResourcesAndLoadToZoneReplica(
+    private void preparePartitionResourcesAndLoadToZoneReplica(
             TableImpl table,
             int partId,
             int zoneId
@@ -722,7 +797,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
         TablePartitionId replicaGrpId = new TablePartitionId(tableId, partId);
 
-        return inBusyLockAsync(busyLock, () -> {
+        inBusyLock(busyLock, () -> {
             var safeTimeTracker = new 
PendingComparableValuesTracker<HybridTimestamp, 
Void>(HybridTimestamp.MIN_VALUE);
 
             var storageIndexTracker = new PendingComparableValuesTracker<Long, 
Void>(0L);
@@ -753,14 +828,13 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                     partitionStorages.getMvPartitionStorage(),
                     partitionStorages.getTxStateStorage(),
                     partitionUpdateHandlers,
-                    raftClient);
+                    raftClient
+            );
 
-            return replicaMgr.replica(new ZonePartitionId(zoneId, partId))
-                    .thenAcceptAsync(zoneReplica ->
-                            ((ZonePartitionReplicaListener) 
zoneReplica.listener()).addTableReplicaListener(
-                                    new TablePartitionId(tableId, partId), 
createListener
-                            ), ioExecutor
-                    );
+            partitionReplicaLifecycleManager.loadTableListenerToZoneReplica(
+                    new ZonePartitionId(zoneId, partId),
+                    new TablePartitionId(tableId, partId), createListener
+            );
         });
     }
 
@@ -873,7 +947,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                                     
Assignments.assignmentListToString(newAssignments)
                             );
 
-                            throw ExceptionUtils.sneakyThrow(e);
+                            throw sneakyThrow(e);
                         }
 
                         return invokeResult;
@@ -924,7 +998,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                         if (e != null) {
                             LOG.error("Couldn't get assignments from metastore 
for table [tableId={}].", e, tableId);
 
-                            throw ExceptionUtils.sneakyThrow(e);
+                            throw sneakyThrow(e);
                         }
 
                         return realAssignments;
@@ -2676,8 +2750,6 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
     }
 
     private void startTables(long recoveryRevision, @Nullable HybridTimestamp 
lwm) {
-        sharedTxStateStorage.start();
-
         int earliestCatalogVersion = 
catalogService.activeCatalogVersion(hybridTimestampToLong(lwm));
         int latestCatalogVersion = catalogService.latestCatalogVersion();
 
@@ -2688,7 +2760,24 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             int ver0 = ver;
             catalogService.tables(ver).stream()
                     .filter(tbl -> startedTables.add(tbl.id()))
-                    .forEach(tableDescriptor -> 
startTableFutures.add(createTableLocally(recoveryRevision, ver0, 
tableDescriptor, true)));
+                    .forEach(tableDescriptor -> {
+                        if (PartitionReplicaLifecycleManager.ENABLED) {
+                            CatalogZoneDescriptor zoneDescriptor = 
getZoneDescriptor(tableDescriptor, ver0);
+
+                            startTableFutures.add(
+                                    prepareTableResourcesAndLoadToZoneReplica(
+                                            recoveryRevision,
+                                            zoneDescriptor,
+                                            tableDescriptor,
+                                            true
+                                    )
+                            );
+                        } else {
+                            startTableFutures.add(
+                                    createTableLocally(recoveryRevision, ver0, 
tableDescriptor, true)
+                            );
+                        }
+                    });
         }
 
         // Forces you to wait until recovery is complete before the metastore 
watches is deployed to avoid races with catalog listeners.
@@ -2819,4 +2908,26 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
     public void setStreamerReceiverRunner(StreamerReceiverRunner runner) {
         this.streamerReceiverRunner = runner;
     }
+
+    private Set<TableImpl> zoneTables(int zoneId) {
+        return tablesPerZone.compute(zoneId, (id, tables) -> {
+            if (tables == null) {
+                tables = new HashSet<>();
+            }
+
+            return tables;
+        });
+    }
+
+    private void addTableToZone(int zoneId, TableImpl table) {
+        tablesPerZone.compute(zoneId, (id, tbls) -> {
+            if (tbls == null) {
+                tbls = new HashSet<>();
+            }
+
+            tbls.add(table);
+
+            return tbls;
+        });
+    }
 }


Reply via email to