This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 118a33b522 IGNITE-18527 Inline writeIntents into data storage on
primary replica side (#1551)
118a33b522 is described below
commit 118a33b522ab90ea5ebb4aebe1da32f7b056d5e1
Author: Denis Chudov <[email protected]>
AuthorDate: Thu Jan 26 14:56:34 2023 +0200
IGNITE-18527 Inline writeIntents into data storage on primary replica side
(#1551)
---
.../java/org/apache/ignite/lang/ErrorGroups.java | 2 +-
.../app/ItIgniteInMemoryNodeRestartTest.java | 6 +-
.../storage/AbstractMvPartitionStorageTest.java | 12 +
.../ignite/distributed/ItTablePersistenceTest.java | 10 +-
.../distributed/ItTxDistributedTestSingleNode.java | 48 ++--
...butedTestThreeNodesThreeReplicasCollocated.java | 10 +-
.../ignite/internal/table/ItColocationTest.java | 8 +-
.../table/distributed/StorageUpdateHandler.java | 144 ++++++++++
.../internal/table/distributed/TableManager.java | 42 ++-
.../table/distributed/raft/PartitionListener.java | 94 ++-----
.../replicator/PartitionReplicaListener.java | 153 +++++++++--
.../distributed/replicator/TablePartitionId.java | 8 +-
.../distributed/storage/InternalTableImpl.java | 8 +-
.../apache/ignite/internal/table/TxLocalTest.java | 5 +
.../raft/PartitionCommandListenerTest.java | 32 ++-
.../PartitionReplicaListenerIndexLockingTest.java | 7 +
.../replication/PartitionReplicaListenerTest.java | 291 +++++++++++++++++++--
.../ignite/internal/table/TxAbstractTest.java | 13 +-
.../table/impl/DummyInternalTableImpl.java | 25 +-
19 files changed, 728 insertions(+), 190 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index d455731179..dcdb73e759 100755
--- a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -281,7 +281,7 @@ public class ErrorGroups {
public static final int TX_ROLLBACK_ERR =
TX_ERR_GROUP.registerErrorCode(8);
/** Failed to enlist read-write operation into read-only transaction.
*/
- public static final int TX_INSUFFICIENT_READ_WRITE_OPERATION_ERR =
TX_ERR_GROUP.registerErrorCode(9);
+ public static final int TX_FAILED_READ_WRITE_OPERATION_ERR =
TX_ERR_GROUP.registerErrorCode(9);
/** The error happens when the replica is not ready to handle a
request. */
public static final int TX_REPLICA_UNAVAILABLE_ERR =
TX_ERR_GROUP.registerErrorCode(10);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
index fb32865c60..f690e07052 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
@@ -210,7 +210,7 @@ public class ItIgniteInMemoryNodeRestartTest extends
IgniteAbstractTest {
assertTrue(IgniteTestUtils.waitForCondition(
() -> loza.localNodes().stream().anyMatch(nodeId -> {
if (nodeId.groupId() instanceof TablePartitionId) {
- return ((TablePartitionId)
nodeId.groupId()).getTableId().equals(tableId);
+ return ((TablePartitionId)
nodeId.groupId()).tableId().equals(tableId);
}
return true;
@@ -251,7 +251,7 @@ public class ItIgniteInMemoryNodeRestartTest extends
IgniteAbstractTest {
assertTrue(IgniteTestUtils.waitForCondition(
() -> loza.localNodes().stream().anyMatch(nodeId -> {
if (nodeId.groupId() instanceof TablePartitionId) {
- return ((TablePartitionId)
nodeId.groupId()).getTableId().equals(tableId);
+ return ((TablePartitionId)
nodeId.groupId()).tableId().equals(tableId);
}
return true;
@@ -294,7 +294,7 @@ public class ItIgniteInMemoryNodeRestartTest extends
IgniteAbstractTest {
assertTrue(IgniteTestUtils.waitForCondition(
() -> loza.localNodes().stream().anyMatch(nodeId -> {
if (nodeId.groupId() instanceof TablePartitionId) {
- return ((TablePartitionId)
nodeId.groupId()).getTableId().equals(tableId);
+ return ((TablePartitionId)
nodeId.groupId()).tableId().equals(tableId);
}
return true;
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 35ff2865b7..8d2c9e7d65 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -97,6 +97,18 @@ public abstract class AbstractMvPartitionStorageTest extends
BaseMvPartitionStor
// Read with timestamp returns write-intent.
assertRowMatches(read(rowId, clock.now()), tableRow);
+
+ // Remove write.
+ addWrite(rowId, null, txId);
+
+ // Removed row can't be read.
+ assertNull(read(rowId, HybridTimestamp.MAX_VALUE));
+
+ // Remove write once again.
+ addWrite(rowId, null, txId);
+
+ // Still can't be read.
+ assertNull(read(rowId, HybridTimestamp.MAX_VALUE));
}
/**
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index d1f6067e07..ff896f324d 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -57,6 +57,8 @@ import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.tx.TxManager;
@@ -275,12 +277,14 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
var testMpPartStorage = new TestMvPartitionStorage(0);
+ PartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(testMpPartStorage);
+
+ StorageUpdateHandler storageUpdateHandler = new
StorageUpdateHandler(0, partitionDataStorage, Map::of);
+
PartitionListener listener = new PartitionListener(
new TestPartitionDataStorage(testMpPartStorage),
+ storageUpdateHandler,
new TestTxStateStorage(),
- txManager,
- Map::of,
- 0,
new PendingComparableValuesTracker<>(new
HybridTimestamp(1, 0))
);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 10d188c446..280c021d30 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -40,6 +40,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.apache.ignite.internal.affinity.AffinityUtils;
import org.apache.ignite.internal.affinity.Assignment;
@@ -76,8 +77,10 @@ import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.TxAbstractTest;
import org.apache.ignite.internal.table.distributed.HashIndexLocker;
import org.apache.ignite.internal.table.distributed.IndexLocker;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
@@ -90,6 +93,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -146,6 +150,8 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
protected Int2ObjectOpenHashMap<RaftGroupService> custRaftClients;
+ protected Map<String, TxStateStorage> txStateStorages;
+
protected final List<ClusterService> cluster = new
CopyOnWriteArrayList<>();
private ScheduledThreadPoolExecutor executor;
@@ -250,6 +256,7 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
replicaManagers = new HashMap<>(nodes);
replicaServices = new HashMap<>(nodes);
txManagers = new HashMap<>(nodes);
+ txStateStorages = new HashMap<>(nodes);
executor = new ScheduledThreadPoolExecutor(20,
new NamedThreadFactory(Loza.CLIENT_POOL_NAME, LOG));
@@ -296,6 +303,8 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
txMgr.start();
txManagers.put(node.name(), txMgr);
+
+ txStateStorages.put(node.name(), new TestTxStateStorage());
}
log.info("Raft servers have been started");
@@ -313,18 +322,16 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
String localNodeName =
accRaftClients.get(0).clusterService().topologyService().localMember().name();
- TxManager txMgr;
-
if (startClient()) {
- txMgr = new TxManagerImpl(clientReplicaSvc, new HeapLockManager(),
clientClock);
+ clientTxManager = new TxManagerImpl(clientReplicaSvc, new
HeapLockManager(), clientClock);
} else {
// Collocated mode.
- txMgr = txManagers.get(localNodeName);
+ clientTxManager = txManagers.get(localNodeName);
}
- assertNotNull(txMgr);
+ assertNotNull(clientTxManager);
- igniteTransactions = new IgniteTransactionsImpl(txMgr);
+ igniteTransactions = new IgniteTransactionsImpl(clientTxManager);
this.accounts = new TableImpl(new InternalTableImpl(
accountsName,
@@ -332,12 +339,12 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
accRaftClients,
1,
consistentIdToNode,
- txMgr,
+ clientTxManager,
Mockito.mock(MvTableStorage.class),
Mockito.mock(TxStateTableStorage.class),
startClient() ? clientReplicaSvc :
replicaServices.get(localNodeName),
startClient() ? clientClock : clocks.get(localNodeName)
- ), new DummySchemaManagerImpl(ACCOUNTS_SCHEMA), txMgr.lockManager());
+ ), new DummySchemaManagerImpl(ACCOUNTS_SCHEMA),
clientTxManager.lockManager());
this.customers = new TableImpl(new InternalTableImpl(
customersName,
@@ -345,12 +352,12 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
custRaftClients,
1,
consistentIdToNode,
- txMgr,
+ clientTxManager,
Mockito.mock(MvTableStorage.class),
Mockito.mock(TxStateTableStorage.class),
startClient() ? clientReplicaSvc :
replicaServices.get(localNodeName),
startClient() ? clientClock : clocks.get(localNodeName)
- ), new DummySchemaManagerImpl(CUSTOMERS_SCHEMA), txMgr.lockManager());
+ ), new DummySchemaManagerImpl(CUSTOMERS_SCHEMA),
clientTxManager.lockManager());
log.info("Tables have been started");
}
@@ -388,7 +395,7 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
for (String assignment : partAssignments) {
var testMpPartStorage = new TestMvPartitionStorage(0);
- var txStateStorage = new TestTxStateStorage();
+ var txStateStorage = txStateStorages.get(assignment);
var placementDriver = new
PlacementDriver(replicaServices.get(assignment), consistentIdToNode);
for (int part = 0; part < assignments.size(); part++) {
@@ -421,15 +428,17 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
PendingComparableValuesTracker<HybridTimestamp> safeTime =
new
PendingComparableValuesTracker<>(clocks.get(assignment).now());
+ PartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(testMpPartStorage);
+ Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = ()
-> Map.of(pkStorage.get().id(), pkStorage.get());
+ StorageUpdateHandler storageUpdateHandler = new
StorageUpdateHandler(partId, partitionDataStorage, indexes);
+
CompletableFuture<Void> partitionReadyFuture =
raftServers.get(assignment).startRaftGroupNode(
new RaftNodeId(grpId, configuration.peer(assignment)),
configuration,
new PartitionListener(
- new
TestPartitionDataStorage(testMpPartStorage),
- new TestTxStateStorage(),
- txManagers.get(assignment),
- () -> Map.of(pkStorage.get().id(),
pkStorage.get()),
- partId,
+ partitionDataStorage,
+ storageUpdateHandler,
+ txStateStorage,
safeTime
),
RaftGroupEventsListener.noopLsnr
@@ -454,6 +463,7 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
safeTime,
txStateStorage,
placementDriver,
+ storageUpdateHandler,
peer ->
assignment.equals(peer.consistentId()),
CompletableFuture.completedFuture(schemaManager)
)
@@ -585,6 +595,12 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
return network;
}
+ /** {@inheritDoc} */
+ @Override
+ protected TxManager clientTxManager() {
+ return clientTxManager;
+ }
+
/** {@inheritDoc} */
@Override
protected TxManager txManager(Table t) {
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicasCollocated.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicasCollocated.java
index db77fd739d..1eced21290 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicasCollocated.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicasCollocated.java
@@ -22,7 +22,6 @@ import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.UUID;
-import java.util.stream.Collectors;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
import org.junit.jupiter.api.BeforeEach;
@@ -66,11 +65,10 @@ public class
ItTxDistributedTestThreeNodesThreeReplicasCollocated extends ItTxDi
tx.commit();
assertTrue(waitForCondition(
- () -> txManagers.values().stream()
- .filter(txManager -> txManager.state(txId) != null &&
txManager.state(txId)
- .equals(TxState.COMMITED))
- .collect(Collectors.toList())
- .size() >= 2,
+ () -> txStateStorages.values().stream()
+ .map(txStateStorage -> txStateStorage.get(txId))
+ .filter(txMeta -> txMeta != null && txMeta.txState()
== TxState.COMMITED)
+ .count() >= 2,
5_000));
}
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index ec3ac28631..3b54c3bc77 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -196,8 +196,8 @@ public class ItColocationTest {
return r.run(MSG_FACTORY.updateAllCommand()
.tablePartitionId(MSG_FACTORY.tablePartitionIdMessage()
- .tableId(commitPartId.getTableId())
- .partitionId(commitPartId.getPartId())
+ .tableId(commitPartId.tableId())
+
.partitionId(commitPartId.partitionId())
.build()
)
.rowsToUpdate(rows)
@@ -209,8 +209,8 @@ public class ItColocationTest {
return r.run(MSG_FACTORY.updateCommand()
.tablePartitionId(
MSG_FACTORY.tablePartitionIdMessage()
- .tableId(commitPartId.getTableId())
- .partitionId(commitPartId.getPartId())
+ .tableId(commitPartId.tableId())
+
.partitionId(commitPartId.partitionId())
.build()
)
.rowUuid(Timestamp.nextVersion().toUuid())
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
new file mode 100644
index 0000000000..dae55ead81
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Handler for storage updates that can be performed on processing of primary
replica requests and partition replication requests.
+ */
+public class StorageUpdateHandler {
+ /** Partition id. */
+ private final int partitionId;
+
+ /** Partition storage with access to MV data of a partition. */
+ private final PartitionDataStorage storage;
+
+ private final Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes;
+
+ /**
+ * The constructor.
+ *
+ * @param partitionId Partition id.
+ * @param storage Partition data storage.
+ * @param indexes Indexes supplier.
+ */
+ public StorageUpdateHandler(int partitionId, PartitionDataStorage storage,
Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes) {
+ this.partitionId = partitionId;
+ this.storage = storage;
+ this.indexes = indexes;
+ }
+
+ /**
+ * Handle single update.
+ *
+ * @param txId Transaction id.
+ * @param rowUuid Row UUID.
+ * @param commitPartitionId Commit partition id.
+ * @param rowBuffer Row buffer.
+ * @param onReplication Callback on replication.
+ */
+ public void handleUpdate(
+ UUID txId,
+ UUID rowUuid,
+ TablePartitionId commitPartitionId,
+ ByteBuffer rowBuffer,
+ @Nullable Consumer<RowId> onReplication
+ ) {
+ storage.runConsistently(() -> {
+ TableRow row = rowBuffer != null ? new TableRow(rowBuffer) : null;
+ RowId rowId = new RowId(partitionId, rowUuid);
+ UUID commitTblId = commitPartitionId.tableId();
+ int commitPartId = commitPartitionId.partitionId();
+
+ storage.addWrite(rowId, row, txId, commitTblId, commitPartId);
+
+ if (onReplication != null) {
+ onReplication.accept(rowId);
+ }
+
+ addToIndexes(row, rowId);
+
+ return null;
+ });
+ }
+
+ /**
+ * Handle multiple updates.
+ *
+ * @param txId Transaction id.
+ * @param rowsToUpdate Collection of rows to update.
+ * @param commitPartitionId Commit partition id.
+ * @param onReplication On replication callback.
+ */
+ public void handleUpdateAll(
+ UUID txId,
+ Map<UUID, ByteBuffer> rowsToUpdate,
+ TablePartitionId commitPartitionId,
+ @Nullable Consumer<Collection<RowId>> onReplication
+ ) {
+ storage.runConsistently(() -> {
+ UUID commitTblId = commitPartitionId.tableId();
+ int commitPartId = commitPartitionId.partitionId();
+
+ if (!nullOrEmpty(rowsToUpdate)) {
+ List<RowId> rowIds = new ArrayList<>();
+
+ for (Map.Entry<UUID, ByteBuffer> entry :
rowsToUpdate.entrySet()) {
+ RowId rowId = new RowId(partitionId, entry.getKey());
+ TableRow row = entry.getValue() != null ? new
TableRow(entry.getValue()) : null;
+
+ storage.addWrite(rowId, row, txId, commitTblId,
commitPartId);
+
+ rowIds.add(rowId);
+ addToIndexes(row, rowId);
+ }
+
+ if (onReplication != null) {
+ onReplication.accept(rowIds);
+ }
+ }
+
+ return null;
+ });
+ }
+
+ private void addToIndexes(@Nullable TableRow tableRow, RowId rowId) {
+ if (tableRow == null) { // skip removes
+ return;
+ }
+
+ for (TableSchemaAwareIndexStorage index : indexes.get().values()) {
+ index.put(tableRow, rowId);
+ }
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index bb02f0016a..7a379d8b3c 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -699,11 +699,19 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
PendingComparableValuesTracker<HybridTimestamp> safeTime = new
PendingComparableValuesTracker<>(clock.now());
+ CompletableFuture<MvPartitionStorage> mvPartitionStorageFut =
getOrCreateMvPartition(internalTbl.storage(), partId);
+
+ CompletableFuture<PartitionDataStorage>
partitionDataStorageFut = mvPartitionStorageFut
+ .thenApply(mvPartitionStorage ->
partitionDataStorage(mvPartitionStorage, internalTbl, partId));
+
+ CompletableFuture<StorageUpdateHandler>
storageUpdateHandlerFut = partitionDataStorageFut
+ .thenApply(storage -> new StorageUpdateHandler(partId,
storage, table.indexStorageAdapters(partId)));
+
CompletableFuture<Void> startGroupFut;
// start new nodes, only if it is table creation, other cases
will be covered by rebalance logic
if (oldPartAssignment.isEmpty() && localMemberAssignment !=
null) {
- startGroupFut =
getOrCreateMvPartition(internalTbl.storage(),
partId).thenComposeAsync(mvPartitionStorage -> {
+ startGroupFut =
mvPartitionStorageFut.thenComposeAsync(mvPartitionStorage -> {
boolean hasData =
mvPartitionStorage.lastAppliedIndex() > 0;
CompletableFuture<Boolean> fut;
@@ -742,7 +750,9 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
return completedFuture(null);
}
- return
getOrCreateTxStateStorageAsync(internalTbl.txStateStorage(), partId)
+ return partitionDataStorageFut
+ .thenCompose(s -> storageUpdateHandlerFut)
+ .thenCompose(s ->
getOrCreateTxStateStorageAsync(internalTbl.txStateStorage(), partId))
.thenAcceptAsync(txStatePartitionStorage
-> {
RaftGroupOptions groupOptions =
groupOptionsForPartition(
internalTbl.storage(),
@@ -755,17 +765,18 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
var raftNodeId = new
RaftNodeId(replicaGrpId, serverPeer);
+ PartitionDataStorage
partitionDataStorage = partitionDataStorageFut.join();
+ StorageUpdateHandler
storageUpdateHandler = storageUpdateHandlerFut.join();
+
try {
// TODO: use RaftManager
interface, see https://issues.apache.org/jira/browse/IGNITE-18273
((Loza)
raftMgr).startRaftGroupNode(
raftNodeId,
newConfiguration,
new PartitionListener(
-
partitionDataStorage(mvPartitionStorage, internalTbl, partId),
+
partitionDataStorage,
+
storageUpdateHandler,
txStatePartitionStorage,
- txManager,
-
table.indexStorageAdapters(partId),
- partId,
safeTime
),
new
RebalanceRaftGroupEventsListener(
@@ -791,6 +802,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
startGroupFut
+ .thenCompose(v -> storageUpdateHandlerFut)
.thenComposeAsync(v -> {
try {
return
raftMgr.startRaftGroupService(replicaGrpId, newConfiguration);
@@ -805,13 +817,12 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
return completedFuture(null);
}
- CompletableFuture<MvPartitionStorage>
partitionStorageFuture =
-
getOrCreateMvPartition(internalTbl.storage(), partId);
-
CompletableFuture<TxStateStorage>
txStateStorageFuture =
getOrCreateTxStateStorageAsync(internalTbl.txStateStorage(), partId);
- return
partitionStorageFuture.thenAcceptBoth(txStateStorageFuture, (partitionStorage,
txStateStorage) -> {
+ StorageUpdateHandler storageUpdateHandler =
storageUpdateHandlerFut.join();
+
+ return
mvPartitionStorageFut.thenAcceptBoth(txStateStorageFuture, (partitionStorage,
txStateStorage) -> {
try {
replicaMgr.startReplica(replicaGrpId,
new PartitionReplicaListener(
@@ -829,6 +840,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
safeTime,
txStateStorage,
placementDriver,
+ storageUpdateHandler,
this::isLocalPeer,
schemaManager.schemaRegistry(causalityToken, tblId)
)
@@ -1831,6 +1843,9 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
if (shouldStartLocalServices) {
MvPartitionStorage mvPartitionStorage =
getOrCreateMvPartition(internalTable.storage(), partId).join();
+ PartitionDataStorage partitionDataStorage =
partitionDataStorage(mvPartitionStorage, internalTable, partId);
+ StorageUpdateHandler storageUpdateHandler =
+ new StorageUpdateHandler(partId,
partitionDataStorage, tbl.indexStorageAdapters(partId));
TxStateStorage txStatePartitionStorage =
getOrCreateTxStateStorage(internalTable.txStateStorage(), partId);
@@ -1842,11 +1857,9 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
);
RaftGroupListener raftGrpLsnr = new PartitionListener(
- partitionDataStorage(mvPartitionStorage,
internalTable, partId),
+ partitionDataStorage,
+ storageUpdateHandler,
txStatePartitionStorage,
- txManager,
- tbl.indexStorageAdapters(partId),
- partId,
safeTime
);
@@ -1891,6 +1904,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
safeTime,
txStatePartitionStorage,
placementDriver,
+ storageUpdateHandler,
TableManager.this::isLocalPeer,
completedFuture(schemaManager.schemaRegistry(tblId))
)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index dc39737492..37fe4cc51b 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -19,22 +19,18 @@ package org.apache.ignite.internal.table.distributed.raft;
import static org.apache.ignite.internal.tx.TxState.ABORTED;
import static org.apache.ignite.internal.tx.TxState.COMMITED;
-import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_UNEXPECTED_STATE_ERR;
import static org.apache.ignite.lang.IgniteStringFormatter.format;
-import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -47,25 +43,22 @@ import
org.apache.ignite.internal.raft.service.CommittedConfiguration;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
-import org.apache.ignite.internal.schema.TableRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.RaftGroupConfiguration;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
-import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
import
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
-import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.IgniteInternalException;
-import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
@@ -78,17 +71,12 @@ public class PartitionListener implements RaftGroupListener
{
/** Partition storage with access to MV data of a partition. */
private final PartitionDataStorage storage;
+ /** Handler that processes storage updates. */
+ private final StorageUpdateHandler storageUpdateHandler;
+
/** Storage of transaction metadata. */
private final TxStateStorage txStateStorage;
- /** Transaction manager. */
- private final TxManager txManager;
-
- private final Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes;
-
- /** Partition ID. */
- private final int partitionId;
-
/** Rows that were inserted, updated or removed. */
private final HashMap<UUID, Set<RowId>> txsPendingRowIds = new HashMap<>();
@@ -99,23 +87,17 @@ public class PartitionListener implements RaftGroupListener
{
* The constructor.
*
* @param partitionDataStorage The storage.
- * @param txManager Transaction manager.
- * @param partitionId Partition ID this listener serves.
* @param safeTime Safe time tracker.
*/
public PartitionListener(
PartitionDataStorage partitionDataStorage,
+ StorageUpdateHandler storageUpdateHandler,
TxStateStorage txStateStorage,
- TxManager txManager,
- Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes,
- int partitionId,
PendingComparableValuesTracker<HybridTimestamp> safeTime
) {
this.storage = partitionDataStorage;
+ this.storageUpdateHandler = storageUpdateHandler;
this.txStateStorage = txStateStorage;
- this.txManager = txManager;
- this.indexes = indexes;
- this.partitionId = partitionId;
this.safeTime = safeTime;
// TODO: IGNITE-18502 Implement a pending update storage
@@ -214,24 +196,20 @@ public class PartitionListener implements
RaftGroupListener {
return;
}
- storage.runConsistently(() -> {
- TableRow row = cmd.rowBuffer() != null ? new
TableRow(cmd.rowBuffer()) : null;
- UUID rowUuid = cmd.rowUuid();
- RowId rowId = new RowId(partitionId, rowUuid);
- UUID txId = cmd.txId();
- UUID commitTblId = cmd.tablePartitionId().tableId();
- int commitPartId = cmd.tablePartitionId().partitionId();
-
- storage.addWrite(rowId, row, txId, commitTblId, commitPartId);
-
- txsPendingRowIds.computeIfAbsent(txId, entry -> new
HashSet<>()).add(rowId);
+ TxMeta txMeta = txStateStorage.get(cmd.txId());
+ if (txMeta != null && (txMeta.txState() == COMMITED ||
txMeta.txState() == ABORTED)) {
+ storage.lastApplied(commandIndex, commandTerm);
- addToIndexes(row, rowId);
+ return;
+ }
- storage.lastApplied(commandIndex, commandTerm);
+ storageUpdateHandler.handleUpdate(cmd.txId(), cmd.rowUuid(),
cmd.tablePartitionId().asTablePartitionId(), cmd.rowBuffer(),
+ rowId -> {
+ txsPendingRowIds.computeIfAbsent(cmd.txId(), entry -> new
HashSet<>()).add(rowId);
- return null;
- });
+ storage.lastApplied(commandIndex, commandTerm);
+ }
+ );
}
/**
@@ -247,28 +225,19 @@ public class PartitionListener implements
RaftGroupListener {
return;
}
- storage.runConsistently(() -> {
- UUID txId = cmd.txId();
- Map<UUID, ByteBuffer> rowsToUpdate = cmd.rowsToUpdate();
- UUID commitTblId = cmd.tablePartitionId().tableId();
- int commitPartId = cmd.tablePartitionId().partitionId();
-
- if (!nullOrEmpty(rowsToUpdate)) {
- for (Map.Entry<UUID, ByteBuffer> entry :
rowsToUpdate.entrySet()) {
- RowId rowId = new RowId(partitionId, entry.getKey());
- TableRow row = entry.getValue() != null ? new
TableRow(entry.getValue()) : null;
-
- storage.addWrite(rowId, row, txId, commitTblId,
commitPartId);
+ TxMeta txMeta = txStateStorage.get(cmd.txId());
+ if (txMeta != null && (txMeta.txState() == COMMITED ||
txMeta.txState() == ABORTED)) {
+ storage.lastApplied(commandIndex, commandTerm);
- txsPendingRowIds.computeIfAbsent(txId, entry0 -> new
HashSet<>()).add(rowId);
+ return;
+ }
- addToIndexes(row, rowId);
- }
+ storageUpdateHandler.handleUpdateAll(cmd.txId(), cmd.rowsToUpdate(),
cmd.tablePartitionId().asTablePartitionId(), rowIds -> {
+ for (RowId rowId : rowIds) {
+ txsPendingRowIds.computeIfAbsent(cmd.txId(), entry0 -> new
HashSet<>()).add(rowId);
}
storage.lastApplied(commandIndex, commandTerm);
-
- return null;
});
}
@@ -357,9 +326,6 @@ public class PartitionListener implements RaftGroupListener
{
txsPendingRowIds.remove(txId);
- // TODO: IGNITE-17638 TestOnly code, let's consider using Txn
state map instead of states.
- txManager.changeState(txId, null, cmd.commit() ? COMMITED :
ABORTED);
-
storage.lastApplied(commandIndex, commandTerm);
return null;
@@ -458,16 +424,6 @@ public class PartitionListener implements
RaftGroupListener {
}
}
- private void addToIndexes(@Nullable TableRow tableRow, RowId rowId) {
- if (tableRow == null) { // skip removes
- return;
- }
-
- for (TableSchemaAwareIndexStorage index : indexes.get().values()) {
- index.put(tableRow, rowId);
- }
- }
-
/**
* Returns underlying storage.
*/
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index deb7211be4..9db426fca5 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -22,6 +22,7 @@ import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR;
import static org.apache.ignite.lang.IgniteStringFormatter.format;
import java.nio.ByteBuffer;
@@ -37,6 +38,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executor;
@@ -79,6 +81,7 @@ import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.table.distributed.IndexLocker;
import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import
org.apache.ignite.internal.table.distributed.command.FinishTxCommandBuilder;
@@ -117,6 +120,7 @@ import org.apache.ignite.lang.ErrorGroups.Replicator;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -158,6 +162,9 @@ public class PartitionReplicaListener implements
ReplicaListener {
/** Lock manager. */
private final LockManager lockManager;
+ /** Handler that processes updates writing them to storage. */
+ private final StorageUpdateHandler storageUpdateHandler;
+
/**
* Cursors map. The key of the map is internal Ignite uuid which consists
of a transaction id ({@link UUID}) and a cursor id
* ({@link Long}).
@@ -192,6 +199,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
*/
private final Function<Peer, Boolean> isLocalPeerChecker;
+ private final ConcurrentMap<UUID, TxCleanupReadyFutureList>
txCleanupReadyFutures = new ConcurrentHashMap<>();
+
private final CompletableFuture<SchemaRegistry> schemaFut;
/**
@@ -210,6 +219,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @param safeTime Safe time clock.
* @param txStateStorage Transaction state storage.
* @param placementDriver Placement driver.
+ * @param storageUpdateHandler Handler that processes updates writing them
to storage.
* @param isLocalPeerChecker Function for checking that the given peer is
local.
* @param schemaFut Table schema.
*/
@@ -228,6 +238,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
PendingComparableValuesTracker<HybridTimestamp> safeTime,
TxStateStorage txStateStorage,
PlacementDriver placementDriver,
+ StorageUpdateHandler storageUpdateHandler,
Function<Peer, Boolean> isLocalPeerChecker,
CompletableFuture<SchemaRegistry> schemaFut
) {
@@ -246,6 +257,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
this.txStateStorage = txStateStorage;
this.placementDriver = placementDriver;
this.isLocalPeerChecker = isLocalPeerChecker;
+ this.storageUpdateHandler = storageUpdateHandler;
this.schemaFut = schemaFut;
this.replicationGroupId = new TablePartitionId(tableId, partId);
@@ -1040,18 +1052,44 @@ public class PartitionReplicaListener implements
ReplicaListener {
return failedFuture(e);
}
- HybridTimestampMessage timestampMsg =
hybridTimestamp(request.commitTimestamp());
+ List<CompletableFuture<?>> txUpdateFutures = new ArrayList<>();
- TxCleanupCommand txCleanupCmd = msgFactory.txCleanupCommand()
- .txId(request.txId())
- .commit(request.commit())
- .commitTimestamp(timestampMsg)
- .safeTime(hybridTimestamp(hybridClock.now()))
- .build();
+ // TODO https://issues.apache.org/jira/browse/IGNITE-18617
+ // TODO https://issues.apache.org/jira/browse/IGNITE-18632
+ TxCleanupReadyFutureList futs =
txCleanupReadyFutures.computeIfAbsent(request.txId(), k -> new
TxCleanupReadyFutureList());
+
+ synchronized (futs) {
+ txUpdateFutures.addAll(futs.futures);
+
+ futs.futures.clear();
+
+ futs.finished = true;
+ }
+
+ if (txUpdateFutures.isEmpty()) {
+ releaseTxLocks(request.txId());
+
+ return completedFuture(null);
+ }
+
+ return allOf(txUpdateFutures.toArray(new
CompletableFuture<?>[txUpdateFutures.size()])).thenCompose(v -> {
+ HybridTimestampMessage timestampMsg =
hybridTimestamp(request.commitTimestamp());
+
+ TxCleanupCommand txCleanupCmd = msgFactory.txCleanupCommand()
+ .txId(request.txId())
+ .commit(request.commit())
+ .commitTimestamp(timestampMsg)
+ .safeTime(hybridTimestamp(hybridClock.now()))
+ .build();
+
+ return raftClient
+ .run(txCleanupCmd)
+ .thenRun(() -> releaseTxLocks(request.txId()));
+ });
+ }
- return raftClient
- .run(txCleanupCmd)
- .thenRun(() ->
lockManager.locks(request.txId()).forEachRemaining(lockManager::release));
+ private void releaseTxLocks(UUID txId) {
+ lockManager.locks(txId).forEachRemaining(lockManager::release);
}
/**
@@ -1218,7 +1256,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return completedFuture(result);
}
- return
applyCmdWithExceptionHandling(updateAllCommand(committedPartitionId,
rowIdsToDelete, txId))
+ return
applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowIdsToDelete,
txId))
.thenApply(ignored -> result);
});
}
@@ -1254,7 +1292,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
CompletableFuture<Object> raftFut =
rowIdsToDelete.isEmpty() ? completedFuture(null)
- :
applyCmdWithExceptionHandling(updateAllCommand(committedPartitionId,
rowIdsToDelete, txId));
+ :
applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowIdsToDelete,
txId));
return raftFut.thenApply(ignored -> result);
});
@@ -1315,7 +1353,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
return convertedMap;
})
- .thenCompose(convertedMap ->
applyCmdWithExceptionHandling(
+ .thenCompose(convertedMap -> applyUpdateAllCommand(
updateAllCommand(committedPartitionId,
convertedMap, txId)))
.thenApply(ignored -> {
// Release short term locks.
@@ -1361,7 +1399,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return completedFuture(null);
}
- return
applyCmdWithExceptionHandling(updateAllCommand(committedPartitionId,
rowsToUpdate, txId))
+ return
applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowsToUpdate,
txId))
.thenApply(ignored -> {
// Release short term locks.
for (CompletableFuture<IgniteBiTuple<RowId,
Collection<Lock>>> rowIdFut : rowIdFuts) {
@@ -1404,6 +1442,59 @@ public class PartitionReplicaListener implements
ReplicaListener {
});
}
+ /**
+ * Executes an Update command.
+ *
+ * @param cmd Update command.
+ * @return Raft future, see {@link
#applyCmdWithExceptionHandling(Command)}.
+ */
+ private CompletableFuture<Object> applyUpdateCommand(UpdateCommand cmd) {
+ return applyUpdatingCommand(cmd.txId(), () -> {
+ storageUpdateHandler.handleUpdate(
+ cmd.txId(),
+ cmd.rowUuid(),
+ cmd.tablePartitionId().asTablePartitionId(),
+ cmd.rowBuffer(),
+ null
+ );
+
+ return applyCmdWithExceptionHandling(cmd);
+ });
+ }
+
+ /**
+ * Executes an UpdateAll command.
+ *
+ * @param cmd UpdateAll command.
+ * @return Raft future, see {@link
#applyCmdWithExceptionHandling(Command)}.
+ */
+ private CompletableFuture<Object> applyUpdateAllCommand(UpdateAllCommand
cmd) {
+ return applyUpdatingCommand(cmd.txId(), () -> {
+ storageUpdateHandler.handleUpdateAll(cmd.txId(),
cmd.rowsToUpdate(), cmd.tablePartitionId().asTablePartitionId(), null);
+
+ return applyCmdWithExceptionHandling(cmd);
+ });
+ }
+
+ private CompletableFuture<Object> applyUpdatingCommand(UUID txId,
Supplier<CompletableFuture<Object>> closure) {
+ CompletableFuture<Object> applyCmdFuture;
+
+ TxCleanupReadyFutureList futs =
txCleanupReadyFutures.computeIfAbsent(txId, k -> new
TxCleanupReadyFutureList());
+
+ // TODO https://issues.apache.org/jira/browse/IGNITE-18632
+ synchronized (futs) {
+ if (futs.finished) {
+ throw new
TransactionException(TX_FAILED_READ_WRITE_OPERATION_ERR, "Transaction is
already finished.");
+ }
+
+ applyCmdFuture = closure.get();
+
+ futs.futures.add(applyCmdFuture);
+ }
+
+ return applyCmdFuture;
+ }
+
/**
* Precesses single request.
*
@@ -1436,7 +1527,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
return takeLocksForDelete(row, rowId, txId)
- .thenCompose(ignored ->
applyCmdWithExceptionHandling(
+ .thenCompose(ignored -> applyUpdateCommand(
updateCommand(commitPartitionId,
rowId.uuid(), null, txId)))
.thenApply(ignored -> true);
});
@@ -1448,7 +1539,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
return takeLocksForDelete(row, rowId, txId)
- .thenCompose(ignored ->
applyCmdWithExceptionHandling(
+ .thenCompose(ignored -> applyUpdateCommand(
updateCommand(commitPartitionId,
rowId.uuid(), null, txId)))
.thenApply(ignored -> row);
});
@@ -1465,7 +1556,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return completedFuture(false);
}
- return applyCmdWithExceptionHandling(
+ return applyUpdateCommand(
updateCommand(commitPartitionId,
validatedRowId.uuid(), null, txId))
.thenApply(ignored -> true);
});
@@ -1480,7 +1571,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
RowId rowId0 = new RowId(partId);
return toTableRow(searchRow).thenCompose(tableRow ->
takeLocksForInsert(searchRow, rowId0, txId)
- .thenCompose(rowIdLock ->
applyCmdWithExceptionHandling(
+ .thenCompose(rowIdLock -> applyUpdateCommand(
updateCommand(commitPartitionId,
rowId0.uuid(), tableRow.byteBuffer(), txId))
.thenApply(ignored -> rowIdLock))
.thenApply(rowIdLock -> {
@@ -1502,7 +1593,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
: takeLocksForUpdate(searchRow, rowId0, txId);
return toTableRow(searchRow).thenCompose(tableRow ->
lockFut
- .thenCompose(rowIdLock ->
applyCmdWithExceptionHandling(
+ .thenCompose(rowIdLock -> applyUpdateCommand(
updateCommand(commitPartitionId,
rowId0.uuid(), tableRow.byteBuffer(), txId))
.thenApply(ignored -> rowIdLock))
.thenApply(rowIdLock -> {
@@ -1524,7 +1615,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
: takeLocksForUpdate(searchRow, rowId0, txId);
return toTableRow(searchRow).thenCompose(tableRow ->
lockFut
- .thenCompose(rowIdLock ->
applyCmdWithExceptionHandling(
+ .thenCompose(rowIdLock -> applyUpdateCommand(
updateCommand(commitPartitionId,
rowId0.uuid(), tableRow.byteBuffer(), txId))
.thenApply(ignored -> rowIdLock))
.thenApply(rowIdLock -> {
@@ -1542,7 +1633,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
return toTableRow(searchRow).thenCompose(tableRow ->
takeLocksForUpdate(searchRow, rowId, txId)
- .thenCompose(rowIdLock ->
applyCmdWithExceptionHandling(
+ .thenCompose(rowIdLock -> applyUpdateCommand(
updateCommand(commitPartitionId,
rowId.uuid(), tableRow.byteBuffer(), txId))
.thenApply(ignored -> rowIdLock))
.thenApply(rowIdLock -> {
@@ -1560,7 +1651,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
return toTableRow(searchRow).thenCompose(tableRow ->
takeLocksForUpdate(searchRow, rowId, txId)
- .thenCompose(rowLock ->
applyCmdWithExceptionHandling(
+ .thenCompose(rowLock -> applyUpdateCommand(
updateCommand(commitPartitionId,
rowId.uuid(), tableRow.byteBuffer(), txId))
.thenApply(ignored -> rowLock))
.thenApply(rowIdLock -> {
@@ -1722,7 +1813,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return completedFuture(false);
}
- return applyCmdWithExceptionHandling(
+ return applyUpdateCommand(
updateCommand(commitPartitionId,
validatedRowId.get1().uuid(), tableRow.byteBuffer(), txId))
.thenApply(ignored -> validatedRowId)
.thenApply(rowIdLock -> {
@@ -1987,8 +2078,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
*/
private TablePartitionIdMessage tablePartitionId(TablePartitionId
tablePartId) {
return msgFactory.tablePartitionIdMessage()
- .tableId(tablePartId.getTableId())
- .partitionId(tablePartId.getPartId())
+ .tableId(tablePartId.tableId())
+ .partitionId(tablePartId.partitionId())
.build();
}
@@ -2009,4 +2100,16 @@ public class PartitionReplicaListener implements
ReplicaListener {
private CompletableFuture<BinaryConverter> getConverterFuture(int
schemaVersion) {
return schemaFut.thenApply(schemaRegistry ->
BinaryConverter.forRow(schemaRegistry.schema(schemaVersion)));
}
+
+ /**
+ * Class that stores list of futures for updates that can block tx
cleanup, and finished flag.
+ */
+ private static class TxCleanupReadyFutureList {
+ final List<CompletableFuture<?>> futures = new ArrayList<>();
+
+ /**
+ * Whether the transaction is finished and therefore locked for
further updates by started cleanup process.
+ */
+ boolean finished;
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TablePartitionId.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TablePartitionId.java
index 01aeaf3e56..cac005f024 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TablePartitionId.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TablePartitionId.java
@@ -44,20 +44,20 @@ public class TablePartitionId implements ReplicationGroupId
{
}
/**
- * Gets a pration id.
+ * Get the partition id.
*
* @return Partition id.
*/
- public int getPartId() {
+ public int partitionId() {
return partId;
}
/**
- * Gets a table id.
+ * Get the table id.
*
* @return Table id.
*/
- public UUID getTableId() {
+ public UUID tableId() {
return tableId;
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index b0d1108257..44b4a300ee 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -21,7 +21,7 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
import static
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
-import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_INSUFFICIENT_READ_WRITE_OPERATION_ERR;
+import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_REPLICA_UNAVAILABLE_ERR;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
@@ -208,7 +208,7 @@ public class InternalTableImpl implements InternalTable {
if (tx != null && tx.isReadOnly()) {
return failedFuture(
new TransactionException(
- TX_INSUFFICIENT_READ_WRITE_OPERATION_ERR,
+ TX_FAILED_READ_WRITE_OPERATION_ERR,
"Failed to enlist read-write operation into
read-only transaction txId={" + tx.id() + '}'
)
);
@@ -271,7 +271,7 @@ public class InternalTableImpl implements InternalTable {
if (tx != null && tx.isReadOnly()) {
return failedFuture(
new TransactionException(
- TX_INSUFFICIENT_READ_WRITE_OPERATION_ERR,
+ TX_FAILED_READ_WRITE_OPERATION_ERR,
"Failed to enlist read-write operation into
read-only transaction txId={" + tx.id() + '}'
)
);
@@ -933,7 +933,7 @@ public class InternalTableImpl implements InternalTable {
if (tx != null && tx.isReadOnly()) {
throw new TransactionException(
new TransactionException(
- TX_INSUFFICIENT_READ_WRITE_OPERATION_ERR,
+ TX_FAILED_READ_WRITE_OPERATION_ERR,
"Failed to enlist read-write operation into
read-only transaction txId={" + tx.id() + '}'
)
);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
index cefdb1bc27..dc57824eb4 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
@@ -112,6 +112,11 @@ public class TxLocalTest extends TxAbstractTest {
// TODO asch IGNITE-15928 implement local scan
}
+ @Override
+ protected TxManager clientTxManager() {
+ return txManager;
+ }
+
@Override
protected TxManager txManager(Table t) {
return txManager;
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index ac35be91fb..f148e6a4d9 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.function.Supplier;
import org.apache.ignite.distributed.TestPartitionDataStorage;
import org.apache.ignite.internal.TestHybridClock;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -83,6 +84,7 @@ import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
@@ -94,8 +96,6 @@ import
org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
-import org.apache.ignite.internal.tx.impl.HeapLockManager;
-import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
import org.apache.ignite.internal.util.Cursor;
@@ -200,12 +200,14 @@ public class PartitionCommandListenerTest {
safeTimeTracker = new PendingComparableValuesTracker<>(new
HybridTimestamp(1, 0));
+ Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () ->
Map.of(pkStorage.id(), pkStorage);
+
+ StorageUpdateHandler storageUpdateHandler = new
StorageUpdateHandler(0, partitionDataStorage, indexes);
+
commandListener = new PartitionListener(
partitionDataStorage,
+ storageUpdateHandler,
txStateStorage,
- new TxManagerImpl(replicaService, new HeapLockManager(),
hybridClock),
- () -> Map.of(pkStorage.id(), pkStorage),
- PARTITION_ID,
safeTimeTracker
);
}
@@ -290,12 +292,14 @@ public class PartitionCommandListenerTest {
TestPartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(mvPartitionStorage);
+ Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () ->
Map.of(pkStorage.id(), pkStorage);
+
+ StorageUpdateHandler storageUpdateHandler = new
StorageUpdateHandler(PARTITION_ID, partitionDataStorage, indexes);
+
PartitionListener testCommandListener = new PartitionListener(
partitionDataStorage,
+ storageUpdateHandler,
txStateStorage,
- new TxManagerImpl(replicaService, new HeapLockManager(), new
HybridClockImpl()),
- () -> Map.of(pkStorage.id(), pkStorage),
- PARTITION_ID,
new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0))
);
@@ -577,8 +581,8 @@ public class PartitionCommandListenerTest {
invokeBatchedCommand(msgFactory.updateAllCommand()
.tablePartitionId(
msgFactory.tablePartitionIdMessage()
- .tableId(commitPartId.getTableId())
- .partitionId(commitPartId.getPartId())
+ .tableId(commitPartId.tableId())
+ .partitionId(commitPartId.partitionId())
.build())
.rowsToUpdate(rows)
.txId(txId)
@@ -613,8 +617,8 @@ public class PartitionCommandListenerTest {
invokeBatchedCommand(msgFactory.updateAllCommand()
.tablePartitionId(
msgFactory.tablePartitionIdMessage()
- .tableId(commitPartId.getTableId())
- .partitionId(commitPartId.getPartId())
+ .tableId(commitPartId.tableId())
+ .partitionId(commitPartId.partitionId())
.build())
.rowsToUpdate(rows)
.txId(txId)
@@ -647,8 +651,8 @@ public class PartitionCommandListenerTest {
invokeBatchedCommand(msgFactory.updateAllCommand()
.tablePartitionId(
msgFactory.tablePartitionIdMessage()
- .tableId(commitPartId.getTableId())
- .partitionId(commitPartId.getPartId())
+ .tableId(commitPartId.tableId())
+ .partitionId(commitPartId.partitionId())
.build())
.rowsToUpdate(keyRows)
.txId(txId)
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index d3960dcd90..158be20534 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
+import org.apache.ignite.distributed.TestPartitionDataStorage;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -60,6 +61,7 @@ import
org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
import org.apache.ignite.internal.table.distributed.HashIndexLocker;
import org.apache.ignite.internal.table.distributed.IndexLocker;
import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
@@ -189,6 +191,11 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
new PendingComparableValuesTracker<>(CLOCK.now()),
new TestTxStateStorage(),
mock(PlacementDriver.class),
+ new StorageUpdateHandler(
+ PART_ID,
+ new
TestPartitionDataStorage(TEST_MV_PARTITION_STORAGE),
+ () -> Map.of(pkStorage.get().id(), pkStorage.get())
+ ),
peer -> true,
CompletableFuture.completedFuture(schemaManager)
);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 6131815184..171fbc4816 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -17,7 +17,13 @@
package org.apache.ignite.internal.table.distributed.replication;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willFailFast;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.apache.ignite.internal.util.ArrayUtils.asList;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -27,6 +33,9 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
@@ -35,8 +44,11 @@ import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.IntStream;
+import org.apache.ignite.distributed.TestPartitionDataStorage;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -74,8 +86,11 @@ import
org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
import org.apache.ignite.internal.table.distributed.HashIndexLocker;
import org.apache.ignite.internal.table.distributed.IndexLocker;
import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest;
import org.apache.ignite.internal.table.distributed.replicator.LeaderOrTxState;
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
@@ -93,12 +108,14 @@ import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
+import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.tx.TransactionException;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -106,6 +123,8 @@ import org.mockito.Mock;
/** There are tests for partition replica listener. */
public class PartitionReplicaListenerTest extends IgniteAbstractTest {
+ private static final Supplier<CompletableFuture<?>>
DEFAULT_MOCK_RAFT_FUTURE_SUPPLIER = () -> completedFuture(null);
+
/** Tx messages factory. */
private static final TxMessagesFactory TX_MESSAGES_FACTORY = new
TxMessagesFactory();
@@ -138,6 +157,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private static PlacementDriver placementDriver =
mock(PlacementDriver.class);
+ private static PartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(testMvPartitionStorage);
+
@Mock
private static RaftGroupService mockRaftClient =
mock(RaftGroupService.class);
@@ -176,16 +197,22 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
/** Secondary hash index. */
private static TableSchemaAwareIndexStorage hashIndexStorage;
+ private static Supplier<CompletableFuture<?>> raftClientFutureSupplier =
DEFAULT_MOCK_RAFT_FUTURE_SUPPLIER;
+
+ private static LockManager lockManager = new HeapLockManager();
+
@BeforeAll
private static void beforeAll() {
when(mockRaftClient.refreshAndGetLeaderWithTerm()).thenAnswer(invocationOnMock
-> {
if (!localLeader) {
- return CompletableFuture.completedFuture(new
LeaderWithTerm(new Peer(anotherNode.name()), 1L));
+ return completedFuture(new LeaderWithTerm(new
Peer(anotherNode.name()), 1L));
}
- return CompletableFuture.completedFuture(new LeaderWithTerm(new
Peer(localNode.name()), 1L));
+ return completedFuture(new LeaderWithTerm(new
Peer(localNode.name()), 1L));
});
+ when(mockRaftClient.run(any())).thenAnswer(invocationOnMock ->
raftClientFutureSupplier.get());
+
when(topologySrv.getByConsistentId(any())).thenAnswer(invocationOnMock
-> {
String consistentId = invocationOnMock.getArgument(0);
if (consistentId.equals(anotherNode.name())) {
@@ -213,11 +240,11 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
txMeta = new TxMeta(TxState.ABORTED,
Collections.singletonList(grpId), txFixedTimestamp);
}
- return CompletableFuture.completedFuture(txMeta);
+ return completedFuture(txMeta);
});
PendingComparableValuesTracker safeTimeClock =
mock(PendingComparableValuesTracker.class);
-
when(safeTimeClock.waitFor(any())).thenReturn(CompletableFuture.completedFuture(null));
+ when(safeTimeClock.waitFor(any())).thenReturn(completedFuture(null));
UUID pkIndexId = UUID.randomUUID();
UUID sortedIndexId = UUID.randomUUID();
@@ -248,14 +275,11 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
rowConverter::toTuple
));
- sortedIndexStorage = new TableSchemaAwareIndexStorage(
- sortedIndexId,
- new TestSortedIndexStorage(new
SortedIndexDescriptor(sortedIndexId, List.of(
- new SortedIndexColumnDescriptor("intVal",
NativeTypes.INT32, false, true)
- ))),
- row -> null,
- row -> null
- );
+ SortedIndexStorage indexStorage = new TestSortedIndexStorage(new
SortedIndexDescriptor(sortedIndexId, List.of(
+ new SortedIndexColumnDescriptor("intVal", NativeTypes.INT32,
false, true)
+ )));
+
+ sortedIndexStorage = new TableSchemaAwareIndexStorage(sortedIndexId,
indexStorage, row -> null, row -> null);
hashIndexStorage = new TableSchemaAwareIndexStorage(
hashIndexId,
@@ -266,10 +290,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
row -> null
);
- LockManager lockManager = new HeapLockManager();
-
IndexLocker pkLocker = new HashIndexLocker(pkIndexId, true,
lockManager, row2tuple);
- IndexLocker sortedIndexLocker = new SortedIndexLocker(sortedIndexId,
lockManager, null, row2tuple);
+ IndexLocker sortedIndexLocker = new SortedIndexLocker(sortedIndexId,
lockManager, indexStorage, row2tuple);
IndexLocker hashIndexLocker = new HashIndexLocker(hashIndexId, false,
lockManager, row2tuple);
DummySchemaManagerImpl schemaManager = new
DummySchemaManagerImpl(schemaDescriptor);
@@ -288,8 +310,13 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
safeTimeClock,
txStateStorage,
placementDriver,
+ new StorageUpdateHandler(
+ partId,
+ partitionDataStorage,
+ () -> Map.of(pkStorage.get().id(), pkStorage.get())
+ ),
peer -> localNode.name().equals(peer.consistentId()),
- CompletableFuture.completedFuture(schemaManager)
+ completedFuture(schemaManager)
);
marshallerFactory = new ReflectionMarshallerFactory();
@@ -306,6 +333,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
((TestHashIndexStorage) pkStorage.get().storage()).clear();
((TestHashIndexStorage) hashIndexStorage.storage()).clear();
((TestSortedIndexStorage) sortedIndexStorage.storage()).clear();
+ testMvPartitionStorage.clear();
}
@Test
@@ -761,6 +789,225 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertEquals(3, rows.size());
}
+ @Test
+ public void testWriteIntentOnPrimaryReplicaInsertUpdateDelete() {
+ UUID txId = Timestamp.nextVersion().toUuid();
+
+ doSingleRowRequest(txId, binaryRow(0), RequestType.RW_INSERT);
+ checkRowInMvStorage(binaryRow(0), true);
+
+ BinaryRow br = binaryRow(new TestKey(0, "k0"), new TestValue(1, "v1"));
+ doSingleRowRequest(txId, br, RequestType.RW_UPSERT);
+ checkRowInMvStorage(br, true);
+
+ doSingleRowRequest(txId, binaryRow(0), RequestType.RW_DELETE);
+ checkRowInMvStorage(binaryRow(0), false);
+
+ doSingleRowRequest(txId, binaryRow(0), RequestType.RW_INSERT);
+ checkRowInMvStorage(binaryRow(0), true);
+
+ br = binaryRow(new TestKey(0, "k0"), new TestValue(1, "v2"));
+ doSingleRowRequest(txId, br, RequestType.RW_GET_AND_REPLACE);
+ checkRowInMvStorage(br, true);
+
+ br = binaryRow(new TestKey(0, "k0"), new TestValue(1, "v3"));
+ doSingleRowRequest(txId, br, RequestType.RW_GET_AND_UPSERT);
+ checkRowInMvStorage(br, true);
+
+ doSingleRowRequest(txId, br, RequestType.RW_GET_AND_DELETE);
+ checkRowInMvStorage(br, false);
+
+ doSingleRowRequest(txId, binaryRow(0), RequestType.RW_INSERT);
+ checkRowInMvStorage(binaryRow(0), true);
+ doSingleRowRequest(txId, binaryRow(0), RequestType.RW_DELETE_EXACT);
+ checkRowInMvStorage(binaryRow(0), false);
+
+ lockManager.locks(txId).forEachRemaining(lock ->
lockManager.release(lock));
+ }
+
+ @Test
+ public void testWriteIntentOnPrimaryReplicaMultiRowOps() {
+ UUID txId = Timestamp.nextVersion().toUuid();
+ BinaryRow row0 = binaryRow(0);
+ BinaryRow row1 = binaryRow(1);
+ Collection<BinaryRow> rows = asList(row0, row1);
+
+ doMultiRowRequest(txId, rows, RequestType.RW_INSERT_ALL);
+ checkRowInMvStorage(row0, true);
+ checkRowInMvStorage(row1, true);
+
+ BinaryRow newRow0 = binaryRow(new TestKey(0, "k0"), new TestValue(2,
"v2"));
+ BinaryRow newRow1 = binaryRow(new TestKey(1, "k1"), new TestValue(3,
"v3"));
+ Collection<BinaryRow> newRows = asList(newRow0, newRow1);
+ doMultiRowRequest(txId, newRows, RequestType.RW_UPSERT_ALL);
+ checkRowInMvStorage(row0, false);
+ checkRowInMvStorage(row1, false);
+ checkRowInMvStorage(newRow0, true);
+ checkRowInMvStorage(newRow1, true);
+
+ doMultiRowRequest(txId, newRows, RequestType.RW_DELETE_ALL);
+ checkRowInMvStorage(row0, false);
+ checkRowInMvStorage(row1, false);
+ checkRowInMvStorage(newRow0, false);
+ checkRowInMvStorage(newRow1, false);
+
+ doMultiRowRequest(txId, rows, RequestType.RW_INSERT_ALL);
+ checkRowInMvStorage(row0, true);
+ checkRowInMvStorage(row1, true);
+ doMultiRowRequest(txId, rows, RequestType.RW_DELETE_EXACT_ALL);
+ checkRowInMvStorage(row0, false);
+ checkRowInMvStorage(row1, false);
+
+ lockManager.locks(txId).forEachRemaining(lock ->
lockManager.release(lock));
+ }
+
+ private void doSingleRowRequest(UUID txId, BinaryRow binaryRow,
RequestType requestType) {
+
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
+ .transactionId(txId)
+ .requestType(requestType)
+ .binaryRow(binaryRow)
+ .term(1L)
+ .commitPartitionId(new TablePartitionId(UUID.randomUUID(),
partId))
+ .build()
+ );
+ }
+
+ private void doMultiRowRequest(UUID txId, Collection<BinaryRow>
binaryRows, RequestType requestType) {
+
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
+ .transactionId(txId)
+ .requestType(requestType)
+ .binaryRows(binaryRows)
+ .term(1L)
+ .commitPartitionId(new TablePartitionId(UUID.randomUUID(),
partId))
+ .build()
+ );
+ }
+
+ @Test
+ public void testWriteIntentOnPrimaryReplicaSingleUpdate() {
+ UUID txId = Timestamp.nextVersion().toUuid();
+ AtomicInteger counter = new AtomicInteger();
+
+ testWriteIntentOnPrimaryReplica(
+ txId,
+ () -> {
+ BinaryRow binaryRow = binaryRow(counter.getAndIncrement());
+
+ return
TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
+ .transactionId(txId)
+ .requestType(RequestType.RW_INSERT)
+ .binaryRow(binaryRow)
+ .term(1L)
+ .commitPartitionId(new
TablePartitionId(UUID.randomUUID(), partId))
+ .build();
+ },
+ () -> checkRowInMvStorage(binaryRow(0), true)
+ );
+
+ lockManager.locks(txId).forEachRemaining(lock ->
lockManager.release(lock));
+ }
+
+ @Test
+ public void testWriteIntentOnPrimaryReplicaUpdateAll() {
+ UUID txId = Timestamp.nextVersion().toUuid();
+ AtomicInteger counter = new AtomicInteger();
+
+ testWriteIntentOnPrimaryReplica(
+ txId,
+ () -> {
+ int cntr = counter.getAndIncrement();
+ BinaryRow binaryRow0 = binaryRow(cntr * 2);
+ BinaryRow binaryRow1 = binaryRow(cntr * 2 + 1);
+
+ return
TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
+ .transactionId(txId)
+ .requestType(RequestType.RW_UPSERT_ALL)
+ .binaryRows(asList(binaryRow0, binaryRow1))
+ .term(1L)
+ .commitPartitionId(new
TablePartitionId(UUID.randomUUID(), partId))
+ .build();
+ },
+ () -> checkRowInMvStorage(binaryRow(0), true)
+ );
+
+ lockManager.locks(txId).forEachRemaining(lock ->
lockManager.release(lock));
+ }
+
+ private void checkRowInMvStorage(BinaryRow binaryRow, boolean
shouldBePresent) {
+ TableRow tableRow = tableRow(binaryRow);
+ Cursor<RowId> cursor = pkStorage.get().get(binaryRow);
+
+ if (shouldBePresent) {
+ boolean found = false;
+
+ // There can be write intents for deletion.
+ while (cursor.hasNext()) {
+ RowId rowId = cursor.next();
+
+ TableRow row = testMvPartitionStorage.read(rowId,
HybridTimestamp.MAX_VALUE).tableRow();
+
+ if (row != null && Arrays.equals(tableRow.bytes(),
row.bytes())) {
+ found = true;
+ }
+ }
+
+ assertTrue(found);
+ } else {
+ RowId rowId = cursor.next();
+
+ TableRow row = testMvPartitionStorage.read(rowId,
HybridTimestamp.MAX_VALUE).tableRow();
+
+ assertTrue(row == null || !Arrays.equals(row.bytes(),
tableRow.bytes()));
+ }
+ }
+
+ private void testWriteIntentOnPrimaryReplica(
+ UUID txId,
+ Supplier<ReadWriteReplicaRequest> updatingRequestSupplier,
+ Runnable checkAfterFirstOperation
+ ) {
+ partitionReplicaListener.invoke(updatingRequestSupplier.get());
+ checkAfterFirstOperation.run();
+
+ // Check that cleanup request processing awaits all write requests.
+ CompletableFuture<?> writeFut = new CompletableFuture<>();
+
+ raftClientFutureSupplier = () -> writeFut;
+
+ try {
+ CompletableFuture<?> replicaWriteFut =
partitionReplicaListener.invoke(updatingRequestSupplier.get());
+
+ assertFalse(replicaWriteFut.isDone());
+
+ raftClientFutureSupplier = DEFAULT_MOCK_RAFT_FUTURE_SUPPLIER;
+
+ HybridTimestamp now = clock.now();
+
+ // Imitation of tx commit.
+ txStateStorage.put(txId, new TxMeta(TxState.COMMITED, new
ArrayList<>(), now));
+
+ CompletableFuture<?> replicaCleanupFut =
partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txCleanupReplicaRequest()
+ .txId(txId)
+ .commit(true)
+ .commitTimestamp(now)
+ .term(1L)
+ .build()
+ );
+
+ assertFalse(replicaCleanupFut.isDone());
+
+ writeFut.complete(null);
+
+ assertThat(replicaCleanupFut, willSucceedFast());
+ } finally {
+ raftClientFutureSupplier = DEFAULT_MOCK_RAFT_FUTURE_SUPPLIER;
+ }
+
+ // Check that one more write after cleanup is discarded.
+ CompletableFuture<?> writeAfterCleanupFuture =
partitionReplicaListener.invoke(updatingRequestSupplier.get());
+ assertThat(writeAfterCleanupFuture,
willFailFast(TransactionException.class));
+ }
+
private static BinaryTuplePrefix toIndexBound(int val) {
ByteBuffer tuple = new BinaryTuplePrefixBuilder(1,
1).appendInt(val).build();
@@ -783,10 +1030,22 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
}
}
+ protected static TableRow tableRow(BinaryRow binaryRow) {
+ return TableRowConverter.fromBinaryRow(binaryRow, rowConverter);
+ }
+
private static TableRow tableRow(TestKey key, TestValue value) {
return TableRowConverter.fromBinaryRow(binaryRow(key, value),
rowConverter);
}
+ protected static BinaryRow binaryRow(int i) {
+ try {
+ return kvMarshaller.marshal(new TestKey(i, "k" + i), new
TestValue(i, "v" + i));
+ } catch (MarshallerException e) {
+ throw new IgniteException(e);
+ }
+ }
+
private static BinaryRow binaryRow(TestKey key, TestValue value) {
try {
return kvMarshaller.marshal(key, value);
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 5a356045f7..a4f0a18045 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -115,6 +115,8 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
protected IgniteTransactions igniteTransactions;
+ protected TxManager clientTxManager;
+
/**
* Initialize the test state.
*/
@@ -290,7 +292,7 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
assertEquals(BALANCE_1 - DELTA, view.get(null,
makeKey(1)).doubleValue("balance"));
assertEquals(BALANCE_2 + DELTA, view.get(null,
makeKey(2)).doubleValue("balance"));
- assertEquals(5, txManager(accounts).finished());
+ assertEquals(5, clientTxManager().finished());
}
/**
@@ -314,7 +316,7 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
assertEquals(BALANCE_1 - DELTA, accounts.recordView().get(null,
makeKey(1)).doubleValue("balance"));
assertEquals(BALANCE_2 + DELTA, accounts.recordView().get(null,
makeKey(2)).doubleValue("balance"));
- assertEquals(5, txManager(accounts).finished());
+ assertEquals(5, clientTxManager().finished());
}
/**
@@ -1642,6 +1644,13 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
return Tuple.create().set("name", name);
}
+ /**
+ * Get a client tx manager.
+ *
+ * @return TX manager.
+ */
+ protected abstract TxManager clientTxManager();
+
/**
* Get a tx manager on a partition leader.
*
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 6e82ef55e6..3346feb172 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -29,6 +29,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
+import java.util.function.Supplier;
import javax.naming.OperationNotSupportedException;
import org.apache.ignite.distributed.TestPartitionDataStorage;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -58,7 +59,9 @@ import
org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.table.distributed.HashIndexLocker;
import org.apache.ignite.internal.table.distributed.IndexLocker;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
@@ -84,6 +87,8 @@ import org.jetbrains.annotations.Nullable;
public class DummyInternalTableImpl extends InternalTableImpl {
public static final NetworkAddress ADDR = new NetworkAddress("127.0.0.1",
2004);
+ private static final int PART_ID = 0;
+
private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
1,
new Column[]{new Column("key", NativeTypes.INT64, false)},
@@ -164,7 +169,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
super(
"test",
UUID.randomUUID(),
- Int2ObjectMaps.singleton(0, mock(RaftGroupService.class)),
+ Int2ObjectMaps.singleton(PART_ID,
mock(RaftGroupService.class)),
1,
name -> mock(ClusterNode.class),
txManager == null ? new TxManagerImpl(replicaSvc, new
HeapLockManager(), new HybridClockImpl()) : txManager,
@@ -175,7 +180,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
);
RaftGroupService svc = partitionMap.get(0);
- groupId = crossTableUsage ? new TablePartitionId(tableId(), 0) :
crossTableGroupId;
+ groupId = crossTableUsage ? new TablePartitionId(tableId(), PART_ID) :
crossTableGroupId;
lenient().doReturn(groupId).when(svc).groupId();
Peer leaderPeer = new Peer(UUID.randomUUID().toString());
@@ -260,34 +265,36 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
HybridClock clock = new HybridClockImpl();
PendingComparableValuesTracker<HybridTimestamp> safeTime = new
PendingComparableValuesTracker<>(clock.now());
+ PartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(mvPartStorage);
+ Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () ->
Map.of(pkStorage.get().id(), pkStorage.get());
+ StorageUpdateHandler storageUpdateHandler = new
StorageUpdateHandler(PART_ID, partitionDataStorage, indexes);
DummySchemaManagerImpl schemaManager = new
DummySchemaManagerImpl(schema);
replicaListener = new PartitionReplicaListener(
mvPartStorage,
- partitionMap.get(0),
+ partitionMap.get(PART_ID),
this.txManager,
this.txManager.lockManager(),
Runnable::run,
- 0,
+ PART_ID,
tableId,
() -> Map.of(pkLocker.id(), pkLocker),
pkStorage,
() -> Map.of(),
clock,
safeTime,
- txStateStorage().getOrCreateTxStateStorage(0),
+ txStateStorage().getOrCreateTxStateStorage(PART_ID),
placementDriver,
+ storageUpdateHandler,
peer -> true,
CompletableFuture.completedFuture(schemaManager)
);
partitionListener = new PartitionListener(
new TestPartitionDataStorage(mvPartStorage),
- txStateStorage().getOrCreateTxStateStorage(0),
- this.txManager,
- () -> Map.of(pkStorage.get().id(), pkStorage.get()),
- 0,
+ storageUpdateHandler,
+ txStateStorage().getOrCreateTxStateStorage(PART_ID),
safeTime
);
}