This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new bacda8436c5 IGNITE-24968 Java thin: fix race on query cancellation
(#5569)
bacda8436c5 is described below
commit bacda8436c5da067b06d81ca152f97e4416e20d2
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Tue Apr 8 13:32:48 2025 +0300
IGNITE-24968 Java thin: fix race on query cancellation (#5569)
Co-authored-by: Pavel Tupitsyn <[email protected]>
---
.../ignite/internal/client/PayloadOutputChannel.java | 18 ++++++++++++++++++
.../ignite/internal/client/TcpClientChannel.java | 5 +++++
.../apache/ignite/internal/client/sql/ClientSql.java | 13 ++++++++-----
.../ignite/internal/sql/api/ItSqlApiBaseTest.java | 16 ++++++----------
4 files changed, 37 insertions(+), 15 deletions(-)
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
index 6c86281e49e..d932247b62f 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.client;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
+import org.jetbrains.annotations.Nullable;
/**
* Thin client payload output channel.
@@ -32,6 +33,9 @@ public class PayloadOutputChannel implements AutoCloseable {
/** Client request ID. */
private final long requestId;
+ /** Action to be executed when the payload is sent. */
+ private volatile @Nullable Runnable onSentAction;
+
/**
* Constructor.
*
@@ -77,4 +81,18 @@ public class PayloadOutputChannel implements AutoCloseable {
public void close() {
out.close();
}
+
+ /**
+ * Sets the action to be executed when the payload is sent successfully.
+ *
+ * @param action Action to be executed.
+ */
+ public void onSent(Runnable action) {
+ this.onSentAction = action;
+ }
+
+ /** Returns an action, if any, that should be executed when the payload is
sent successfully. */
+ @Nullable Runnable onSentAction() {
+ return onSentAction;
+ }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index a9c6e2b26c6..9dc67872570 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -379,6 +379,11 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
onDisconnected(ex);
} else {
metrics.requestsSentIncrement();
+
+ Runnable action = payloadCh.onSentAction();
+ if (action != null) {
+ asyncContinuationExecutor.execute(action);
+ }
}
});
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
index b693440988e..b77cbb8eaf0 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
@@ -264,7 +264,7 @@ public class ClientSql implements IgniteSql {
w.out().packLong(ch.observableTimestamp().get().longValue());
if (cancellationToken != null) {
- addCancelAction(cancellationToken, w.requestId());
+ addCancelAction(cancellationToken, w);
}
};
@@ -351,21 +351,24 @@ public class ClientSql implements IgniteSql {
w.out().packLong(ch.observableTimestamp().get().longValue());
if (cancellationToken != null) {
- addCancelAction(cancellationToken, w.requestId());
+ addCancelAction(cancellationToken, w);
}
};
return ch.serviceAsync(ClientOp.SQL_EXEC_SCRIPT, payloadWriter, null);
}
- private void addCancelAction(CancellationToken cancellationToken, long
correlationToken) {
+ private static void addCancelAction(CancellationToken cancellationToken,
PayloadOutputChannel ch) {
CompletableFuture<Void> cancelFuture = new CompletableFuture<>();
if (CancelHandleHelper.isCancelled(cancellationToken)) {
throw new SqlException(Sql.EXECUTION_CANCELLED_ERR, "The query was
cancelled while executing.");
}
- Runnable cancelAction = () ->
ch.serviceAsync(ClientOp.SQL_CANCEL_EXEC, w ->
w.out().packLong(correlationToken), null)
+ long correlationToken = ch.requestId();
+
+ Runnable cancelAction = () -> ch.clientChannel()
+ .serviceAsync(ClientOp.SQL_CANCEL_EXEC, w ->
w.out().packLong(correlationToken), null)
.whenComplete((r, e) -> {
if (e != null) {
cancelFuture.completeExceptionally(e);
@@ -374,7 +377,7 @@ public class ClientSql implements IgniteSql {
}
});
- CancelHandleHelper.addCancelAction(cancellationToken, cancelAction,
cancelFuture);
+ ch.onSent(() -> CancelHandleHelper.addCancelAction(cancellationToken,
cancelAction, cancelFuture));
}
private static void packProperties(
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
index 2e89ed3b934..88a8788aca4 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
@@ -1286,22 +1286,18 @@ public abstract class ItSqlApiBaseTest extends
BaseSqlIntegrationTest {
startBarrier.await();
- ResultSet<SqlRow> rs;
-
- try {
- rs = executeLazy(sql, token, query);
+ try (ResultSet<SqlRow> rs = executeLazy(sql, token, query)) {
+ expectQueryCancelled(() -> {
+ while (rs.hasNext()) {
+ rs.next();
+ }
+ });
} catch (SqlException e) {
assertEquals(Sql.EXECUTION_CANCELLED_ERR, e.code());
continue;
}
- expectQueryCancelled(() -> {
- while (rs.hasNext()) {
- rs.next();
- }
- });
-
await(cancelFut);
startBarrier.reset();