This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8bdbb88f97 IGNITE-20685 Implement ability to trigger transaction
recovery (#2832)
8bdbb88f97 is described below
commit 8bdbb88f9751fe50ded7732ab3968ebb0bb219a1
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Mon Nov 20 13:46:50 2023 +0300
IGNITE-20685 Implement ability to trigger transaction recovery (#2832)
---
.../ignite/client/fakes/FakeInternalTable.java | 3 +
.../apache/ignite/client/fakes/FakeTxManager.java | 4 +-
.../runner/app/ItIgniteNodeRestartTest.java | 2 +-
.../ignite/internal/table/ItTableScanTest.java | 14 +-
.../internal/table/ItTransactionConflictTest.java | 161 ++++++++++++++++++
.../org/apache/ignite/internal/app/IgniteImpl.java | 2 +-
.../sql/engine/exec/ScannableTableImpl.java | 14 +-
.../engine/exec/rel/ScannableTableSelfTest.java | 33 +++-
.../exec/rel/TableScanNodeExecutionTest.java | 25 ++-
.../ItInternalTableReadWriteScanTest.java | 2 +-
.../ignite/distributed/ItTablePersistenceTest.java | 16 +-
...xDistributedTestSingleNodeNoCleanupMessage.java | 10 +-
.../ignite/distributed/ItTxStateLocalMapTest.java | 5 +-
.../rebalance/ItRebalanceDistributedTest.java | 2 +-
.../ignite/internal/table/ItColocationTest.java | 15 +-
.../ignite/internal/table/InternalTable.java | 5 +
.../table/distributed/raft/PartitionListener.java | 3 +
.../replication/request/CommittableTxRequest.java | 8 +
.../request/ReadWriteMultiRowPkReplicaRequest.java | 8 -
.../request/ReadWriteMultiRowReplicaRequest.java | 8 -
.../ReadWriteSingleRowPkReplicaRequest.java | 7 -
.../request/ReadWriteSingleRowReplicaRequest.java | 7 -
.../request/ReadWriteSwapRowReplicaRequest.java | 8 +-
.../replicator/PartitionReplicaListener.java | 54 +++++-
.../distributed/storage/InternalTableImpl.java | 25 +--
.../replication/PartitionReplicaListenerTest.java | 20 ++-
.../apache/ignite/distributed/ItTxTestCluster.java | 15 +-
.../ignite/internal/table/TxAbstractTest.java | 7 +-
.../table/impl/DummyInternalTableImpl.java | 22 ++-
.../org/apache/ignite/internal/tx/LockManager.java | 1 -
.../org/apache/ignite/internal/tx/TxManager.java | 3 +-
.../org/apache/ignite/internal/tx/TxStateMeta.java | 25 ++-
.../ignite/internal/tx/TxStateMetaFinishing.java | 6 +-
.../ignite/internal/tx/impl/OrphanDetector.java | 184 +++++++++++++++++++++
.../internal/tx/impl/ReadOnlyTransactionImpl.java | 2 +-
.../ignite/internal/tx/impl/TxManagerImpl.java | 92 +++++------
.../ignite/internal/tx/message/TxMessageGroup.java | 5 +
.../internal/tx/message/TxRecoveryMessage.java} | 23 ++-
.../apache/ignite/internal/tx/TxManagerTest.java | 2 +-
39 files changed, 680 insertions(+), 168 deletions(-)
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 02e382a279..0e580576cf 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -34,6 +34,7 @@ import javax.naming.OperationNotSupportedException;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.BinaryTuple;
@@ -358,6 +359,7 @@ public class FakeInternalTable implements InternalTable {
public Publisher<BinaryRow> scan(
int partId,
UUID txId,
+ TablePartitionId commitPartition,
PrimaryReplica recipient,
@Nullable Integer indexId,
@Nullable BinaryTuplePrefix lowerBound,
@@ -394,6 +396,7 @@ public class FakeInternalTable implements InternalTable {
public Publisher<BinaryRow> lookup(
int partId,
UUID txId,
+ TablePartitionId commitPartition,
PrimaryReplica recipient,
int indexId,
BinaryTuple key,
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 42a9b58f06..ef00ab2d48 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -145,8 +145,8 @@ public class FakeTxManager implements TxManager {
}
@Override
- public void updateTxMeta(UUID txId, Function<TxStateMeta, TxStateMeta>
updater) {
-
+ public TxStateMeta updateTxMeta(UUID txId, Function<TxStateMeta,
TxStateMeta> updater) {
+ return null;
}
@Override
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index c5a535c5a0..e0d1d45d22 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -297,11 +297,11 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
ReplicaService replicaSvc = new
ReplicaService(clusterSvc.messagingService(), hybridClock);
var txManager = new TxManagerImpl(
+ clusterSvc,
replicaService,
lockManager,
hybridClock,
new TransactionIdGenerator(idx),
- () -> clusterSvc.topologyService().localMember().id(),
placementDriver,
partitionIdleSafeTimePropagationPeriodMsSupplier
);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index 2b67249267..1b846c0084 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -139,7 +139,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
Publisher<BinaryRow> publisher = new RollbackTxOnErrorPublisher<>(
tx1,
- internalTable.scan(PART_ID, tx1.id(), recipient,
sortedIndexId, null, null, 0, null)
+ internalTable.scan(PART_ID, tx1.id(), tx1.commitPartition(),
recipient, sortedIndexId, null, null, 0, null)
);
CompletableFuture<Void> scanned = new CompletableFuture<>();
@@ -415,7 +415,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
Publisher<BinaryRow> publisher = new RollbackTxOnErrorPublisher<>(
tx,
- internalTable.scan(PART_ID, tx.id(), recipient, sortedIndexId,
null, null, 0, null)
+ internalTable.scan(PART_ID, tx.id(), tx.commitPartition(),
recipient, sortedIndexId, null, null, 0, null)
);
CompletableFuture<Void> scanned = new CompletableFuture<>();
@@ -443,7 +443,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
Publisher<BinaryRow> publisher1 = new RollbackTxOnErrorPublisher<>(
tx,
- internalTable.scan(PART_ID, tx.id(), recipient, sortedIndexId,
null, null, 0, null)
+ internalTable.scan(PART_ID, tx.id(), tx.commitPartition(),
recipient, sortedIndexId, null, null, 0, null)
);
assertEquals(scanAllRows(publisher1).size(), scannedRows.size());
@@ -474,6 +474,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
internalTable.scan(
PART_ID,
tx.id(),
+ tx.commitPartition(),
recipient,
soredIndexId,
lowBound,
@@ -500,6 +501,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
internalTable.scan(
PART_ID,
tx.id(),
+ tx.commitPartition(),
recipient,
soredIndexId,
lowBound,
@@ -554,7 +556,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
Publisher<BinaryRow> publisher = new
RollbackTxOnErrorPublisher<>(
tx,
- internalTable.scan(PART_ID, tx.id(), recipient,
sortedIndexId, null, null, 0, null)
+ internalTable.scan(PART_ID, tx.id(),
tx.commitPartition(), recipient, sortedIndexId, null, null, 0, null)
);
// Non-thread-safe collection is fine, HB is guaranteed by
"Thread#join" inside of "runRace".
@@ -580,7 +582,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
Publisher<BinaryRow> publisher1 = new
RollbackTxOnErrorPublisher<>(
tx,
- internalTable.scan(PART_ID, tx.id(), recipient,
sortedIndexId, null, null, 0, null)
+ internalTable.scan(PART_ID, tx.id(),
tx.commitPartition(), recipient, sortedIndexId, null, null, 0, null)
);
assertEquals(scanAllRows(publisher1).size(),
scannedRows.size());
@@ -657,7 +659,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
publisher = new RollbackTxOnErrorPublisher<>(
tx,
- internalTable.scan(PART_ID, tx.id(), recipient,
sortedIndexId, null, null, 0, null)
+ internalTable.scan(PART_ID, tx.id(),
tx.commitPartition(), recipient, sortedIndexId, null, null, 0, null)
);
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionConflictTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionConflictTest.java
new file mode 100644
index 0000000000..9f25d1a8cd
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionConflictTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ExceptionUtils.extractCodeFrom;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
+import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Abandoned transactions integration tests.
+ */
+public class ItTransactionConflictTest extends ClusterPerTestIntegrationTest {
+ /** Table name. */
+ private static final String TABLE_NAME = "test_table";
+
+ @BeforeEach
+ @Override
+ public void setup(TestInfo testInfo) throws Exception {
+ super.setup(testInfo);
+
+ String zoneSql = "create zone test_zone with partitions=1, replicas=3";
+ String sql = "create table " + TABLE_NAME + " (key int primary key,
val varchar(20)) with primary_zone='TEST_ZONE'";
+
+ cluster.doInSession(0, session -> {
+ executeUpdate(zoneSql, session);
+ executeUpdate(sql, session);
+ });
+ }
+
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-20773")
+ public void test() throws Exception {
+ TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME);
+
+ var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
+
+ CompletableFuture<ReplicaMeta> primaryReplicaFut =
node(0).placementDriver().awaitPrimaryReplica(
+ tblReplicationGrp,
+ node(0).clock().now(),
+ 10,
+ SECONDS
+ );
+
+ assertThat(primaryReplicaFut, willCompleteSuccessfully());
+
+ String leaseholder = primaryReplicaFut.join().getLeaseholder();
+
+ IgniteImpl commitPartNode = IntStream.range(0,
initialNodes()).mapToObj(this::node).filter(n -> leaseholder.equals(n.name()))
+ .findFirst().get();
+
+ log.info("Transaction commit partition is determined [node={}].",
commitPartNode.name());
+
+ IgniteImpl txCrdNode = IntStream.range(1,
initialNodes()).mapToObj(this::node).filter(n -> !leaseholder.equals(n.name()))
+ .findFirst().get();
+
+ log.info("Transaction coordinator is chosen [node={}].",
txCrdNode.name());
+
+ UUID orphanTxId = startTransactionAndStopNode(txCrdNode);
+
+ CompletableFuture<UUID> recoveryTxMsgCaptureFut = new
CompletableFuture<>();
+
+ commitPartNode.dropMessages((nodeName, msg) -> {
+ if (msg instanceof TxRecoveryMessage) {
+ var recoveryTxMsg = (TxRecoveryMessage) msg;
+
+ recoveryTxMsgCaptureFut.complete(recoveryTxMsg.txId());
+ }
+
+ return false;
+ });
+
+ runConflictedTransaction(node(0));
+
+ assertThat(recoveryTxMsgCaptureFut, willCompleteSuccessfully());
+
+ assertEquals(orphanTxId, recoveryTxMsgCaptureFut.join());
+ }
+
+ /**
+ * Runs a transaction that was expectedly finished with the lock conflict
exception.
+ *
+ * @param node Transaction coordinator node.
+ */
+ private void runConflictedTransaction(IgniteImpl node) {
+ RecordView view = node.tables().table(TABLE_NAME).recordView();
+
+ try {
+ Transaction rwTx2 = node.transactions().begin();
+
+ view.upsert(rwTx2, Tuple.create().set("key", 42).set("val",
"val2"));
+ } catch (Exception e) {
+ assertEquals(Transactions.ACQUIRE_LOCK_ERR, extractCodeFrom(e));
+
+ log.info("Expected lock conflict.", e);
+ }
+ }
+
+ /**
+ * Starts the transaction, takes a lock, and stops the transaction
coordinator.
+ * The stopped node leaves the transaction in the pending state.
+ *
+ * @param node Transaction coordinator node.
+ * @return Transaction id.
+ * @throws InterruptedException If interrupted.
+ */
+ private UUID startTransactionAndStopNode(IgniteImpl node) throws
InterruptedException {
+ RecordView view = node.tables().table(TABLE_NAME).recordView();
+
+ Transaction rwTx1 = node.transactions().begin();
+
+ view.upsert(rwTx1, Tuple.create().set("key", 42).set("val", "val1"));
+
+ String txCrdNodeId = node.id();
+
+ node.stop();
+
+ assertTrue(waitForCondition(
+ () -> node(0).clusterNodes().stream().filter(n ->
txCrdNodeId.equals(n.id())).count() == 0,
+ 10_000)
+ );
+ return ((InternalTransaction) rwTx1).id();
+ }
+}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 8c6d88a75b..b841b330f4 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -563,11 +563,11 @@ public class IgniteImpl implements Ignite {
// TODO: IGNITE-19344 - use nodeId that is validated on join (and
probably generated differently).
txManager = new TxManagerImpl(
+ clusterSvc,
replicaSvc,
lockMgr,
clock,
new TransactionIdGenerator(() ->
clusterSvc.nodeName().hashCode()),
- () -> clusterSvc.topologyService().localMember().id(),
placementDriverMgr.placementDriver(),
partitionIdleSafeTimePropagationPeriodMsSupplier
);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
index e0477f4515..a68684eeec 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
@@ -67,7 +67,17 @@ public class ScannableTableImpl implements ScannableTable {
} else {
PrimaryReplica recipient = new PrimaryReplica(ctx.localNode(),
partWithTerm.term());
- pub = internalTable.scan(partWithTerm.partId(), txAttributes.id(),
recipient, null, null, null, 0, null);
+ pub = internalTable.scan(
+ partWithTerm.partId(),
+ txAttributes.id(),
+ txAttributes.commitPartition(),
+ recipient,
+ null,
+ null,
+ null,
+ 0,
+ null
+ );
}
TableRowConverter rowConverter =
converterFactory.create(requiredColumns);
@@ -126,6 +136,7 @@ public class ScannableTableImpl implements ScannableTable {
pub = internalTable.scan(
partWithTerm.partId(),
txAttributes.id(),
+ txAttributes.commitPartition(),
new PrimaryReplica(ctx.localNode(), partWithTerm.term()),
indexId,
lower,
@@ -177,6 +188,7 @@ public class ScannableTableImpl implements ScannableTable {
pub = internalTable.lookup(
partWithTerm.partId(),
txAttributes.id(),
+ txAttributes.commitPartition(),
new PrimaryReplica(ctx.localNode(), partWithTerm.term()),
indexId,
keyTuple,
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
index 83f9a928cf..330353eaa9 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
@@ -54,6 +54,7 @@ import org.apache.calcite.rel.type.RelDataTypeFactory.Builder;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
@@ -128,7 +129,17 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
} else {
ClusterNode clusterNode = tx.clusterNode();
- verify(internalTable).scan(partitionId, tx.id(), new
PrimaryReplica(clusterNode, term), null, null, null, 0, null);
+ verify(internalTable).scan(
+ partitionId,
+ tx.id(),
+ tx.commitPartition(),
+ new PrimaryReplica(clusterNode, term),
+ null,
+ null,
+ null,
+ 0,
+ null
+ );
}
data.sendRows();
@@ -207,6 +218,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
verify(internalTable).scan(
eq(partitionId),
eq(tx.id()),
+ eq(tx.commitPartition()),
eq(primaryReplica),
eq(indexId),
condition.lowerValue != null ?
any(BinaryTuplePrefix.class) : isNull(),
@@ -279,6 +291,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
verify(internalTable).scan(
eq(partitionId),
eq(tx.id()),
+ eq(tx.commitPartition()),
eq(primaryReplica),
eq(indexId),
nullable(BinaryTuplePrefix.class),
@@ -396,6 +409,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
verify(internalTable).scan(
eq(partitionId),
eq(tx.id()),
+ eq(tx.commitPartition()),
eq(primaryReplica),
eq(indexId),
prefix.capture(),
@@ -447,6 +461,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
verify(internalTable).lookup(
eq(partitionId),
eq(tx.id()),
+ any(),
eq(primaryReplica),
eq(indexId),
any(BinaryTuple.class),
@@ -496,6 +511,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
verify(internalTable).lookup(
eq(partitionId),
eq(tx.id()),
+ any(),
eq(primaryReplica),
eq(indexId),
any(BinaryTuple.class),
@@ -568,8 +584,17 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
doAnswer(invocation -> input.publisher).when(internalTable)
.scan(anyInt(), any(HybridTimestamp.class),
any(ClusterNode.class));
} else {
- doAnswer(invocation -> input.publisher).when(internalTable)
- .scan(anyInt(), any(UUID.class),
any(PrimaryReplica.class), isNull(), isNull(), isNull(), eq(0), isNull());
+ doAnswer(invocation ->
input.publisher).when(internalTable).scan(
+ anyInt(),
+ any(UUID.class),
+ any(TablePartitionId.class),
+ any(PrimaryReplica.class),
+ isNull(),
+ isNull(),
+ isNull(),
+ eq(0),
+ isNull()
+ );
}
RowHandler<Object[]> rowHandler = ArrayRowHandler.INSTANCE;
@@ -600,6 +625,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
doAnswer(i -> input.publisher).when(internalTable).scan(
anyInt(),
any(UUID.class),
+ any(TablePartitionId.class),
any(PrimaryReplica.class),
any(Integer.class),
nullable(BinaryTuplePrefix.class),
@@ -637,6 +663,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
doAnswer(i -> input.publisher).when(internalTable).lookup(
anyInt(),
any(UUID.class),
+ any(TablePartitionId.class),
any(PrimaryReplica.class),
any(Integer.class),
nullable(BinaryTuple.class),
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 36829d6fe8..699eb60281 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -17,11 +17,13 @@
package org.apache.ignite.internal.sql.engine.exec.rel;
+import static
org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
import static
org.apache.ignite.internal.sql.engine.util.TypeUtils.rowSchemaFromRelTypes;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
import java.util.BitSet;
@@ -65,6 +67,11 @@ import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterNodeImpl;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.TopologyService;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@@ -106,20 +113,30 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
HybridTimestampTracker timestampTracker = new HybridTimestampTracker();
+ String leaseholder = "local";
+
+ TopologyService topologyService = mock(TopologyService.class);
+ when(topologyService.localMember()).thenReturn(
+ new ClusterNodeImpl(leaseholder, leaseholder,
NetworkAddress.from("127.0.0.1:1111"))
+ );
+
+ ClusterService clusterService = mock(ClusterService.class);
+
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
+ when(clusterService.topologyService()).thenReturn(topologyService);
+
for (int size : sizes) {
log.info("Check: size=" + size);
ReplicaService replicaSvc = mock(ReplicaService.class,
RETURNS_DEEP_STUBS);
- String leaseholder = "local";
-
TxManagerImpl txManager = new TxManagerImpl(
+ clusterService,
replicaSvc,
new HeapLockManager(),
new HybridClockImpl(),
new TransactionIdGenerator(0xdeadbeef),
- () -> leaseholder,
- new TestPlacementDriver(leaseholder, leaseholder)
+ new TestPlacementDriver(leaseholder, leaseholder),
+ () ->
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
);
txManager.start();
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
index 281926ef82..d355a632cf 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
@@ -48,7 +48,7 @@ public class ItInternalTableReadWriteScanTest extends
ItAbstractInternalTableSca
return new RollbackTxOnErrorPublisher<>(
tx,
- internalTbl.scan(part, tx.id(), recipient, null, null, null,
0, null)
+ internalTbl.scan(part, tx.id(), tx.commitPartition(),
recipient, null, null, null, 0, null)
);
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index c2f066a838..9359b32118 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.distributed;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
import static
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME;
import static
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener.tablePartitionId;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -209,12 +210,13 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
for (int i = 0; i < nodes(); i++) {
if (!txManagers.containsKey(i)) {
TxManager txManager = new TxManagerImpl(
+ service.clusterService(),
replicaService,
new HeapLockManager(),
hybridClock,
new TransactionIdGenerator(i),
- () -> NODE_ID,
- TEST_PLACEMENT_DRIVER
+ TEST_PLACEMENT_DRIVER,
+ () ->
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
);
txManager.start();
@@ -225,12 +227,13 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
}
TxManager txManager = new TxManagerImpl(
+ service.clusterService(),
replicaService,
new HeapLockManager(),
hybridClock,
new TransactionIdGenerator(-1),
- () -> NODE_ID,
- TEST_PLACEMENT_DRIVER
+ TEST_PLACEMENT_DRIVER,
+ () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
);
txManager.start();
@@ -480,12 +483,13 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
TxManager txManager = txManagers.computeIfAbsent(index, k
-> {
TxManager txMgr = new TxManagerImpl(
+ service,
replicaService,
new HeapLockManager(),
hybridClock,
new TransactionIdGenerator(index),
- () -> NODE_ID,
- TEST_PLACEMENT_DRIVER
+ TEST_PLACEMENT_DRIVER,
+ () ->
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
);
txMgr.start();
closeables.add(txMgr::stop);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index cac9e1c778..8404846fa0 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -18,6 +18,7 @@
package org.apache.ignite.distributed;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -58,6 +59,7 @@ import
org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.TransactionException;
import org.junit.jupiter.api.BeforeEach;
@@ -95,19 +97,21 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage
extends ItTxDistribut
) {
@Override
protected TxManagerImpl newTxManager(
+ ClusterService clusterService,
ReplicaService replicaSvc,
HybridClock clock,
TransactionIdGenerator generator,
ClusterNode node,
PlacementDriver placementDriver
) {
- return new TxManagerImpl(
+ return new TxManagerImpl(
+ clusterService,
replicaSvc,
new HeapLockManager(),
clock,
generator,
- node::id,
- placementDriver
+ placementDriver,
+ () ->
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
) {
@Override
public CompletableFuture<Void>
executeCleanupAsync(Runnable runnable) {
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
index 7bb9e285f4..861189e381 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
@@ -134,13 +134,13 @@ public class ItTxStateLocalMapTest extends
IgniteAbstractTest {
ReadWriteTransactionImpl tx = (ReadWriteTransactionImpl)
testCluster.igniteTransactions().begin();
- checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING,
coordinatorId, null), List.of(0));
+ checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING,
coordinatorId, tx.commitPartition(), null), List.of(0));
checkLocalTxStateOnNodes(tx.id(), null, IntStream.range(1,
NODES).boxed().collect(toList()));
touchOp.accept(tx);
if (checkAfterTouch) {
- checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING,
coordinatorId, null));
+ checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING,
coordinatorId, tx.commitPartition(), null));
}
if (commit) {
@@ -154,6 +154,7 @@ public class ItTxStateLocalMapTest extends
IgniteAbstractTest {
new TxStateMeta(
commit ? COMMITED : ABORTED,
coordinatorId,
+ tx.commitPartition(),
commit ? testCluster.clocks.get(coord.name()).now() :
null
)
);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index ca7d7c3f7d..219fe93464 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -907,11 +907,11 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
);
txManager = new TxManagerImpl(
+ clusterService,
replicaSvc,
lockManager,
hybridClock,
new TransactionIdGenerator(addr.port()),
- () -> clusterService.topologyService().localMember().id(),
placementDriver,
partitionIdleSafeTimePropagationPeriodMsSupplier
);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 3e061ffe71..06a8447183 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toMap;
+import static
org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
import static org.apache.ignite.internal.schema.SchemaTestUtils.specToType;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
@@ -28,6 +29,7 @@ import static
org.junit.jupiter.params.ParameterizedTest.ARGUMENTS_PLACEHOLDER;
import static org.mockito.Answers.RETURNS_DEEP_STUBS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import it.unimi.dsi.fastutil.ints.Int2IntMap;
@@ -95,6 +97,7 @@ import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -137,20 +140,22 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
@BeforeAll
static void beforeAllTests() {
- ClusterService clusterService = Mockito.mock(ClusterService.class,
RETURNS_DEEP_STUBS);
-
when(clusterService.topologyService().localMember().address()).thenReturn(DummyInternalTableImpl.ADDR);
-
ClusterNode clusterNode = DummyInternalTableImpl.LOCAL_NODE;
+ ClusterService clusterService = Mockito.mock(ClusterService.class,
RETURNS_DEEP_STUBS);
+
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
+
when(clusterService.topologyService().localMember()).thenReturn(clusterNode);
+
ReplicaService replicaService = Mockito.mock(ReplicaService.class,
RETURNS_DEEP_STUBS);
txManager = new TxManagerImpl(
+ clusterService,
replicaService,
new HeapLockManager(),
new HybridClockImpl(),
new TransactionIdGenerator(0xdeadbeef),
- clusterNode::id,
- new TestPlacementDriver(clusterNode)
+ new TestPlacementDriver(clusterNode),
+ () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
) {
@Override
public CompletableFuture<Void> finish(
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 1b588578da..72b0942492 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.BinaryTuple;
@@ -331,6 +332,7 @@ public interface InternalTable extends ManuallyCloseable {
*
* @param partId The partition.
* @param txId Transaction id.
+ * @param commitPartition Commit partition id.
* @param recipient Primary replica that will handle given get request.
* @param lowerBound Lower search bound.
* @param upperBound Upper search bound.
@@ -341,6 +343,7 @@ public interface InternalTable extends ManuallyCloseable {
Publisher<BinaryRow> scan(
int partId,
UUID txId,
+ TablePartitionId commitPartition,
PrimaryReplica recipient,
@Nullable Integer indexId,
@Nullable BinaryTuplePrefix lowerBound,
@@ -397,6 +400,7 @@ public interface InternalTable extends ManuallyCloseable {
*
* @param partId The partition.
* @param txId Transaction id.
+ * @param commitPartition Commit partition id.
* @param recipient Primary replica that will handle given get request.
* @param indexId Index id.
* @param key Key to search.
@@ -406,6 +410,7 @@ public interface InternalTable extends ManuallyCloseable {
Publisher<BinaryRow> lookup(
int partId,
UUID txId,
+ TablePartitionId commitPartition,
PrimaryReplica recipient,
int indexId,
BinaryTuple key,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 5c1710fa1e..38e23f85d9 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -270,6 +270,7 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
updateTrackerIgnoringTrackerClosedException(safeTime,
cmd.safeTime());
}
+
replicaTouch(cmd.txId(), cmd.txCoordinatorId(), cmd.full() ?
cmd.safeTime() : null, cmd.full());
}
@@ -577,6 +578,7 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
txManager.updateTxMeta(txId, old -> new TxStateMeta(
full ? COMMITED : PENDING,
txCoordinatorId,
+ old == null ? null : old.commitPartitionId(),
full ? commitTimestamp : null
));
}
@@ -585,6 +587,7 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
txManager.updateTxMeta(txId, old -> new TxStateMeta(
commit ? COMMITED : ABORTED,
txCoordinatorId,
+ old == null ? null : old.commitPartitionId(),
commit ? commitTimestamp : null
));
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/CommittableTxRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/CommittableTxRequest.java
index 9d163ca11c..c90badb72b 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/CommittableTxRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/CommittableTxRequest.java
@@ -19,6 +19,7 @@ package
org.apache.ignite.internal.table.distributed.replication.request;
import java.util.UUID;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
/**
* Transaction request that can contain full transaction (transaction that
contains full set of keys).
@@ -30,4 +31,11 @@ public interface CommittableTxRequest extends ReplicaRequest
{
* Return {@code true} if this is a full transaction.
*/
boolean full();
+
+ /**
+ * Gets a commit partition id.
+ *
+ * @return Table partition id.
+ */
+ TablePartitionIdMessage commitPartitionId();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowPkReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowPkReplicaRequest.java
index e5fca9fc41..32a7bbf109 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowPkReplicaRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowPkReplicaRequest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.table.distributed.replication.request;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
-import
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
@@ -26,13 +25,6 @@ import org.apache.ignite.network.annotations.Transferable;
*/
@Transferable(TableMessageGroup.RW_MULTI_ROW_PK_REPLICA_REQUEST)
public interface ReadWriteMultiRowPkReplicaRequest extends
MultipleRowPkReplicaRequest, ReadWriteReplicaRequest, CommittableTxRequest {
- /**
- * Gets a commit partition id.
- *
- * @return Table partition id.
- */
- TablePartitionIdMessage commitPartitionId();
-
/**
* Disable delayed ack optimization.
*
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
index 0789ea568d..de3773252f 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.table.distributed.replication.request;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
-import
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
@@ -26,13 +25,6 @@ import org.apache.ignite.network.annotations.Transferable;
*/
@Transferable(TableMessageGroup.RW_MULTI_ROW_REPLICA_REQUEST)
public interface ReadWriteMultiRowReplicaRequest extends
MultipleRowReplicaRequest, ReadWriteReplicaRequest, CommittableTxRequest {
- /**
- * Gets a commit partition id.
- *
- * @return Table partition id.
- */
- TablePartitionIdMessage commitPartitionId();
-
/**
* Disable delayed ack optimization.
*
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowPkReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowPkReplicaRequest.java
index 6447819dd7..896244e56a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowPkReplicaRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowPkReplicaRequest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.table.distributed.replication.request;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
-import
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
@@ -26,10 +25,4 @@ import org.apache.ignite.network.annotations.Transferable;
*/
@Transferable(TableMessageGroup.RW_SINGLE_ROW_PK_REPLICA_REQUEST)
public interface ReadWriteSingleRowPkReplicaRequest extends
SingleRowPkReplicaRequest, ReadWriteReplicaRequest, CommittableTxRequest {
- /**
- * Gets a commit partition id.
- *
- * @return Table partition id.
- */
- TablePartitionIdMessage commitPartitionId();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java
index ce1e6a1b0e..6f62a42b9b 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.table.distributed.replication.request;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
-import
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
@@ -26,10 +25,4 @@ import org.apache.ignite.network.annotations.Transferable;
*/
@Transferable(TableMessageGroup.RW_SINGLE_ROW_REPLICA_REQUEST)
public interface ReadWriteSingleRowReplicaRequest extends
SingleRowReplicaRequest, ReadWriteReplicaRequest, CommittableTxRequest {
- /**
- * Gets a commit partition id.
- *
- * @return Table partition id.
- */
- TablePartitionIdMessage commitPartitionId();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java
index d276841d38..983505f75a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.table.distributed.replication.request;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
-import
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
@@ -26,10 +25,5 @@ import org.apache.ignite.network.annotations.Transferable;
*/
@Transferable(TableMessageGroup.RW_DUAL_ROW_REPLICA_REQUEST)
public interface ReadWriteSwapRowReplicaRequest extends SwapRowReplicaRequest,
ReadWriteReplicaRequest, CommittableTxRequest {
- /**
- * Gets a commit partition id.
- *
- * @return Table partition id.
- */
- TablePartitionIdMessage commitPartitionId();
+
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 53e32760e8..2318f6081b 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -158,6 +158,7 @@ import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest;
import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.Cursor;
@@ -478,10 +479,19 @@ public class PartitionReplicaListener implements
ReplicaListener {
// Saving state is not needed for full transactions.
if (!req.full()) {
- txManager.updateTxMeta(req.transactionId(), old -> new
TxStateMeta(PENDING, senderId, null));
+ txManager.updateTxMeta(req.transactionId(), old -> new
TxStateMeta(
+ PENDING,
+ senderId,
+ req.commitPartitionId().asTablePartitionId(),
+ null
+ ));
}
}
+ if (request instanceof TxRecoveryMessage) {
+ return processTxRecoveryAction((TxRecoveryMessage) request);
+ }
+
HybridTimestamp opTsIfDirectRo = (request instanceof
ReadOnlyDirectReplicaRequest) ? hybridClock.now() : null;
return validateTableExistence(request, opTsIfDirectRo)
@@ -490,6 +500,30 @@ public class PartitionReplicaListener implements
ReplicaListener {
.thenCompose(opStartTimestamp ->
processOperationRequest(request, isPrimary, senderId, opTsIfDirectRo));
}
+ /**
+ * Processes transaction recovery request on a commit partition.
+ *
+ * @param request Tx recovery request.
+ * @return The future is complete when the transaction state is finalized.
+ */
+ private CompletableFuture<Void> processTxRecoveryAction(TxRecoveryMessage
request) {
+ UUID txId = request.txId();
+
+ TxMeta txMeta = txStateStorage.get(txId);
+
+ // Check whether a transaction has already been finished.
+ boolean transactionAlreadyFinished = txMeta != null &&
isFinalState(txMeta.txState());
+
+ if (transactionAlreadyFinished) {
+ return completedFuture(null);
+ }
+
+ LOG.info("Orphan transaction has to be aborted [tx={}].", txId);
+
+ // TODO: IGNITE-20735 Implement initiate recovery handling logic.
+ return completedFuture(null);
+ }
+
/**
* Validates that the table exists at a timestamp corresponding to the
request operation.
*
@@ -635,7 +669,12 @@ public class PartitionReplicaListener implements
ReplicaListener {
// We treat SCAN as 2pc and only switch to a 1pc mode if all table
rows fit in the bucket and the transaction is implicit.
// See `req.full() && (err != null || rows.size() <
req.batchSize())` condition.
// If they don't fit the bucket, the transaction is treated as 2pc.
- txManager.updateTxMeta(req.transactionId(), old -> new
TxStateMeta(PENDING, senderId, null));
+ txManager.updateTxMeta(req.transactionId(), old -> new TxStateMeta(
+ PENDING,
+ senderId,
+ req.commitPartitionId().asTablePartitionId(),
+ null
+ ));
// Implicit RW scan can be committed locally on a last batch or
error.
return appendTxCommand(req.transactionId(), RequestType.RW_SCAN,
false, () -> processScanRetrieveBatchAction(req, senderId))
@@ -1526,6 +1565,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
) {
CompletableFuture<?>[] futures = enlistedPartitions.stream()
.map(partitionId -> changeStateFuture.thenCompose(ignored ->
+ // TODO: IGNITE-20874 Use the node cleanup procedure
instead of the replication group cleanup one.
cleanupWithRetry(commit, commitTimestamp, txId,
partitionId, attemptsToCleanupReplica)))
.toArray(size -> new CompletableFuture<?>[size]);
@@ -3313,6 +3353,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
private CompletableFuture<Boolean> ensureReplicaIsPrimary(ReplicaRequest
request) {
Long expectedTerm;
+ // TODO: IGNITE-20875 Add enlistment consistency token to
PrimaryReplicaTestRequest interface.
if (request instanceof ReadWriteReplicaRequest) {
expectedTerm = ((ReadWriteReplicaRequest) request).term();
@@ -3327,6 +3368,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
assert expectedTerm != null;
} else if (request instanceof BuildIndexReplicaRequest) {
expectedTerm = ((BuildIndexReplicaRequest)
request).enlistmentConsistencyToken();
+ } else if (request instanceof TxRecoveryMessage) {
+ expectedTerm = ((TxRecoveryMessage)
request).enlistmentConsistencyToken();
} else {
expectedTerm = null;
}
@@ -3687,7 +3730,12 @@ public class PartitionReplicaListener implements
ReplicaListener {
txManager.updateTxMeta(txId, old -> old == null
? null
- : new TxStateMeta(txState, old.txCoordinatorId(), txState ==
COMMITED ? commitTimestamp : null));
+ : new TxStateMeta(
+ txState,
+ old.txCoordinatorId(),
+ old.commitPartitionId(),
+ txState == COMMITED ? commitTimestamp : null
+ ));
}
// TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code
below
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 5a1eae715b..0f71220cc7 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -98,7 +98,6 @@ import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnly
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowPkReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest;
-import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequestBuilder;
import
org.apache.ignite.internal.table.distributed.replication.request.SingleRowPkReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.SingleRowReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.SwapRowReplicaRequest;
@@ -407,7 +406,7 @@ public class InternalTableImpl implements InternalTable {
CompletableFuture<Collection<BinaryRow>> fut;
- ReadWriteScanRetrieveBatchReplicaRequestBuilder requestBuilder =
tableMessagesFactory.readWriteScanRetrieveBatchReplicaRequest()
+ Function<Long, ReplicaRequest> mapFunc = (term) ->
tableMessagesFactory.readWriteScanRetrieveBatchReplicaRequest()
.groupId(partGroupId)
.timestampLong(clock.nowLong())
.transactionId(tx.id())
@@ -419,13 +418,15 @@ public class InternalTableImpl implements InternalTable {
.flags(flags)
.columnsToInclude(columnsToInclude)
.full(implicit) // Intent for one phase commit.
- .batchSize(batchSize);
+ .batchSize(batchSize)
+ .term(term)
+
.commitPartitionId(serializeTablePartitionId(tx.commitPartition()))
+ .build();
if (primaryReplicaAndTerm != null) {
- ReadWriteScanRetrieveBatchReplicaRequest request =
requestBuilder.term(primaryReplicaAndTerm.get2()).build();
- fut = replicaSvc.invoke(primaryReplicaAndTerm.get1(), request);
+ fut = replicaSvc.invoke(primaryReplicaAndTerm.get1(),
mapFunc.apply(primaryReplicaAndTerm.get2()));
} else {
- fut = enlistWithRetry(tx, partId, term ->
requestBuilder.term(term).build(), ATTEMPTS_TO_ENLIST_PARTITION, false, null);
+ fut = enlistWithRetry(tx, partId, mapFunc,
ATTEMPTS_TO_ENLIST_PARTITION, false, null);
}
return postEnlist(fut, false, tx, false);
@@ -827,9 +828,7 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
keyRows,
tx,
- (keyRows0, txo, groupId, term, full) -> {
- return readWriteMultiRowPkReplicaRequest(RW_GET_ALL,
keyRows0, txo, groupId, term, full);
- },
+ (keyRows0, txo, groupId, term, full) ->
readWriteMultiRowPkReplicaRequest(RW_GET_ALL, keyRows0, txo, groupId, term,
full),
InternalTableImpl::collectMultiRowsResponsesWithRestoreOrder,
(res, req) -> false
);
@@ -1265,12 +1264,13 @@ public class InternalTableImpl implements InternalTable
{
public Publisher<BinaryRow> lookup(
int partId,
UUID txId,
+ TablePartitionId commitPartition,
PrimaryReplica recipient,
int indexId,
BinaryTuple key,
@Nullable BitSet columnsToInclude
) {
- return scan(partId, txId, recipient, indexId, key, null, null, 0,
columnsToInclude);
+ return scan(partId, txId, commitPartition, recipient, indexId, key,
null, null, 0, columnsToInclude);
}
@Override
@@ -1388,6 +1388,7 @@ public class InternalTableImpl implements InternalTable {
public Publisher<BinaryRow> scan(
int partId,
UUID txId,
+ TablePartitionId commitPartition,
PrimaryReplica recipient,
@Nullable Integer indexId,
@Nullable BinaryTuplePrefix lowerBound,
@@ -1395,12 +1396,13 @@ public class InternalTableImpl implements InternalTable
{
int flags,
@Nullable BitSet columnsToInclude
) {
- return scan(partId, txId, recipient, indexId, null, lowerBound,
upperBound, flags, columnsToInclude);
+ return scan(partId, txId, commitPartition, recipient, indexId, null,
lowerBound, upperBound, flags, columnsToInclude);
}
private Publisher<BinaryRow> scan(
int partId,
UUID txId,
+ TablePartitionId commitPartition,
PrimaryReplica recipient,
@Nullable Integer indexId,
@Nullable BinaryTuple exactKey,
@@ -1427,6 +1429,7 @@ public class InternalTableImpl implements InternalTable {
.batchSize(batchSize)
.term(recipient.term())
.full(false) // Set explicitly.
+
.commitPartitionId(serializeTablePartitionId(commitPartition))
.build();
return replicaSvc.invoke(recipient.node(), request);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index dd830a0052..d0e38050d3 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -220,6 +220,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private static final int TABLE_ID = 1;
+ private static final TablePartitionId commitPartitionId = new
TablePartitionId(TABLE_ID, PART_ID);
+
private static final int ANOTHER_TABLE_ID = 2;
private final Map<UUID, Set<RowId>> pendingRows = new
ConcurrentHashMap<>();
@@ -651,7 +653,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
pkStorage().put(testBinaryRow, rowId);
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID,
PART_ID);
- txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.COMMITED,
localNode.id(), clock.now()));
+ txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.COMMITED,
localNode.id(), commitPartitionId, clock.now()));
CompletableFuture<ReplicaResult> fut =
doReadOnlySingleGet(testBinaryKey);
@@ -669,7 +671,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
pkStorage().put(testBinaryRow, rowId);
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID,
PART_ID);
- txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING,
localNode.id(), null));
+ txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING,
localNode.id(), commitPartitionId, null));
CompletableFuture<ReplicaResult> fut =
doReadOnlySingleGet(testBinaryKey);
@@ -688,7 +690,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
pkStorage().put(testBinaryRow, rowId);
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID,
PART_ID);
- txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.ABORTED,
localNode.id(), null));
+ txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.ABORTED,
localNode.id(), commitPartitionId, null));
CompletableFuture<ReplicaResult> fut =
doReadOnlySingleGet(testBinaryKey);
@@ -728,6 +730,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.scanId(1L)
.indexToUse(sortedIndexId)
.batchSize(4)
+ .commitPartitionId(commitPartitionId())
.build(), localNode.id());
List<BinaryRow> rows = (List<BinaryRow>) fut.get(1,
TimeUnit.SECONDS).result();
@@ -744,6 +747,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.scanId(1L)
.indexToUse(sortedIndexId)
.batchSize(4)
+ .commitPartitionId(commitPartitionId())
.build(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -763,6 +767,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.upperBoundPrefix(toIndexBound(3))
.flags(SortedIndexStorage.LESS_OR_EQUAL)
.batchSize(5)
+ .commitPartitionId(commitPartitionId())
.build(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -780,6 +785,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.indexToUse(sortedIndexId)
.lowerBoundPrefix(toIndexBound(5))
.batchSize(5)
+ .commitPartitionId(commitPartitionId())
.build(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -797,6 +803,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.indexToUse(sortedIndexId)
.exactKey(toIndexKey(0))
.batchSize(5)
+ .commitPartitionId(commitPartitionId())
.build(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -1302,7 +1309,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
// Imitation of tx commit.
txStateStorage.put(txId, new TxMeta(TxState.COMMITED, new
ArrayList<>(), now));
- txManager.updateTxMeta(txId, old -> new
TxStateMeta(TxState.COMMITED, UUID.randomUUID().toString(), now));
+ txManager.updateTxMeta(txId, old -> new
TxStateMeta(TxState.COMMITED, UUID.randomUUID().toString(), commitPartitionId,
now));
CompletableFuture<?> replicaCleanupFut =
partitionReplicaListener.invoke(
TX_MESSAGES_FACTORY.txCleanupReplicaRequest()
@@ -1715,6 +1722,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.term(1L)
.scanId(1)
.batchSize(100)
+ .commitPartitionId(commitPartitionId())
.build(),
localNode.id()
)
@@ -1732,6 +1740,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.term(1L)
.scanId(1)
.batchSize(100)
+ .commitPartitionId(commitPartitionId())
.build(),
localNode.id()
)
@@ -1754,6 +1763,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.scanId(1)
.batchSize(100)
.full(false)
+ .commitPartitionId(commitPartitionId())
.build(),
localNode.id()
);
@@ -2443,7 +2453,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private void cleanup(UUID txId) {
HybridTimestamp commitTs = clock.now();
- txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.COMMITED,
UUID.randomUUID().toString(), commitTs));
+ txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.COMMITED,
UUID.randomUUID().toString(), commitPartitionId, commitTs));
partitionReplicaListener.invoke(
TX_MESSAGES_FACTORY.txCleanupReplicaRequest()
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index cb1f5b3334..226f3c32c7 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -21,6 +21,7 @@ import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
+import static
org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.CollectionUtils.first;
import static
org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
@@ -351,6 +352,7 @@ public class ItTxTestCluster {
replicaServices.put(node.name(), replicaSvc);
TxManagerImpl txMgr = newTxManager(
+ cluster.get(i),
replicaSvc,
clock,
new TransactionIdGenerator(i),
@@ -384,6 +386,7 @@ public class ItTxTestCluster {
}
protected TxManagerImpl newTxManager(
+ ClusterService clusterService,
ReplicaService replicaSvc,
HybridClock clock,
TransactionIdGenerator generator,
@@ -391,12 +394,13 @@ public class ItTxTestCluster {
PlacementDriver placementDriver
) {
return new TxManagerImpl(
+ clusterService,
replicaSvc,
new HeapLockManager(),
clock,
generator,
- node::id,
- placementDriver
+ placementDriver,
+ () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
);
}
@@ -822,15 +826,14 @@ public class ItTxTestCluster {
}
private void initializeClientTxComponents() {
- Supplier<String> localNodeIdSupplier = () ->
client.topologyService().localMember().id();
-
clientTxManager = new TxManagerImpl(
+ client,
clientReplicaSvc,
new HeapLockManager(),
clientClock,
new TransactionIdGenerator(-1),
- localNodeIdSupplier,
- placementDriver
+ placementDriver,
+ () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
);
clientTxStateResolver = new TransactionStateResolver(
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 52f8ea4a7a..4e2040f7b2 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -2004,7 +2004,12 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
UUID txId = ((ReadWriteTransactionImpl) tx).id();
for (TxManager txManager : txManagers()) {
- txManager.updateTxMeta(txId, old -> old == null ? null : new
TxStateMeta(old.txState(), "restarted", old.commitTimestamp()));
+ txManager.updateTxMeta(txId, old -> old == null ? null : new
TxStateMeta(
+ old.txState(),
+ "restarted",
+ old.commitPartitionId(),
+ old.commitTimestamp()
+ ));
}
// Read-only.
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index da2716615f..7c64a6eb5e 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.table.impl;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -98,7 +99,10 @@ import
org.apache.ignite.internal.util.PendingComparableValuesTracker;
import
org.apache.ignite.internal.util.PendingIndependentComparableValuesTracker;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterNodeImpl;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.TopologyService;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
@@ -433,14 +437,26 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
* @param replicaSvc Replica service to use.
*/
public static TxManagerImpl txManager(ReplicaService replicaSvc) {
- return new TxManagerImpl(
+ TopologyService topologyService = mock(TopologyService.class);
+ when(topologyService.localMember()).thenReturn(LOCAL_NODE);
+
+ ClusterService clusterService = mock(ClusterService.class);
+
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
+ when(clusterService.topologyService()).thenReturn(topologyService);
+
+ var txManager = new TxManagerImpl(
+ clusterService,
replicaSvc,
new HeapLockManager(),
CLOCK,
new TransactionIdGenerator(0xdeadbeef),
- LOCAL_NODE::id,
- TEST_PLACEMENT_DRIVER
+ TEST_PLACEMENT_DRIVER,
+ () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
);
+
+ txManager.start();
+
+ return txManager;
}
/** {@inheritDoc} */
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java
index 3ab221d854..79f8f3d987 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java
@@ -25,7 +25,6 @@ import org.jetbrains.annotations.TestOnly;
/** Lock manager allows to acquire locks and release locks and supports
deadlock prevention by transaction id ordering. */
public interface LockManager {
-
/**
* Attempts to acquire a lock for the specified {@code lockKey} in
specified {@code lockMode}.
*
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index b81708ff5a..1b0faeb812 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -70,8 +70,9 @@ public interface TxManager extends IgniteComponent {
*
* @param txId Transaction id.
* @param updater Transaction meta updater.
+ * @return Updated transaction state.
*/
- void updateTxMeta(UUID txId, Function<TxStateMeta, TxStateMeta> updater);
+ TxStateMeta updateTxMeta(UUID txId, Function<TxStateMeta, TxStateMeta>
updater);
/**
* Returns lock manager.
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
index df1e5f79ce..b0241b32fd 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
@@ -17,7 +17,10 @@
package org.apache.ignite.internal.tx;
+import java.util.Objects;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.Nullable;
/**
@@ -30,6 +33,9 @@ public class TxStateMeta implements TransactionMeta {
private final String txCoordinatorId;
+ /** Identifier of the replication group that manages a transaction state.
*/
+ private final TablePartitionId commitPartitionId;
+
private final HybridTimestamp commitTimestamp;
/**
@@ -37,15 +43,18 @@ public class TxStateMeta implements TransactionMeta {
*
* @param txState Transaction state.
* @param txCoordinatorId Transaction coordinator id.
+ * @param commitPartitionId Commit partition replication group id.
* @param commitTimestamp Commit timestamp.
*/
public TxStateMeta(
TxState txState,
String txCoordinatorId,
+ @Nullable TablePartitionId commitPartitionId,
@Nullable HybridTimestamp commitTimestamp
) {
this.txState = txState;
this.txCoordinatorId = txCoordinatorId;
+ this.commitPartitionId = commitPartitionId;
this.commitTimestamp = commitTimestamp;
}
@@ -58,6 +67,10 @@ public class TxStateMeta implements TransactionMeta {
return txCoordinatorId;
}
+ public TablePartitionId commitPartitionId() {
+ return commitPartitionId;
+ }
+
@Override
public @Nullable HybridTimestamp commitTimestamp() {
return commitTimestamp;
@@ -80,19 +93,21 @@ public class TxStateMeta implements TransactionMeta {
if (txCoordinatorId != null ?
!txCoordinatorId.equals(that.txCoordinatorId) : that.txCoordinatorId != null) {
return false;
}
+
+ if (commitPartitionId != null ?
!commitPartitionId.equals(that.commitPartitionId) : that.commitPartitionId !=
null) {
+ return false;
+ }
+
return commitTimestamp != null ?
commitTimestamp.equals(that.commitTimestamp) : that.commitTimestamp == null;
}
@Override
public int hashCode() {
- int result = txState != null ? txState.hashCode() : 0;
- result = 31 * result + (txCoordinatorId != null ?
txCoordinatorId.hashCode() : 0);
- result = 31 * result + (commitTimestamp != null ?
commitTimestamp.hashCode() : 0);
- return result;
+ return Objects.hash(txState, txCoordinatorId, commitPartitionId,
commitTimestamp);
}
@Override
public String toString() {
- return "[txState=" + txState + ", txCoordinatorId=" + txCoordinatorId
+ ", commitTimestamp=" + commitTimestamp + ']';
+ return S.toString(TxStateMeta.class, this);
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
index 9b85925d7c..bc53742775 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.tx;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
import org.jetbrains.annotations.Nullable;
/**
@@ -35,9 +36,10 @@ public class TxStateMetaFinishing extends TxStateMeta {
* Constructor.
*
* @param txCoordinatorId Transaction coordinator id.
+ * @param commitPartitionId Commit partition id.
*/
- public TxStateMetaFinishing(String txCoordinatorId) {
- super(TxState.FINISHING, txCoordinatorId, null);
+ public TxStateMetaFinishing(String txCoordinatorId, @Nullable
TablePartitionId commitPartitionId) {
+ super(TxState.FINISHING, txCoordinatorId, commitPartitionId, null);
}
/**
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java
new file mode 100644
index 0000000000..bce40cba30
--- /dev/null
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * The class detects transactions that are left without a coordinator but
still hold locks. For that orphan transaction, the recovery
+ * message is sent to the commit partition replication group.
+ */
+public class OrphanDetector {
+ /** The logger. */
+ private static final IgniteLogger LOG =
Loggers.forClass(OrphanDetector.class);
+
+ /** Tx messages factory. */
+ private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
+
+ private static final long AWAIT_PRIMARY_REPLICA_TIMEOUT_SEC = 10;
+
+ /** Busy lock to stop synchronously. */
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ /** Topology service. */
+ private final TopologyService topologyService;
+
+ /** Replica service. */
+ private final ReplicaService replicaService;
+
+ /** Placement driver. */
+ private final PlacementDriver placementDriver;
+
+ // TODO: IGNITE-20773 Uncomment this during implementation.
+ ///** Lock manager. */
+ //private final LockManager lockManager;
+
+ /** Hybrid clock. */
+ private final HybridClock clock;
+
+ /** Local transaction state storage. */
+ private Function<UUID, TxStateMeta> txLocalStateStorage;
+
+ /**
+ * The constructor.
+ *
+ * @param topologyService Topology service.
+ * @param replicaService Replica service.
+ * @param placementDriver Placement driver.
+ * @param clock Clock.
+ */
+ public OrphanDetector(
+ TopologyService topologyService,
+ ReplicaService replicaService,
+ PlacementDriver placementDriver,
+ //LockManager lockManager,
+ HybridClock clock) {
+ this.topologyService = topologyService;
+ this.replicaService = replicaService;
+ this.placementDriver = placementDriver;
+ //this.lockManager = lockManager;
+ this.clock = clock;
+ }
+
+ /**
+ * Starts the detector.
+ *
+ * @param txLocalStateStorage Local transaction state storage.
+ */
+ public void start(Function<UUID, TxStateMeta> txLocalStateStorage) {
+ this.txLocalStateStorage = txLocalStateStorage;
+ // TODO: IGNITE-20773 Subscribe to lock conflicts here.
+ }
+
+ /**
+ * Stops the detector.
+ */
+ public void stop() {
+ busyLock.block();
+ // TODO: IGNITE-20773 Unsubscribe from lock conflicts here.
+ }
+
+ /**
+ * Sends {@link TxRecoveryMessage} if the transaction is orphaned.
+ * TODO: IGNITE-20773 Invoke the method when the lock conflict is noted.
+ *
+ * @param txId Transaction id that holds a lock.
+ * @return Future to complete.
+ */
+ private CompletableFuture<Void> handleLockHolder(UUID txId) {
+ if (busyLock.enterBusy()) {
+ try {
+ return handleLockHolderInternal(txId);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ return failedFuture(new NodeStoppingException());
+ }
+
+ /**
+ * Sends {@link TxRecoveryMessage} if the transaction is orphaned.
+ *
+ * @param txId Transaction id that holds a lock.
+ * @return Future to complete.
+ */
+ private CompletableFuture<Void> handleLockHolderInternal(UUID txId) {
+ TxStateMeta txState = txLocalStateStorage.apply(txId);
+
+ assert txState != null : "The transaction is undefined in the local
node [txId=" + txId + "].";
+
+ if (txState.txState() == TxState.ABANDONED
+ || isFinalState(txState.txState())
+ || topologyService.getById(txState.txCoordinatorId()) != null)
{
+ return completedFuture(null);
+ }
+
+ LOG.info(
+ "Conflict was found, and the coordinator of the transaction
that holds a lock is not available "
+ + "[txId={}, txCrd={}].",
+ txId,
+ txState.txCoordinatorId()
+ );
+
+ return placementDriver.awaitPrimaryReplica(
+ txState.commitPartitionId(),
+ clock.now(),
+ AWAIT_PRIMARY_REPLICA_TIMEOUT_SEC,
+ SECONDS
+ ).thenCompose(replicaMeta -> {
+ ClusterNode commitPartPrimaryNode =
topologyService.getByConsistentId(replicaMeta.getLeaseholder());
+
+ if (commitPartPrimaryNode == null) {
+ LOG.warn(
+ "The primary replica of the commit partition is not
available [commitPartGrp={}, tx={}]",
+ txState.commitPartitionId(),
+ txId
+ );
+
+ return completedFuture(null);
+ }
+
+ return replicaService.invoke(commitPartPrimaryNode,
FACTORY.txRecoveryMessage()
+ .groupId(txState.commitPartitionId())
+
.enlistmentConsistencyToken(replicaMeta.getStartTime().longValue())
+ .txId(txId)
+ .build());
+ });
+ }
+}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
index 2846d7b43d..d367396ec6 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
@@ -116,7 +116,7 @@ class ReadOnlyTransactionImpl extends
IgniteAbstractTransactionImpl {
return ((TxManagerImpl)
txManager).completeReadOnlyTransactionFuture(new
TxIdAndTimestamp(readTimestamp, id()))
.thenRun(() -> txManager.updateTxMeta(
id(),
- old -> new TxStateMeta(COMMITED,
old.txCoordinatorId(), old.commitTimestamp())
+ old -> new TxStateMeta(COMMITED,
old.txCoordinatorId(), old.commitPartitionId(), old.commitTimestamp())
));
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 1d0c6f0b26..cbf750efdd 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -23,7 +23,6 @@ import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
-import static
org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
import static org.apache.ignite.internal.tx.TxState.ABORTED;
import static org.apache.ignite.internal.tx.TxState.COMMITED;
import static org.apache.ignite.internal.tx.TxState.PENDING;
@@ -89,7 +88,7 @@ import
org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.NetworkMessageHandler;
import org.apache.ignite.tx.TransactionException;
@@ -146,8 +145,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
/** Lock to update and read the low watermark. */
private final ReadWriteLock lowWatermarkReadWriteLock = new
ReentrantReadWriteLock();
- private final Lazy<String> localNodeId;
-
private final PlacementDriver placementDriver;
private final LongSupplier idleSafeTimePropagationPeriodMsSupplier;
@@ -158,48 +155,32 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
- /**
- * The constructor.
- *
- * @param replicaService Replica service.
- * @param lockManager Lock manager.
- * @param clock A hybrid logical clock.
- * @param transactionIdGenerator Used to generate transaction IDs.
- */
- public TxManagerImpl(
- ReplicaService replicaService,
- LockManager lockManager,
- HybridClock clock,
- TransactionIdGenerator transactionIdGenerator,
- Supplier<String> localNodeIdSupplier,
- PlacementDriver placementDriver
- ) {
- this(
- replicaService,
- lockManager,
- clock,
- transactionIdGenerator,
- localNodeIdSupplier,
- placementDriver,
- () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
- );
- }
+ /** Cluster service. */
+ private final ClusterService clusterService;
+
+ /** Detector of transactions that lost the coordinator. */
+ private final OrphanDetector orphanDetector;
+
+ /** Local node network identity. This id is available only after the
network has started. */
+ private String localNodeId;
/**
* The constructor.
*
+ * @param clusterService Cluster service.
* @param replicaService Replica service.
* @param lockManager Lock manager.
* @param clock A hybrid logical clock.
* @param transactionIdGenerator Used to generate transaction IDs.
+ * @param placementDriver Placement driver.
* @param idleSafeTimePropagationPeriodMsSupplier Used to get idle safe
time propagation period in ms.
*/
public TxManagerImpl(
+ ClusterService clusterService,
ReplicaService replicaService,
LockManager lockManager,
HybridClock clock,
TransactionIdGenerator transactionIdGenerator,
- Supplier<String> localNodeIdSupplier,
PlacementDriver placementDriver,
LongSupplier idleSafeTimePropagationPeriodMsSupplier
) {
@@ -207,7 +188,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
this.lockManager = lockManager;
this.clock = clock;
this.transactionIdGenerator = transactionIdGenerator;
- this.localNodeId = new Lazy<>(localNodeIdSupplier);
+ this.clusterService = clusterService;
this.placementDriver = placementDriver;
this.idleSafeTimePropagationPeriodMsSupplier =
idleSafeTimePropagationPeriodMsSupplier;
@@ -220,6 +201,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory("tx-async-cleanup", LOG));
+
+ orphanDetector = new OrphanDetector(clusterService.topologyService(),
replicaService, placementDriver, /*lockManager,*/ clock);
}
@Override
@@ -231,7 +214,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
public InternalTransaction begin(HybridTimestampTracker timestampTracker,
boolean readOnly) {
HybridTimestamp beginTimestamp = clock.now();
UUID txId = transactionIdGenerator.transactionIdFor(beginTimestamp);
- updateTxMeta(txId, old -> new TxStateMeta(PENDING, coordinatorId(),
null));
+ updateTxMeta(txId, old -> new TxStateMeta(PENDING, localNodeId, null,
null));
if (!readOnly) {
return new ReadWriteTransactionImpl(this, timestampTracker, txId);
@@ -289,8 +272,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
}
@Override
- public void updateTxMeta(UUID txId, Function<TxStateMeta, TxStateMeta>
updater) {
- txStateMap.compute(txId, (k, oldMeta) -> {
+ public TxStateMeta updateTxMeta(UUID txId, Function<TxStateMeta,
TxStateMeta> updater) {
+ return txStateMap.compute(txId, (k, oldMeta) -> {
TxStateMeta newMeta = updater.apply(oldMeta);
if (newMeta == null) {
@@ -315,17 +298,13 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
finalState = ABORTED;
}
- updateTxMeta(txId, old -> new TxStateMeta(finalState,
old.txCoordinatorId(), old.commitTimestamp()));
+ updateTxMeta(txId, old -> new TxStateMeta(finalState,
old.txCoordinatorId(), old.commitPartitionId(), old.commitTimestamp()));
}
private @Nullable HybridTimestamp commitTimestamp(boolean commit) {
return commit ? clock.now() : null;
}
- private String coordinatorId() {
- return localNodeId.get();
- }
-
@Override
public CompletableFuture<Void> finish(
HybridTimestampTracker observableTimestampTracker,
@@ -338,7 +317,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
if (enlistedGroups.isEmpty()) {
// If there are no enlisted groups, just update local state - we
already marked the tx as finished.
- updateTxMeta(txId, old -> coordinatorFinalTxStateMeta(commit,
commitTimestamp(commit)));
+ updateTxMeta(txId, old -> coordinatorFinalTxStateMeta(commit,
commitPartition, commitTimestamp(commit)));
return completedFuture(null);
}
@@ -349,9 +328,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
// than all the read timestamps processed before.
// Every concurrent operation will now use a finish future from the
finishing state meta and get only final transaction
// state after the transaction is finished.
- TxStateMetaFinishing finishingStateMeta = new
TxStateMetaFinishing(coordinatorId());
-
- updateTxMeta(txId, old -> finishingStateMeta);
+ TxStateMetaFinishing finishingStateMeta = (TxStateMetaFinishing)
updateTxMeta(txId, old ->
+ new TxStateMetaFinishing(localNodeId, old == null ? null :
old.commitPartitionId()));
AtomicBoolean performingFinish = new AtomicBoolean();
TxContext tuple = txCtxMap.compute(txId, (uuid, tuple0) -> {
@@ -455,7 +433,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
if (transactionException.code() ==
TX_WAS_ABORTED_ERR) {
updateTxMeta(txId, old -> {
- TxStateMeta txStateMeta = new
TxStateMeta(ABORTED, old.txCoordinatorId(), null);
+ TxStateMeta txStateMeta = new
TxStateMeta(ABORTED, old.txCoordinatorId(), commitPartition, null);
txFinishFuture.complete(txStateMeta);
@@ -526,7 +504,11 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
assert old instanceof TxStateMetaFinishing;
- TxStateMeta finalTxStateMeta =
coordinatorFinalTxStateMeta(commit, commitTimestamp);
+ TxStateMeta finalTxStateMeta =
coordinatorFinalTxStateMeta(
+ commit,
+ old.commitPartitionId(),
+ commitTimestamp
+ );
txFinishFuture.complete(finalTxStateMeta);
@@ -590,7 +572,14 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
@Override
public void start() {
-
replicaService.messagingService().addMessageHandler(ReplicaMessageGroup.class,
this);
+ localNodeId = clusterService.topologyService().localMember().id();
+
clusterService.messagingService().addMessageHandler(ReplicaMessageGroup.class,
this);
+ orphanDetector.start(txStateMap::get);
+ }
+
+ @Override
+ public void beforeNodeStop() {
+ orphanDetector.stop();
}
@Override
@@ -718,11 +707,16 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
* Creates final {@link TxStateMeta} for coordinator node.
*
* @param commit Commit flag.
+ * @param commitPartitionId Commit partition id.
* @param commitTimestamp Commit timestamp.
* @return Transaction meta.
*/
- private TxStateMeta coordinatorFinalTxStateMeta(boolean commit, @Nullable
HybridTimestamp commitTimestamp) {
- return new TxStateMeta(commit ? COMMITED : ABORTED, coordinatorId(),
commitTimestamp);
+ private TxStateMeta coordinatorFinalTxStateMeta(
+ boolean commit,
+ TablePartitionId commitPartitionId,
+ @Nullable HybridTimestamp commitTimestamp
+ ) {
+ return new TxStateMeta(commit ? COMMITED : ABORTED, localNodeId,
commitPartitionId, commitTimestamp);
}
/**
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
index 00abd58a1b..2aed7d0c01 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
@@ -53,4 +53,9 @@ public class TxMessageGroup {
* Message type for {@link TxStateResponse}.
*/
public static final short TX_STATE_RESPONSE = 5;
+
+ /**
+ * Message type for {@link TxRecoveryMessage}.
+ */
+ public static final short TX_RECOVERY_MSG = 6;
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/CommittableTxRequest.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxRecoveryMessage.java
similarity index 57%
copy from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/CommittableTxRequest.java
copy to
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxRecoveryMessage.java
index 9d163ca11c..6f36fd8d61 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/CommittableTxRequest.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxRecoveryMessage.java
@@ -15,19 +15,30 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.table.distributed.replication.request;
+package org.apache.ignite.internal.tx.message;
import java.util.UUID;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.network.annotations.Transferable;
/**
- * Transaction request that can contain full transaction (transaction that
contains full set of keys).
+ * Transaction recovery message.
*/
-public interface CommittableTxRequest extends ReplicaRequest {
- UUID transactionId();
+@Transferable(TxMessageGroup.TX_RECOVERY_MSG)
+public interface TxRecoveryMessage extends PrimaryReplicaRequest,
ReplicaRequest {
+ /**
+ * Gets a transaction id to resolve.
+ *
+ * @return Transaction id.
+ */
+ UUID txId();
/**
- * Return {@code true} if this is a full transaction.
+ * Gets an enlistment consistency token.
+ * The token is used to check that the lease is still actual while the
message goes to the replica.
+ *
+ * @return Enlistment consistency token.
*/
- boolean full();
+ long enlistmentConsistencyToken();
}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 1b2dde012a..8f7b4ce5e1 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -114,11 +114,11 @@ public class TxManagerTest extends IgniteAbstractTest {
when(replicaService.invoke(anyString(),
any())).thenReturn(CompletableFuture.completedFuture(null));
txManager = new TxManagerImpl(
+ clusterService,
replicaService,
new HeapLockManager(),
clock,
new TransactionIdGenerator(0xdeadbeef),
- LOCAL_NODE::id,
placementDriver,
idleSafeTimePropagationPeriodMsSupplier
);