This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 26904f638f7 IGNITE-23457 Sql. Coordinator failed to send close message 
after query cancellation (#7194)
26904f638f7 is described below

commit 26904f638f71652b727d1d27eb343ed0491d44dd
Author: Max Zhuravkov <[email protected]>
AuthorDate: Fri Dec 12 11:56:26 2025 +0200

    IGNITE-23457 Sql. Coordinator failed to send close message after query 
cancellation (#7194)
---
 .../sql/engine/exec/ExecutionServiceImpl.java      |   4 +-
 .../sql/engine/exec/ExecutionServiceImplTest.java  | 188 ++++++++++++++++-----
 2 files changed, 151 insertions(+), 41 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index fcba7b4faab..900c5ba4ae1 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -1342,7 +1342,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, LogicalTopo
                 stage = start.thenCompose(ignored -> 
messageService.send(coordinatorNodeName, FACTORY.queryCloseMessage()
                                 .queryId(executionId.queryId())
                                 .executionToken(executionId.executionToken())
-                                .build()))
+                                .build()).exceptionally(ignore -> null))
                         .thenCompose(ignored -> closeLocalFragments());
             }
 
@@ -1413,7 +1413,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, LogicalTopo
                                                     
.queryId(executionId.queryId())
                                                     
.executionToken(executionId.executionToken())
                                                     .build()
-                                    );
+                                    ).exceptionally(ignore -> null);
                                 })
                 );
             }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index f9acf2f5668..352690e02bb 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -52,6 +52,7 @@ import static org.mockito.Mockito.when;
 
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -68,6 +69,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -131,6 +133,7 @@ import 
org.apache.ignite.internal.sql.engine.message.MessageListener;
 import org.apache.ignite.internal.sql.engine.message.MessageService;
 import org.apache.ignite.internal.sql.engine.message.QueryBatchMessage;
 import org.apache.ignite.internal.sql.engine.message.QueryBatchRequestMessage;
+import org.apache.ignite.internal.sql.engine.message.QueryCloseMessage;
 import org.apache.ignite.internal.sql.engine.message.QueryStartRequest;
 import org.apache.ignite.internal.sql.engine.message.QueryStartResponse;
 import org.apache.ignite.internal.sql.engine.message.QueryStartResponseImpl;
@@ -284,7 +287,8 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
 
         when(clockService.currentLong()).thenReturn(new HybridTimestamp(1_000, 
500).longValue());
 
-        AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters> 
producer = new AbstractEventProducer<>() {};
+        AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters> 
producer = new AbstractEventProducer<>() {
+        };
 
         prepareService = new PrepareServiceImpl(
                 "test",
@@ -635,7 +639,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
 
         startResponse.await();
         execService.onNodeLeft(
-                new LogicalNode(firstNode, Map.of()), 
+                new LogicalNode(firstNode, Map.of()),
                 new LogicalTopologySnapshot(currentTopologyVersion.get(), 
List.of(), randomUUID())
         );
 
@@ -851,45 +855,45 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
 
         String expectedOnCoordinator = format(
                 "Debug info for query: {} (canceled=false, stopped=false)" + nl
-                + "  Coordinator node: node_1 (current node)" + nl
-                + "  Root node state: opened" + nl
-                + nl
-                + "  Fragments awaiting init completion:" + nl
-                + "    id=1, node=node_1" + nl
-                + "    id=2, node=node_1" + nl
-                + "    id=2, node=node_2" + nl
-                + "    id=2, node=node_3" + nl
-                + nl
-                + "  Local fragments:" + nl
-                + "    id=1, state=opened, canceled=false, class=Inbox (root)" 
+ nl
-                + "    id=2, state=opened, canceled=false, class=Outbox" + nl
-                + nl
-                + "  Fragment#1 tree:" + nl
-                + "    class=Inbox, requested=512" + nl
-                + "      class=RemoteSource, nodeName=node_1, state=WAITING" + 
nl
-                + "      class=RemoteSource, nodeName=node_2, state=WAITING" + 
nl
-                + "      class=RemoteSource, nodeName=node_3, state=WAITING" + 
nl
-                + nl
-                + "  Fragment#2 tree:" + nl
-                + "    class=Outbox, waiting=-1" + nl
-                + "      class=RemoteDownstream, nodeName=node_1, state=END" + 
nl
-                + "      class=, requested=0" + nl
-                + nl, new ExecutionId(ctx.queryId(), 0));
+                        + "  Coordinator node: node_1 (current node)" + nl
+                        + "  Root node state: opened" + nl
+                        + nl
+                        + "  Fragments awaiting init completion:" + nl
+                        + "    id=1, node=node_1" + nl
+                        + "    id=2, node=node_1" + nl
+                        + "    id=2, node=node_2" + nl
+                        + "    id=2, node=node_3" + nl
+                        + nl
+                        + "  Local fragments:" + nl
+                        + "    id=1, state=opened, canceled=false, class=Inbox 
(root)" + nl
+                        + "    id=2, state=opened, canceled=false, 
class=Outbox" + nl
+                        + nl
+                        + "  Fragment#1 tree:" + nl
+                        + "    class=Inbox, requested=512" + nl
+                        + "      class=RemoteSource, nodeName=node_1, 
state=WAITING" + nl
+                        + "      class=RemoteSource, nodeName=node_2, 
state=WAITING" + nl
+                        + "      class=RemoteSource, nodeName=node_3, 
state=WAITING" + nl
+                        + nl
+                        + "  Fragment#2 tree:" + nl
+                        + "    class=Outbox, waiting=-1" + nl
+                        + "      class=RemoteDownstream, nodeName=node_1, 
state=END" + nl
+                        + "      class=, requested=0" + nl
+                        + nl, new ExecutionId(ctx.queryId(), 0));
 
         assertThat(debugInfoCoordinator, equalTo(expectedOnCoordinator));
 
         String expectedOnNonCoordinator = format(
                 "Debug info for query: {} (canceled=false, stopped=false)" + nl
-                + "  Coordinator node: node_1" + nl
-                + nl
-                + "  Local fragments:" + nl
-                + "    id=2, state=opened, canceled=false, class=Outbox" + nl
-                + nl
-                + "  Fragment#2 tree:" + nl
-                + "    class=Outbox, waiting=-1" + nl
-                + "      class=RemoteDownstream, nodeName=node_1, state=END" + 
nl
-                + "      class=, requested=0" + nl
-                + nl, new ExecutionId(ctx.queryId(), 0));
+                        + "  Coordinator node: node_1" + nl
+                        + nl
+                        + "  Local fragments:" + nl
+                        + "    id=2, state=opened, canceled=false, 
class=Outbox" + nl
+                        + nl
+                        + "  Fragment#2 tree:" + nl
+                        + "    class=Outbox, waiting=-1" + nl
+                        + "      class=RemoteDownstream, nodeName=node_1, 
state=END" + nl
+                        + "      class=, requested=0" + nl
+                        + nl, new ExecutionId(ctx.queryId(), 0));
 
         assertThat(debugInfo2, equalTo(expectedOnNonCoordinator));
         assertThat(debugInfo3, equalTo(expectedOnNonCoordinator));
@@ -1023,7 +1027,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
      * <p>The sequence of events on real cluster is as follow:<ul>
      * <li>Given: cluster of 3 nodes, distribution zone spans all these 
nodes.</li>
      * <li>Node 1 has been restarted.</li>
-     * <li>Notification of 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener#onNodeLeft
 
+     * <li>Notification of 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener#onNodeLeft
      * handlers are delayed on node 2 (due to metastorage lagging or whatever 
reason).</li>
      * <li>Query started from node 1.</li>
      * <li>Root fragment processed locally, QueryBatchRequest came to node 2 
before QueryStartRequest. This step
@@ -1094,7 +1098,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
     void outdatedNodeLeftEventDoesntCauseQueryToHangAllNodes() {
         QueryPlan plan = prepare("SELECT * FROM test_tbl", createContext());
 
-        AtomicLong currentTopologyVersion = new AtomicLong(); 
+        AtomicLong currentTopologyVersion = new AtomicLong();
         CountDownLatch latch = new CountDownLatch(1);
 
         // Triggers node left events with previous topology version for every 
node in the cluster.
@@ -1188,7 +1192,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
 
         assertEquals(expectedEx, actualException);
 
-        verify(txWrapper).finalise(ArgumentMatchers.<Exception>argThat(ex -> 
+        verify(txWrapper).finalise(ArgumentMatchers.<Exception>argThat(ex ->
                 
expectedEx.getMessage().equals(ExceptionUtils.unwrapCause(ex).getMessage())));
     }
 
@@ -1219,6 +1223,112 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
         assertThat(txCtx.observableTime(), 
equalTo(expectedCatalogActivationTimestamp));
     }
 
+    @Test
+    public void coordinatorIgnoresRemoteCloseErrorFromNodeOnCoordinator() 
throws InterruptedException {
+        ExecutionService execService = executionServices.get(0);
+
+        nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
+
+        var expectedEx = new RuntimeException("Test error");
+        var queryClosed = new CountDownLatch(nodeNames.size() - 1);
+
+        String coordinatorNode = nodeNames.get(0);
+        testCluster.node(coordinatorNode).interceptor((senderNode, msg, 
original) -> {
+            if (msg instanceof QueryStartRequest) {
+                QueryStartRequest queryStart = (QueryStartRequest) msg;
+
+                String nodeName = senderNode.name();
+                
testCluster.node(coordinatorNode).messageService().send(nodeName, new 
SqlQueryMessagesFactory().queryStartResponse()
+                        .queryId(queryStart.queryId())
+                        .fragmentId(queryStart.fragmentId())
+                        .error(expectedEx)
+                        .build()
+                );
+            } else {
+                original.onMessage(senderNode, msg);
+            }
+
+            if (msg instanceof QueryCloseMessage) {
+                queryClosed.countDown();
+                return CompletableFuture.failedFuture(new 
RuntimeException("Test exception: failed to close"));
+            } else {
+                return nullCompletedFuture();
+            }
+        });
+
+        SqlOperationContext ctx = createContext();
+        QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
+
+        RuntimeException actualException = 
assertWillThrow(execService.executePlan(plan, ctx), RuntimeException.class);
+        assertEquals(expectedEx, actualException);
+
+        queryClosed.await();
+    }
+
+    @Test
+    public void coordinatorIgnoresRemoteCloseErrorOnNode() throws 
InterruptedException {
+        ExecutionService execService = executionServices.get(0);
+
+        nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
+
+        var expectedEx = new RuntimeException("Test error");
+        var queryClosed = new CountDownLatch(nodeNames.size() - 1);
+
+        String coordinatorNodeName = nodeNames.get(0);
+        List<String> shuffledNodeNames = nodeNames.stream()
+                .filter(n -> !n.equals(coordinatorNodeName))
+                .collect(Collectors.toList());
+        Collections.shuffle(shuffledNodeNames);
+
+        testCluster.node(coordinatorNodeName).interceptor((senderNode, msg, 
original) -> {
+            if (msg instanceof QueryStartRequest) {
+                QueryStartRequest queryStart = (QueryStartRequest) msg;
+
+                String nodeName = senderNode.name();
+                
testCluster.node(coordinatorNodeName).messageService().send(nodeName, new 
SqlQueryMessagesFactory().queryStartResponse()
+                        .queryId(queryStart.queryId())
+                        .fragmentId(queryStart.fragmentId())
+                        .error(expectedEx)
+                        .build()
+                );
+            } else {
+                original.onMessage(senderNode, msg);
+            }
+
+            if (msg instanceof QueryCloseMessage) {
+                queryClosed.countDown();
+                return CompletableFuture.failedFuture(new 
RuntimeException("Test exception: failed to close"));
+            } else {
+                return nullCompletedFuture();
+            }
+        });
+
+        AtomicBoolean closeShouldFail = new AtomicBoolean();
+
+        for (String nodeName : shuffledNodeNames) {
+            testCluster.node(nodeName).interceptor((senderNode, msg, original) 
-> {
+                original.onMessage(senderNode, msg);
+
+                if (msg instanceof QueryCloseMessage) {
+                    queryClosed.countDown();
+                    // Let only one QueryClose to return an exception.
+                    if (closeShouldFail.compareAndSet(false, true)) {
+                        return CompletableFuture.failedFuture(new 
RuntimeException("Test exception: failed to close"));
+                    }
+                }
+                return nullCompletedFuture();
+            });
+        }
+
+        SqlOperationContext ctx = createContext();
+        QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
+
+        RuntimeException actualException = 
assertWillThrow(execService.executePlan(plan, ctx), RuntimeException.class);
+        assertEquals(expectedEx, actualException);
+
+        queryClosed.await();
+    }
+
     private static Stream<Arguments> txTypes() {
         return Stream.of(
                 Arguments.of(Named.named("ro-implicit", 
NoOpTransaction.readOnly("ro", true))),

Reply via email to