This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new bacda8436c5 IGNITE-24968 Java thin: fix race on query cancellation 
(#5569)
bacda8436c5 is described below

commit bacda8436c5da067b06d81ca152f97e4416e20d2
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Tue Apr 8 13:32:48 2025 +0300

    IGNITE-24968 Java thin: fix race on query cancellation (#5569)
    
    Co-authored-by: Pavel Tupitsyn <[email protected]>
---
 .../ignite/internal/client/PayloadOutputChannel.java   | 18 ++++++++++++++++++
 .../ignite/internal/client/TcpClientChannel.java       |  5 +++++
 .../apache/ignite/internal/client/sql/ClientSql.java   | 13 ++++++++-----
 .../ignite/internal/sql/api/ItSqlApiBaseTest.java      | 16 ++++++----------
 4 files changed, 37 insertions(+), 15 deletions(-)

diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
index 6c86281e49e..d932247b62f 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.client;
 
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Thin client payload output channel.
@@ -32,6 +33,9 @@ public class PayloadOutputChannel implements AutoCloseable {
     /** Client request ID. */
     private final long requestId;
 
+    /** Action to be executed when the payload is sent. */
+    private volatile @Nullable Runnable onSentAction;
+
     /**
      * Constructor.
      *
@@ -77,4 +81,18 @@ public class PayloadOutputChannel implements AutoCloseable {
     public void close() {
         out.close();
     }
+
+    /**
+     * Sets the action to be executed when the payload is sent successfully.
+     *
+     * @param action Action to be executed.
+     */
+    public void onSent(Runnable action) {
+        this.onSentAction = action;
+    }
+
+    /** Returns an action, if any, that should be executed when the payload is 
sent successfully. */
+    @Nullable Runnable onSentAction() {
+        return onSentAction;
+    }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index a9c6e2b26c6..9dc67872570 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -379,6 +379,11 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
                     onDisconnected(ex);
                 } else {
                     metrics.requestsSentIncrement();
+
+                    Runnable action = payloadCh.onSentAction();
+                    if (action != null) {
+                        asyncContinuationExecutor.execute(action);
+                    }
                 }
             });
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
index b693440988e..b77cbb8eaf0 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
@@ -264,7 +264,7 @@ public class ClientSql implements IgniteSql {
             w.out().packLong(ch.observableTimestamp().get().longValue());
 
             if (cancellationToken != null) {
-                addCancelAction(cancellationToken, w.requestId());
+                addCancelAction(cancellationToken, w);
             }
         };
 
@@ -351,21 +351,24 @@ public class ClientSql implements IgniteSql {
             w.out().packLong(ch.observableTimestamp().get().longValue());
 
             if (cancellationToken != null) {
-                addCancelAction(cancellationToken, w.requestId());
+                addCancelAction(cancellationToken, w);
             }
         };
 
         return ch.serviceAsync(ClientOp.SQL_EXEC_SCRIPT, payloadWriter, null);
     }
 
-    private void addCancelAction(CancellationToken cancellationToken, long 
correlationToken) {
+    private static void addCancelAction(CancellationToken cancellationToken, 
PayloadOutputChannel ch) {
         CompletableFuture<Void> cancelFuture = new CompletableFuture<>();
 
         if (CancelHandleHelper.isCancelled(cancellationToken)) {
             throw new SqlException(Sql.EXECUTION_CANCELLED_ERR, "The query was 
cancelled while executing.");
         }
 
-        Runnable cancelAction = () -> 
ch.serviceAsync(ClientOp.SQL_CANCEL_EXEC, w -> 
w.out().packLong(correlationToken), null)
+        long correlationToken = ch.requestId();
+
+        Runnable cancelAction = () -> ch.clientChannel()
+                .serviceAsync(ClientOp.SQL_CANCEL_EXEC, w -> 
w.out().packLong(correlationToken), null)
                 .whenComplete((r, e) -> {
                     if (e != null) {
                         cancelFuture.completeExceptionally(e);
@@ -374,7 +377,7 @@ public class ClientSql implements IgniteSql {
                     }
                 });
 
-        CancelHandleHelper.addCancelAction(cancellationToken, cancelAction, 
cancelFuture);
+        ch.onSent(() -> CancelHandleHelper.addCancelAction(cancellationToken, 
cancelAction, cancelFuture));
     }
 
     private static void packProperties(
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
index 2e89ed3b934..88a8788aca4 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
@@ -1286,22 +1286,18 @@ public abstract class ItSqlApiBaseTest extends 
BaseSqlIntegrationTest {
 
             startBarrier.await();
 
-            ResultSet<SqlRow> rs;
-
-            try {
-                rs = executeLazy(sql, token, query);
+            try (ResultSet<SqlRow> rs = executeLazy(sql, token, query)) {
+                expectQueryCancelled(() -> {
+                    while (rs.hasNext()) {
+                        rs.next();
+                    }
+                });
             } catch (SqlException e) {
                 assertEquals(Sql.EXECUTION_CANCELLED_ERR, e.code());
 
                 continue;
             }
 
-            expectQueryCancelled(() -> {
-                while (rs.hasNext()) {
-                    rs.next();
-                }
-            });
-
             await(cancelFut);
 
             startBarrier.reset();

Reply via email to