This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7dc56ce5186 IGNITE-24974 Pool starvation when stopping zone in 
colocation mode (#5547)
7dc56ce5186 is described below

commit 7dc56ce51865e99c6a1fa3f90b7533ffa9e3560b
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Thu Apr 3 15:55:02 2025 +0400

    IGNITE-24974 Pool starvation when stopping zone in colocation mode (#5547)
---
 .../replicator/ItBigZoneOperationTest.java         |  51 ++++++
 .../replicator/NaiveAsyncReadWriteLock.java        | 186 +++++++++++++++++++++
 .../PartitionReplicaLifecycleManager.java          |  40 ++---
 .../replicator/NaiveAsyncReadWriteLockTest.java    | 140 ++++++++++++++++
 .../internal/table/distributed/TableManager.java   |  44 +++--
 5 files changed, 432 insertions(+), 29 deletions(-)

diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItBigZoneOperationTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItBigZoneOperationTest.java
new file mode 100644
index 00000000000..de260422cb2
--- /dev/null
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItBigZoneOperationTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
+
+import java.time.Duration;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.lang.IgniteSystemProperties;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+@WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG, 
value = "true")
+class ItBigZoneOperationTest extends ClusterPerTestIntegrationTest {
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-24991";)
+    void zoneWithManyPartitionsDoesNotCauseHangs() {
+        Ignite node = cluster.node(0);
+
+        assertTimeoutPreemptively(Duration.ofSeconds(20), () -> {
+            node.sql().executeScript(
+                    "CREATE ZONE TEST_ZONE WITH STORAGE_PROFILES='default', 
PARTITIONS=300;"
+                            + "CREATE TABLE TEST_TABLE(id INT PRIMARY KEY, val 
VARCHAR(255)) ZONE TEST_ZONE"
+            );
+
+            node.sql().execute(null, "SELECT * FROM TEST_TABLE").close();
+        });
+    }
+}
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLock.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLock.java
new file mode 100644
index 00000000000..b0038113d90
--- /dev/null
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLock.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongList;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.StampedLock;
+
+/**
+ * Asynchronous analogue of a read-write lock. It has the following properties:
+ *
+ * <ul>
+ *     <li>Write lock is exclusive; if the lock is write-locked, other 
attempts to acquire any lock waits for the write lock to be released
+ *     </li>
+ *     <li>Read lock is non-exclusive: if the lock is read-locked (and there 
are no waiting write lock attempts), other read locks are
+ *     acquired immediately, but attempts to acquire write locks wait for all 
read locks to be released</li>
+ *     <li>Write locks have priority over read locks: if the lock is 
read-locked, and there is a waiting write lock attempt, read lock
+ *     attempts will queue until all write lock attempts are satisfied and 
released</li>
+ *     <li>Lock holder is not bound to any thread; instead, a lock holder gets 
a stamp that can be used to release the lock</li>
+ * </ul>
+ *
+ * <p>This implementation is naive because it implies that time to hold the 
locks can be pretty long and there will be no
+ * high contention on the acquiring side; this simplifies the 
implementation.</p>
+ */
+public class NaiveAsyncReadWriteLock {
+    /** Used to manage the lock state (including issuing and using stamps). */
+    private final StampedLock stampedLock = new StampedLock();
+
+    /** Used to linearize access to waiters collections. */
+    private final Object mutex = new Object();
+
+    /** Queue of futures waiting for write lock to be acquired; served in the 
order of appearance. */
+    private final Queue<CompletableFuture<Long>> writeLockWaiters = new 
ArrayDeque<>();
+
+    /** Queue of futures waiting for read locks to be acquired; served in the 
order of appearance. */
+    private final Queue<CompletableFuture<Long>> readLockWaiters = new 
ArrayDeque<>();
+
+    /**
+     * Attempts to acquire the write lock.
+     *
+     * @return Future completed with the stamp of the acquired lock; completed 
when the lock is acquired.
+     */
+    public CompletableFuture<Long> writeLock() {
+        synchronized (mutex) {
+            long stamp = stampedLock.tryWriteLock();
+            if (stamp != 0) {
+                return completedFuture(stamp);
+            }
+
+            CompletableFuture<Long> lockFuture = new CompletableFuture<>();
+
+            writeLockWaiters.add(lockFuture);
+
+            return lockFuture;
+        }
+    }
+
+    /**
+     * Unlocks write lock previously obtained via {@link #writeLock()}.
+     *
+     * @param stamp Stamp returned via write lock future.
+     */
+    public void unlockWrite(long stamp) {
+        long newWriteStamp = 0;
+        CompletableFuture<Long> writeLockWaiter;
+
+        LongList readStamps = null;
+        List<CompletableFuture<Long>> readLockWaitersToComplete = null;
+
+        synchronized (mutex) {
+            stampedLock.unlockWrite(stamp);
+
+            writeLockWaiter = writeLockWaiters.poll();
+
+            if (writeLockWaiter != null) {
+                // Someone is waiting for a write lock, satisfy the request.
+                newWriteStamp = stampedLock.tryWriteLock();
+                assert newWriteStamp != 0;
+            } else {
+                // Someone might be waiting for read locks.
+                for (CompletableFuture<Long> readLockWaiter : readLockWaiters) 
{
+                    long newReadStamp = stampedLock.tryReadLock();
+                    assert newReadStamp != 0;
+
+                    if (readStamps == null) {
+                        readStamps = new LongArrayList(readLockWaiters.size());
+                        readLockWaitersToComplete = new 
ArrayList<>(readLockWaiters.size());
+                    }
+                    readStamps.add(newReadStamp);
+                    readLockWaitersToComplete.add(readLockWaiter);
+                }
+
+                readLockWaiters.clear();
+            }
+        }
+
+        // Completing the futures out of the synchronized block.
+        if (writeLockWaiter != null) {
+            writeLockWaiter.complete(newWriteStamp);
+        } else if (readLockWaitersToComplete != null) {
+            for (int i = 0; i < readLockWaitersToComplete.size(); i++) {
+                
readLockWaitersToComplete.get(i).complete(readStamps.getLong(i));
+            }
+        }
+    }
+
+    /**
+     * Attempts to acquire a read lock.
+     *
+     * @return Future completed with the stamp of the acquired lock; completed 
when the lock is acquired.
+     */
+    public CompletableFuture<Long> readLock() {
+        synchronized (mutex) {
+            // Write lock attempts have priority over read lock attempts, so 
first check whether someone waits for write lock.
+            if (writeLockWaiters.isEmpty()) {
+                long stamp = stampedLock.tryReadLock();
+                if (stamp != 0) {
+                    return completedFuture(stamp);
+                }
+            }
+
+            CompletableFuture<Long> lockFuture = new CompletableFuture<>();
+
+            readLockWaiters.add(lockFuture);
+
+            return lockFuture;
+        }
+    }
+
+    /**
+     * Unlocks read lock previously obtained via {@link #readLock()}.
+     *
+     * @param stamp Stamp returned via read lock future.
+     */
+    public void unlockRead(long stamp) {
+        long newWriteStamp = 0;
+        CompletableFuture<Long> writeLockWaiter;
+
+        synchronized (mutex) {
+            stampedLock.unlockRead(stamp);
+
+            if (stampedLock.isReadLocked()) {
+                return;
+            }
+
+            writeLockWaiter = writeLockWaiters.poll();
+
+            if (writeLockWaiter != null) {
+                // Someone is waiting for a write lock, satisfy the request.
+                newWriteStamp = stampedLock.tryWriteLock();
+                assert newWriteStamp != 0;
+            }
+        }
+
+        // Completing the future out of the synchronized block.
+        if (writeLockWaiter != null) {
+            writeLockWaiter.complete(newWriteStamp);
+        }
+    }
+
+    boolean isReadLocked() {
+        return stampedLock.isReadLocked();
+    }
+}
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index a4117ed3c74..1e7342b5925 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -74,7 +74,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.StampedLock;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -194,7 +193,7 @@ public class PartitionReplicaLifecycleManager extends
     private final Set<ZonePartitionId> replicationGroupIds = 
ConcurrentHashMap.newKeySet();
 
     /** (zoneId -> lock) map to provide concurrent access to the zone replicas 
list. */
-    private final Map<Integer, StampedLock> zonePartitionsLocks = new 
ConcurrentHashMap<>();
+    private final Map<Integer, NaiveAsyncReadWriteLock> zonePartitionsLocks = 
new ConcurrentHashMap<>();
 
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -864,7 +863,7 @@ public class PartitionReplicaLifecycleManager extends
      */
     // TODO: https://issues.apache.org/jira/browse/IGNITE-22624 replace this 
method by the replicas await process.
     public boolean hasLocalPartition(ZonePartitionId zonePartitionId) {
-        assert 
zonePartitionsLocks.get(zonePartitionId.zoneId()).tryWriteLock() == 0;
+        assert 
zonePartitionsLocks.get(zonePartitionId.zoneId()).isReadLocked() : 
zonePartitionId;
 
         return replicationGroupIds.contains(zonePartitionId);
     }
@@ -1409,9 +1408,7 @@ public class PartitionReplicaLifecycleManager extends
             try {
                 return replicaMgr.stopReplica(zonePartitionId)
                         .thenCompose((replicaWasStopped) -> {
-                            if (afterReplicaStopAction != null) {
-                                
afterReplicaStopAction.accept(replicaWasStopped);
-                            }
+                            afterReplicaStopAction.accept(replicaWasStopped);
 
                             if (!replicaWasStopped) {
                                 return falseCompletedFuture();
@@ -1457,10 +1454,15 @@ public class PartitionReplicaLifecycleManager extends
      * Lock the zones replica list for any changes. {@link 
#hasLocalPartition(ZonePartitionId)} must be executed under this lock always.
      *
      * @param zoneId Zone id.
-     * @return Stamp, which must be used for further unlock.
+     * @return Future completing with a stamp which must be used for further 
unlock.
      */
-    public long lockZoneForRead(int zoneId) {
-        return zonePartitionsLocks.computeIfAbsent(zoneId, id -> new 
StampedLock()).readLock();
+    public CompletableFuture<Long> lockZoneForRead(int zoneId) {
+        NaiveAsyncReadWriteLock lock = 
zonePartitionsLocks.computeIfAbsent(zoneId, id -> newZoneLock());
+        return lock.readLock();
+    }
+
+    private static NaiveAsyncReadWriteLock newZoneLock() {
+        return new NaiveAsyncReadWriteLock();
     }
 
     /**
@@ -1523,18 +1525,18 @@ public class PartitionReplicaLifecycleManager extends
     }
 
     private <T> CompletableFuture<T> executeUnderZoneWriteLock(int zoneId, 
Supplier<CompletableFuture<T>> action) {
-        StampedLock lock = zonePartitionsLocks.computeIfAbsent(zoneId, id -> 
new StampedLock());
+        NaiveAsyncReadWriteLock lock = 
zonePartitionsLocks.computeIfAbsent(zoneId, id -> newZoneLock());
 
-        long stamp = lock.writeLock();
-
-        try {
-            return action.get()
-                    .whenComplete((v, e) -> lock.unlockWrite(stamp));
-        } catch (Throwable e) {
-            lock.unlockWrite(stamp);
+        return lock.writeLock().thenCompose(stamp -> {
+            try {
+                return action.get()
+                        .whenComplete((v, e) -> lock.unlockWrite(stamp));
+            } catch (Throwable e) {
+                lock.unlockWrite(stamp);
 
-            return failedFuture(e);
-        }
+                return failedFuture(e);
+            }
+        });
     }
 
     private CompletableFuture<Boolean> 
onPrimaryReplicaExpired(PrimaryReplicaEventParameters parameters) {
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLockTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLockTest.java
new file mode 100644
index 00000000000..ce55edb1027
--- /dev/null
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLockTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
+import org.apache.ignite.internal.util.CompletableFutures;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ExecutorServiceExtension.class)
+class NaiveAsyncReadWriteLockTest {
+    private final NaiveAsyncReadWriteLock lock = new NaiveAsyncReadWriteLock();
+
+    @Test
+    void writeLockAttemptsArrivingWhenWriteLockedGetSatisfiedWhenUnlocked() {
+        List<CompletableFuture<?>> futures = new ArrayList<>();
+
+        CompletableFuture<Long> firstFuture = lock.writeLock();
+
+        for (int i = 0; i < 10; i++) {
+            futures.add(lock.writeLock().thenAccept(lock::unlockWrite));
+        }
+
+        assertThat(firstFuture.thenAccept(lock::unlockWrite), 
willCompleteSuccessfully());
+
+        assertThat(CompletableFutures.allOf(futures), 
willCompleteSuccessfully());
+    }
+
+    @Test
+    void readLockAttemptsArrivingWhenWriteLockedGetSatisfiedWhenUnlocked() {
+        List<CompletableFuture<?>> futures = new ArrayList<>();
+
+        CompletableFuture<Long> firstFuture = lock.writeLock();
+
+        for (int i = 0; i < 10; i++) {
+            futures.add(lock.readLock().thenAccept(lock::unlockRead));
+        }
+
+        assertThat(firstFuture.thenAccept(lock::unlockWrite), 
willCompleteSuccessfully());
+
+        assertThat(CompletableFutures.allOf(futures), 
willCompleteSuccessfully());
+    }
+
+    @Test
+    void writeLockAttemptsArrivingWhenReadLockedGetSatisfiedWhenUnlocked() {
+        CompletableFuture<Long> readLockFuture1 = lock.readLock();
+        CompletableFuture<Long> readLockFuture2 = lock.readLock();
+
+        CompletableFuture<Long> writeLockFuture = lock.writeLock();
+
+        assertThat(readLockFuture1.thenAccept(lock::unlockRead), 
willCompleteSuccessfully());
+        assertThat(readLockFuture2.thenAccept(lock::unlockRead), 
willCompleteSuccessfully());
+
+        assertThat(writeLockFuture, willCompleteSuccessfully());
+        assertDoesNotThrow(() -> lock.unlockWrite(writeLockFuture.join()));
+    }
+
+    @Test
+    void incompleteReadUnlockDoesNotAllowWriteWaiterAcquireWriteLock() throws 
Exception {
+        CompletableFuture<Long> readLock1 = lock.readLock();
+        @SuppressWarnings("unused")
+        CompletableFuture<Long> readLock2 = lock.readLock();
+        CompletableFuture<Long> writeLock = lock.writeLock();
+
+        AtomicBoolean writeLocked = new AtomicBoolean(false);
+        writeLock.thenAccept(stamp -> writeLocked.set(true));
+
+        readLock1.thenAccept(lock::unlockRead);
+
+        assertFalse(waitForCondition(writeLocked::get, 100), "Write lock was 
acquired before unlocking all read locks");
+    }
+
+    @Test
+    void testConcurrency(
+            @InjectExecutorService(threadCount = 4) ExecutorService 
orchestratingExecutor,
+            @InjectExecutorService(threadCount = 10) ExecutorService 
sleepTasksExecutor
+    ) {
+        List<CompletableFuture<?>> futures = new CopyOnWriteArrayList<>();
+
+        for (int thread = 0; thread < 4; thread++) {
+            orchestratingExecutor.execute(() -> {
+                for (int i = 0; i < 10000; i++) {
+                    CompletableFuture<?> future;
+
+                    if (i % 2 == 0) {
+                        future = lock.readLock().thenCompose(stamp -> {
+                            return CompletableFuture.runAsync(() -> sleep(1), 
sleepTasksExecutor)
+                                    .whenComplete((res, ex) -> 
lock.unlockRead(stamp));
+                        });
+                    } else {
+                        future = lock.writeLock().thenCompose(stamp -> {
+                            return CompletableFuture.runAsync(() -> sleep(1), 
sleepTasksExecutor)
+                                    .whenComplete((res, ex) -> 
lock.unlockWrite(stamp));
+                        });
+                    }
+
+                    futures.add(future);
+                }
+            });
+        }
+
+        assertThat(CompletableFutures.allOf(futures), 
willCompleteSuccessfully());
+    }
+
+    private static void sleep(int millis) {
+        try {
+            Thread.sleep(millis);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index ed022208428..71fb54418f4 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -777,11 +777,31 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             return schemaManager.schemaRegistry(causalityToken, 
tableId).thenAccept(table::schemaView);
         }));
 
-        long stamp = 
partitionReplicaLifecycleManager.lockZoneForRead(zoneDescriptor.id());
+        // Obtain future, but don't chain on it yet because update() on VVs 
must be called in the same thread. The method we call
+        // will call update() on VVs and inside those updates it will chain on 
the lock acquisition future.
+        CompletableFuture<Long> acquisitionFuture = 
partitionReplicaLifecycleManager.lockZoneForRead(zoneDescriptor.id());
+        try {
+            return 
prepareTableResourcesAndLoadHavingZoneReadLock(acquisitionFuture, 
causalityToken, zoneDescriptor, onNodeRecovery, table)
+                    .whenComplete((res, ex) -> 
unlockZoneForRead(zoneDescriptor, acquisitionFuture));
+        } catch (Throwable e) {
+            unlockZoneForRead(zoneDescriptor, acquisitionFuture);
+
+            return failedFuture(e);
+        }
+    }
+
+    private CompletableFuture<Void> 
prepareTableResourcesAndLoadHavingZoneReadLock(
+            CompletableFuture<Long> readLockAcquisitionFuture,
+            long causalityToken,
+            CatalogZoneDescriptor zoneDescriptor,
+            boolean onNodeRecovery,
+            TableImpl table
+    ) {
+        int tableId = table.tableId();
 
         // NB: all vv.update() calls must be made from the synchronous part of 
the method (not in thenCompose()/etc!).
         CompletableFuture<?> localPartsUpdateFuture = 
localPartitionsVv.update(causalityToken,
-                (ignore, throwable) -> inBusyLock(busyLock, () -> 
supplyAsync(() -> {
+                (ignore, throwable) -> inBusyLock(busyLock, () -> 
readLockAcquisitionFuture.thenComposeAsync(unused -> {
                     PartitionSet parts = new BitSetPartitionSet();
 
                     for (int i = 0; i < zoneDescriptor.partitions(); i++) {
@@ -791,7 +811,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                     }
 
                     return getOrCreatePartitionStorages(table, 
parts).thenRun(() -> localPartsByTableId.put(tableId, parts));
-                }, ioExecutor).thenCompose(identity())));
+                }, ioExecutor))
+        );
 
         CompletableFuture<?> tablesByIdFuture = tablesVv.get(causalityToken);
 
@@ -823,14 +844,17 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         tables.put(tableId, table);
 
         // TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible 
performance degradation.
-        return createPartsFut.thenAccept(ignore -> startedTables.put(tableId, 
table))
-                .whenComplete((v, th) -> {
-                    
partitionReplicaLifecycleManager.unlockZoneForRead(zoneDescriptor.id(), stamp);
+        return createPartsFut.thenAccept(ignore -> {
+            startedTables.put(tableId, table);
 
-                    if (th == null) {
-                        addTableToZone(zoneDescriptor.id(), table);
-                    }
-                });
+            addTableToZone(zoneDescriptor.id(), table);
+        });
+    }
+
+    private void unlockZoneForRead(CatalogZoneDescriptor zoneDescriptor, 
CompletableFuture<Long> readLockAcquiryFuture) {
+        readLockAcquiryFuture.thenAccept(stamp -> {
+            
partitionReplicaLifecycleManager.unlockZoneForRead(zoneDescriptor.id(), stamp);
+        });
     }
 
     /**

Reply via email to