This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 003edce538 IGNITE-22956 Implement local events about zone replica
start in PartitionReplicaLifecycleManager. (#4294)
003edce538 is described below
commit 003edce5383ea77d44dda9cbc8755327e382bc3a
Author: Kirill Gusakov <[email protected]>
AuthorDate: Tue Sep 3 17:04:52 2024 +0300
IGNITE-22956 Implement local events about zone replica start in
PartitionReplicaLifecycleManager. (#4294)
---
.../replicator/ItReplicaLifecycleTest.java | 68 +++++++-
.../replicator/LocalPartitionReplicaEvent.java | 30 ++++
.../PartitionReplicaEventParameters.java | 48 ++++++
.../PartitionReplicaLifecycleManager.java | 113 +++++++++++--
.../replicator/ZonePartitionReplicaListener.java | 11 ++
.../internal/table/distributed/TableManager.java | 175 +++++++++++++++++----
6 files changed, 400 insertions(+), 45 deletions(-)
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index 5aea3a6229..7f15fecdb6 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -46,7 +46,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
@@ -58,10 +60,12 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@@ -69,6 +73,7 @@ import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.internal.affinity.AffinityUtils;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.affinity.Assignments;
@@ -111,6 +116,7 @@ import org.apache.ignite.internal.hlc.ClockWaiter;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.index.IndexManager;
+import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -123,7 +129,7 @@ import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
-import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.StaticNodeFinder;
@@ -141,6 +147,7 @@ import
org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
+import org.apache.ignite.internal.replicator.Replica;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
@@ -582,6 +589,36 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
);
}
+ @Test
+ void testTableReplicaListenersCreationAfterRebalance(TestInfo testInfo)
throws Exception {
+ startNodes(testInfo, 3);
+
+ Assignment replicaAssignment = (Assignment)
AffinityUtils.calculateAssignmentForPartition(
+ nodes.values().stream().map(n ->
n.name).collect(Collectors.toList()), 0, 1).toArray()[0];
+
+ Node node = getNode(replicaAssignment.consistentId());
+
+
placementDriver.setPrimary(node.clusterService.topologyService().localMember());
+
+ DistributionZonesTestUtil.createZone(node.catalogManager, "test_zone",
1, 1);
+
+ int zoneId = DistributionZonesTestUtil.getZoneId(node.catalogManager,
"test_zone", node.hybridClock.nowLong());
+
+ assertTrue(waitForCondition(() -> assertTableListenersCount(node,
zoneId, 0), 10_000L));
+
+ createTable(node, "test_zone", "test_table");
+
+
+ assertTrue(waitForCondition(() -> assertTableListenersCount(node,
zoneId, 1), 10_000L));
+
+ alterZone(node.catalogManager, "test_zone", 3);
+
+ assertTrue(waitForCondition(
+ () -> IntStream.range(0, 3).allMatch(i ->
assertTableListenersCount(getNode(i), zoneId, 1)),
+ 30_000L
+ ));
+ }
+
@Test
void testReplicaIsStartedOnNodeStart(TestInfo testInfo) throws Exception {
startNodes(testInfo, 3);
@@ -984,7 +1021,8 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
LogicalTopologyServiceImpl logicalTopologyService = new
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
- KeyValueStorage keyValueStorage = new
SimpleInMemoryKeyValueStorage(name);
+ KeyValueStorage keyValueStorage =
+ new RocksDbKeyValueStorage(name, resolveDir(dir,
"metaStorageTestKeyValue"), failureProcessor);
var topologyAwareRaftGroupServiceFactory = new
TopologyAwareRaftGroupServiceFactory(
clusterService,
@@ -1352,4 +1390,30 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
private static boolean skipMetaStorageInvoke(Collection<Operation> ops,
String prefix) {
return ops.stream().anyMatch(op -> new String(toByteArray(op.key()),
StandardCharsets.UTF_8).startsWith(prefix));
}
+
+ private static Path resolveDir(Path workDir, String dirName) {
+ Path newDirPath = workDir.resolve(dirName);
+
+ try {
+ return Files.createDirectories(newDirPath);
+ } catch (IOException e) {
+ throw new IgniteInternalException(e);
+ }
+ }
+
+ private boolean assertTableListenersCount(Node node, int zoneId, int
count) {
+ try {
+ CompletableFuture<Replica> replicaFut =
node.replicaManager.replica(new ZonePartitionId(zoneId, 0));
+
+ if (replicaFut == null) {
+ return false;
+ }
+
+ Replica replica = replicaFut.get(1, TimeUnit.SECONDS);
+
+ return replica != null && (((ZonePartitionReplicaListener)
replica.listener()).tableReplicaListeners().size() == count);
+ } catch (ExecutionException | InterruptedException | TimeoutException
e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEvent.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEvent.java
new file mode 100644
index 0000000000..933f528032
--- /dev/null
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEvent.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import org.apache.ignite.internal.event.Event;
+
+/**
+ * Events produced by {@link PartitionReplicaLifecycleManager}.
+ */
+public enum LocalPartitionReplicaEvent implements Event {
+ /**
+ * Fired when partition replica has started.
+ */
+ AFTER_REPLICA_STARTED
+}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaEventParameters.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaEventParameters.java
new file mode 100644
index 0000000000..762c015b67
--- /dev/null
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaEventParameters.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import org.apache.ignite.internal.event.EventParameters;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+
+/**
+ * Parameters for the events about zone partition replicas produced by {@link
PartitionReplicaLifecycleManager}.
+ */
+public class PartitionReplicaEventParameters implements EventParameters {
+ /** Zone partition id. */
+ private final ZonePartitionId zonePartitionId;
+
+ /**
+ * Constructor.
+ *
+ * @param zonePartitionId Zone partition id.
+ */
+ public PartitionReplicaEventParameters(ZonePartitionId zonePartitionId) {
+ this.zonePartitionId = zonePartitionId;
+ }
+
+ /**
+ * Returns zone partition id.
+ *
+ * @return Zone partition id.
+ */
+ public ZonePartitionId zonePartitionId() {
+ return zonePartitionId;
+ }
+}
+
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index bb3dfca2d3..713f23a24d 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -69,7 +69,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.StampedLock;
import java.util.function.BiFunction;
+import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.IntStream;
@@ -86,6 +90,7 @@ import
org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.rebalance.PartitionMover;
import
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceRaftGroupEventsListener;
import
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil;
+import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
@@ -110,12 +115,15 @@ import
org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.replicator.Replica;
import org.apache.ignite.internal.replicator.ReplicaManager;
import
org.apache.ignite.internal.replicator.ReplicaManager.WeakReplicaStopReason;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.schema.SchemaSyncService;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.ExceptionUtils;
@@ -130,7 +138,8 @@ import org.jetbrains.annotations.Nullable;
* - Stop the same nodes on the zone removing.
* - Support the rebalance mechanism and start the new replication nodes when
the rebalance triggers occurred.
*/
-public class PartitionReplicaLifecycleManager implements IgniteComponent {
+public class PartitionReplicaLifecycleManager extends
+ AbstractEventProducer<LocalPartitionReplicaEvent,
PartitionReplicaEventParameters> implements IgniteComponent {
public static final String FEATURE_FLAG_NAME =
"IGNITE_ZONE_BASED_REPLICATION";
/* Feature flag for zone based collocation track */
// TODO IGNITE-22115 remove it
@@ -162,6 +171,9 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
private final Set<ReplicationGroupId> replicationGroupIds =
ConcurrentHashMap.newKeySet();
+ /** (zoneId -> lock) map to provide concurrent access to the zone replicas
list. */
+ private final Map<Integer, StampedLock> zonePartitionsLocks = new
ConcurrentHashMap<>();
+
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -450,18 +462,43 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> {
try {
- replicationGroupIds.add(replicaGrpId);
+ AtomicReference<Long> stamp = new AtomicReference<>(null);
return replicaMgr.startReplica(
- replicaGrpId,
- (raftClient) -> new ZonePartitionReplicaListener(
- new
ExecutorInclinedRaftCommandRunner(raftClient, partitionOperationsExecutor)),
- new FailFastSnapshotStorageFactory(),
- stablePeersAndLearners,
- raftGroupListener,
- raftGroupEventsListener,
- busyLock
- ).thenApply(ignored -> true);
+ replicaGrpId,
+ (raftClient) -> new
ZonePartitionReplicaListener(
+ new
ExecutorInclinedRaftCommandRunner(raftClient, partitionOperationsExecutor)),
+ new FailFastSnapshotStorageFactory(),
+ stablePeersAndLearners,
+ raftGroupListener,
+ raftGroupEventsListener,
+ busyLock
+ ).thenCompose(replica -> {
+ zonePartitionsLocks.compute(zoneId, (id, lock) -> {
+ if (lock == null) {
+ lock = new StampedLock();
+ }
+
+ stamp.set(lock.writeLock());
+
+ return lock;
+ });
+
+ replicationGroupIds.add(replicaGrpId);
+
+ return fireEvent(
+
LocalPartitionReplicaEvent.AFTER_REPLICA_STARTED,
+ new PartitionReplicaEventParameters(
+ new
ZonePartitionId(replicaGrpId.zoneId(), replicaGrpId.partitionId())
+ )
+ );
+ })
+ .whenComplete((unused, throwable) -> {
+ if (stamp.get() != null) {
+
zonePartitionsLocks.get(zoneId).unlockWrite(stamp.get());
+ }
+ })
+ .thenApply(unused -> false);
} catch (NodeStoppingException e) {
return failedFuture(e);
}
@@ -669,11 +706,15 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
/**
* Check if the current node has local replica for this {@link
ZonePartitionId}.
*
+ * <p>Important: this method must be invoked always under the according
stamped lock.
+ *
* @param zonePartitionId Zone partition id.
* @return true if local replica exists, false otherwise.
*/
// TODO: https://issues.apache.org/jira/browse/IGNITE-22624 replace this
method by the replicas await process.
public boolean hasLocalPartition(ZonePartitionId zonePartitionId) {
+ assert
zonePartitionsLocks.get(zonePartitionId.zoneId()).tryWriteLock() == 0;
+
return replicationGroupIds.contains(zonePartitionId);
}
@@ -1271,4 +1312,54 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
LOG.error("Unable to clean zones resources", e);
}
}
+
+ /**
+ * Lock the zones replica list for any changes. {@link
#hasLocalPartition(ZonePartitionId)} must be executed under this lock always.
+ *
+ * @param zoneId Zone id.
+ * @return Stamp, which must be used for further unlock.
+ */
+ public long lockZoneForRead(int zoneId) {
+ AtomicLong stamp = new AtomicLong();
+
+ zonePartitionsLocks.compute(zoneId, (id, l) -> {
+ if (l == null) {
+ l = new StampedLock();
+ }
+
+ stamp.set(l.readLock());
+
+ return l;
+ });
+
+ return stamp.get();
+ }
+
+ /**
+ * Unlock zones replica list.
+ *
+ * @param zoneId Zone id.
+ * @param stamp Stamp, produced by the according {@link
#hasLocalPartition(ZonePartitionId) call.}
+ */
+ public void unlockZoneForRead(int zoneId, long stamp) {
+ zonePartitionsLocks.get(zoneId).unlockRead(stamp);
+ }
+
+ /**
+ * Load a new table partition listener to the zone replica.
+ *
+ * <p>Important: This method must be called only with the guarantee, that
the replica is exist at the current moment.
+ *
+ * @param zonePartitionId Zone partition id.
+ * @param tablePartitionId Table partition id.
+ * @param createListener Lazy replica listener from RAFT command runner
builder.
+ */
+ public void loadTableListenerToZoneReplica(ZonePartitionId
zonePartitionId, TablePartitionId tablePartitionId,
+ Function<RaftCommandRunner, ReplicaListener> createListener) {
+ CompletableFuture<Replica> replicaFut =
replicaMgr.replica(zonePartitionId);
+
+ assert replicaFut != null && replicaFut.isDone();
+
+ ((ZonePartitionReplicaListener)
replicaFut.join().listener()).addTableReplicaListener(tablePartitionId,
createListener);
+ }
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
index 45c30025fd..4fa12d845d 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
@@ -35,6 +35,7 @@ import
org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.replicator.message.TableAware;
import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.jetbrains.annotations.VisibleForTesting;
/**
* Zone partition replica listener.
@@ -120,4 +121,14 @@ public class ZonePartitionReplicaListener implements
ReplicaListener {
public void addTableReplicaListener(TablePartitionId partitionId,
Function<RaftCommandRunner, ReplicaListener> replicaListener) {
replicas.put(partitionId, replicaListener.apply(raftClient));
}
+
+ /**
+ * Return table replicas listeners.
+ *
+ * @return Table replicas listeners.
+ */
+ @VisibleForTesting
+ public Map<TablePartitionId, ReplicaListener> tableReplicaListeners() {
+ return replicas;
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index a1872c6482..b4f00604df 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -57,6 +57,7 @@ import static
org.apache.ignite.internal.util.CompletableFutures.emptyListComple
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
@@ -68,6 +69,7 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -137,8 +139,9 @@ import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
+import
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent;
+import
org.apache.ignite.internal.partition.replicator.PartitionReplicaEventParameters;
import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
-import
org.apache.ignite.internal.partition.replicator.ZonePartitionReplicaListener;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
@@ -212,7 +215,6 @@ import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedS
import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbTableStorage;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.Cursor;
-import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.Lazy;
@@ -416,6 +418,10 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
@Nullable
private StreamerReceiverRunner streamerReceiverRunner;
+ private final CompletableFuture<Void> readyToProcessTableStarts = new
CompletableFuture<>();
+
+ private final Map<Integer, Set<TableImpl>> tablesPerZone = new
ConcurrentHashMap<>();
+
/**
* Creates a new table manager.
*
@@ -579,6 +585,11 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
);
fullStateTransferIndexChooser = new
FullStateTransferIndexChooser(catalogService, lowWatermark, indexMetaStorage);
+
+ partitionReplicaLifecycleManager.listen(
+ LocalPartitionReplicaEvent.AFTER_REPLICA_STARTED,
+ this::onZoneReplicaCreated
+ );
}
@Override
@@ -598,6 +609,12 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
cleanUpResourcesForDroppedTablesOnRecoveryBusy();
+ sharedTxStateStorage.start();
+
+ // This future unblocks the process of tables start.
+ // All needed storages, like the TxStateRocksDbSharedStorage must
be started already.
+ readyToProcessTableStarts.complete(null);
+
startTables(recoveryRevision, lowWatermark.getLowWatermark());
processAssignmentsOnRecovery(recoveryRevision);
@@ -638,6 +655,39 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
return
executorInclinedSchemaSyncService.waitForMetadataCompleteness(HybridTimestamp.hybridTimestamp(ts));
}
+ private CompletableFuture<Boolean>
onZoneReplicaCreated(PartitionReplicaEventParameters parameters) {
+ if (!PartitionReplicaLifecycleManager.ENABLED) {
+ return completedFuture(false);
+ }
+
+ return inBusyLockAsync(busyLock, () -> {
+ List<CompletableFuture<?>> futs = new ArrayList<>();
+
+ readyToProcessTableStarts.thenRun(() -> {
+ Set<TableImpl> zoneTables =
zoneTables(parameters.zonePartitionId().zoneId());
+
+ PartitionSet singlePartitionIdSet =
PartitionSet.of(parameters.zonePartitionId().partitionId());
+
+ zoneTables.forEach(tbl -> {
+ futs.add(inBusyLockAsync(busyLock,
+ () -> supplyAsync(() ->
getOrCreatePartitionStorages(tbl, singlePartitionIdSet), ioExecutor))
+ .thenCompose(identity())
+ .thenRunAsync(() -> inBusyLock(busyLock, () -> {
+ lowWatermark.getLowWatermarkSafe(lwm ->
+ registerIndexesToTable(tbl,
catalogService, singlePartitionIdSet, tbl.schemaView(), lwm)
+ );
+
+ preparePartitionResourcesAndLoadToZoneReplica(
+ tbl,
parameters.zonePartitionId().partitionId(),
parameters.zonePartitionId().zoneId());
+ }), ioExecutor)
+ );
+ });
+ });
+
+ return allOf(futs.toArray(new
CompletableFuture[]{})).thenApply((unused) -> false);
+ });
+ }
+
private CompletableFuture<Boolean>
prepareTableResourcesAndLoadToZoneReplica(CreateTableEventParameters
parameters) {
if (!PartitionReplicaLifecycleManager.ENABLED) {
return completedFuture(false);
@@ -647,19 +697,32 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
CatalogTableDescriptor tableDescriptor = parameters.tableDescriptor();
CatalogZoneDescriptor zoneDescriptor =
getZoneDescriptor(tableDescriptor, parameters.catalogVersion());
+ return prepareTableResourcesAndLoadToZoneReplica(causalityToken,
zoneDescriptor, tableDescriptor, false);
+ }
+
+ private CompletableFuture<Boolean>
prepareTableResourcesAndLoadToZoneReplica(
+ long causalityToken,
+ CatalogZoneDescriptor zoneDescriptor,
+ CatalogTableDescriptor tableDescriptor,
+ boolean onNodeRecovery
+ ) {
TableImpl table = createTableImpl(causalityToken, tableDescriptor,
zoneDescriptor);
+ int tableId = tableDescriptor.id();
+
tablesVv.update(causalityToken, (ignore, e) -> inBusyLock(busyLock, ()
-> {
if (e != null) {
return failedFuture(e);
}
- return schemaManager.schemaRegistry(causalityToken,
parameters.tableId()).thenAccept(table::schemaView);
+ return schemaManager.schemaRegistry(causalityToken,
tableId).thenAccept(table::schemaView);
}));
+ long stamp =
partitionReplicaLifecycleManager.lockZoneForRead(zoneDescriptor.id());
+
// NB: all vv.update() calls must be made from the synchronous part of
the method (not in thenCompose()/etc!).
CompletableFuture<?> localPartsUpdateFuture =
localPartitionsVv.update(causalityToken,
- (ignore, throwable) -> inBusyLock(busyLock, () ->
nullCompletedFuture().thenComposeAsync((ignored) -> {
+ (ignore, throwable) -> inBusyLock(busyLock, () ->
supplyAsync(() -> {
PartitionSet parts = new BitSetPartitionSet();
for (int i = 0; i < zoneDescriptor.partitions(); i++) {
@@ -668,8 +731,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
}
}
- return getOrCreatePartitionStorages(table,
parts).thenAccept(u -> localPartsByTableId.put(parameters.tableId(), parts));
- }, ioExecutor)));
+ return getOrCreatePartitionStorages(table,
parts).thenAccept(u -> localPartsByTableId.put(tableId, parts));
+ }, ioExecutor).thenCompose(identity())));
CompletableFuture<?> tablesByIdFuture = tablesVv.get(causalityToken);
@@ -678,29 +741,42 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return failedFuture(e);
}
- return allOf(localPartsUpdateFuture,
tablesByIdFuture).thenComposeAsync(ignore -> inBusyLock(busyLock, () -> {
- var startPartsFut = new
ArrayList<CompletableFuture<?>>();
+ return allOf(localPartsUpdateFuture,
tablesByIdFuture).thenRunAsync(() -> inBusyLock(busyLock, () -> {
+ if (onNodeRecovery) {
+ SchemaRegistry schemaRegistry = table.schemaView();
+ PartitionSet partitionSet =
localPartsByTableId.get(tableId);
+ // LWM starts updating only after the node is
restored.
+ HybridTimestamp lwm =
lowWatermark.getLowWatermark();
+
+ registerIndexesToTable(table, catalogService,
partitionSet, schemaRegistry, lwm);
+ }
for (int i = 0; i < zoneDescriptor.partitions(); i++) {
if
(partitionReplicaLifecycleManager.hasLocalPartition(new
ZonePartitionId(zoneDescriptor.id(), i))) {
-
startPartsFut.add(preparePartitionResourcesAndLoadToZoneReplica(
- table,
- i,
- zoneDescriptor.id()));
+
preparePartitionResourcesAndLoadToZoneReplica(table, i, zoneDescriptor.id());
}
}
-
- return
allOf(startPartsFut.toArray(CompletableFuture[]::new));
}
+
), ioExecutor);
});
- tables.put(parameters.tableId(), table);
+ tables.put(tableId, table);
// TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible
performance degradation.
- return createPartsFut.thenAccept(ignore ->
startedTables.put(parameters.tableId(), table))
- .thenApply(unused -> false);
+ return createPartsFut.thenAccept(ignore -> startedTables.put(tableId,
table))
+ .handle((v, th) -> {
+
partitionReplicaLifecycleManager.unlockZoneForRead(zoneDescriptor.id(), stamp);
+
+ if (th != null) {
+ sneakyThrow(th);
+ }
+ addTableToZone(zoneDescriptor.id(), table);
+
+ return v;
+ })
+ .thenApply(unused -> false);
}
/**
@@ -709,9 +785,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
* @param table Table.
* @param partId Partition id.
* @param zoneId Zone id.
- * @return Future, which will complete when the table processor loaded to
the zone replica.
*/
- private CompletableFuture<Void>
preparePartitionResourcesAndLoadToZoneReplica(
+ private void preparePartitionResourcesAndLoadToZoneReplica(
TableImpl table,
int partId,
int zoneId
@@ -722,7 +797,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
TablePartitionId replicaGrpId = new TablePartitionId(tableId, partId);
- return inBusyLockAsync(busyLock, () -> {
+ inBusyLock(busyLock, () -> {
var safeTimeTracker = new
PendingComparableValuesTracker<HybridTimestamp,
Void>(HybridTimestamp.MIN_VALUE);
var storageIndexTracker = new PendingComparableValuesTracker<Long,
Void>(0L);
@@ -753,14 +828,13 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
partitionStorages.getMvPartitionStorage(),
partitionStorages.getTxStateStorage(),
partitionUpdateHandlers,
- raftClient);
+ raftClient
+ );
- return replicaMgr.replica(new ZonePartitionId(zoneId, partId))
- .thenAcceptAsync(zoneReplica ->
- ((ZonePartitionReplicaListener)
zoneReplica.listener()).addTableReplicaListener(
- new TablePartitionId(tableId, partId),
createListener
- ), ioExecutor
- );
+ partitionReplicaLifecycleManager.loadTableListenerToZoneReplica(
+ new ZonePartitionId(zoneId, partId),
+ new TablePartitionId(tableId, partId), createListener
+ );
});
}
@@ -873,7 +947,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
Assignments.assignmentListToString(newAssignments)
);
- throw ExceptionUtils.sneakyThrow(e);
+ throw sneakyThrow(e);
}
return invokeResult;
@@ -924,7 +998,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
if (e != null) {
LOG.error("Couldn't get assignments from metastore
for table [tableId={}].", e, tableId);
- throw ExceptionUtils.sneakyThrow(e);
+ throw sneakyThrow(e);
}
return realAssignments;
@@ -2676,8 +2750,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
private void startTables(long recoveryRevision, @Nullable HybridTimestamp
lwm) {
- sharedTxStateStorage.start();
-
int earliestCatalogVersion =
catalogService.activeCatalogVersion(hybridTimestampToLong(lwm));
int latestCatalogVersion = catalogService.latestCatalogVersion();
@@ -2688,7 +2760,24 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
int ver0 = ver;
catalogService.tables(ver).stream()
.filter(tbl -> startedTables.add(tbl.id()))
- .forEach(tableDescriptor ->
startTableFutures.add(createTableLocally(recoveryRevision, ver0,
tableDescriptor, true)));
+ .forEach(tableDescriptor -> {
+ if (PartitionReplicaLifecycleManager.ENABLED) {
+ CatalogZoneDescriptor zoneDescriptor =
getZoneDescriptor(tableDescriptor, ver0);
+
+ startTableFutures.add(
+ prepareTableResourcesAndLoadToZoneReplica(
+ recoveryRevision,
+ zoneDescriptor,
+ tableDescriptor,
+ true
+ )
+ );
+ } else {
+ startTableFutures.add(
+ createTableLocally(recoveryRevision, ver0,
tableDescriptor, true)
+ );
+ }
+ });
}
// Forces you to wait until recovery is complete before the metastore
watches is deployed to avoid races with catalog listeners.
@@ -2819,4 +2908,26 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
public void setStreamerReceiverRunner(StreamerReceiverRunner runner) {
this.streamerReceiverRunner = runner;
}
+
+ private Set<TableImpl> zoneTables(int zoneId) {
+ return tablesPerZone.compute(zoneId, (id, tables) -> {
+ if (tables == null) {
+ tables = new HashSet<>();
+ }
+
+ return tables;
+ });
+ }
+
+ private void addTableToZone(int zoneId, TableImpl table) {
+ tablesPerZone.compute(zoneId, (id, tbls) -> {
+ if (tbls == null) {
+ tbls = new HashSet<>();
+ }
+
+ tbls.add(table);
+
+ return tbls;
+ });
+ }
}