This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 22e08a23554 IGNITE-23488 Sql. Integrate cancellation of transaction 
with sql kill handler (#5207)
22e08a23554 is described below

commit 22e08a23554ee81940f8177b562d17a67effb137
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Tue Feb 11 18:50:40 2025 +0300

    IGNITE-23488 Sql. Integrate cancellation of transaction with sql kill 
handler (#5207)
---
 .../ignite/internal/compute/IgniteComputeImpl.java | 25 -------
 .../ignite/internal/lowwatermark/LowWatermark.java |  8 ---
 .../internal/lowwatermark/LowWatermarkImpl.java    |  7 --
 .../internal/lowwatermark/TestLowWatermark.java    |  7 --
 .../org/apache/ignite/internal/app/IgniteImpl.java | 50 ++++++++++++-
 .../sql/engine/kill/ItSqlKillCommandTest.java      |  7 --
 .../ignite/internal/tx/impl/TxManagerImpl.java     |  2 +-
 .../tx/views/TransactionsViewProvider.java         | 81 +++++++++-------------
 8 files changed, 84 insertions(+), 103 deletions(-)

diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 953b60158eb..77b120d01a1 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -67,8 +67,6 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.network.TopologyService;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.sql.engine.api.kill.CancellableOperationType;
-import org.apache.ignite.internal.sql.engine.api.kill.OperationKillHandler;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.StreamerReceiverRunner;
 import org.apache.ignite.internal.table.TableViewInternal;
@@ -616,29 +614,6 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
                 });
     }
 
-    /** Returns a {@link OperationKillHandler kill handler} for the compute 
job. */
-    public OperationKillHandler killHandler() {
-        return new OperationKillHandler() {
-            @Override
-            public CompletableFuture<Boolean> cancelAsync(String operationId) {
-                UUID jobId = UUID.fromString(operationId);
-
-                return IgniteComputeImpl.this.cancelAsync(jobId)
-                        .thenApply(res -> res != null ? res : Boolean.FALSE);
-            }
-
-            @Override
-            public boolean local() {
-                return false;
-            }
-
-            @Override
-            public CancellableOperationType type() {
-                return CancellableOperationType.COMPUTE;
-            }
-        };
-    }
-
     @TestOnly
     ComputeComponent computeComponent() {
         return computeComponent;
diff --git 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
index 4aeeaeb2f7f..3164619628b 100644
--- 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
+++ 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.lowwatermark;
 
-import java.util.Set;
 import java.util.UUID;
 import java.util.function.Consumer;
 import org.apache.ignite.internal.event.EventProducer;
@@ -68,11 +67,4 @@ public interface LowWatermark extends 
EventProducer<LowWatermarkEvent, LowWaterm
      * @param lockId ID of the transaction that locks the low watermark.
      */
     void unlock(UUID lockId);
-
-    /**
-     * Returns a set of all transaction IDs that lock the low watermark.
-     *
-     * @return Set of all transaction IDs that lock the low watermark.
-     */
-    Set<UUID> lockIds();
 }
diff --git 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
index 94c10cd4e60..f2753c43434 100644
--- 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
+++ 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
@@ -25,9 +25,7 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 
-import java.util.Collections;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -335,11 +333,6 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
         lock.future().complete(null);
     }
 
-    @Override
-    public Set<UUID> lockIds() {
-        return Collections.unmodifiableSet(locks.keySet());
-    }
-
     CompletableFuture<Void> updateAndNotify(HybridTimestamp newLowWatermark) {
         return inBusyLockAsync(busyLock, () -> {
                     vaultManager.put(LOW_WATERMARK_VAULT_KEY, 
newLowWatermark.toBytes());
diff --git 
a/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
 
b/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
index b5b5951903e..66c6798fbcc 100644
--- 
a/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
+++ 
b/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
@@ -24,9 +24,7 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import java.util.Collections;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -117,11 +115,6 @@ public class TestLowWatermark extends 
AbstractEventProducer<LowWatermarkEvent, L
         lock.future().complete(null);
     }
 
-    @Override
-    public Set<UUID> lockIds() {
-        return Collections.unmodifiableSet(locks.keySet());
-    }
-
     /**
      * Update low watermark and notify listeners.
      *
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index fa5b18443fa..bcb8444749f 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -242,6 +242,8 @@ import 
org.apache.ignite.internal.sql.configuration.distributed.SqlClusterExtens
 import 
org.apache.ignite.internal.sql.configuration.local.SqlNodeExtensionConfiguration;
 import org.apache.ignite.internal.sql.engine.QueryProcessor;
 import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
+import org.apache.ignite.internal.sql.engine.api.kill.CancellableOperationType;
+import org.apache.ignite.internal.sql.engine.api.kill.OperationKillHandler;
 import org.apache.ignite.internal.sql.engine.exec.kill.KillCommandHandler;
 import org.apache.ignite.internal.storage.DataStorageManager;
 import org.apache.ignite.internal.storage.DataStorageModule;
@@ -1029,6 +1031,7 @@ public class IgniteImpl implements Ignite {
         );
 
         systemViewManager.register(txManager);
+        killCommandHandler.register(transactionKillHandler(txManager));
 
         resourceVacuumManager = new ResourceVacuumManager(
                 name,
@@ -1179,7 +1182,7 @@ public class IgniteImpl implements Ignite {
                 clock
         );
 
-        killCommandHandler.register(((IgniteComputeImpl) 
compute).killHandler());
+        killCommandHandler.register(computeKillHandler(compute));
 
         authenticationManager = createAuthenticationManager();
 
@@ -1858,6 +1861,51 @@ public class IgniteImpl implements Ignite {
         );
     }
 
+    /** Returns a {@link OperationKillHandler kill handler} for the compute 
job. */
+    private static OperationKillHandler 
computeKillHandler(IgniteComputeInternal compute) {
+        return new OperationKillHandler() {
+            @Override
+            public CompletableFuture<Boolean> cancelAsync(String operationId) {
+                UUID jobId = UUID.fromString(operationId);
+
+                return compute.cancelAsync(jobId)
+                        .thenApply(res -> res != null ? res : Boolean.FALSE);
+            }
+
+            @Override
+            public boolean local() {
+                return false;
+            }
+
+            @Override
+            public CancellableOperationType type() {
+                return CancellableOperationType.COMPUTE;
+            }
+        };
+    }
+
+    /** Returns a {@link OperationKillHandler kill handler} for the 
transaction. */
+    private static OperationKillHandler transactionKillHandler(TxManager 
txManager) {
+        return new OperationKillHandler() {
+            @Override
+            public CompletableFuture<Boolean> cancelAsync(String operationId) {
+                UUID transactionId = UUID.fromString(operationId);
+
+                return txManager.kill(transactionId);
+            }
+
+            @Override
+            public boolean local() {
+                return true;
+            }
+
+            @Override
+            public CancellableOperationType type() {
+                return CancellableOperationType.TRANSACTION;
+            }
+        };
+    }
+
     @TestOnly
     public Loza raftManager() {
         return raftMgr;
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/kill/ItSqlKillCommandTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/kill/ItSqlKillCommandTest.java
index 9b1b66ed0aa..d488e3e3d3f 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/kill/ItSqlKillCommandTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/kill/ItSqlKillCommandTest.java
@@ -35,7 +35,6 @@ import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
-import static org.junit.jupiter.api.Assumptions.assumeTrue;
 
 import java.util.Collection;
 import java.util.List;
@@ -104,9 +103,6 @@ public class ItSqlKillCommandTest extends 
BaseSqlIntegrationTest {
     @ParameterizedTest
     @EnumSource(CancellableOperationType.class)
     public void killWithInvalidIdentifier(CancellableOperationType type) {
-        // TODO https://issues.apache.org/jira/browse/IGNITE-23488 Remove 
assumption.
-        assumeTrue(type != CancellableOperationType.TRANSACTION, type + " not 
implemented yet");
-
         Consumer<String> exceptionChecker = query -> {
             SqlException err = assertThrowsSqlException(
                     SqlException.class,
@@ -126,9 +122,6 @@ public class ItSqlKillCommandTest extends 
BaseSqlIntegrationTest {
     @ParameterizedTest
     @EnumSource(CancellableOperationType.class)
     public void killNonExistentOperation(CancellableOperationType type) {
-        // TODO https://issues.apache.org/jira/browse/IGNITE-23488 Remove 
assumption.
-        assumeTrue(type != CancellableOperationType.TRANSACTION, type + " not 
implemented yet");
-
         assertThat(executeKill(CLUSTER.aliveNode(), type, UUID.randomUUID(), 
false), is(false));
 
         // NO WAIT should never return false.
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index ddfdc6c3a6f..2f92e73b46a 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -871,7 +871,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
 
             txStateVolatileStorage.start();
 
-            txViewProvider.init(localNodeId, lowWatermark.lockIds(), 
txStateVolatileStorage.statesMap());
+            txViewProvider.init(txStateVolatileStorage.statesMap().values());
 
             orphanDetector.start(txStateVolatileStorage, 
txConfig.abandonedCheckTs());
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/views/TransactionsViewProvider.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/views/TransactionsViewProvider.java
index b051035e4d7..c7c6604228c 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/views/TransactionsViewProvider.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/views/TransactionsViewProvider.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.tx.views;
 
-import static org.apache.ignite.internal.tx.TxState.PENDING;
 import static org.apache.ignite.internal.tx.TxState.isFinalState;
 import static org.apache.ignite.internal.type.NativeTypes.stringOf;
 
@@ -25,20 +24,16 @@ import java.time.Instant;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Flow.Publisher;
 import org.apache.ignite.internal.systemview.api.SystemView;
 import org.apache.ignite.internal.systemview.api.SystemViews;
+import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.TransactionIds;
-import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.internal.type.NativeType;
 import org.apache.ignite.internal.type.NativeTypes;
-import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.internal.util.SubscriptionUtils;
-import org.apache.ignite.internal.util.TransformingIterator;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * {@code TRANSACTIONS} system view provider.
@@ -51,16 +46,8 @@ public class TransactionsViewProvider {
     private volatile Iterable<TxInfo> dataSource;
 
     /** Initializes provider with data sources. */
-    public void init(
-            UUID localNodeId,
-            Collection<UUID> roTxIds,
-            Map<UUID, TxStateMeta> rwTxStates
-    ) {
-        this.dataSource = new TxInfoDataSource(
-                localNodeId,
-                roTxIds,
-                rwTxStates
-        );
+    public void init(Collection<TxStateMeta> txStates) {
+        this.dataSource = new TxInfoDataSource(txStates);
     }
 
     /** Returns a {@code TRANSACTIONS} system view. */
@@ -91,53 +78,53 @@ public class TransactionsViewProvider {
     }
 
     static class TxInfoDataSource implements Iterable<TxInfo> {
-        private final UUID localNodeId;
-
-        private final Iterable<UUID> roTxIds;
-
-        private final Map<UUID, TxStateMeta> rwTxStates;
+        private final Collection<TxStateMeta> txStates;
 
-        TxInfoDataSource(UUID localNodeId, Iterable<UUID> roTxIds, Map<UUID, 
TxStateMeta> rwTxStates) {
-            this.localNodeId = localNodeId;
-            this.roTxIds = roTxIds;
-            this.rwTxStates = rwTxStates;
+        TxInfoDataSource(Collection<TxStateMeta> txStates) {
+            this.txStates = txStates;
         }
 
         @Override
         public Iterator<TxInfo> iterator() {
-            return CollectionUtils.concat(
-                    new TransformingIterator<>(roTxIds.iterator(), 
TxInfo::readOnly),
-                    rwTxStates.entrySet().stream()
-                            .filter(e -> 
localNodeId.equals(e.getValue().txCoordinatorId())
-                                    && e.getValue().tx() != null && 
!e.getValue().tx().isReadOnly()
-                                    && !isFinalState(e.getValue().txState()))
-                            .map(e -> TxInfo.readWrite(e.getKey(), 
e.getValue().txState()))
-                            .iterator()
-            );
+            return txStates.stream()
+                            .filter(txStateMeta -> {
+                                InternalTransaction tx = txStateMeta.tx();
+
+                                if (tx == null) {
+                                    return false;
+                                }
+
+                                // Currently the read-only transaction status 
does not change and it is always in the PENDING state.
+                                if ((tx.isReadOnly() && 
tx.isFinishingOrFinished()) || isFinalState(txStateMeta.txState())) {
+                                    return false;
+                                }
+
+                                return true;
+                            })
+                            .map(TxInfo::new)
+                            .iterator();
         }
     }
 
     static class TxInfo {
         private final String id;
-        private final @Nullable String state;
+        private final String state;
         private final Instant startTime;
         private final String type;
         private final String priority;
 
-        static TxInfo readOnly(UUID id) {
-            return new TxInfo(id, PENDING, true);
-        }
+        TxInfo(TxStateMeta txStateMeta) {
+            InternalTransaction tx = txStateMeta.tx();
 
-        static TxInfo readWrite(UUID id, TxState txState) {
-            return new TxInfo(id, txState, false);
-        }
+            assert tx != null;
+
+            UUID txId = tx.id();
 
-        private TxInfo(UUID id, @Nullable TxState state, boolean readOnly) {
-            this.id = id.toString();
-            this.state = state == null ? null : state.name();
-            this.startTime = 
Instant.ofEpochMilli(TransactionIds.beginTimestamp(id).getPhysical());
-            this.type = readOnly ? READ_ONLY : READ_WRITE;
-            this.priority = TransactionIds.priority(id).name();
+            this.id = txId.toString();
+            this.state = txStateMeta.txState().name();
+            this.startTime = 
Instant.ofEpochMilli(TransactionIds.beginTimestamp(txId).getPhysical());
+            this.type = tx.isReadOnly() ? READ_ONLY : READ_WRITE;
+            this.priority = TransactionIds.priority(txId).name();
         }
     }
 }

Reply via email to