This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2c3ce15a0d IGNITE-22233 Add zone replica listener (#3931)
2c3ce15a0d is described below
commit 2c3ce15a0d99087c220e386b5e496b5dff9f755f
Author: Kirill Gusakov <[email protected]>
AuthorDate: Tue Jul 23 16:04:30 2024 +0300
IGNITE-22233 Add zone replica listener (#3931)
---
.../replicator/ItReplicaLifecycleTest.java | 94 ++++++--
.../PartitionReplicaLifecycleManager.java | 36 ++-
.../replicator/ZonePartitionRaftListener.java | 48 +++-
.../replicator/ZonePartitionReplicaListener.java | 94 +++++++-
.../ReadOnlyDirectMultiRowReplicaRequest.java | 3 +-
.../ReadOnlyDirectSingleRowReplicaRequest.java | 3 +-
.../ReadOnlyMultiRowPkReplicaRequest.java | 3 +-
.../ReadOnlyScanRetrieveBatchReplicaRequest.java | 3 +-
.../ReadOnlySingleRowPkReplicaRequest.java | 3 +-
.../ReadWriteMultiRowPkReplicaRequest.java | 3 +-
.../ReadWriteMultiRowReplicaRequest.java | 3 +-
.../ReadWriteScanRetrieveBatchReplicaRequest.java | 3 +-
.../ReadWriteSingleRowPkReplicaRequest.java | 3 +-
.../ReadWriteSingleRowReplicaRequest.java | 3 +-
.../ReadWriteSwapRowReplicaRequest.java | 3 +-
.../ItPlacementDriverReplicaSideTest.java | 2 +-
.../apache/ignite/internal/replicator/Replica.java | 9 +
.../ignite/internal/replicator/ReplicaImpl.java | 5 +
.../ignite/internal/replicator/ReplicaManager.java | 20 +-
.../replicator/ZonePartitionReplicaImpl.java | 8 +-
.../internal/replicator/message/TableAware.java} | 13 +-
.../internal/replicator/ReplicaManagerTest.java | 2 +-
.../runner/app/ItIgniteNodeRestartTest.java | 14 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 6 +-
.../sql/engine/exec/UpdatableTableImpl.java | 3 +
.../ignite/distributed/ReplicaUnavailableTest.java | 13 +-
.../rebalance/ItRebalanceDistributedTest.java | 16 +-
.../internal/table/distributed/TableManager.java | 250 +++++++++++++++++----
.../distributed/storage/InternalTableImpl.java | 19 ++
.../distributed/TableManagerRecoveryTest.java | 15 +-
.../table/distributed/TableManagerTest.java | 10 +-
.../PartitionReplicaListenerIndexLockingTest.java | 4 +
.../replication/PartitionReplicaListenerTest.java | 35 +++
.../ignite/internal/tx/impl/TxMessageSender.java | 6 +
.../tx/message/TxFinishReplicaRequest.java | 7 +
35 files changed, 634 insertions(+), 128 deletions(-)
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index 6b683ea4f0..3524d04942 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -63,7 +63,9 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
@@ -137,6 +139,7 @@ import
org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.configuration.GcConfiguration;
import
org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration;
@@ -164,17 +167,21 @@ import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
+import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
+import org.apache.ignite.internal.tx.impl.PublicApiThreadingIgniteTransactions;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
+import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.tx.IgniteTransactions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
@@ -338,7 +345,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
}
@Test
- public void testEmptyReplicaListener(TestInfo testInfo) throws Exception {
+ public void testZoneReplicaListener(TestInfo testInfo) throws Exception {
startNodes(testInfo, 3);
Assignment replicaAssignment = (Assignment)
AffinityUtils.calculateAssignmentForPartition(
@@ -351,17 +358,54 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
createZone(node, "test_zone", 1, 1);
int zoneId = DistributionZonesTestUtil.getZoneId(node.catalogManager,
"test_zone", node.hybridClock.nowLong());
- createTable(node, "test_zone", "test_table");
- int tableId = TableTestUtils.getTableId(node.catalogManager,
"test_table", node.hybridClock.nowLong());
+ long key = 1;
- node.converter.put(new TablePartitionId(tableId, 0), new
ZonePartitionId(zoneId, 0));
+ {
+ createTable(node, "test_zone", "test_table");
+ int tableId = TableTestUtils.getTableId(node.catalogManager,
"test_table", node.hybridClock.nowLong());
- KeyValueView<Long, Integer> keyValueView =
node.tableManager.table(tableId).keyValueView(Long.class, Integer.class);
+ prepareTableIdToZoneIdConverter(
+ node,
+ new TablePartitionId(tableId, 0),
+ new ZonePartitionId(zoneId, 0)
+ );
+
+ KeyValueView<Long, Integer> keyValueView =
node.tableManager.table(tableId).keyValueView(Long.class, Integer.class);
+
+ int val = 100;
+
+ node.transactions().runInTransaction(tx -> {
+ assertDoesNotThrow(() -> keyValueView.put(tx, key, val));
+
+ assertEquals(val, keyValueView.get(tx, key));
+ });
+
+ node.transactions().runInTransaction(tx -> {
+ // Check the replica read inside the another transaction
+ assertEquals(val, keyValueView.get(tx, key));
+ });
+ }
+
+ {
+ createTable(node, "test_zone", "test_table1");
+ int tableId = TableTestUtils.getTableId(node.catalogManager,
"test_table1", node.hybridClock.nowLong());
+
+ prepareTableIdToZoneIdConverter(
+ node,
+ new TablePartitionId(tableId, 0),
+ new ZonePartitionId(zoneId, 0)
+ );
+
+ KeyValueView<Long, Integer> keyValueView =
node.tableManager.table(tableId).keyValueView(Long.class, Integer.class);
+
+ int val = 200;
- assertDoesNotThrow(() -> keyValueView.put(null, 1L, 1));
+ node.transactions().runInTransaction(tx -> {
+ assertDoesNotThrow(() -> keyValueView.put(tx, key, val));
- // Actually we are testing not the fair put value, but the hardcoded
one from temporary noop replica listener
- assertEquals(-1, keyValueView.get(null, 1L));
+ assertEquals(val, keyValueView.get(tx, key));
+ });
+ }
}
@Test
@@ -682,6 +726,18 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
assertTrue(waitForCondition(() ->
getNode(0).replicaManager.isReplicaStarted(partId), 10_000L));
}
+ private void prepareTableIdToZoneIdConverter(Node node, TablePartitionId
tablePartitionId, ZonePartitionId zonePartitionId) {
+ node.converter.set(request -> {
+ if
(request.groupId().asReplicationGroupId().equals(tablePartitionId)
+ && !(request instanceof WriteIntentSwitchReplicaRequest)) {
+ return zonePartitionId;
+ } else {
+ return request.groupId().asReplicationGroupId();
+ }
+ });
+
+ }
+
private Node getNode(int nodeIndex) {
return nodes.get(nodeIndex);
}
@@ -790,12 +846,15 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
private final ScheduledExecutorService rebalanceScheduler;
- private final Map<ReplicationGroupId, ReplicationGroupId> converter =
new ConcurrentHashMap<>();
+ private AtomicReference<Function<ReplicaRequest, ReplicationGroupId>>
converter =
+ new AtomicReference<>(request ->
request.groupId().asReplicationGroupId());
private final LogStorageFactory logStorageFactory;
private final IndexMetaStorage indexMetaStorage;
+ private final HybridTimestampTracker observableTimestampTracker = new
HybridTimestampTracker();
+
/**
* Constructor that simply creates a subset of components of this node.
*/
@@ -1026,7 +1085,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
raftManager,
view -> new LocalLogStorageFactory(),
ForkJoinPool.commonPool(),
- t -> (converter.get(t) != null) ? converter.get(t) : t
+ t -> converter.get().apply(t)
);
LongSupplier delayDurationMsSupplier = () -> 10L;
@@ -1064,7 +1123,8 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
clusterService.topologyService(),
lowWatermark,
threadPoolsManager.tableIoExecutor(),
- rebalanceScheduler
+ rebalanceScheduler,
+ threadPoolsManager.partitionOperationsExecutor()
);
StorageUpdateConfiguration storageUpdateConfiguration =
clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY);
@@ -1081,7 +1141,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
clusterService.topologyService(),
clusterService.serializationRegistry(),
replicaManager,
- mock(LockManager.class),
+ lockManager,
replicaSvc,
txManager,
dataStorageMgr,
@@ -1097,14 +1157,15 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
distributionZoneManager,
schemaSyncService,
catalogManager,
- new HybridTimestampTracker(),
+ observableTimestampTracker,
placementDriver,
() -> mock(IgniteSql.class),
resourcesRegistry,
lowWatermark,
transactionInflights,
indexMetaStorage,
- logSyncer
+ logSyncer,
+ partitionReplicaLifecycleManager
);
tableManager.setStreamerReceiverRunner(mock(StreamerReceiverRunner.class));
@@ -1119,6 +1180,11 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
);
}
+ private IgniteTransactions transactions() {
+ IgniteTransactionsImpl transactions = new
IgniteTransactionsImpl(txManager, observableTimestampTracker);
+ return new PublicApiThreadingIgniteTransactions(transactions,
ForkJoinPool.commonPool());
+ }
+
private void waitForMetadataCompletenessAtNow() {
assertThat(schemaSyncService.waitForMetadataCompleteness(hybridClock.now()),
willCompleteSuccessfully());
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 1b7e7db986..e1b36dfdf8 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -61,6 +61,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -97,6 +98,7 @@ import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.partition.replicator.snapshot.FailFastSnapshotStorageFactory;
+import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
@@ -162,6 +164,11 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
/** Executor for scheduling rebalance routine. */
private final ScheduledExecutorService rebalanceScheduler;
+ /**
+ * Executes partition operations (that might cause I/O and/or be blocked
on locks).
+ */
+ private final Executor partitionOperationsExecutor;
+
/**
* The constructor.
*
@@ -170,7 +177,9 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
* @param distributionZoneMgr Distribution zone manager.
* @param metaStorageMgr Metastorage manager.
* @param topologyService Topology service.
- * @param ioExecutor Separate executor for IO operations.
+ * @param rebalanceScheduler Executor for scheduling rebalance routine.
+ * @param partitionOperationsExecutor Striped executor on which partition
operations (potentially requiring I/O with storages)
+ * will be executed.
*/
public PartitionReplicaLifecycleManager(
CatalogManager catalogMgr,
@@ -180,7 +189,8 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
TopologyService topologyService,
LowWatermark lowWatermark,
ExecutorService ioExecutor,
- ScheduledExecutorService rebalanceScheduler
+ ScheduledExecutorService rebalanceScheduler,
+ Executor partitionOperationsExecutor
) {
this.catalogMgr = catalogMgr;
this.replicaMgr = replicaMgr;
@@ -190,6 +200,7 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
this.lowWatermark = lowWatermark;
this.ioExecutor = ioExecutor;
this.rebalanceScheduler = rebalanceScheduler;
+ this.partitionOperationsExecutor = partitionOperationsExecutor;
pendingAssignmentsRebalanceListener =
createPendingAssignmentsRebalanceListener();
stableAssignmentsRebalanceListener =
createStableAssignmentsRebalanceListener();
@@ -404,7 +415,8 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
try {
return replicaMgr.startReplica(
replicaGrpId,
- new ZonePartitionReplicaListener(),
+ (raftClient) -> new ZonePartitionReplicaListener(
+ new ExecutorInclinedRaftCommandRunner(raftClient,
partitionOperationsExecutor)),
new FailFastSnapshotStorageFactory(),
stablePeersAndLearners,
raftGroupListener,
@@ -427,13 +439,6 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
});
}
- private boolean shouldStartLocally(Assignments assignments) {
- return assignments
- .nodes()
- .stream()
- .anyMatch(a -> a.consistentId().equals(localNode().name()));
- }
-
private ClusterNode localNode() {
return topologyService.localMember();
}
@@ -579,6 +584,17 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
return assignmentsFuture;
}
+ /**
+ * Check if the current node has local replica for this {@link
ZonePartitionId}.
+ *
+ * @param zonePartitionId Zone partition id.
+ * @return true if local replica exists, false otherwise.
+ */
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22624 replace this
method by the replicas await process.
+ public boolean hasLocalPartition(ZonePartitionId zonePartitionId) {
+ return replicationGroupIds.contains(zonePartitionId);
+ }
+
/**
* Creates meta storage listener for pending assignments updates.
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionRaftListener.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionRaftListener.java
index 47523b102b..2895a0b3c0 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionRaftListener.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionRaftListener.java
@@ -17,27 +17,71 @@
package org.apache.ignite.internal.partition.replicator;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+
+import java.io.Serializable;
import java.nio.file.Path;
import java.util.Iterator;
+import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand;
+import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
+import org.apache.ignite.internal.tx.TransactionResult;
/**
* RAFT listener for the zone partition.
*/
public class ZonePartitionRaftListener implements RaftGroupListener {
+ private static final IgniteLogger LOG =
Loggers.forClass(ZonePartitionRaftListener.class);
@Override
public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
- // No-op
+ iterator.forEachRemaining((CommandClosure<? extends ReadCommand> clo)
-> {
+ Command command = clo.command();
+
+ assert false : "No read commands expected, [cmd=" + command + ']';
+ });
}
@Override
public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
- // No-op
+ iterator.forEachRemaining((CommandClosure<? extends WriteCommand> clo)
-> {
+ Command command = clo.command();
+
+ Serializable result = null;
+
+ try {
+ if (command instanceof FinishTxCommand) {
+ FinishTxCommand cmd = (FinishTxCommand) command;
+
+ result = new TransactionResult(cmd.commit() ? COMMITTED :
ABORTED, cmd.commitTimestamp());
+ } else {
+ LOG.debug("Message type " + command.getClass() + " is not
supported by the zone partition RAFT listener yet");
+ }
+ } catch (IgniteInternalException e) {
+ result = e;
+ } catch (CompletionException e) {
+ result = e.getCause();
+ } catch (Throwable t) {
+ LOG.error(
+ "Unknown error while processing command
[commandIndex={}, commandTerm={}, command={}]",
+ t,
+ clo.index(), clo.index(), command
+ );
+
+ throw t;
+ }
+
+ clo.result(result);
+ });
}
@Override
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
index 49142b0a29..45c30025fd 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
@@ -17,31 +17,107 @@
package org.apache.ignite.internal.partition.replicator;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.service.RaftCommandRunner;
import org.apache.ignite.internal.replicator.ReplicaResult;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.internal.schema.BinaryRowImpl;
+import org.apache.ignite.internal.replicator.message.TableAware;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
/**
* Zone partition replica listener.
*/
public class ZonePartitionReplicaListener implements ReplicaListener {
+ private static final TxMessagesFactory TX_MESSAGES_FACTORY = new
TxMessagesFactory();
+
+ private static final IgniteLogger LOG =
Loggers.forClass(ZonePartitionReplicaListener.class);
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22624 await for the
table replica listener if needed.
+ private final Map<TablePartitionId, ReplicaListener> replicas = new
ConcurrentHashMap<>();
+
+ private final RaftCommandRunner raftClient;
+
+ /**
+ * The constructor.
+ *
+ * @param raftClient Raft client.
+ */
+ public ZonePartitionReplicaListener(RaftCommandRunner raftClient) {
+ this.raftClient = raftClient;
+ }
@Override
public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request,
String senderId) {
- var res = new BinaryRowImpl(
- 1,
- new BinaryTupleBuilder(2).appendLong(1).appendInt(-1).build());
- return CompletableFuture.completedFuture(new ReplicaResult(
- res,
- CompletableFuture.completedFuture(res)));
+ if (!(request instanceof TableAware)) {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22620
implement ReplicaSafeTimeSyncRequest processing.
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22621
implement zone-based transaction storage
+ // and txn messages processing
+ if (request instanceof TxFinishReplicaRequest) {
+ TxFinishReplicaRequest txFinishReplicaRequest =
(TxFinishReplicaRequest) request;
+
+ TxFinishReplicaRequest requestForTableListener =
TX_MESSAGES_FACTORY.txFinishReplicaRequest()
+ .txId(txFinishReplicaRequest.txId())
+
.commitPartitionId(txFinishReplicaRequest.commitPartitionId())
+ .timestamp(txFinishReplicaRequest.timestamp())
+ .groupId(txFinishReplicaRequest.commitPartitionId())
+ .groups(txFinishReplicaRequest.groups())
+ .commit(txFinishReplicaRequest.commit())
+
.commitTimestamp(txFinishReplicaRequest.commitTimestamp())
+
.enlistmentConsistencyToken(txFinishReplicaRequest.enlistmentConsistencyToken())
+ .build();
+
+ return replicas
+
.get(txFinishReplicaRequest.commitPartitionId().asTablePartitionId())
+ .invoke(requestForTableListener, senderId);
+ } else {
+ LOG.debug("Non table request is not supported by the zone
partition yet " + request);
+ }
+
+ return nullCompletedFuture();
+ } else {
+ int partitionId;
+
+ ReplicationGroupId replicationGroupId =
request.groupId().asReplicationGroupId();
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 Refine
this code when the zone based replication will done.
+ if (replicationGroupId instanceof TablePartitionId) {
+ partitionId = ((TablePartitionId)
replicationGroupId).partitionId();
+ } else if (replicationGroupId instanceof ZonePartitionId) {
+ partitionId = ((ZonePartitionId)
replicationGroupId).partitionId();
+ } else {
+ throw new IllegalArgumentException("Requests with replication
group type "
+ + request.groupId().getClass() + " is not supported");
+ }
+
+ return replicas.get(new TablePartitionId(((TableAware)
request).tableId(), partitionId))
+ .invoke(request, senderId);
+ }
}
@Override
public RaftCommandRunner raftClient() {
- throw new UnsupportedOperationException("Raft client is not defined in
ZoneReplicaListener");
+ return raftClient;
+ }
+
+ /**
+ * Add table partition listener to the current zone replica listener.
+ *
+ * @param partitionId Table partition id.
+ * @param replicaListener Table replica listener.
+ */
+ public void addTableReplicaListener(TablePartitionId partitionId,
Function<RaftCommandRunner, ReplicaListener> replicaListener) {
+ replicas.put(partitionId, replicaListener.apply(raftClient));
}
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyDirectMultiRowReplicaRequest.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyDirectMultiRowReplicaRequest.java
index 7aa1c16aab..7480f17363 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyDirectMultiRowReplicaRequest.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyDirectMultiRowReplicaRequest.java
@@ -20,10 +20,11 @@ package
org.apache.ignite.internal.partition.replicator.network.replication;
import org.apache.ignite.internal.network.annotations.Transferable;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import
org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest;
+import org.apache.ignite.internal.replicator.message.TableAware;
/**
* Read only direct multi row replica request.
*/
@Transferable(PartitionReplicationMessageGroup.RO_DIRECT_MULTI_ROW_REPLICA_REQUEST)
-public interface ReadOnlyDirectMultiRowReplicaRequest extends
MultipleRowPkReplicaRequest, ReadOnlyDirectReplicaRequest {
+public interface ReadOnlyDirectMultiRowReplicaRequest extends
MultipleRowPkReplicaRequest, ReadOnlyDirectReplicaRequest, TableAware {
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyDirectSingleRowReplicaRequest.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyDirectSingleRowReplicaRequest.java
index 52a73c9cb4..51309a7fcc 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyDirectSingleRowReplicaRequest.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyDirectSingleRowReplicaRequest.java
@@ -20,11 +20,12 @@ package
org.apache.ignite.internal.partition.replicator.network.replication;
import org.apache.ignite.internal.network.annotations.Transferable;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import
org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest;
+import org.apache.ignite.internal.replicator.message.TableAware;
/**
* Read only direct node single row replica request.
* The type of RO request never waits and is executed at the current node
timestamp.
*/
@Transferable(PartitionReplicationMessageGroup.RO_DIRECT_SINGLE_ROW_REPLICA_REQUEST)
-public interface ReadOnlyDirectSingleRowReplicaRequest extends
SingleRowPkReplicaRequest, ReadOnlyDirectReplicaRequest {
+public interface ReadOnlyDirectSingleRowReplicaRequest extends
SingleRowPkReplicaRequest, ReadOnlyDirectReplicaRequest, TableAware {
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyMultiRowPkReplicaRequest.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyMultiRowPkReplicaRequest.java
index f10384436e..4a8e6354b0 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyMultiRowPkReplicaRequest.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyMultiRowPkReplicaRequest.java
@@ -19,10 +19,11 @@ package
org.apache.ignite.internal.partition.replicator.network.replication;
import org.apache.ignite.internal.network.annotations.Transferable;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.replicator.message.TableAware;
/**
* Read only multi row replica request.
*/
@Transferable(PartitionReplicationMessageGroup.RO_MULTI_ROW_REPLICA_REQUEST)
-public interface ReadOnlyMultiRowPkReplicaRequest extends
MultipleRowPkReplicaRequest, ReadOnlyReplicaRequest {
+public interface ReadOnlyMultiRowPkReplicaRequest extends
MultipleRowPkReplicaRequest, ReadOnlyReplicaRequest, TableAware {
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyScanRetrieveBatchReplicaRequest.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyScanRetrieveBatchReplicaRequest.java
index b4d65621bf..bd17616633 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyScanRetrieveBatchReplicaRequest.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyScanRetrieveBatchReplicaRequest.java
@@ -20,12 +20,13 @@ package
org.apache.ignite.internal.partition.replicator.network.replication;
import java.util.UUID;
import org.apache.ignite.internal.network.annotations.Transferable;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.replicator.message.TableAware;
/**
* Scan retrieve batch replica request.
*/
@Transferable(PartitionReplicationMessageGroup.RO_SCAN_RETRIEVE_BATCH_REPLICA_REQUEST)
-public interface ReadOnlyScanRetrieveBatchReplicaRequest extends
ScanRetrieveBatchReplicaRequest, ReadOnlyReplicaRequest {
+public interface ReadOnlyScanRetrieveBatchReplicaRequest extends
ScanRetrieveBatchReplicaRequest, ReadOnlyReplicaRequest, TableAware {
UUID transactionId();
/**
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlySingleRowPkReplicaRequest.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlySingleRowPkReplicaRequest.java
index c841fc0c05..82bd1f6fc5 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlySingleRowPkReplicaRequest.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlySingleRowPkReplicaRequest.java
@@ -19,10 +19,11 @@ package
org.apache.ignite.internal.partition.replicator.network.replication;
import org.apache.ignite.internal.network.annotations.Transferable;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.replicator.message.TableAware;
/**
* Read only single row replica request.
*/
@Transferable(PartitionReplicationMessageGroup.RO_SINGLE_ROW_REPLICA_REQUEST)
-public interface ReadOnlySingleRowPkReplicaRequest extends
SingleRowPkReplicaRequest, ReadOnlyReplicaRequest {
+public interface ReadOnlySingleRowPkReplicaRequest extends
SingleRowPkReplicaRequest, ReadOnlyReplicaRequest, TableAware {
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteMultiRowPkReplicaRequest.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteMultiRowPkReplicaRequest.java
index 1be4c532eb..919e93dff0 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteMultiRowPkReplicaRequest.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteMultiRowPkReplicaRequest.java
@@ -19,12 +19,13 @@ package
org.apache.ignite.internal.partition.replicator.network.replication;
import org.apache.ignite.internal.network.annotations.Transferable;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.replicator.message.TableAware;
/**
* Read-write multi-row replica request involving table's Primary Keys.
*/
@Transferable(PartitionReplicationMessageGroup.RW_MULTI_ROW_PK_REPLICA_REQUEST)
-public interface ReadWriteMultiRowPkReplicaRequest extends
MultipleRowPkReplicaRequest, ReadWriteReplicaRequest {
+public interface ReadWriteMultiRowPkReplicaRequest extends
MultipleRowPkReplicaRequest, ReadWriteReplicaRequest, TableAware {
/**
* Disable delayed ack optimization.
*
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteMultiRowReplicaRequest.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteMultiRowReplicaRequest.java
index 0e1e26066b..c8d85755d2 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteMultiRowReplicaRequest.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteMultiRowReplicaRequest.java
@@ -20,13 +20,14 @@ package
org.apache.ignite.internal.partition.replicator.network.replication;
import java.util.BitSet;
import org.apache.ignite.internal.network.annotations.Transferable;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.replicator.message.TableAware;
import org.jetbrains.annotations.Nullable;
/**
* Read-write multi-row replica request.
*/
@Transferable(PartitionReplicationMessageGroup.RW_MULTI_ROW_REPLICA_REQUEST)
-public interface ReadWriteMultiRowReplicaRequest extends
MultipleRowReplicaRequest, ReadWriteReplicaRequest {
+public interface ReadWriteMultiRowReplicaRequest extends
MultipleRowReplicaRequest, ReadWriteReplicaRequest, TableAware {
/**
* Disable delayed ack optimization.
*
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteScanRetrieveBatchReplicaRequest.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteScanRetrieveBatchReplicaRequest.java
index 2d0cbb02ea..be2f0f24ff 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteScanRetrieveBatchReplicaRequest.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteScanRetrieveBatchReplicaRequest.java
@@ -19,10 +19,11 @@ package
org.apache.ignite.internal.partition.replicator.network.replication;
import org.apache.ignite.internal.network.annotations.Transferable;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.replicator.message.TableAware;
/**
* Scan retrieve batch replica request.
*/
@Transferable(PartitionReplicationMessageGroup.RW_SCAN_RETRIEVE_BATCH_REPLICA_REQUEST)
-public interface ReadWriteScanRetrieveBatchReplicaRequest extends
ScanRetrieveBatchReplicaRequest, ReadWriteReplicaRequest {
+public interface ReadWriteScanRetrieveBatchReplicaRequest extends
ScanRetrieveBatchReplicaRequest, ReadWriteReplicaRequest, TableAware {
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSingleRowPkReplicaRequest.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSingleRowPkReplicaRequest.java
index 2f27d6b42d..e0c13a3aec 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSingleRowPkReplicaRequest.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSingleRowPkReplicaRequest.java
@@ -19,10 +19,11 @@ package
org.apache.ignite.internal.partition.replicator.network.replication;
import org.apache.ignite.internal.network.annotations.Transferable;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.replicator.message.TableAware;
/**
* Read-write single-row replica request involving a table's Primary Key..
*/
@Transferable(PartitionReplicationMessageGroup.RW_SINGLE_ROW_PK_REPLICA_REQUEST)
-public interface ReadWriteSingleRowPkReplicaRequest extends
SingleRowPkReplicaRequest, ReadWriteReplicaRequest {
+public interface ReadWriteSingleRowPkReplicaRequest extends
SingleRowPkReplicaRequest, ReadWriteReplicaRequest, TableAware {
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSingleRowReplicaRequest.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSingleRowReplicaRequest.java
index a8d5e266b7..4f33af949a 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSingleRowReplicaRequest.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSingleRowReplicaRequest.java
@@ -19,10 +19,11 @@ package
org.apache.ignite.internal.partition.replicator.network.replication;
import org.apache.ignite.internal.network.annotations.Transferable;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.replicator.message.TableAware;
/**
* Read-write single-row replica request.
*/
@Transferable(PartitionReplicationMessageGroup.RW_SINGLE_ROW_REPLICA_REQUEST)
-public interface ReadWriteSingleRowReplicaRequest extends
SingleRowReplicaRequest, ReadWriteReplicaRequest {
+public interface ReadWriteSingleRowReplicaRequest extends
SingleRowReplicaRequest, ReadWriteReplicaRequest, TableAware {
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSwapRowReplicaRequest.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSwapRowReplicaRequest.java
index 42322a70ef..65ee84ac86 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSwapRowReplicaRequest.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSwapRowReplicaRequest.java
@@ -19,11 +19,12 @@ package
org.apache.ignite.internal.partition.replicator.network.replication;
import org.apache.ignite.internal.network.annotations.Transferable;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.replicator.message.TableAware;
/**
* Read-write dual row replica request.
*/
@Transferable(PartitionReplicationMessageGroup.RW_DUAL_ROW_REPLICA_REQUEST)
-public interface ReadWriteSwapRowReplicaRequest extends SwapRowReplicaRequest,
ReadWriteReplicaRequest {
+public interface ReadWriteSwapRowReplicaRequest extends SwapRowReplicaRequest,
ReadWriteReplicaRequest, TableAware {
}
diff --git
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index 6ebb2e6292..51f54bd185 100644
---
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -505,7 +505,7 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
);
serviceFutures.add(raftClientFut);
- CompletableFuture<Boolean> replicaFuture =
raftClientFut.thenCompose(raftClient -> {
+ CompletableFuture<Replica> replicaFuture =
raftClientFut.thenCompose(raftClient -> {
try {
ReplicaListener listener = new ReplicaListener() {
@Override
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
index 3974c139fa..b9141c3f98 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
@@ -21,6 +21,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.network.NetworkMessage;
import
org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
/**
@@ -36,6 +37,14 @@ public interface Replica {
@Deprecated(forRemoval = true)
TopologyAwareRaftGroupService raftClient();
+ /**
+ * Returns replica's listener.
+ *
+ * @return Replica's listener.
+ */
+ @Deprecated(forRemoval = true)
+ ReplicaListener listener();
+
/**
* Processes a replication request on the replica.
*
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
index 86dc454979..ef1c673869 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java
@@ -133,6 +133,11 @@ public class ReplicaImpl implements Replica {
raftClient.subscribeLeader(this::onLeaderElected);
}
+ @Override
+ public ReplicaListener listener() {
+ return listener;
+ }
+
@Override
public final TopologyAwareRaftGroupService raftClient() {
return (TopologyAwareRaftGroupService) listener.raftClient();
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 06f87a580a..344b966318 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -208,7 +208,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
/* Temporary converter to support the zone based partitions in tests. **/
// TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this
code
- private Function<ReplicationGroupId, ReplicationGroupId> groupIdConverter
= Function.identity();
+ private Function<ReplicaRequest, ReplicationGroupId> groupIdConverter = r
-> r.groupId().asReplicationGroupId();
/**
* Constructor for a replica service.
@@ -246,7 +246,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
RaftManager raftManager,
LogStorageFactoryCreator volatileLogStorageFactoryCreator,
Executor replicaStartStopExecutor,
- Function<ReplicationGroupId, ReplicationGroupId> groupIdConverter
+ Function<ReplicaRequest, ReplicationGroupId> groupIdConverter
) {
this(
nodeName,
@@ -392,7 +392,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
return;
}
- ReplicationGroupId groupId =
groupIdConverter.apply(request.groupId().asReplicationGroupId());
+ ReplicationGroupId groupId = groupIdConverter.apply(request);
String senderConsistentId = sender.name();
@@ -573,7 +573,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
});
}
- private CompletableFuture<Boolean> startReplicaInternal(
+ private CompletableFuture<Replica> startReplicaInternal(
RaftGroupEventsListener raftGroupEventsListener,
RaftGroupListener raftGroupListener,
boolean isVolatileStorage,
@@ -628,7 +628,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
*
* @return Future that promises ready new replica when done.
*/
- public CompletableFuture<Boolean> startReplica(
+ public CompletableFuture<Replica> startReplica(
RaftGroupEventsListener raftGroupEventsListener,
RaftGroupListener raftGroupListener,
boolean isVolatileStorage,
@@ -663,7 +663,6 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
* Starts a replica. If a replica with the same partition id already
exists, the method throws an exception.
*
* @param replicaGrpId Replication group id.
- * @param listener Replica listener.
* @param snapshotStorageFactory Snapshot storage factory for raft group
option's parameterization.
* @param newConfiguration A configuration for new raft group.
* @param raftGroupListener Raft group listener for raft group starting.
@@ -674,7 +673,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
*/
public CompletableFuture<Replica> startReplica(
ReplicationGroupId replicaGrpId,
- ReplicaListener listener,
+ Function<RaftGroupService, ReplicaListener> listener,
SnapshotStorageFactory snapshotStorageFactory,
PeersAndLearners newConfiguration,
RaftGroupListener raftGroupListener,
@@ -706,7 +705,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
Replica newReplica = new ZonePartitionReplicaImpl(
replicaGrpId,
- listener,
+ listener.apply(raftClient),
raftClient
);
@@ -756,7 +755,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
*/
@VisibleForTesting
@Deprecated
- public CompletableFuture<Boolean> startReplica(
+ public CompletableFuture<Replica> startReplica(
ReplicationGroupId replicaGrpId,
PeersAndLearners newConfiguration,
Consumer<RaftGroupService> updateTableRaftService,
@@ -772,8 +771,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
updateTableRaftService.accept(raftClient);
return createListener.apply(raftClient);
}, replicasCreationExecutor)
- .thenCompose(replicaListener -> startReplica(replicaGrpId,
storageIndexTracker, completedFuture(replicaListener)))
- .thenApply(r -> true);
+ .thenCompose(replicaListener -> startReplica(replicaGrpId,
storageIndexTracker, completedFuture(replicaListener)));
}
/**
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
index 5cd05a8a5e..993219518f 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
@@ -30,12 +30,11 @@ import
org.apache.ignite.internal.replicator.message.ReplicaRequest;
* Replica for the zone based partitions.
*/
public class ZonePartitionReplicaImpl implements Replica {
-
private final ReplicationGroupId replicaGrpId;
private final ReplicaListener listener;
- TopologyAwareRaftGroupService raftClient;
+ private final TopologyAwareRaftGroupService raftClient;
/**
* Constructor.
@@ -54,6 +53,11 @@ public class ZonePartitionReplicaImpl implements Replica {
this.raftClient = raftClient;
}
+ @Override
+ public ReplicaListener listener() {
+ return listener;
+ }
+
@Override
public TopologyAwareRaftGroupService raftClient() {
return raftClient;
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSwapRowReplicaRequest.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/TableAware.java
similarity index 63%
copy from
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSwapRowReplicaRequest.java
copy to
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/TableAware.java
index 42322a70ef..91525c3abf 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadWriteSwapRowReplicaRequest.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/TableAware.java
@@ -15,15 +15,14 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.partition.replicator.network.replication;
+package org.apache.ignite.internal.replicator.message;
-import org.apache.ignite.internal.network.annotations.Transferable;
-import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.network.NetworkMessage;
/**
- * Read-write dual row replica request.
+ * Generic interface for all messages about concrete table.
*/
-@Transferable(PartitionReplicationMessageGroup.RW_DUAL_ROW_REPLICA_REQUEST)
-public interface ReadWriteSwapRowReplicaRequest extends SwapRowReplicaRequest,
ReadWriteReplicaRequest {
-
+public interface TableAware extends NetworkMessage {
+ /** Table Id. */
+ int tableId();
}
diff --git
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
index fcd13cc344..c7999187c5 100644
---
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
+++
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
@@ -180,7 +180,7 @@ public class ReplicaManagerTest extends
BaseIgniteAbstractTest {
String nodeName = testNodeName(testInfo, 0);
PeersAndLearners newConfiguration =
PeersAndLearners.fromConsistentIds(Set.of(nodeName));
- CompletableFuture<Boolean> startReplicaFuture =
replicaManager.startReplica(
+ CompletableFuture<Replica> startReplicaFuture =
replicaManager.startReplica(
groupId,
newConfiguration,
(unused) -> { },
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index ad99548e69..e2d643b558 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -145,6 +145,7 @@ import
org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.network.recovery.VaultStaleIds;
import
org.apache.ignite.internal.network.scalecube.TestScaleCubeClusterServiceFactory;
import
org.apache.ignite.internal.network.wrapper.JumpToExecutorByConsistentIdAfterSend;
+import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import org.apache.ignite.internal.placementdriver.PlacementDriverManager;
import
org.apache.ignite.internal.placementdriver.PrimaryReplicaAwaitTimeoutException;
@@ -654,7 +655,18 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
lowWatermark,
transactionInflights,
indexMetaStorage,
- logSyncer
+ logSyncer,
+ new PartitionReplicaLifecycleManager(
+ catalogManager,
+ replicaMgr,
+ distributionZoneManager,
+ metaStorageMgr,
+ clusterSvc.topologyService(),
+ lowWatermark,
+ threadPoolsManager.tableIoExecutor(),
+ rebalanceScheduler,
+ threadPoolsManager.partitionOperationsExecutor()
+ )
);
var indexManager = new IndexManager(
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 3413714268..b836ebb361 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -751,7 +751,8 @@ public class IgniteImpl implements Ignite {
clusterSvc.topologyService(),
lowWatermark,
threadPoolsManager.tableIoExecutor(),
- rebalanceScheduler
+ rebalanceScheduler,
+ threadPoolsManager.partitionOperationsExecutor()
);
TransactionConfiguration txConfig =
clusterConfigRegistry.getConfiguration(TransactionConfiguration.KEY);
@@ -829,7 +830,8 @@ public class IgniteImpl implements Ignite {
lowWatermark,
transactionInflights,
indexMetaStorage,
- logSyncer
+ logSyncer,
+ partitionReplicaLifecycleManager
);
disasterRecoveryManager = new DisasterRecoveryManager(
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
index 5689c1e6e0..05fbd824d9 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
@@ -148,6 +148,7 @@ public final class UpdatableTableImpl implements
UpdatableTable {
ReplicaRequest request =
PARTITION_REPLICATION_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
.groupId(serializeTablePartitionId(partGroupId))
+ .tableId(tableId)
.commitPartitionId(serializeTablePartitionId(commitPartitionId))
.schemaVersion(partToRows.getValue().get(0).schemaVersion())
.binaryTuples(binaryRowsToBuffers(partToRows.getValue()))
@@ -253,6 +254,7 @@ public final class UpdatableTableImpl implements
UpdatableTable {
ReadWriteMultiRowReplicaRequest request =
PARTITION_REPLICATION_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
.groupId(serializeTablePartitionId(partGroupId))
+ .tableId(tableId)
.commitPartitionId(serializeTablePartitionId(commitPartitionId))
.schemaVersion(rowBatch.requestedRows.get(0).schemaVersion())
.binaryTuples(binaryRowsToBuffers(rowBatch.requestedRows))
@@ -322,6 +324,7 @@ public final class UpdatableTableImpl implements
UpdatableTable {
ReplicaRequest request =
PARTITION_REPLICATION_MESSAGES_FACTORY.readWriteMultiRowPkReplicaRequest()
.groupId(serializeTablePartitionId(partGroupId))
+ .tableId(tableId)
.commitPartitionId(serializeTablePartitionId(commitPartitionId))
.schemaVersion(partToRows.getValue().get(0).schemaVersion())
.primaryKeys(serializePrimaryKeys(partToRows.getValue()))
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index cf81093dec..51c6fa87b4 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -113,6 +113,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
*/
@ExtendWith(ConfigurationExtension.class)
public class ReplicaUnavailableTest extends IgniteAbstractTest {
+ private static final int TABLE_ID = 1;
+
private static final String NODE_NAME = "client";
private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
@@ -222,7 +224,7 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
public void testWithReplicaStartedAfterRequestSending() throws Exception {
ClusterNode clusterNode =
clusterService.topologyService().localMember();
- TablePartitionId tablePartitionId = new TablePartitionId(1, 1);
+ TablePartitionId tablePartitionId = new TablePartitionId(TABLE_ID, 1);
ReadWriteSingleRowReplicaRequest request =
getRequest(tablePartitionId);
@@ -266,6 +268,7 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
return tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(toTablePartitionIdMessage(replicaMessageFactory,
tablePartitionId))
+ .tableId(TABLE_ID)
.transactionId(TestTransactionIds.newTransactionId())
.commitPartitionId(tablePartitionId())
.timestamp(clock.now())
@@ -281,7 +284,7 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
public void testStopReplicaException() {
ClusterNode clusterNode =
clusterService.topologyService().localMember();
- TablePartitionId tablePartitionId = new TablePartitionId(1, 1);
+ TablePartitionId tablePartitionId = new TablePartitionId(TABLE_ID, 1);
ReadWriteSingleRowReplicaRequest request =
getRequest(tablePartitionId);
@@ -316,7 +319,7 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
public void testWithNotStartedReplica() {
ClusterNode clusterNode =
clusterService.topologyService().localMember();
- TablePartitionId tablePartitionId = new TablePartitionId(1, 1);
+ TablePartitionId tablePartitionId = new TablePartitionId(TABLE_ID, 1);
ReadWriteSingleRowReplicaRequest request =
getRequest(tablePartitionId);
@@ -346,7 +349,7 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
public void testWithNotReadyReplica() {
ClusterNode clusterNode =
clusterService.topologyService().localMember();
- TablePartitionId tablePartitionId = new TablePartitionId(1, 1);
+ TablePartitionId tablePartitionId = new TablePartitionId(TABLE_ID, 1);
PeersAndLearners newConfiguration =
PeersAndLearners.fromConsistentIds(Set.of(clusterNode.name()));
@@ -397,7 +400,7 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
private TablePartitionIdMessage tablePartitionId() {
return replicaMessageFactory.tablePartitionIdMessage()
- .tableId(1)
+ .tableId(TABLE_ID)
.partitionId(1)
.build();
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 9dae96eafd..2eecfa4732 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -154,6 +154,7 @@ import
org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import
org.apache.ignite.internal.pagememory.configuration.schema.PersistentPageMemoryProfileConfigurationSchema;
import
org.apache.ignite.internal.pagememory.configuration.schema.VolatilePageMemoryProfileConfigurationSchema;
+import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.Loza;
@@ -625,7 +626,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
}
/**
- * Test checks rebances from [A,B,C] to [A,B] and then again to [A,B,C].
+ * Test checks rebalances from [A,B,C] to [A,B] and then again to [A,B,C].
* In this case the raft group node and {@link Replica} are started only
once on each node.
*
* <p>1. We have an in-progress rebalance and current metastore keys:
@@ -1351,7 +1352,18 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
lowWatermark,
transactionInflights,
indexMetaStorage,
- logSyncer
+ logSyncer,
+ new PartitionReplicaLifecycleManager(
+ catalogManager,
+ replicaManager,
+ distributionZoneManager,
+ metaStorageManager,
+ clusterService.topologyService(),
+ lowWatermark,
+ threadPoolsManager.tableIoExecutor(),
+ rebalanceScheduler,
+ threadPoolsManager.partitionOperationsExecutor()
+ )
) {
@Override
protected TxStateTableStorage createTxStateTableStorage(
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 392ef322af..e2aa8d3e5e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -148,6 +148,8 @@ import
org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
+import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
+import
org.apache.ignite.internal.partition.replicator.ZonePartitionReplicaListener;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
@@ -156,6 +158,7 @@ import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
@@ -165,6 +168,7 @@ import
org.apache.ignite.internal.replicator.ReplicaManager.WeakReplicaStopReaso
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -408,6 +412,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
private final String nodeName;
+ private final PartitionReplicaLifecycleManager
partitionReplicaLifecycleManager;
+
private long implicitTransactionTimeout;
private int attemptsObtainLock;
@@ -446,6 +452,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
* @param lowWatermark Low watermark.
* @param transactionInflights Transaction inflights.
* @param indexMetaStorage Index meta storage.
+ * @param partitionReplicaLifecycleManager Partition replica lifecycle
manager.
*/
public TableManager(
String nodeName,
@@ -480,7 +487,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
LowWatermark lowWatermark,
TransactionInflights transactionInflights,
IndexMetaStorage indexMetaStorage,
- LogSyncer logSyncer
+ LogSyncer logSyncer,
+ PartitionReplicaLifecycleManager partitionReplicaLifecycleManager
) {
this.topologyService = topologyService;
this.replicaMgr = replicaMgr;
@@ -507,6 +515,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
this.txCfg = txCfg;
this.nodeName = nodeName;
this.indexMetaStorage = indexMetaStorage;
+ this.partitionReplicaLifecycleManager =
partitionReplicaLifecycleManager;
this.executorInclinedSchemaSyncService = new
ExecutorInclinedSchemaSyncService(schemaSyncService,
partitionOperationsExecutor);
this.executorInclinedPlacementDriver = new
ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
@@ -610,6 +619,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX),
assignmentsSwitchRebalanceListener);
catalogService.listen(CatalogEvent.TABLE_CREATE, parameters ->
onTableCreate((CreateTableEventParameters) parameters));
+ catalogService.listen(CatalogEvent.TABLE_CREATE, parameters ->
+
prepareTableResourcesAndLoadToZoneReplica((CreateTableEventParameters)
parameters));
catalogService.listen(CatalogEvent.TABLE_DROP,
fromConsumer(this::onTableDrop));
catalogService.listen(CatalogEvent.TABLE_ALTER, parameters -> {
if (parameters instanceof RenameTableEventParameters) {
@@ -635,6 +646,133 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
});
}
+ private CompletableFuture<Boolean>
prepareTableResourcesAndLoadToZoneReplica(CreateTableEventParameters
parameters) {
+ if (!PartitionReplicaLifecycleManager.ENABLED) {
+ return completedFuture(false);
+ }
+
+ long causalityToken = parameters.causalityToken();
+ CatalogTableDescriptor tableDescriptor = parameters.tableDescriptor();
+ CatalogZoneDescriptor zoneDescriptor =
getZoneDescriptor(tableDescriptor, parameters.catalogVersion());
+
+ TableImpl table = createTableImpl(causalityToken, tableDescriptor,
zoneDescriptor);
+
+ tablesVv.update(causalityToken, (ignore, e) -> inBusyLock(busyLock, ()
-> {
+ if (e != null) {
+ return failedFuture(e);
+ }
+
+ return schemaManager.schemaRegistry(causalityToken,
parameters.tableId()).thenAccept(table::schemaView);
+ }));
+
+ // NB: all vv.update() calls must be made from the synchronous part of
the method (not in thenCompose()/etc!).
+ CompletableFuture<?> localPartsUpdateFuture =
localPartitionsVv.update(causalityToken,
+ (ignore, throwable) -> inBusyLock(busyLock, () ->
nullCompletedFuture().thenComposeAsync((ignored) -> {
+ PartitionSet parts = new BitSetPartitionSet();
+
+ for (int i = 0; i < zoneDescriptor.partitions(); i++) {
+ if
(partitionReplicaLifecycleManager.hasLocalPartition(new
ZonePartitionId(zoneDescriptor.id(), i))) {
+ parts.set(i);
+ }
+ }
+
+ return getOrCreatePartitionStorages(table,
parts).thenAccept(u -> localPartsByTableId.put(parameters.tableId(), parts));
+ }, ioExecutor)));
+
+ CompletableFuture<?> tablesByIdFuture = tablesVv.get(causalityToken);
+
+ CompletableFuture<?> createPartsFut =
assignmentsUpdatedVv.update(causalityToken, (token, e) -> {
+ if (e != null) {
+ return failedFuture(e);
+ }
+
+ return allOf(localPartsUpdateFuture,
tablesByIdFuture).thenComposeAsync(ignore -> inBusyLock(busyLock, () -> {
+ var startPartsFut = new
ArrayList<CompletableFuture<?>>();
+
+ for (int i = 0; i < zoneDescriptor.partitions(); i++) {
+ if
(partitionReplicaLifecycleManager.hasLocalPartition(new
ZonePartitionId(zoneDescriptor.id(), i))) {
+
startPartsFut.add(preparePartitionResourcesAndLoadToZoneReplica(
+ table,
+ i,
+ zoneDescriptor.id()));
+ }
+ }
+
+ return
allOf(startPartsFut.toArray(CompletableFuture[]::new));
+ }
+ ), ioExecutor);
+ });
+
+ tables.put(parameters.tableId(), table);
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible
performance degradation.
+ return createPartsFut.thenAccept(ignore ->
startedTables.put(parameters.tableId(), table))
+ .thenApply(unused -> false);
+
+ }
+
+ /**
+ * Prepare the table partition resources and load it to the zone-based
replica.
+ *
+ * @param table Table.
+ * @param partId Partition id.
+ * @param zoneId Zone id.
+ * @return Future, which will complete when the table processor loaded to
the zone replica.
+ */
+ private CompletableFuture<Void>
preparePartitionResourcesAndLoadToZoneReplica(
+ TableImpl table,
+ int partId,
+ int zoneId
+ ) {
+ int tableId = table.tableId();
+
+ var internalTbl = (InternalTableImpl) table.internalTable();
+
+ TablePartitionId replicaGrpId = new TablePartitionId(tableId, partId);
+
+ return inBusyLockAsync(busyLock, () -> {
+ var safeTimeTracker = new
PendingComparableValuesTracker<HybridTimestamp,
Void>(HybridTimestamp.MIN_VALUE);
+
+ var storageIndexTracker = new PendingComparableValuesTracker<Long,
Void>(0L);
+
+ PartitionStorages partitionStorages = getPartitionStorages(table,
partId);
+
+ PartitionDataStorage partitionDataStorage =
partitionDataStorage(partitionStorages.getMvPartitionStorage(),
+ internalTbl, partId);
+
+
storageIndexTracker.update(partitionDataStorage.lastAppliedIndex(), null);
+
+ PartitionUpdateHandlers partitionUpdateHandlers =
createPartitionUpdateHandlers(
+ partId,
+ partitionDataStorage,
+ table,
+ safeTimeTracker,
+ storageUpdateConfig
+ );
+
+ internalTbl.updatePartitionTrackers(partId, safeTimeTracker,
storageIndexTracker);
+
+ mvGc.addStorage(replicaGrpId,
partitionUpdateHandlers.gcUpdateHandler);
+
+ Function<RaftCommandRunner, ReplicaListener> createListener =
(raftClient) -> createReplicaListener(
+ replicaGrpId,
+ table,
+ safeTimeTracker,
+ partitionStorages.getMvPartitionStorage(),
+ partitionStorages.getTxStateStorage(),
+ partitionUpdateHandlers,
+ raftClient);
+
+ return replicaMgr.replica(new ZonePartitionId(zoneId, partId))
+ .thenAcceptAsync(zoneReplica ->
+ ((ZonePartitionReplicaListener)
zoneReplica.listener()).addTableReplicaListener(
+ new TablePartitionId(tableId, partId),
createListener
+ ), ioExecutor
+ );
+ });
+ }
+
+
private CompletableFuture<Boolean>
onPrimaryReplicaExpired(PrimaryReplicaEventParameters parameters) {
if
(topologyService.localMember().id().equals(parameters.leaseholderId())) {
TablePartitionId groupId = (TablePartitionId) parameters.groupId();
@@ -998,7 +1136,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
createListener,
storageIndexTracker,
replicaGrpId,
- stablePeersAndLearners);
+ stablePeersAndLearners).thenApply(ignored ->
true);
} catch (NodeStoppingException e) {
throw new AssertionError("Loza was stopped before
Table manager", e);
}
@@ -1054,7 +1192,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
MvPartitionStorage mvPartitionStorage,
TxStateStorage txStatePartitionStorage,
PartitionUpdateHandlers partitionUpdateHandlers,
- RaftGroupService raftClient
+ RaftCommandRunner raftClient
) {
int tableId = tablePartitionId.tableId();
int partId = tablePartitionId.partitionId();
@@ -1199,6 +1337,66 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
}
+ /**
+ * Create table storages and internal instance.
+ *
+ * @param causalityToken Causality token.
+ * @param tableDescriptor Catalog table descriptor.
+ * @param zoneDescriptor Catalog distribution zone descriptor.
+ * @return Table instance.
+ */
+ private TableImpl createTableImpl(
+ long causalityToken,
+ CatalogTableDescriptor tableDescriptor,
+ CatalogZoneDescriptor zoneDescriptor
+ ) {
+ String tableName = tableDescriptor.name();
+
+ LOG.trace("Creating local table: name={}, id={}, token={}",
tableDescriptor.name(), tableDescriptor.id(), causalityToken);
+
+ MvTableStorage tableStorage = createTableStorage(tableDescriptor,
zoneDescriptor);
+ TxStateTableStorage txStateStorage =
createTxStateTableStorage(tableDescriptor, zoneDescriptor);
+
+ int partitions = zoneDescriptor.partitions();
+
+ TableRaftServiceImpl tableRaftService = new TableRaftServiceImpl(
+ tableName,
+ partitions,
+ new Int2ObjectOpenHashMap<>(partitions),
+ topologyService
+ );
+
+ InternalTableImpl internalTable = new InternalTableImpl(
+ tableName,
+ tableDescriptor.id(),
+ partitions,
+ topologyService,
+ txManager,
+ tableStorage,
+ txStateStorage,
+ replicaSvc,
+ clock,
+ observableTimestampTracker,
+ executorInclinedPlacementDriver,
+ tableRaftService,
+ transactionInflights,
+ implicitTransactionTimeout,
+ attemptsObtainLock,
+ this::streamerFlushExecutor,
+ Objects.requireNonNull(streamerReceiverRunner)
+ );
+
+ return new TableImpl(
+ internalTable,
+ lockMgr,
+ schemaVersions,
+ marshallers,
+ sql.get(),
+ tableDescriptor.primaryKeyIndexId()
+ );
+ }
+
+
/**
* Creates local structures for a table.
*
@@ -1257,51 +1455,9 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
CompletableFuture<List<Assignments>> assignmentsFuture,
boolean onNodeRecovery
) {
- String tableName = tableDescriptor.name();
- int tableId = tableDescriptor.id();
+ TableImpl table = createTableImpl(causalityToken, tableDescriptor,
zoneDescriptor);
- LOG.trace("Creating local table: name={}, id={}, token={}",
tableDescriptor.name(), tableDescriptor.id(), causalityToken);
-
- MvTableStorage tableStorage = createTableStorage(tableDescriptor,
zoneDescriptor);
- TxStateTableStorage txStateStorage =
createTxStateTableStorage(tableDescriptor, zoneDescriptor);
-
- int partitions = zoneDescriptor.partitions();
-
- TableRaftServiceImpl tableRaftService = new TableRaftServiceImpl(
- tableName,
- partitions,
- new Int2ObjectOpenHashMap<>(partitions),
- topologyService
- );
-
- InternalTableImpl internalTable = new InternalTableImpl(
- tableName,
- tableId,
- partitions,
- topologyService,
- txManager,
- tableStorage,
- txStateStorage,
- replicaSvc,
- clock,
- observableTimestampTracker,
- executorInclinedPlacementDriver,
- tableRaftService,
- transactionInflights,
- implicitTransactionTimeout,
- attemptsObtainLock,
- this::streamerFlushExecutor,
- Objects.requireNonNull(streamerReceiverRunner)
- );
-
- var table = new TableImpl(
- internalTable,
- lockMgr,
- schemaVersions,
- marshallers,
- sql.get(),
- tableDescriptor.primaryKeyIndexId()
- );
+ int tableId = tableDescriptor.id();
tablesVv.update(causalityToken, (ignore, e) -> inBusyLock(busyLock, ()
-> {
if (e != null) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 49b0df8f9a..fe5e7559cf 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -561,6 +561,7 @@ public class InternalTableImpl implements InternalTable {
Function<Long, ReplicaRequest> mapFunc =
(enlistmentConsistencyToken) ->
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(serializeTablePartitionId(partGroupId))
+ .tableId(tableId)
.timestamp(clock.now())
.transactionId(tx.id())
.scanId(scanId)
@@ -870,6 +871,7 @@ public class InternalTableImpl implements InternalTable {
keyRow,
(groupId, consistencyToken) ->
TABLE_MESSAGES_FACTORY.readOnlyDirectSingleRowReplicaRequest()
.groupId(serializeTablePartitionId(groupId))
+ .tableId(tableId)
.enlistmentConsistencyToken(consistencyToken)
.schemaVersion(keyRow.schemaVersion())
.primaryKey(keyRow.tupleSlice())
@@ -888,6 +890,7 @@ public class InternalTableImpl implements InternalTable {
tx,
(txo, groupId, enlistmentConsistencyToken) ->
TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest()
.groupId(serializeTablePartitionId(groupId))
+ .tableId(tableId)
.schemaVersion(keyRow.schemaVersion())
.primaryKey(keyRow.tupleSlice())
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
@@ -913,6 +916,7 @@ public class InternalTableImpl implements InternalTable {
return replicaSvc.invoke(recipientNode,
TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
.groupId(serializeTablePartitionId(tablePartitionId))
+ .tableId(tableId)
.schemaVersion(keyRow.schemaVersion())
.primaryKey(keyRow.tupleSlice())
.requestTypeInt(RO_GET.ordinal())
@@ -955,6 +959,7 @@ public class InternalTableImpl implements InternalTable {
keyRows,
(groupId, consistencyToken) ->
TABLE_MESSAGES_FACTORY.readOnlyDirectMultiRowReplicaRequest()
.groupId(serializeTablePartitionId(groupId))
+ .tableId(tableId)
.enlistmentConsistencyToken(consistencyToken)
.schemaVersion(keyRows.iterator().next().schemaVersion())
.primaryKeys(serializeBinaryTuples(keyRows))
@@ -994,6 +999,7 @@ public class InternalTableImpl implements InternalTable {
ReadOnlyMultiRowPkReplicaRequest request =
TABLE_MESSAGES_FACTORY.readOnlyMultiRowPkReplicaRequest()
.groupId(serializeTablePartitionId(tablePartitionId))
+ .tableId(tableId)
.schemaVersion(partitionRowBatch.getValue().requestedRows.get(0).schemaVersion())
.primaryKeys(serializeBinaryTuples(partitionRowBatch.getValue().requestedRows))
.requestTypeInt(RO_GET_ALL.ordinal())
@@ -1018,6 +1024,7 @@ public class InternalTableImpl implements InternalTable {
return TABLE_MESSAGES_FACTORY.readWriteMultiRowPkReplicaRequest()
.groupId(serializeTablePartitionId(groupId))
+ .tableId(tableId)
.commitPartitionId(serializeTablePartitionId(tx.commitPartition()))
.schemaVersion(rows.iterator().next().schemaVersion())
.primaryKeys(serializeBinaryTuples(rows))
@@ -1086,6 +1093,7 @@ public class InternalTableImpl implements InternalTable {
tx,
(txo, groupId, enlistmentConsistencyToken) ->
TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
.groupId(serializeTablePartitionId(groupId))
+ .tableId(tableId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
.schemaVersion(row.schemaVersion())
.binaryTuple(row.tupleSlice())
@@ -1170,6 +1178,7 @@ public class InternalTableImpl implements InternalTable {
tx,
(txo, groupId, enlistmentConsistencyToken) ->
TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
.groupId(serializeTablePartitionId(groupId))
+ .tableId(tableId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
.schemaVersion(row.schemaVersion())
.binaryTuple(row.tupleSlice())
@@ -1192,6 +1201,7 @@ public class InternalTableImpl implements InternalTable {
tx,
(txo, groupId, enlistmentConsistencyToken) ->
TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
.groupId(serializeTablePartitionId(groupId))
+ .tableId(tableId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
.schemaVersion(row.schemaVersion())
.binaryTuple(row.tupleSlice())
@@ -1248,6 +1258,7 @@ public class InternalTableImpl implements InternalTable {
return TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
.groupId(serializeTablePartitionId(groupId))
+ .tableId(tableId)
.commitPartitionId(serializeTablePartitionId(tx.commitPartition()))
.schemaVersion(rows.iterator().next().schemaVersion())
.binaryTuples(serializeBinaryTuples(rows))
@@ -1269,6 +1280,7 @@ public class InternalTableImpl implements InternalTable {
tx,
(txo, groupId, enlistmentConsistencyToken) ->
TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
.groupId(serializeTablePartitionId(groupId))
+ .tableId(tableId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
.schemaVersion(row.schemaVersion())
.binaryTuple(row.tupleSlice())
@@ -1294,6 +1306,7 @@ public class InternalTableImpl implements InternalTable {
tx,
(txo, groupId, enlistmentConsistencyToken) ->
TABLE_MESSAGES_FACTORY.readWriteSwapRowReplicaRequest()
.groupId(serializeTablePartitionId(groupId))
+ .tableId(tableId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
.schemaVersion(oldRow.schemaVersion())
.oldBinaryTuple(oldRow.tupleSlice())
@@ -1317,6 +1330,7 @@ public class InternalTableImpl implements InternalTable {
tx,
(txo, groupId, enlistmentConsistencyToken) ->
TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
.groupId(serializeTablePartitionId(groupId))
+ .tableId(tableId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
.schemaVersion(row.schemaVersion())
.binaryTuple(row.tupleSlice())
@@ -1339,6 +1353,7 @@ public class InternalTableImpl implements InternalTable {
tx,
(txo, groupId, enlistmentConsistencyToken) ->
TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest()
.groupId(serializeTablePartitionId(groupId))
+ .tableId(tableId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
.schemaVersion(keyRow.schemaVersion())
.primaryKey(keyRow.tupleSlice())
@@ -1361,6 +1376,7 @@ public class InternalTableImpl implements InternalTable {
tx,
(txo, groupId, enlistmentConsistencyToken) ->
TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
.groupId(serializeTablePartitionId(groupId))
+ .tableId(tableId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
.schemaVersion(oldRow.schemaVersion())
.binaryTuple(oldRow.tupleSlice())
@@ -1383,6 +1399,7 @@ public class InternalTableImpl implements InternalTable {
tx,
(txo, groupId, enlistmentConsistencyToken) ->
TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest()
.groupId(serializeTablePartitionId(groupId))
+ .tableId(tableId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
.schemaVersion(row.schemaVersion())
.primaryKey(row.tupleSlice())
@@ -1582,6 +1599,7 @@ public class InternalTableImpl implements InternalTable {
(scanId, batchSize) -> {
ReadOnlyScanRetrieveBatchReplicaRequest request =
TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(serializeTablePartitionId(tablePartitionId))
+ .tableId(tableId)
.readTimestamp(readTimestamp)
.transactionId(txId)
.scanId(scanId)
@@ -1691,6 +1709,7 @@ public class InternalTableImpl implements InternalTable {
(scanId, batchSize) -> {
ReadWriteScanRetrieveBatchReplicaRequest request =
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(serializeTablePartitionId(tablePartitionId))
+ .tableId(tableId)
.timestamp(clock.now())
.transactionId(txId)
.scanId(scanId)
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index cb53d8ee1a..c401bf50d8 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -55,6 +55,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.LongFunction;
@@ -86,6 +87,7 @@ import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.TopologyService;
+import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.Loza;
@@ -351,7 +353,18 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
lowWatermark,
new TransactionInflights(placementDriver, clockService),
indexMetaStorage,
- logSyncer
+ logSyncer,
+ new PartitionReplicaLifecycleManager(
+ catalogManager,
+ replicaMgr,
+ distributionZoneManager,
+ metaStorageManager,
+ topologyService,
+ lowWatermark,
+ ForkJoinPool.commonPool(),
+ mock(ScheduledExecutorService.class),
+ partitionOperationsExecutor
+ )
) {
@Override
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 3d6492e005..7c291b4296 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -104,6 +104,7 @@ import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.TopologyService;
+import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
@@ -263,6 +264,10 @@ public class TableManagerTest extends IgniteAbstractTest {
private IndexMetaStorage indexMetaStorage;
+ /** Partition replica lifecycle manager. */
+ @Mock
+ private PartitionReplicaLifecycleManager partitionReplicaLifecycleManager;
+
@BeforeEach
void before() throws NodeStoppingException {
lowWatermark = new TestLowWatermark();
@@ -290,7 +295,7 @@ public class TableManagerTest extends IgniteAbstractTest {
when(distributionZoneManager.dataNodes(anyLong(), anyInt(),
anyInt())).thenReturn(emptySetCompletedFuture());
when(replicaMgr.startReplica(any(), any(), anyBoolean(), any(), any(),
any(), any(), any(), any()))
- .thenReturn(trueCompletedFuture());
+ .thenReturn(nullCompletedFuture());
when(replicaMgr.stopReplica(any())).thenReturn(trueCompletedFuture());
when(replicaMgr.weakStartReplica(any(), any(), any())).thenAnswer(inv
-> {
Supplier<CompletableFuture<Void>> startOperation =
inv.getArgument(1);
@@ -832,7 +837,8 @@ public class TableManagerTest extends IgniteAbstractTest {
lowWatermark,
mock(TransactionInflights.class),
indexMetaStorage,
- logSyncer
+ logSyncer,
+ partitionReplicaLifecycleManager
) {
@Override
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index d344ceac12..8855f160cb 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -337,6 +337,7 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
case RW_GET_AND_DELETE:
request =
TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest()
.groupId(tablePartitionId(PARTITION_ID))
+ .tableId(TABLE_ID)
.enlistmentConsistencyToken(1L)
.commitPartitionId(tablePartitionId(PARTITION_ID))
.transactionId(TRANSACTION_ID)
@@ -357,6 +358,7 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
case RW_GET_AND_UPSERT:
request =
TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
.groupId(tablePartitionId(PARTITION_ID))
+ .tableId(TABLE_ID)
.enlistmentConsistencyToken(1L)
.commitPartitionId(tablePartitionId(PARTITION_ID))
.transactionId(TRANSACTION_ID)
@@ -425,6 +427,7 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
case RW_DELETE_ALL:
request =
TABLE_MESSAGES_FACTORY.readWriteMultiRowPkReplicaRequest()
.groupId(tablePartitionId(PARTITION_ID))
+ .tableId(TABLE_ID)
.enlistmentConsistencyToken(1L)
.commitPartitionId(tablePartitionId(PARTITION_ID))
.transactionId(TRANSACTION_ID)
@@ -442,6 +445,7 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
case RW_UPSERT_ALL:
request =
TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
.groupId(tablePartitionId(PARTITION_ID))
+ .tableId(TABLE_ID)
.enlistmentConsistencyToken(1L)
.commitPartitionId(tablePartitionId(PARTITION_ID))
.transactionId(TRANSACTION_ID)
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 89ea604cd5..83738b6489 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -801,6 +801,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private CompletableFuture<ReplicaResult> doReadOnlySingleGet(BinaryRow pk,
HybridTimestamp readTimestamp) {
ReadOnlySingleRowPkReplicaRequest request =
TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.readTimestamp(readTimestamp)
.schemaVersion(pk.schemaVersion())
.primaryKey(pk.tupleSlice())
@@ -813,6 +814,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private CompletableFuture<ReplicaResult>
doReadOnlyDirectSingleGet(BinaryRow pk) {
ReadOnlyDirectSingleRowReplicaRequest request =
TABLE_MESSAGES_FACTORY.readOnlyDirectSingleRowReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.schemaVersion(pk.schemaVersion())
.primaryKey(pk.tupleSlice())
.requestTypeInt(RO_GET.ordinal())
@@ -921,6 +923,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(scanTxId)
.timestamp(clock.now())
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
@@ -940,6 +943,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
// Request second batch
fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(scanTxId)
.timestamp(clock.now())
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
@@ -959,6 +963,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
// Request bounded.
fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(newTxId())
.timestamp(clock.now())
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
@@ -981,6 +986,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
// Empty result.
fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(newTxId())
.timestamp(clock.now())
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
@@ -1001,6 +1007,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
// Lookup.
fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(newTxId())
.timestamp(clock.now())
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
@@ -1044,6 +1051,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(
TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(scanTxId)
.readTimestamp(clock.now())
.scanId(1L)
@@ -1060,6 +1068,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
// Request second batch
fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(scanTxId)
.readTimestamp(clock.now())
.scanId(1L)
@@ -1076,6 +1085,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
// Request bounded.
fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(newTxId())
.readTimestamp(clock.now())
.scanId(2L)
@@ -1095,6 +1105,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
// Empty result.
fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(newTxId())
.readTimestamp(clock.now())
.scanId(2L)
@@ -1112,6 +1123,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
// Lookup.
fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(newTxId())
.readTimestamp(clock.now())
.scanId(2L)
@@ -1152,6 +1164,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(
TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(scanTxId)
.readTimestamp(clock.now())
.scanId(1L)
@@ -1169,6 +1182,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
// Request second batch
fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(scanTxId)
.readTimestamp(clock.now())
.scanId(1L)
@@ -1186,6 +1200,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
// Empty result.
fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(newTxId())
.readTimestamp(clock.now())
.scanId(2L)
@@ -1203,6 +1218,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
// Lookup.
fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(newTxId())
.readTimestamp(clock.now())
.scanId(2L)
@@ -1329,6 +1345,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private CompletableFuture<?> doSingleRowRequest(UUID txId, BinaryRow
binaryRow, RequestType requestType, boolean full) {
return
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(txId)
.requestTypeInt(requestType.ordinal())
.schemaVersion(binaryRow.schemaVersion())
@@ -1350,6 +1367,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private CompletableFuture<?> doSingleRowPkRequest(UUID txId, BinaryRow
binaryRow, RequestType requestType, boolean full) {
return
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(txId)
.requestTypeInt(requestType.ordinal())
.schemaVersion(binaryRow.schemaVersion())
@@ -1382,6 +1400,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private CompletableFuture<?> doMultiRowRequest(UUID txId,
Collection<BinaryRow> binaryRows, RequestType requestType, boolean full) {
return
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(txId)
.requestTypeInt(requestType.ordinal())
.schemaVersion(binaryRows.iterator().next().schemaVersion())
@@ -1407,6 +1426,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private CompletableFuture<?> doMultiRowPkRequest(UUID txId,
Collection<BinaryRow> binaryRows, RequestType requestType, boolean full) {
return
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteMultiRowPkReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(txId)
.requestTypeInt(requestType.ordinal())
.schemaVersion(binaryRows.iterator().next().schemaVersion())
@@ -1433,6 +1453,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
return
TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(txId)
.requestTypeInt(RW_INSERT.ordinal())
.schemaVersion(binaryRow.schemaVersion())
@@ -1463,6 +1484,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
return
TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(txId)
.requestTypeInt(RW_UPSERT_ALL.ordinal())
.schemaVersion(binaryRow0.schemaVersion())
@@ -1709,6 +1731,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
TxFinishReplicaRequest commitRequest =
TX_MESSAGES_FACTORY.txFinishReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .commitPartitionId(tablePartitionIdMessage(grpId))
.txId(txId)
.groups(Map.of(tablePartitionIdMessage(grpId),
localNode.name()))
.commit(false)
@@ -1773,6 +1796,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
TxFinishReplicaRequest commitRequest =
TX_MESSAGES_FACTORY.txFinishReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .commitPartitionId(tablePartitionIdMessage(grpId))
.txId(txId)
.groups(Map.of(tablePartitionIdMessage(grpId),
localNode.name()))
.commit(true)
@@ -1951,6 +1975,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private CompletableFuture<?> doReplaceRequest(UUID targetTxId, BinaryRow
oldRow, BinaryRow newRow, boolean full) {
return
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSwapRowReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(targetTxId)
.requestTypeInt(RW_REPLACE.ordinal())
.schemaVersion(oldRow.schemaVersion())
@@ -1972,6 +1997,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
(targetTxId, key) -> partitionReplicaListener.invoke(
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(targetTxId)
.indexToUse(sortedIndexStorage.id())
.exactKey(toIndexKey(FUTURE_SCHEMA_ROW_INDEXED_VALUE))
@@ -1993,6 +2019,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
(targetTxId, key) -> partitionReplicaListener.invoke(
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(targetTxId)
.indexToUse(sortedIndexStorage.id())
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
@@ -2018,6 +2045,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
return partitionReplicaListener.invoke(
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(targetTxId)
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.scanId(1)
@@ -2046,6 +2074,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
return partitionReplicaListener.invoke(
TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.transactionId(targetTxId)
.scanId(1)
.batchSize(100)
@@ -2519,6 +2548,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
CompletableFuture<?> future = partitionReplicaListener.invoke(
TX_MESSAGES_FACTORY.txFinishReplicaRequest()
.groupId(tablePartitionIdMessage(commitPartitionId))
+
.commitPartitionId(tablePartitionIdMessage(commitPartitionId))
.groups(groups.entrySet().stream().collect(toMap(e ->
tablePartitionIdMessage(e.getKey()), Map.Entry::getValue)))
.txId(txId)
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
@@ -2668,6 +2698,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private CompletableFuture<ReplicaResult> upsertAsync(UUID txId, BinaryRow
row, boolean full) {
ReadWriteSingleRowReplicaRequest message =
TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.requestTypeInt(RW_UPSERT.ordinal())
.transactionId(txId)
.schemaVersion(row.schemaVersion())
@@ -2685,6 +2716,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private void delete(UUID txId, BinaryRow row) {
ReadWriteSingleRowPkReplicaRequest message =
TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.requestTypeInt(RW_DELETE.ordinal())
.transactionId(txId)
.schemaVersion(row.schemaVersion())
@@ -2709,6 +2741,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private CompletableFuture<BinaryRow> roGetAsync(BinaryRow row,
HybridTimestamp readTimestamp) {
ReadOnlySingleRowPkReplicaRequest message =
TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.requestTypeInt(RO_GET.ordinal())
.readTimestamp(readTimestamp)
.schemaVersion(row.schemaVersion())
@@ -2729,6 +2762,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private CompletableFuture<ReplicaResult>
doReadOnlyMultiGet(Collection<BinaryRow> rows, HybridTimestamp readTimestamp) {
ReadOnlyMultiRowPkReplicaRequest request =
TABLE_MESSAGES_FACTORY.readOnlyMultiRowPkReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.requestTypeInt(RO_GET_ALL.ordinal())
.readTimestamp(readTimestamp)
.schemaVersion(rows.iterator().next().schemaVersion())
@@ -2741,6 +2775,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private CompletableFuture<ReplicaResult>
doReadOnlyDirectMultiGet(Collection<BinaryRow> rows) {
ReadOnlyDirectMultiRowReplicaRequest request =
TABLE_MESSAGES_FACTORY.readOnlyDirectMultiRowReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
.requestTypeInt(RO_GET_ALL.ordinal())
.schemaVersion(rows.iterator().next().schemaVersion())
.primaryKeys(binaryRowsToBuffers(rows))
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
index edee7fbdf1..b1985a6a36 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
@@ -161,10 +161,16 @@ public class TxMessageSender {
boolean commit,
@Nullable HybridTimestamp commitTimestamp
) {
+ TablePartitionIdMessage commitPartitionIdMessage =
REPLICA_MESSAGES_FACTORY.tablePartitionIdMessage()
+ .partitionId(commitPartition.partitionId())
+ .tableId(commitPartition.tableId())
+ .build();
+
return replicaService.invoke(
primaryConsistentId,
TX_MESSAGES_FACTORY.txFinishReplicaRequest()
.txId(txId)
+ .commitPartitionId(commitPartitionIdMessage)
.timestamp(clockService.now())
.groupId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, commitPartition))
.groups(toTablePartitionIdMessages(replicationGroupIds))
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
index ae39416d2e..d73c81eb56 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
@@ -44,6 +44,13 @@ public interface TxFinishReplicaRequest extends
PrimaryReplicaRequest, Timestamp
*/
UUID txId();
+ /**
+ * Returns commit partition id.
+ *
+ * @return Commit partition id.
+ */
+ TablePartitionIdMessage commitPartitionId();
+
/**
* Returns {@code True} if a commit request.
*