This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7dc56ce5186 IGNITE-24974 Pool starvation when stopping zone in
colocation mode (#5547)
7dc56ce5186 is described below
commit 7dc56ce51865e99c6a1fa3f90b7533ffa9e3560b
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Thu Apr 3 15:55:02 2025 +0400
IGNITE-24974 Pool starvation when stopping zone in colocation mode (#5547)
---
.../replicator/ItBigZoneOperationTest.java | 51 ++++++
.../replicator/NaiveAsyncReadWriteLock.java | 186 +++++++++++++++++++++
.../PartitionReplicaLifecycleManager.java | 40 ++---
.../replicator/NaiveAsyncReadWriteLockTest.java | 140 ++++++++++++++++
.../internal/table/distributed/TableManager.java | 44 +++--
5 files changed, 432 insertions(+), 29 deletions(-)
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItBigZoneOperationTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItBigZoneOperationTest.java
new file mode 100644
index 00000000000..de260422cb2
--- /dev/null
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItBigZoneOperationTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
+
+import java.time.Duration;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.lang.IgniteSystemProperties;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+@WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG,
value = "true")
+class ItBigZoneOperationTest extends ClusterPerTestIntegrationTest {
+ @Override
+ protected int initialNodes() {
+ return 1;
+ }
+
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-24991")
+ void zoneWithManyPartitionsDoesNotCauseHangs() {
+ Ignite node = cluster.node(0);
+
+ assertTimeoutPreemptively(Duration.ofSeconds(20), () -> {
+ node.sql().executeScript(
+ "CREATE ZONE TEST_ZONE WITH STORAGE_PROFILES='default',
PARTITIONS=300;"
+ + "CREATE TABLE TEST_TABLE(id INT PRIMARY KEY, val
VARCHAR(255)) ZONE TEST_ZONE"
+ );
+
+ node.sql().execute(null, "SELECT * FROM TEST_TABLE").close();
+ });
+ }
+}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLock.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLock.java
new file mode 100644
index 00000000000..b0038113d90
--- /dev/null
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLock.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongList;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.StampedLock;
+
+/**
+ * Asynchronous analogue of a read-write lock. It has the following properties:
+ *
+ * <ul>
+ * <li>Write lock is exclusive; if the lock is write-locked, other
attempts to acquire any lock waits for the write lock to be released
+ * </li>
+ * <li>Read lock is non-exclusive: if the lock is read-locked (and there
are no waiting write lock attempts), other read locks are
+ * acquired immediately, but attempts to acquire write locks wait for all
read locks to be released</li>
+ * <li>Write locks have priority over read locks: if the lock is
read-locked, and there is a waiting write lock attempt, read lock
+ * attempts will queue until all write lock attempts are satisfied and
released</li>
+ * <li>Lock holder is not bound to any thread; instead, a lock holder gets
a stamp that can be used to release the lock</li>
+ * </ul>
+ *
+ * <p>This implementation is naive because it implies that time to hold the
locks can be pretty long and there will be no
+ * high contention on the acquiring side; this simplifies the
implementation.</p>
+ */
+public class NaiveAsyncReadWriteLock {
+ /** Used to manage the lock state (including issuing and using stamps). */
+ private final StampedLock stampedLock = new StampedLock();
+
+ /** Used to linearize access to waiters collections. */
+ private final Object mutex = new Object();
+
+ /** Queue of futures waiting for write lock to be acquired; served in the
order of appearance. */
+ private final Queue<CompletableFuture<Long>> writeLockWaiters = new
ArrayDeque<>();
+
+ /** Queue of futures waiting for read locks to be acquired; served in the
order of appearance. */
+ private final Queue<CompletableFuture<Long>> readLockWaiters = new
ArrayDeque<>();
+
+ /**
+ * Attempts to acquire the write lock.
+ *
+ * @return Future completed with the stamp of the acquired lock; completed
when the lock is acquired.
+ */
+ public CompletableFuture<Long> writeLock() {
+ synchronized (mutex) {
+ long stamp = stampedLock.tryWriteLock();
+ if (stamp != 0) {
+ return completedFuture(stamp);
+ }
+
+ CompletableFuture<Long> lockFuture = new CompletableFuture<>();
+
+ writeLockWaiters.add(lockFuture);
+
+ return lockFuture;
+ }
+ }
+
+ /**
+ * Unlocks write lock previously obtained via {@link #writeLock()}.
+ *
+ * @param stamp Stamp returned via write lock future.
+ */
+ public void unlockWrite(long stamp) {
+ long newWriteStamp = 0;
+ CompletableFuture<Long> writeLockWaiter;
+
+ LongList readStamps = null;
+ List<CompletableFuture<Long>> readLockWaitersToComplete = null;
+
+ synchronized (mutex) {
+ stampedLock.unlockWrite(stamp);
+
+ writeLockWaiter = writeLockWaiters.poll();
+
+ if (writeLockWaiter != null) {
+ // Someone is waiting for a write lock, satisfy the request.
+ newWriteStamp = stampedLock.tryWriteLock();
+ assert newWriteStamp != 0;
+ } else {
+ // Someone might be waiting for read locks.
+ for (CompletableFuture<Long> readLockWaiter : readLockWaiters)
{
+ long newReadStamp = stampedLock.tryReadLock();
+ assert newReadStamp != 0;
+
+ if (readStamps == null) {
+ readStamps = new LongArrayList(readLockWaiters.size());
+ readLockWaitersToComplete = new
ArrayList<>(readLockWaiters.size());
+ }
+ readStamps.add(newReadStamp);
+ readLockWaitersToComplete.add(readLockWaiter);
+ }
+
+ readLockWaiters.clear();
+ }
+ }
+
+ // Completing the futures out of the synchronized block.
+ if (writeLockWaiter != null) {
+ writeLockWaiter.complete(newWriteStamp);
+ } else if (readLockWaitersToComplete != null) {
+ for (int i = 0; i < readLockWaitersToComplete.size(); i++) {
+
readLockWaitersToComplete.get(i).complete(readStamps.getLong(i));
+ }
+ }
+ }
+
+ /**
+ * Attempts to acquire a read lock.
+ *
+ * @return Future completed with the stamp of the acquired lock; completed
when the lock is acquired.
+ */
+ public CompletableFuture<Long> readLock() {
+ synchronized (mutex) {
+ // Write lock attempts have priority over read lock attempts, so
first check whether someone waits for write lock.
+ if (writeLockWaiters.isEmpty()) {
+ long stamp = stampedLock.tryReadLock();
+ if (stamp != 0) {
+ return completedFuture(stamp);
+ }
+ }
+
+ CompletableFuture<Long> lockFuture = new CompletableFuture<>();
+
+ readLockWaiters.add(lockFuture);
+
+ return lockFuture;
+ }
+ }
+
+ /**
+ * Unlocks read lock previously obtained via {@link #readLock()}.
+ *
+ * @param stamp Stamp returned via read lock future.
+ */
+ public void unlockRead(long stamp) {
+ long newWriteStamp = 0;
+ CompletableFuture<Long> writeLockWaiter;
+
+ synchronized (mutex) {
+ stampedLock.unlockRead(stamp);
+
+ if (stampedLock.isReadLocked()) {
+ return;
+ }
+
+ writeLockWaiter = writeLockWaiters.poll();
+
+ if (writeLockWaiter != null) {
+ // Someone is waiting for a write lock, satisfy the request.
+ newWriteStamp = stampedLock.tryWriteLock();
+ assert newWriteStamp != 0;
+ }
+ }
+
+ // Completing the future out of the synchronized block.
+ if (writeLockWaiter != null) {
+ writeLockWaiter.complete(newWriteStamp);
+ }
+ }
+
+ boolean isReadLocked() {
+ return stampedLock.isReadLocked();
+ }
+}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index a4117ed3c74..1e7342b5925 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -74,7 +74,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.StampedLock;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -194,7 +193,7 @@ public class PartitionReplicaLifecycleManager extends
private final Set<ZonePartitionId> replicationGroupIds =
ConcurrentHashMap.newKeySet();
/** (zoneId -> lock) map to provide concurrent access to the zone replicas
list. */
- private final Map<Integer, StampedLock> zonePartitionsLocks = new
ConcurrentHashMap<>();
+ private final Map<Integer, NaiveAsyncReadWriteLock> zonePartitionsLocks =
new ConcurrentHashMap<>();
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -864,7 +863,7 @@ public class PartitionReplicaLifecycleManager extends
*/
// TODO: https://issues.apache.org/jira/browse/IGNITE-22624 replace this
method by the replicas await process.
public boolean hasLocalPartition(ZonePartitionId zonePartitionId) {
- assert
zonePartitionsLocks.get(zonePartitionId.zoneId()).tryWriteLock() == 0;
+ assert
zonePartitionsLocks.get(zonePartitionId.zoneId()).isReadLocked() :
zonePartitionId;
return replicationGroupIds.contains(zonePartitionId);
}
@@ -1409,9 +1408,7 @@ public class PartitionReplicaLifecycleManager extends
try {
return replicaMgr.stopReplica(zonePartitionId)
.thenCompose((replicaWasStopped) -> {
- if (afterReplicaStopAction != null) {
-
afterReplicaStopAction.accept(replicaWasStopped);
- }
+ afterReplicaStopAction.accept(replicaWasStopped);
if (!replicaWasStopped) {
return falseCompletedFuture();
@@ -1457,10 +1454,15 @@ public class PartitionReplicaLifecycleManager extends
* Lock the zones replica list for any changes. {@link
#hasLocalPartition(ZonePartitionId)} must be executed under this lock always.
*
* @param zoneId Zone id.
- * @return Stamp, which must be used for further unlock.
+ * @return Future completing with a stamp which must be used for further
unlock.
*/
- public long lockZoneForRead(int zoneId) {
- return zonePartitionsLocks.computeIfAbsent(zoneId, id -> new
StampedLock()).readLock();
+ public CompletableFuture<Long> lockZoneForRead(int zoneId) {
+ NaiveAsyncReadWriteLock lock =
zonePartitionsLocks.computeIfAbsent(zoneId, id -> newZoneLock());
+ return lock.readLock();
+ }
+
+ private static NaiveAsyncReadWriteLock newZoneLock() {
+ return new NaiveAsyncReadWriteLock();
}
/**
@@ -1523,18 +1525,18 @@ public class PartitionReplicaLifecycleManager extends
}
private <T> CompletableFuture<T> executeUnderZoneWriteLock(int zoneId,
Supplier<CompletableFuture<T>> action) {
- StampedLock lock = zonePartitionsLocks.computeIfAbsent(zoneId, id ->
new StampedLock());
+ NaiveAsyncReadWriteLock lock =
zonePartitionsLocks.computeIfAbsent(zoneId, id -> newZoneLock());
- long stamp = lock.writeLock();
-
- try {
- return action.get()
- .whenComplete((v, e) -> lock.unlockWrite(stamp));
- } catch (Throwable e) {
- lock.unlockWrite(stamp);
+ return lock.writeLock().thenCompose(stamp -> {
+ try {
+ return action.get()
+ .whenComplete((v, e) -> lock.unlockWrite(stamp));
+ } catch (Throwable e) {
+ lock.unlockWrite(stamp);
- return failedFuture(e);
- }
+ return failedFuture(e);
+ }
+ });
}
private CompletableFuture<Boolean>
onPrimaryReplicaExpired(PrimaryReplicaEventParameters parameters) {
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLockTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLockTest.java
new file mode 100644
index 00000000000..ce55edb1027
--- /dev/null
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLockTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
+import org.apache.ignite.internal.util.CompletableFutures;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ExecutorServiceExtension.class)
+class NaiveAsyncReadWriteLockTest {
+ private final NaiveAsyncReadWriteLock lock = new NaiveAsyncReadWriteLock();
+
+ @Test
+ void writeLockAttemptsArrivingWhenWriteLockedGetSatisfiedWhenUnlocked() {
+ List<CompletableFuture<?>> futures = new ArrayList<>();
+
+ CompletableFuture<Long> firstFuture = lock.writeLock();
+
+ for (int i = 0; i < 10; i++) {
+ futures.add(lock.writeLock().thenAccept(lock::unlockWrite));
+ }
+
+ assertThat(firstFuture.thenAccept(lock::unlockWrite),
willCompleteSuccessfully());
+
+ assertThat(CompletableFutures.allOf(futures),
willCompleteSuccessfully());
+ }
+
+ @Test
+ void readLockAttemptsArrivingWhenWriteLockedGetSatisfiedWhenUnlocked() {
+ List<CompletableFuture<?>> futures = new ArrayList<>();
+
+ CompletableFuture<Long> firstFuture = lock.writeLock();
+
+ for (int i = 0; i < 10; i++) {
+ futures.add(lock.readLock().thenAccept(lock::unlockRead));
+ }
+
+ assertThat(firstFuture.thenAccept(lock::unlockWrite),
willCompleteSuccessfully());
+
+ assertThat(CompletableFutures.allOf(futures),
willCompleteSuccessfully());
+ }
+
+ @Test
+ void writeLockAttemptsArrivingWhenReadLockedGetSatisfiedWhenUnlocked() {
+ CompletableFuture<Long> readLockFuture1 = lock.readLock();
+ CompletableFuture<Long> readLockFuture2 = lock.readLock();
+
+ CompletableFuture<Long> writeLockFuture = lock.writeLock();
+
+ assertThat(readLockFuture1.thenAccept(lock::unlockRead),
willCompleteSuccessfully());
+ assertThat(readLockFuture2.thenAccept(lock::unlockRead),
willCompleteSuccessfully());
+
+ assertThat(writeLockFuture, willCompleteSuccessfully());
+ assertDoesNotThrow(() -> lock.unlockWrite(writeLockFuture.join()));
+ }
+
+ @Test
+ void incompleteReadUnlockDoesNotAllowWriteWaiterAcquireWriteLock() throws
Exception {
+ CompletableFuture<Long> readLock1 = lock.readLock();
+ @SuppressWarnings("unused")
+ CompletableFuture<Long> readLock2 = lock.readLock();
+ CompletableFuture<Long> writeLock = lock.writeLock();
+
+ AtomicBoolean writeLocked = new AtomicBoolean(false);
+ writeLock.thenAccept(stamp -> writeLocked.set(true));
+
+ readLock1.thenAccept(lock::unlockRead);
+
+ assertFalse(waitForCondition(writeLocked::get, 100), "Write lock was
acquired before unlocking all read locks");
+ }
+
+ @Test
+ void testConcurrency(
+ @InjectExecutorService(threadCount = 4) ExecutorService
orchestratingExecutor,
+ @InjectExecutorService(threadCount = 10) ExecutorService
sleepTasksExecutor
+ ) {
+ List<CompletableFuture<?>> futures = new CopyOnWriteArrayList<>();
+
+ for (int thread = 0; thread < 4; thread++) {
+ orchestratingExecutor.execute(() -> {
+ for (int i = 0; i < 10000; i++) {
+ CompletableFuture<?> future;
+
+ if (i % 2 == 0) {
+ future = lock.readLock().thenCompose(stamp -> {
+ return CompletableFuture.runAsync(() -> sleep(1),
sleepTasksExecutor)
+ .whenComplete((res, ex) ->
lock.unlockRead(stamp));
+ });
+ } else {
+ future = lock.writeLock().thenCompose(stamp -> {
+ return CompletableFuture.runAsync(() -> sleep(1),
sleepTasksExecutor)
+ .whenComplete((res, ex) ->
lock.unlockWrite(stamp));
+ });
+ }
+
+ futures.add(future);
+ }
+ });
+ }
+
+ assertThat(CompletableFutures.allOf(futures),
willCompleteSuccessfully());
+ }
+
+ private static void sleep(int millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index ed022208428..71fb54418f4 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -777,11 +777,31 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return schemaManager.schemaRegistry(causalityToken,
tableId).thenAccept(table::schemaView);
}));
- long stamp =
partitionReplicaLifecycleManager.lockZoneForRead(zoneDescriptor.id());
+ // Obtain future, but don't chain on it yet because update() on VVs
must be called in the same thread. The method we call
+ // will call update() on VVs and inside those updates it will chain on
the lock acquisition future.
+ CompletableFuture<Long> acquisitionFuture =
partitionReplicaLifecycleManager.lockZoneForRead(zoneDescriptor.id());
+ try {
+ return
prepareTableResourcesAndLoadHavingZoneReadLock(acquisitionFuture,
causalityToken, zoneDescriptor, onNodeRecovery, table)
+ .whenComplete((res, ex) ->
unlockZoneForRead(zoneDescriptor, acquisitionFuture));
+ } catch (Throwable e) {
+ unlockZoneForRead(zoneDescriptor, acquisitionFuture);
+
+ return failedFuture(e);
+ }
+ }
+
+ private CompletableFuture<Void>
prepareTableResourcesAndLoadHavingZoneReadLock(
+ CompletableFuture<Long> readLockAcquisitionFuture,
+ long causalityToken,
+ CatalogZoneDescriptor zoneDescriptor,
+ boolean onNodeRecovery,
+ TableImpl table
+ ) {
+ int tableId = table.tableId();
// NB: all vv.update() calls must be made from the synchronous part of
the method (not in thenCompose()/etc!).
CompletableFuture<?> localPartsUpdateFuture =
localPartitionsVv.update(causalityToken,
- (ignore, throwable) -> inBusyLock(busyLock, () ->
supplyAsync(() -> {
+ (ignore, throwable) -> inBusyLock(busyLock, () ->
readLockAcquisitionFuture.thenComposeAsync(unused -> {
PartitionSet parts = new BitSetPartitionSet();
for (int i = 0; i < zoneDescriptor.partitions(); i++) {
@@ -791,7 +811,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
}
return getOrCreatePartitionStorages(table,
parts).thenRun(() -> localPartsByTableId.put(tableId, parts));
- }, ioExecutor).thenCompose(identity())));
+ }, ioExecutor))
+ );
CompletableFuture<?> tablesByIdFuture = tablesVv.get(causalityToken);
@@ -823,14 +844,17 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
tables.put(tableId, table);
// TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible
performance degradation.
- return createPartsFut.thenAccept(ignore -> startedTables.put(tableId,
table))
- .whenComplete((v, th) -> {
-
partitionReplicaLifecycleManager.unlockZoneForRead(zoneDescriptor.id(), stamp);
+ return createPartsFut.thenAccept(ignore -> {
+ startedTables.put(tableId, table);
- if (th == null) {
- addTableToZone(zoneDescriptor.id(), table);
- }
- });
+ addTableToZone(zoneDescriptor.id(), table);
+ });
+ }
+
+ private void unlockZoneForRead(CatalogZoneDescriptor zoneDescriptor,
CompletableFuture<Long> readLockAcquiryFuture) {
+ readLockAcquiryFuture.thenAccept(stamp -> {
+
partitionReplicaLifecycleManager.unlockZoneForRead(zoneDescriptor.id(), stamp);
+ });
}
/**