This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 22e08a23554 IGNITE-23488 Sql. Integrate cancellation of transaction
with sql kill handler (#5207)
22e08a23554 is described below
commit 22e08a23554ee81940f8177b562d17a67effb137
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Tue Feb 11 18:50:40 2025 +0300
IGNITE-23488 Sql. Integrate cancellation of transaction with sql kill
handler (#5207)
---
.../ignite/internal/compute/IgniteComputeImpl.java | 25 -------
.../ignite/internal/lowwatermark/LowWatermark.java | 8 ---
.../internal/lowwatermark/LowWatermarkImpl.java | 7 --
.../internal/lowwatermark/TestLowWatermark.java | 7 --
.../org/apache/ignite/internal/app/IgniteImpl.java | 50 ++++++++++++-
.../sql/engine/kill/ItSqlKillCommandTest.java | 7 --
.../ignite/internal/tx/impl/TxManagerImpl.java | 2 +-
.../tx/views/TransactionsViewProvider.java | 81 +++++++++-------------
8 files changed, 84 insertions(+), 103 deletions(-)
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 953b60158eb..77b120d01a1 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -67,8 +67,6 @@ import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.sql.engine.api.kill.CancellableOperationType;
-import org.apache.ignite.internal.sql.engine.api.kill.OperationKillHandler;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
import org.apache.ignite.internal.table.TableViewInternal;
@@ -616,29 +614,6 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
});
}
- /** Returns a {@link OperationKillHandler kill handler} for the compute
job. */
- public OperationKillHandler killHandler() {
- return new OperationKillHandler() {
- @Override
- public CompletableFuture<Boolean> cancelAsync(String operationId) {
- UUID jobId = UUID.fromString(operationId);
-
- return IgniteComputeImpl.this.cancelAsync(jobId)
- .thenApply(res -> res != null ? res : Boolean.FALSE);
- }
-
- @Override
- public boolean local() {
- return false;
- }
-
- @Override
- public CancellableOperationType type() {
- return CancellableOperationType.COMPUTE;
- }
- };
- }
-
@TestOnly
ComputeComponent computeComponent() {
return computeComponent;
diff --git
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
index 4aeeaeb2f7f..3164619628b 100644
---
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
+++
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.lowwatermark;
-import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.ignite.internal.event.EventProducer;
@@ -68,11 +67,4 @@ public interface LowWatermark extends
EventProducer<LowWatermarkEvent, LowWaterm
* @param lockId ID of the transaction that locks the low watermark.
*/
void unlock(UUID lockId);
-
- /**
- * Returns a set of all transaction IDs that lock the low watermark.
- *
- * @return Set of all transaction IDs that lock the low watermark.
- */
- Set<UUID> lockIds();
}
diff --git
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
index 94c10cd4e60..f2753c43434 100644
---
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
+++
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
@@ -25,9 +25,7 @@ import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
-import java.util.Collections;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -335,11 +333,6 @@ public class LowWatermarkImpl extends
AbstractEventProducer<LowWatermarkEvent, L
lock.future().complete(null);
}
- @Override
- public Set<UUID> lockIds() {
- return Collections.unmodifiableSet(locks.keySet());
- }
-
CompletableFuture<Void> updateAndNotify(HybridTimestamp newLowWatermark) {
return inBusyLockAsync(busyLock, () -> {
vaultManager.put(LOW_WATERMARK_VAULT_KEY,
newLowWatermark.toBytes());
diff --git
a/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
b/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
index b5b5951903e..66c6798fbcc 100644
---
a/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
+++
b/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
@@ -24,9 +24,7 @@ import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import java.util.Collections;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -117,11 +115,6 @@ public class TestLowWatermark extends
AbstractEventProducer<LowWatermarkEvent, L
lock.future().complete(null);
}
- @Override
- public Set<UUID> lockIds() {
- return Collections.unmodifiableSet(locks.keySet());
- }
-
/**
* Update low watermark and notify listeners.
*
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index fa5b18443fa..bcb8444749f 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -242,6 +242,8 @@ import
org.apache.ignite.internal.sql.configuration.distributed.SqlClusterExtens
import
org.apache.ignite.internal.sql.configuration.local.SqlNodeExtensionConfiguration;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
+import org.apache.ignite.internal.sql.engine.api.kill.CancellableOperationType;
+import org.apache.ignite.internal.sql.engine.api.kill.OperationKillHandler;
import org.apache.ignite.internal.sql.engine.exec.kill.KillCommandHandler;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.DataStorageModule;
@@ -1029,6 +1031,7 @@ public class IgniteImpl implements Ignite {
);
systemViewManager.register(txManager);
+ killCommandHandler.register(transactionKillHandler(txManager));
resourceVacuumManager = new ResourceVacuumManager(
name,
@@ -1179,7 +1182,7 @@ public class IgniteImpl implements Ignite {
clock
);
- killCommandHandler.register(((IgniteComputeImpl)
compute).killHandler());
+ killCommandHandler.register(computeKillHandler(compute));
authenticationManager = createAuthenticationManager();
@@ -1858,6 +1861,51 @@ public class IgniteImpl implements Ignite {
);
}
+ /** Returns a {@link OperationKillHandler kill handler} for the compute
job. */
+ private static OperationKillHandler
computeKillHandler(IgniteComputeInternal compute) {
+ return new OperationKillHandler() {
+ @Override
+ public CompletableFuture<Boolean> cancelAsync(String operationId) {
+ UUID jobId = UUID.fromString(operationId);
+
+ return compute.cancelAsync(jobId)
+ .thenApply(res -> res != null ? res : Boolean.FALSE);
+ }
+
+ @Override
+ public boolean local() {
+ return false;
+ }
+
+ @Override
+ public CancellableOperationType type() {
+ return CancellableOperationType.COMPUTE;
+ }
+ };
+ }
+
+ /** Returns a {@link OperationKillHandler kill handler} for the
transaction. */
+ private static OperationKillHandler transactionKillHandler(TxManager
txManager) {
+ return new OperationKillHandler() {
+ @Override
+ public CompletableFuture<Boolean> cancelAsync(String operationId) {
+ UUID transactionId = UUID.fromString(operationId);
+
+ return txManager.kill(transactionId);
+ }
+
+ @Override
+ public boolean local() {
+ return true;
+ }
+
+ @Override
+ public CancellableOperationType type() {
+ return CancellableOperationType.TRANSACTION;
+ }
+ };
+ }
+
@TestOnly
public Loza raftManager() {
return raftMgr;
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/kill/ItSqlKillCommandTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/kill/ItSqlKillCommandTest.java
index 9b1b66ed0aa..d488e3e3d3f 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/kill/ItSqlKillCommandTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/kill/ItSqlKillCommandTest.java
@@ -35,7 +35,6 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
-import static org.junit.jupiter.api.Assumptions.assumeTrue;
import java.util.Collection;
import java.util.List;
@@ -104,9 +103,6 @@ public class ItSqlKillCommandTest extends
BaseSqlIntegrationTest {
@ParameterizedTest
@EnumSource(CancellableOperationType.class)
public void killWithInvalidIdentifier(CancellableOperationType type) {
- // TODO https://issues.apache.org/jira/browse/IGNITE-23488 Remove
assumption.
- assumeTrue(type != CancellableOperationType.TRANSACTION, type + " not
implemented yet");
-
Consumer<String> exceptionChecker = query -> {
SqlException err = assertThrowsSqlException(
SqlException.class,
@@ -126,9 +122,6 @@ public class ItSqlKillCommandTest extends
BaseSqlIntegrationTest {
@ParameterizedTest
@EnumSource(CancellableOperationType.class)
public void killNonExistentOperation(CancellableOperationType type) {
- // TODO https://issues.apache.org/jira/browse/IGNITE-23488 Remove
assumption.
- assumeTrue(type != CancellableOperationType.TRANSACTION, type + " not
implemented yet");
-
assertThat(executeKill(CLUSTER.aliveNode(), type, UUID.randomUUID(),
false), is(false));
// NO WAIT should never return false.
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index ddfdc6c3a6f..2f92e73b46a 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -871,7 +871,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
txStateVolatileStorage.start();
- txViewProvider.init(localNodeId, lowWatermark.lockIds(),
txStateVolatileStorage.statesMap());
+ txViewProvider.init(txStateVolatileStorage.statesMap().values());
orphanDetector.start(txStateVolatileStorage,
txConfig.abandonedCheckTs());
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/views/TransactionsViewProvider.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/views/TransactionsViewProvider.java
index b051035e4d7..c7c6604228c 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/views/TransactionsViewProvider.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/views/TransactionsViewProvider.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.tx.views;
-import static org.apache.ignite.internal.tx.TxState.PENDING;
import static org.apache.ignite.internal.tx.TxState.isFinalState;
import static org.apache.ignite.internal.type.NativeTypes.stringOf;
@@ -25,20 +24,16 @@ import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Flow.Publisher;
import org.apache.ignite.internal.systemview.api.SystemView;
import org.apache.ignite.internal.systemview.api.SystemViews;
+import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TransactionIds;
-import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.type.NativeType;
import org.apache.ignite.internal.type.NativeTypes;
-import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.SubscriptionUtils;
-import org.apache.ignite.internal.util.TransformingIterator;
-import org.jetbrains.annotations.Nullable;
/**
* {@code TRANSACTIONS} system view provider.
@@ -51,16 +46,8 @@ public class TransactionsViewProvider {
private volatile Iterable<TxInfo> dataSource;
/** Initializes provider with data sources. */
- public void init(
- UUID localNodeId,
- Collection<UUID> roTxIds,
- Map<UUID, TxStateMeta> rwTxStates
- ) {
- this.dataSource = new TxInfoDataSource(
- localNodeId,
- roTxIds,
- rwTxStates
- );
+ public void init(Collection<TxStateMeta> txStates) {
+ this.dataSource = new TxInfoDataSource(txStates);
}
/** Returns a {@code TRANSACTIONS} system view. */
@@ -91,53 +78,53 @@ public class TransactionsViewProvider {
}
static class TxInfoDataSource implements Iterable<TxInfo> {
- private final UUID localNodeId;
-
- private final Iterable<UUID> roTxIds;
-
- private final Map<UUID, TxStateMeta> rwTxStates;
+ private final Collection<TxStateMeta> txStates;
- TxInfoDataSource(UUID localNodeId, Iterable<UUID> roTxIds, Map<UUID,
TxStateMeta> rwTxStates) {
- this.localNodeId = localNodeId;
- this.roTxIds = roTxIds;
- this.rwTxStates = rwTxStates;
+ TxInfoDataSource(Collection<TxStateMeta> txStates) {
+ this.txStates = txStates;
}
@Override
public Iterator<TxInfo> iterator() {
- return CollectionUtils.concat(
- new TransformingIterator<>(roTxIds.iterator(),
TxInfo::readOnly),
- rwTxStates.entrySet().stream()
- .filter(e ->
localNodeId.equals(e.getValue().txCoordinatorId())
- && e.getValue().tx() != null &&
!e.getValue().tx().isReadOnly()
- && !isFinalState(e.getValue().txState()))
- .map(e -> TxInfo.readWrite(e.getKey(),
e.getValue().txState()))
- .iterator()
- );
+ return txStates.stream()
+ .filter(txStateMeta -> {
+ InternalTransaction tx = txStateMeta.tx();
+
+ if (tx == null) {
+ return false;
+ }
+
+ // Currently the read-only transaction status
does not change and it is always in the PENDING state.
+ if ((tx.isReadOnly() &&
tx.isFinishingOrFinished()) || isFinalState(txStateMeta.txState())) {
+ return false;
+ }
+
+ return true;
+ })
+ .map(TxInfo::new)
+ .iterator();
}
}
static class TxInfo {
private final String id;
- private final @Nullable String state;
+ private final String state;
private final Instant startTime;
private final String type;
private final String priority;
- static TxInfo readOnly(UUID id) {
- return new TxInfo(id, PENDING, true);
- }
+ TxInfo(TxStateMeta txStateMeta) {
+ InternalTransaction tx = txStateMeta.tx();
- static TxInfo readWrite(UUID id, TxState txState) {
- return new TxInfo(id, txState, false);
- }
+ assert tx != null;
+
+ UUID txId = tx.id();
- private TxInfo(UUID id, @Nullable TxState state, boolean readOnly) {
- this.id = id.toString();
- this.state = state == null ? null : state.name();
- this.startTime =
Instant.ofEpochMilli(TransactionIds.beginTimestamp(id).getPhysical());
- this.type = readOnly ? READ_ONLY : READ_WRITE;
- this.priority = TransactionIds.priority(id).name();
+ this.id = txId.toString();
+ this.state = txStateMeta.txState().name();
+ this.startTime =
Instant.ofEpochMilli(TransactionIds.beginTimestamp(txId).getPhysical());
+ this.type = tx.isReadOnly() ? READ_ONLY : READ_WRITE;
+ this.priority = TransactionIds.priority(txId).name();
}
}
}