This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3f1ab7e5cec IGNITE-27005 Add tx labels to public api (#7349)
3f1ab7e5cec is described below
commit 3f1ab7e5cec353d5ad4988c0562387d6eb3820c2
Author: Anton Laletin <[email protected]>
AuthorDate: Fri Feb 20 16:22:25 2026 +0400
IGNITE-27005 Add tx labels to public api (#7349)
---
.../org/apache/ignite/tx/TransactionOptions.java | 32 +++++
.../internal/benchmark/LockManagerBenchmark.java | 1 -
.../systemviews/ItTransactionsSystemViewTest.java | 23 +---
.../apache/ignite/internal/table/TxContext.java | 51 +++++--
.../distributed/storage/InternalTableImpl.java | 51 ++++++-
.../table/distributed/SortedIndexLockerTest.java | 1 -
.../distributed/storage/InternalTableImplTest.java | 149 +++++++++++++++++++++
.../tx/impl/ItTransactionLabelLoggingTest.java | 118 ++++++++++++++++
.../tx/impl/ItTransactionLabelPropagationTest.java | 83 ++++++++++++
.../internal/tx/impl/IgniteTransactionsImpl.java | 1 +
.../tx/impl/TransactionExpirationRegistry.java | 5 +-
.../ignite/internal/tx/impl/TxManagerImpl.java | 4 +
12 files changed, 481 insertions(+), 38 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/tx/TransactionOptions.java
b/modules/api/src/main/java/org/apache/ignite/tx/TransactionOptions.java
index 61a10894bd0..1e06dfca5fc 100644
--- a/modules/api/src/main/java/org/apache/ignite/tx/TransactionOptions.java
+++ b/modules/api/src/main/java/org/apache/ignite/tx/TransactionOptions.java
@@ -17,6 +17,8 @@
package org.apache.ignite.tx;
+import org.jetbrains.annotations.Nullable;
+
/**
* Ignite transaction options.
*/
@@ -27,6 +29,10 @@ public class TransactionOptions {
/** Read-only transaction. */
private boolean readOnly = false;
+ /** Transaction label. Used for identification in logs and system views. */
+ @Nullable
+ private String label = null;
+
/**
* Returns transaction timeout, in milliseconds. 0 means 'use default
timeout'.
*
@@ -80,4 +86,30 @@ public class TransactionOptions {
return this;
}
+
+ /**
+ * Returns transaction label. The label is included in diagnostic and
observability outputs, such as logs, system views, etc.
+ *
+ * @return Transaction label, or {@code null} if not set.
+ */
+ @Nullable
+ public String label() {
+ return label;
+ }
+
+ /**
+ * Sets the transaction label. The label is included in diagnostic and
observability outputs,
+ * such as logs, system views, etc.
+ *
+ * <p>Use labels to help identify and track transactions for debugging and
monitoring.
+ * Once set, the label remains unchanged for the lifetime of the
transaction.
+ *
+ * @param label Transaction label. Can be {@code null} to clear the label.
+ * @return {@code this} for chaining.
+ */
+ public TransactionOptions label(@Nullable String label) {
+ this.label = label;
+
+ return this;
+ }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/LockManagerBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/LockManagerBenchmark.java
index 08be0febf69..8328ed75572 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/LockManagerBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/LockManagerBenchmark.java
@@ -64,7 +64,6 @@ public class LockManagerBenchmark {
@Setup
public void setUp() {
VolatileTxStateMetaStorage txStateVolatileStorage =
VolatileTxStateMetaStorage.createStarted();
-
lockManager = new HeapLockManager(DEFAULT_SLOTS,
txStateVolatileStorage);
lockManager.start(new WaitDieDeadlockPreventionPolicy());
generator = new TransactionIdGenerator(0);
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/systemviews/ItTransactionsSystemViewTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/systemviews/ItTransactionsSystemViewTest.java
index 247ad7b7ece..7975bb0ac15 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/systemviews/ItTransactionsSystemViewTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/systemviews/ItTransactionsSystemViewTest.java
@@ -34,14 +34,12 @@ import
org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.sql.engine.util.MetadataMatcher;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TransactionIds;
-import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxPriority;
-import org.apache.ignite.internal.tx.TxState;
-import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.views.TransactionsViewProvider;
import org.apache.ignite.sql.ColumnType;
import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionOptions;
import org.junit.jupiter.api.Test;
/**
@@ -133,28 +131,13 @@ public class ItTransactionsSystemViewTest extends
AbstractSystemViewTest {
@Test
public void testTransactionLabel() {
- // TODO: IGNITE-27005 - Replace test-only IgniteImpl#txManager() with
regular API usage
- Transaction tx = CLUSTER.aliveNode().transactions().begin();
+ String customLabel = "TEST-CUSTOM-LABEL";
+ Transaction tx = CLUSTER.aliveNode().transactions().begin(new
TransactionOptions().label(customLabel));
InternalTransaction internalTx = (InternalTransaction) tx;
try {
- String customLabel = "TEST-CUSTOM-LABEL";
UUID txId = internalTx.id();
- // Get txManager using test-only method
- TxManager txManager =
unwrapIgniteImpl(CLUSTER.aliveNode()).txManager();
-
- // Update transaction state to set a custom label
- txManager.updateTxMeta(txId, oldMeta -> {
- if (oldMeta != null) {
- // Preserve all existing fields and only update the label
- return oldMeta.mutate().txLabel(customLabel).build();
- } else {
- // Create new meta with PENDING state and custom label
- return
TxStateMeta.builder(TxState.PENDING).txLabel(customLabel).build();
- }
- });
-
// Verify the label appears in the system view
assertQuery("SELECT TRANSACTION_LABEL FROM SYSTEM.TRANSACTIONS
WHERE TRANSACTION_ID = '" + txId + "'")
.returns(customLabel)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TxContext.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/TxContext.java
index 23ebd45b717..a4d68e1e273 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/TxContext.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/TxContext.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.tostring.IgniteToStringBuilder;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TransactionIds;
+import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
@@ -34,7 +35,12 @@ import org.jetbrains.annotations.TestOnly;
public abstract class TxContext {
/** Creates operation context from RO transaction. */
public static TxContext readOnly(UUID txId, UUID txCoordinatorId,
HybridTimestamp readTimestamp) {
- return new ReadOnly(txId, txCoordinatorId, readTimestamp);
+ return new ReadOnly(txId, txCoordinatorId, readTimestamp, null);
+ }
+
+ /** Creates operation context from RO transaction. */
+ public static TxContext readOnly(UUID txId, UUID txCoordinatorId,
HybridTimestamp readTimestamp, @Nullable String label) {
+ return new ReadOnly(txId, txCoordinatorId, readTimestamp, label);
}
/** Creates operation context from RO transaction. For test purposes only.
*/
@@ -46,7 +52,7 @@ public abstract class TxContext {
assert readTimestamp != null;
- return new ReadOnly(tx.id(), tx.coordinatorId(), readTimestamp);
+ return new ReadOnly(tx.id(), tx.coordinatorId(), readTimestamp, null);
}
/** Creates operation context from RW transaction. */
@@ -56,7 +62,18 @@ public abstract class TxContext {
ZonePartitionId commitPartition,
long enlistmentConsistencyToken
) {
- return new ReadWrite(txId, txCoordinatorId, commitPartition,
enlistmentConsistencyToken);
+ return new ReadWrite(txId, txCoordinatorId, commitPartition,
enlistmentConsistencyToken, null);
+ }
+
+ /** Creates operation context from RW transaction. */
+ public static TxContext readWrite(
+ UUID txId,
+ UUID txCoordinatorId,
+ ZonePartitionId commitPartition,
+ long enlistmentConsistencyToken,
+ @Nullable String label
+ ) {
+ return new ReadWrite(txId, txCoordinatorId, commitPartition,
enlistmentConsistencyToken, label);
}
/** Creates operation context from RW transaction. For test purposes only.
*/
@@ -64,18 +81,21 @@ public abstract class TxContext {
public static TxContext readWrite(InternalTransaction tx, long
enlistmentConsistencyToken) {
assert !tx.isReadOnly();
- return new ReadWrite(tx.id(), tx.coordinatorId(),
tx.commitPartition(), enlistmentConsistencyToken);
+ return new ReadWrite(tx.id(), tx.coordinatorId(),
tx.commitPartition(), enlistmentConsistencyToken, null);
}
protected final UUID txId;
protected final UUID coordinatorId;
+ @Nullable
+ protected final String label;
- protected TxContext(UUID txId, UUID coordinatorId) {
+ protected TxContext(UUID txId, UUID coordinatorId, @Nullable String label)
{
Objects.requireNonNull(txId, "Transaction id is mandatory");
Objects.requireNonNull(coordinatorId, "Transaction coordinator id is
mandatory");
this.txId = txId;
this.coordinatorId = coordinatorId;
+ this.label = label;
}
/** Returns {@code true} for read only transaction, {@code false}
otherwise. */
@@ -91,12 +111,18 @@ public abstract class TxContext {
return coordinatorId;
}
+ /** Returns transaction label. */
+ @Nullable
+ public String label() {
+ return label;
+ }
+
/** Read-only transaction context. */
public static class ReadOnly extends TxContext {
private final HybridTimestamp readTimestamp;
- private ReadOnly(UUID txId, UUID txCoordinatorId, HybridTimestamp
readTimestamp) {
- super(txId, txCoordinatorId);
+ private ReadOnly(UUID txId, UUID txCoordinatorId, HybridTimestamp
readTimestamp, @Nullable String label) {
+ super(txId, txCoordinatorId, label);
this.readTimestamp = readTimestamp;
}
@@ -120,7 +146,8 @@ public abstract class TxContext {
ReadOnly txCtx = (ReadOnly) o;
return Objects.equals(txId, txCtx.txId)
&& Objects.equals(coordinatorId, txCtx.coordinatorId)
- && Objects.equals(readTimestamp, txCtx.readTimestamp);
+ && Objects.equals(readTimestamp, txCtx.readTimestamp)
+ && Objects.equals(label, txCtx.label);
}
@Override
@@ -139,8 +166,9 @@ public abstract class TxContext {
private final ZonePartitionId commitPartition;
private final long enlistmentConsistencyToken;
- private ReadWrite(UUID txId, UUID txCoordinatorId, ZonePartitionId
commitPartition, long enlistmentConsistencyToken) {
- super(txId, txCoordinatorId);
+ private ReadWrite(UUID txId, UUID txCoordinatorId, ZonePartitionId
commitPartition, long enlistmentConsistencyToken,
+ @Nullable String label) {
+ super(txId, txCoordinatorId, label);
Objects.requireNonNull(commitPartition, "Commit partition is
mandatory for RW transaction");
@@ -166,7 +194,8 @@ public abstract class TxContext {
return enlistmentConsistencyToken ==
opCtx.enlistmentConsistencyToken
&& Objects.equals(txId, opCtx.txId)
&& Objects.equals(coordinatorId, opCtx.coordinatorId)
- && Objects.equals(commitPartition, opCtx.commitPartition);
+ && Objects.equals(commitPartition, opCtx.commitPartition)
+ && Objects.equals(label, opCtx.label);
}
/** Returns transaction commit partition. */
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 700c06e6711..da23f76155d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -44,6 +44,7 @@ import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.
import static
org.apache.ignite.internal.table.distributed.TableUtils.isDirectFlowApplicable;
import static
org.apache.ignite.internal.table.distributed.storage.RowBatch.allResultFutures;
import static org.apache.ignite.internal.tx.TransactionLogUtils.formatTxInfo;
+import static org.apache.ignite.internal.tx.TxState.PENDING;
import static
org.apache.ignite.internal.util.CompletableFutures.completedOrFailedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -129,6 +130,7 @@ import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
import org.apache.ignite.internal.tx.TransactionIds;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -565,6 +567,7 @@ public class InternalTableImpl implements InternalTable {
.enlistmentConsistencyToken(enlistmentConsistencyToken)
.commitPartitionId(serializeReplicationGroupId(tx.commitPartition()))
.coordinatorId(tx.coordinatorId())
+ .txLabel(txLabel(tx))
.build();
if (enlistment != null) {
@@ -940,6 +943,7 @@ public class InternalTableImpl implements InternalTable {
.timestamp(txo.schemaTimestamp())
.full(false)
.coordinatorId(txo.coordinatorId())
+ .txLabel(txLabel(txo))
.build(),
(res, req) -> false
);
@@ -1089,6 +1093,7 @@ public class InternalTableImpl implements InternalTable {
.full(full)
.coordinatorId(tx.coordinatorId())
.delayedAckProcessor(tx.remote() ? tx::processDelayedAck :
null)
+ .txLabel(txLabel(tx))
.build();
}
@@ -1155,6 +1160,7 @@ public class InternalTableImpl implements InternalTable {
.full(txo.implicit())
.coordinatorId(txo.coordinatorId())
.delayedAckProcessor(txo.remote() ?
txo::processDelayedAck : null)
+ .txLabel(txLabel(txo))
.build(),
(res, req) -> false
);
@@ -1250,6 +1256,7 @@ public class InternalTableImpl implements InternalTable {
.full(txo.implicit())
.coordinatorId(txo.coordinatorId())
.delayedAckProcessor(txo.remote() ?
txo::processDelayedAck : null)
+ .txLabel(txLabel(txo))
.build(),
(res, req) -> false
);
@@ -1274,6 +1281,7 @@ public class InternalTableImpl implements InternalTable {
.full(txo.implicit())
.coordinatorId(txo.coordinatorId())
.delayedAckProcessor(txo.remote() ?
txo::processDelayedAck : null)
+ .txLabel(txLabel(txo))
.build(),
(res, req) -> !res
);
@@ -1333,6 +1341,7 @@ public class InternalTableImpl implements InternalTable {
.full(full)
.coordinatorId(tx.coordinatorId())
.delayedAckProcessor(tx.remote() ? tx::processDelayedAck :
null)
+ .txLabel(txLabel(tx))
.build();
}
@@ -1355,6 +1364,7 @@ public class InternalTableImpl implements InternalTable {
.full(txo.implicit())
.coordinatorId(txo.coordinatorId())
.delayedAckProcessor(txo.remote() ?
txo::processDelayedAck : null)
+ .txLabel(txLabel(txo))
.build(),
(res, req) -> !res
);
@@ -1383,6 +1393,7 @@ public class InternalTableImpl implements InternalTable {
.full(txo.implicit())
.coordinatorId(txo.coordinatorId())
.delayedAckProcessor(txo.remote() ?
txo::processDelayedAck : null)
+ .txLabel(txLabel(txo))
.build(),
(res, req) -> !res
);
@@ -1409,6 +1420,7 @@ public class InternalTableImpl implements InternalTable {
.full(txo.implicit())
.coordinatorId(txo.coordinatorId())
.delayedAckProcessor(txo.remote() ?
txo::processDelayedAck : null)
+ .txLabel(txLabel(txo))
.build(),
(res, req) -> res == null
);
@@ -1433,6 +1445,7 @@ public class InternalTableImpl implements InternalTable {
.full(txo.implicit())
.coordinatorId(txo.coordinatorId())
.delayedAckProcessor(txo.remote() ?
txo::processDelayedAck : null)
+ .txLabel(txLabel(txo))
.build(),
(res, req) -> !res
);
@@ -1457,6 +1470,7 @@ public class InternalTableImpl implements InternalTable {
.full(txo.implicit())
.coordinatorId(txo.coordinatorId())
.delayedAckProcessor(txo.remote() ?
txo::processDelayedAck : null)
+ .txLabel(txLabel(txo))
.build(),
(res, req) -> !res
);
@@ -1483,6 +1497,7 @@ public class InternalTableImpl implements InternalTable {
.full(txo.implicit())
.coordinatorId(txo.coordinatorId())
.delayedAckProcessor(txo.remote() ?
txo::processDelayedAck : null)
+ .txLabel(txLabel(txo))
.build(),
(res, req) -> res == null
);
@@ -1635,6 +1650,8 @@ public class InternalTableImpl implements InternalTable {
) {
assert opCtx.txContext().isReadOnly();
+ TxContext.ReadOnly txContext = (TxContext.ReadOnly) opCtx.txContext();
+
boolean rangeScan = criteria instanceof IndexScanCriteria.Range;
boolean lookup = criteria instanceof IndexScanCriteria.Lookup;
@@ -1643,8 +1660,6 @@ public class InternalTableImpl implements InternalTable {
BinaryTuplePrefix upperBound = rangeScan ? ((IndexScanCriteria.Range)
criteria).upperBound() : null;
int flags = rangeScan ? ((IndexScanCriteria.Range) criteria).flags() :
0;
- TxContext.ReadOnly txContext = (TxContext.ReadOnly) opCtx.txContext();
-
ZonePartitionId replicationGroupId = targetReplicationGroupId(partId);
return new PartitionScanPublisher<>(new
ReadOnlyInflightBatchRequestTracker(transactionInflights, txContext.txId(),
txManager)) {
@@ -1761,6 +1776,11 @@ public class InternalTableImpl implements InternalTable {
) {
assert !opCtx.txContext().isReadOnly();
+ TxContext.ReadWrite txContext = (TxContext.ReadWrite)
opCtx.txContext();
+
+ // Update transaction state meta with label if present.
+ updateLabel(txContext, txContext.commitPartition());
+
boolean rangeScan = criteria instanceof IndexScanCriteria.Range;
boolean lookup = criteria instanceof IndexScanCriteria.Lookup;
@@ -1769,8 +1789,6 @@ public class InternalTableImpl implements InternalTable {
BinaryTuplePrefix upperBound = rangeScan ? ((IndexScanCriteria.Range)
criteria).upperBound() : null;
int flags = rangeScan ? ((IndexScanCriteria.Range) criteria).flags() :
0;
- TxContext.ReadWrite txContext = (TxContext.ReadWrite)
opCtx.txContext();
-
ZonePartitionId replicationGroupId = targetReplicationGroupId(partId);
return new
PartitionScanPublisher<>(READ_WRITE_INFLIGHT_BATCH_REQUEST_TRACKER) {
@@ -1792,6 +1810,7 @@ public class InternalTableImpl implements InternalTable {
.flags(flags)
.batchSize(batchSize)
.full(false) // Set explicitly.
+ .txLabel(txContext.label())
.build();
return replicaSvc.invoke(recipient, request);
@@ -1804,6 +1823,30 @@ public class InternalTableImpl implements InternalTable {
};
}
+ /**
+ * Updates transaction meta with the label and commit partition, if a
label is present.
+ *
+ * <p>This is used for tracking labeled transactions in the meta storage;
unlabeled transactions are ignored.
+ *
+ * @param txContext Transaction context.
+ * @param commitPartition Commit partition to record (may be {@code null}).
+ */
+ private void updateLabel(TxContext txContext, @Nullable ZonePartitionId
commitPartition) {
+ String label = txContext.label();
+ if (label != null) {
+ txManager.updateTxMeta(txContext.txId(), old ->
TxStateMeta.builder(old, PENDING)
+ .txCoordinatorId(txContext.coordinatorId())
+ .commitPartitionId(commitPartition)
+ .txLabel(label)
+ .build());
+ }
+ }
+
+ private @Nullable String txLabel(InternalTransaction tx) {
+ TxStateMeta meta = txManager.stateMeta(tx.id());
+ return meta == null ? null : meta.txLabel();
+ }
+
/**
* Returns message used to create {@code TransactionException} with {@code
ErrorGroups.TX_FAILED_READ_WRITE_OPERATION_ERR}.
*
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/SortedIndexLockerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/SortedIndexLockerTest.java
index 293f801f63a..5e109760d10 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/SortedIndexLockerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/SortedIndexLockerTest.java
@@ -82,7 +82,6 @@ class SortedIndexLockerTest extends BaseIgniteAbstractTest {
private LockManager lockManager() {
VolatileTxStateMetaStorage txStateVolatileStorage =
VolatileTxStateMetaStorage.createStarted();
-
HeapLockManager lockManager = new
HeapLockManager(systemLocalConfiguration, txStateVolatileStorage);
lockManager.start(new WaitDieDeadlockPreventionPolicy());
return lockManager;
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index 3143e3b2a29..aaf913d0dbd 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -48,8 +48,10 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -61,10 +63,12 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
+import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.hlc.ClockService;
@@ -99,7 +103,9 @@ import
org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.IndexScanCriteria;
import org.apache.ignite.internal.table.IndexScanCriteria.Range;
import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.OperationContext;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
+import org.apache.ignite.internal.table.TxContext;
import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
@@ -848,4 +854,147 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
}
}
}
+
+ /**
+ * Tests for label propagation from OperationContext to TxStateMeta.
+ */
+ @Nested
+ class LabelPropagationTest {
+ @Test
+ void testReadWriteScanSavesLabelToTxStateMeta() {
+ InternalTableImpl internalTable = newInternalTable(TABLE_ID, 1);
+
+ UUID txId = randomUUID();
+ UUID coordinatorId = randomUUID();
+ ZonePartitionId commitPartition = new ZonePartitionId(ZONE_ID, 0);
+ long enlistmentConsistencyToken = 1L;
+ String label = "TEST-RW-LABEL";
+
+ TxContext.ReadWrite txContext = (TxContext.ReadWrite)
TxContext.readWrite(
+ txId,
+ coordinatorId,
+ commitPartition,
+ enlistmentConsistencyToken,
+ label
+ );
+
+ OperationContext opCtx = OperationContext.create(txContext);
+
+ ArgumentCaptor<UUID> txIdCaptor =
ArgumentCaptor.forClass(UUID.class);
+ ArgumentCaptor<Function<TxStateMeta, TxStateMeta>> updaterCaptor =
+ ArgumentCaptor.forClass(Function.class);
+
+ internalTable.scan(0, clusterNode, opCtx);
+
+ verify(txManager).updateTxMeta(txIdCaptor.capture(),
updaterCaptor.capture());
+
+ assertThat(txIdCaptor.getValue(), is(txId));
+
+ TxStateMeta newMeta = updaterCaptor.getValue().apply(null);
+ assertThat(newMeta.txLabel(), is(label));
+ assertThat(newMeta.txCoordinatorId(), is(coordinatorId));
+ assertThat(newMeta.commitPartitionId(), is(commitPartition));
+ assertThat(newMeta.txState(), is(TxState.PENDING));
+ }
+
+ @Test
+ void testReadWriteScanWithIndexSavesLabelToTxStateMeta() {
+ InternalTableImpl internalTable = newInternalTable(TABLE_ID, 1);
+
+ UUID txId = randomUUID();
+ UUID coordinatorId = randomUUID();
+ ZonePartitionId commitPartition = new ZonePartitionId(ZONE_ID, 0);
+ long enlistmentConsistencyToken = 1L;
+ String label = "TEST-RW-INDEX-LABEL";
+ int indexId = 1;
+
+ TxContext.ReadWrite txContext = (TxContext.ReadWrite)
TxContext.readWrite(
+ txId,
+ coordinatorId,
+ commitPartition,
+ enlistmentConsistencyToken,
+ label
+ );
+
+ OperationContext opCtx = OperationContext.create(txContext);
+ IndexScanCriteria criteria = IndexScanCriteria.unbounded();
+
+ ArgumentCaptor<UUID> txIdCaptor =
ArgumentCaptor.forClass(UUID.class);
+ ArgumentCaptor<Function<TxStateMeta, TxStateMeta>> updaterCaptor =
+ ArgumentCaptor.forClass(Function.class);
+
+ internalTable.scan(0, clusterNode, indexId, criteria, opCtx);
+
+ verify(txManager).updateTxMeta(txIdCaptor.capture(),
updaterCaptor.capture());
+
+ assertThat(txIdCaptor.getValue(), is(txId));
+
+ TxStateMeta newMeta = updaterCaptor.getValue().apply(null);
+ assertThat(newMeta.txLabel(), is(label));
+ assertThat(newMeta.commitPartitionId(), is(commitPartition));
+ }
+
+ @Test
+ void testScanWithoutLabelDoesNotUpdateTxStateMeta() {
+ InternalTableImpl internalTable = newInternalTable(TABLE_ID, 1);
+
+ UUID txId = randomUUID();
+ UUID coordinatorId = randomUUID();
+ HybridTimestamp readTimestamp = clock.now();
+
+ TxContext.ReadOnly txContext = (TxContext.ReadOnly)
TxContext.readOnly(
+ txId,
+ coordinatorId,
+ readTimestamp,
+ null
+ );
+
+ OperationContext opCtx = OperationContext.create(txContext);
+
+ internalTable.scan(0, clusterNode, opCtx);
+
+ verify(txManager, never()).updateTxMeta(any(), any());
+ }
+
+ @Test
+ void testReadWriteScanSavesLabelToTxStateMetaWithStateMap() {
+ InternalTableImpl internalTable = newInternalTable(TABLE_ID, 1);
+
+ UUID txId = randomUUID();
+ UUID coordinatorId = randomUUID();
+ ZonePartitionId commitPartition = new ZonePartitionId(ZONE_ID, 0);
+ long enlistmentConsistencyToken = 1L;
+ String label = "TEST-RW-LABEL-MAP";
+
+ TxContext.ReadWrite txContext = (TxContext.ReadWrite)
TxContext.readWrite(
+ txId,
+ coordinatorId,
+ commitPartition,
+ enlistmentConsistencyToken,
+ label
+ );
+
+ OperationContext opCtx = OperationContext.create(txContext);
+
+ Map<UUID, TxStateMeta> txStateMap = new ConcurrentHashMap<>();
+
+ doAnswer(invocation -> txStateMap.get(invocation.getArgument(0)))
+ .when(txManager).stateMeta(any());
+
+ doAnswer(invocation -> {
+ UUID txIdArg = invocation.getArgument(0);
+ Function<TxStateMeta, TxStateMeta> updater =
invocation.getArgument(1);
+ txStateMap.compute(txIdArg, (k, oldMeta) ->
updater.apply(oldMeta));
+ return null;
+ }).when(txManager).updateTxMeta(any(), any());
+
+ internalTable.scan(0, clusterNode, opCtx);
+
+ TxStateMeta txMeta = txManager.stateMeta(txId);
+ assertThat(txMeta, is(notNullValue()));
+ assertThat(txMeta.txLabel(), is(label));
+ assertThat(txMeta.txCoordinatorId(), is(coordinatorId));
+ assertThat(txMeta.commitPartitionId(), is(commitPartition));
+ }
+ }
}
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/impl/ItTransactionLabelLoggingTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/impl/ItTransactionLabelLoggingTest.java
new file mode 100644
index 00000000000..82b397d04f7
--- /dev/null
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/impl/ItTransactionLabelLoggingTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.testframework.log4j2.Log4jUtils;
+import org.apache.ignite.internal.testframework.log4j2.LogInspector;
+import org.apache.ignite.tx.TransactionOptions;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test to verify that transaction labels appear in logs for various negative
scenarios.
+ */
+public class ItTransactionLabelLoggingTest extends
ClusterPerTestIntegrationTest {
+ @Override
+ protected int initialNodes() {
+ return 1;
+ }
+
+ @Test
+ void testTransactionLabelInTimeoutLogs() {
+ AutoCloseable debugLogging =
enableDebugLogging(TransactionExpirationRegistry.class);
+ LogInspector timeoutLogInspector =
LogInspector.create(TransactionExpirationRegistry.class);
+ AtomicInteger timeoutLabelCount = new AtomicInteger(0);
+ String label = "TIMEOUT-TEST";
+ timeoutLogInspector.addHandler(
+ logEvent -> {
+ String message =
logEvent.getMessage().getFormattedMessage();
+ return message.contains("Transaction has aborted due to
timeout")
+ && message.contains("txLabel=" + label);
+ },
+ timeoutLabelCount::incrementAndGet
+ );
+ timeoutLogInspector.start();
+
+ try {
+ Ignite ignite = cluster.aliveNode();
+
+ // Create a transaction with a custom label and very short timeout.
+ ignite.transactions().begin(
+ new TransactionOptions()
+ .label(label)
+ .timeoutMillis(1) // 1ms timeout to trigger
expiration quickly.
+ );
+
+ await("Expected to find transaction label in timeout log message")
+
.atMost(Duration.ofSeconds(10L)).untilAtomic(timeoutLabelCount,
is(greaterThan(0)));
+ } finally {
+ timeoutLogInspector.stop();
+ closeQuietly(debugLogging);
+ }
+ }
+
+ private static AutoCloseable enableDebugLogging(Class<?> loggerClass) {
+ Log4jUtils.waitTillConfigured();
+
+ LoggerContext context = (LoggerContext) LogManager.getContext(false);
+ Configuration config = context.getConfiguration();
+ String loggerName = loggerClass.getName();
+
+ LoggerConfig existing = config.getLoggerConfig(loggerName);
+ boolean exactMatch = loggerName.equals(existing.getName());
+
+ if (exactMatch) {
+ Level previous = existing.getLevel();
+ existing.setLevel(Level.DEBUG);
+ context.updateLoggers();
+ return () -> {
+ existing.setLevel(previous);
+ context.updateLoggers();
+ };
+ }
+
+ LoggerConfig loggerConfig = new LoggerConfig(loggerName, Level.DEBUG,
true);
+ config.addLogger(loggerName, loggerConfig);
+ context.updateLoggers();
+
+ return () -> {
+ config.removeLogger(loggerName);
+ context.updateLoggers();
+ };
+ }
+
+ private static void closeQuietly(AutoCloseable closeable) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/impl/ItTransactionLabelPropagationTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/impl/ItTransactionLabelPropagationTest.java
new file mode 100644
index 00000000000..8d0fdcf732f
--- /dev/null
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/impl/ItTransactionLabelPropagationTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
+import static org.awaitility.Awaitility.await;
+
+import java.time.Duration;
+import java.util.List;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.TestWrappers;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.table.NodeUtils;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.tx.TransactionOptions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test to verify that transaction labels propagate to primary replica.
+ */
+public class ItTransactionLabelPropagationTest extends
ClusterPerTestIntegrationTest {
+
+ @Override
+ protected int initialNodes() {
+ return 2;
+ }
+
+ @Test
+ void testTxLabelPropagatedToPrimaryReplica() {
+ String zoneName = "LABEL_ZONE";
+ String tableName = "LABEL_TABLE";
+ String label = "TX-LABEL";
+
+ executeSql("CREATE ZONE " + zoneName + " (PARTITIONS 1, REPLICAS 2)
STORAGE PROFILES ['"
+ + CatalogService.DEFAULT_STORAGE_PROFILE + "']");
+ executeSql("CREATE TABLE " + tableName + " (id INT PRIMARY KEY, val
VARCHAR) ZONE " + zoneName);
+
+ TableImpl table = unwrapTableImpl(node(0).tables().table(tableName));
+ ZonePartitionId groupId = new ZonePartitionId(table.zoneId(), 0);
+
+ List<IgniteImpl> nodes =
runningNodes().map(TestWrappers::unwrapIgniteImpl).collect(toList());
+ String primaryName = NodeUtils.leaseholder(igniteImpl(0),
groupId).getLeaseholder();
+
+ IgniteImpl primaryNode = nodes.stream().filter(n ->
n.name().equals(primaryName)).findFirst().orElseThrow();
+ IgniteImpl txCoordinatorFromOtherNode = nodes.stream().filter(n ->
!n.name().equals(primaryName)).findFirst().orElseThrow();
+
+ KeyValueView<Integer, String> view =
txCoordinatorFromOtherNode.tables().table(tableName).keyValueView(Integer.class,
String.class);
+ InternalTransaction tx = (InternalTransaction)
txCoordinatorFromOtherNode.transactions()
+ .begin(new TransactionOptions().label(label));
+
+ view.put(tx, 1, "val");
+
+ await("Expected tx label to be visible on the primary replica node")
+ .atMost(Duration.ofSeconds(10))
+ .until(() -> {
+ TxStateMeta meta =
primaryNode.txManager().stateMeta(tx.id());
+ return meta != null && label.equals(meta.txLabel());
+ });
+
+ tx.commit();
+ }
+}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
index d5f2e3f4983..93b061c6569 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
@@ -54,6 +54,7 @@ public class IgniteTransactionsImpl implements
IgniteTransactions {
? InternalTxOptions.defaults()
: InternalTxOptions.builder()
.timeoutMillis(options.timeoutMillis())
+ .txLabel(options.label())
.build();
return txManager.beginExplicit(observableTimestampTracker, options !=
null && options.readOnly(), internalTxOptions);
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistry.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistry.java
index ab33cfaac2a..a2dd3dc8cd4 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistry.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistry.java
@@ -128,7 +128,10 @@ class TransactionExpirationRegistry {
private void abortTransaction(InternalTransaction tx) {
tx.rollbackTimeoutExceededAsync().whenComplete((res, ex) -> {
if (ex != null && !hasCause(ex, NodeStoppingException.class)) {
- LOG.error("Transaction has aborted due to timeout {}.", ex,
+ LOG.error("Transaction abortion has failed {}.", ex,
+ formatTxInfo(tx.id(), volatileTxStateMetaStorage));
+ } else {
+ LOG.debug("Transaction has aborted due to timeout {}.",
formatTxInfo(tx.id(), volatileTxStateMetaStorage));
}
});
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 0ea57518cdb..573f855f6ae 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -459,6 +459,10 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
@Override
public InternalTransaction beginExplicit(HybridTimestampTracker
timestampTracker, boolean readOnly, InternalTxOptions txOptions) {
+ if (readOnly && txOptions.txLabel() != null) {
+ throw new TransactionException(Common.ILLEGAL_ARGUMENT_ERR,
"Labels are not supported for read only transactions");
+ }
+
InternalTransaction tx;
if (readOnly) {