This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 26904f638f7 IGNITE-23457 Sql. Coordinator failed to send close message
after query cancellation (#7194)
26904f638f7 is described below
commit 26904f638f71652b727d1d27eb343ed0491d44dd
Author: Max Zhuravkov <[email protected]>
AuthorDate: Fri Dec 12 11:56:26 2025 +0200
IGNITE-23457 Sql. Coordinator failed to send close message after query
cancellation (#7194)
---
.../sql/engine/exec/ExecutionServiceImpl.java | 4 +-
.../sql/engine/exec/ExecutionServiceImplTest.java | 188 ++++++++++++++++-----
2 files changed, 151 insertions(+), 41 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index fcba7b4faab..900c5ba4ae1 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -1342,7 +1342,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, LogicalTopo
stage = start.thenCompose(ignored ->
messageService.send(coordinatorNodeName, FACTORY.queryCloseMessage()
.queryId(executionId.queryId())
.executionToken(executionId.executionToken())
- .build()))
+ .build()).exceptionally(ignore -> null))
.thenCompose(ignored -> closeLocalFragments());
}
@@ -1413,7 +1413,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, LogicalTopo
.queryId(executionId.queryId())
.executionToken(executionId.executionToken())
.build()
- );
+ ).exceptionally(ignore -> null);
})
);
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index f9acf2f5668..352690e02bb 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -52,6 +52,7 @@ import static org.mockito.Mockito.when;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -68,6 +69,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -131,6 +133,7 @@ import
org.apache.ignite.internal.sql.engine.message.MessageListener;
import org.apache.ignite.internal.sql.engine.message.MessageService;
import org.apache.ignite.internal.sql.engine.message.QueryBatchMessage;
import org.apache.ignite.internal.sql.engine.message.QueryBatchRequestMessage;
+import org.apache.ignite.internal.sql.engine.message.QueryCloseMessage;
import org.apache.ignite.internal.sql.engine.message.QueryStartRequest;
import org.apache.ignite.internal.sql.engine.message.QueryStartResponse;
import org.apache.ignite.internal.sql.engine.message.QueryStartResponseImpl;
@@ -284,7 +287,8 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
when(clockService.currentLong()).thenReturn(new HybridTimestamp(1_000,
500).longValue());
- AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters>
producer = new AbstractEventProducer<>() {};
+ AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters>
producer = new AbstractEventProducer<>() {
+ };
prepareService = new PrepareServiceImpl(
"test",
@@ -635,7 +639,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
startResponse.await();
execService.onNodeLeft(
- new LogicalNode(firstNode, Map.of()),
+ new LogicalNode(firstNode, Map.of()),
new LogicalTopologySnapshot(currentTopologyVersion.get(),
List.of(), randomUUID())
);
@@ -851,45 +855,45 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
String expectedOnCoordinator = format(
"Debug info for query: {} (canceled=false, stopped=false)" + nl
- + " Coordinator node: node_1 (current node)" + nl
- + " Root node state: opened" + nl
- + nl
- + " Fragments awaiting init completion:" + nl
- + " id=1, node=node_1" + nl
- + " id=2, node=node_1" + nl
- + " id=2, node=node_2" + nl
- + " id=2, node=node_3" + nl
- + nl
- + " Local fragments:" + nl
- + " id=1, state=opened, canceled=false, class=Inbox (root)"
+ nl
- + " id=2, state=opened, canceled=false, class=Outbox" + nl
- + nl
- + " Fragment#1 tree:" + nl
- + " class=Inbox, requested=512" + nl
- + " class=RemoteSource, nodeName=node_1, state=WAITING" +
nl
- + " class=RemoteSource, nodeName=node_2, state=WAITING" +
nl
- + " class=RemoteSource, nodeName=node_3, state=WAITING" +
nl
- + nl
- + " Fragment#2 tree:" + nl
- + " class=Outbox, waiting=-1" + nl
- + " class=RemoteDownstream, nodeName=node_1, state=END" +
nl
- + " class=, requested=0" + nl
- + nl, new ExecutionId(ctx.queryId(), 0));
+ + " Coordinator node: node_1 (current node)" + nl
+ + " Root node state: opened" + nl
+ + nl
+ + " Fragments awaiting init completion:" + nl
+ + " id=1, node=node_1" + nl
+ + " id=2, node=node_1" + nl
+ + " id=2, node=node_2" + nl
+ + " id=2, node=node_3" + nl
+ + nl
+ + " Local fragments:" + nl
+ + " id=1, state=opened, canceled=false, class=Inbox
(root)" + nl
+ + " id=2, state=opened, canceled=false,
class=Outbox" + nl
+ + nl
+ + " Fragment#1 tree:" + nl
+ + " class=Inbox, requested=512" + nl
+ + " class=RemoteSource, nodeName=node_1,
state=WAITING" + nl
+ + " class=RemoteSource, nodeName=node_2,
state=WAITING" + nl
+ + " class=RemoteSource, nodeName=node_3,
state=WAITING" + nl
+ + nl
+ + " Fragment#2 tree:" + nl
+ + " class=Outbox, waiting=-1" + nl
+ + " class=RemoteDownstream, nodeName=node_1,
state=END" + nl
+ + " class=, requested=0" + nl
+ + nl, new ExecutionId(ctx.queryId(), 0));
assertThat(debugInfoCoordinator, equalTo(expectedOnCoordinator));
String expectedOnNonCoordinator = format(
"Debug info for query: {} (canceled=false, stopped=false)" + nl
- + " Coordinator node: node_1" + nl
- + nl
- + " Local fragments:" + nl
- + " id=2, state=opened, canceled=false, class=Outbox" + nl
- + nl
- + " Fragment#2 tree:" + nl
- + " class=Outbox, waiting=-1" + nl
- + " class=RemoteDownstream, nodeName=node_1, state=END" +
nl
- + " class=, requested=0" + nl
- + nl, new ExecutionId(ctx.queryId(), 0));
+ + " Coordinator node: node_1" + nl
+ + nl
+ + " Local fragments:" + nl
+ + " id=2, state=opened, canceled=false,
class=Outbox" + nl
+ + nl
+ + " Fragment#2 tree:" + nl
+ + " class=Outbox, waiting=-1" + nl
+ + " class=RemoteDownstream, nodeName=node_1,
state=END" + nl
+ + " class=, requested=0" + nl
+ + nl, new ExecutionId(ctx.queryId(), 0));
assertThat(debugInfo2, equalTo(expectedOnNonCoordinator));
assertThat(debugInfo3, equalTo(expectedOnNonCoordinator));
@@ -1023,7 +1027,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
* <p>The sequence of events on real cluster is as follow:<ul>
* <li>Given: cluster of 3 nodes, distribution zone spans all these
nodes.</li>
* <li>Node 1 has been restarted.</li>
- * <li>Notification of
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener#onNodeLeft
+ * <li>Notification of
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener#onNodeLeft
* handlers are delayed on node 2 (due to metastorage lagging or whatever
reason).</li>
* <li>Query started from node 1.</li>
* <li>Root fragment processed locally, QueryBatchRequest came to node 2
before QueryStartRequest. This step
@@ -1094,7 +1098,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
void outdatedNodeLeftEventDoesntCauseQueryToHangAllNodes() {
QueryPlan plan = prepare("SELECT * FROM test_tbl", createContext());
- AtomicLong currentTopologyVersion = new AtomicLong();
+ AtomicLong currentTopologyVersion = new AtomicLong();
CountDownLatch latch = new CountDownLatch(1);
// Triggers node left events with previous topology version for every
node in the cluster.
@@ -1188,7 +1192,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
assertEquals(expectedEx, actualException);
- verify(txWrapper).finalise(ArgumentMatchers.<Exception>argThat(ex ->
+ verify(txWrapper).finalise(ArgumentMatchers.<Exception>argThat(ex ->
expectedEx.getMessage().equals(ExceptionUtils.unwrapCause(ex).getMessage())));
}
@@ -1219,6 +1223,112 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
assertThat(txCtx.observableTime(),
equalTo(expectedCatalogActivationTimestamp));
}
+ @Test
+ public void coordinatorIgnoresRemoteCloseErrorFromNodeOnCoordinator()
throws InterruptedException {
+ ExecutionService execService = executionServices.get(0);
+
+ nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
+
+ var expectedEx = new RuntimeException("Test error");
+ var queryClosed = new CountDownLatch(nodeNames.size() - 1);
+
+ String coordinatorNode = nodeNames.get(0);
+ testCluster.node(coordinatorNode).interceptor((senderNode, msg,
original) -> {
+ if (msg instanceof QueryStartRequest) {
+ QueryStartRequest queryStart = (QueryStartRequest) msg;
+
+ String nodeName = senderNode.name();
+
testCluster.node(coordinatorNode).messageService().send(nodeName, new
SqlQueryMessagesFactory().queryStartResponse()
+ .queryId(queryStart.queryId())
+ .fragmentId(queryStart.fragmentId())
+ .error(expectedEx)
+ .build()
+ );
+ } else {
+ original.onMessage(senderNode, msg);
+ }
+
+ if (msg instanceof QueryCloseMessage) {
+ queryClosed.countDown();
+ return CompletableFuture.failedFuture(new
RuntimeException("Test exception: failed to close"));
+ } else {
+ return nullCompletedFuture();
+ }
+ });
+
+ SqlOperationContext ctx = createContext();
+ QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
+
+ RuntimeException actualException =
assertWillThrow(execService.executePlan(plan, ctx), RuntimeException.class);
+ assertEquals(expectedEx, actualException);
+
+ queryClosed.await();
+ }
+
+ @Test
+ public void coordinatorIgnoresRemoteCloseErrorOnNode() throws
InterruptedException {
+ ExecutionService execService = executionServices.get(0);
+
+ nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
+
+ var expectedEx = new RuntimeException("Test error");
+ var queryClosed = new CountDownLatch(nodeNames.size() - 1);
+
+ String coordinatorNodeName = nodeNames.get(0);
+ List<String> shuffledNodeNames = nodeNames.stream()
+ .filter(n -> !n.equals(coordinatorNodeName))
+ .collect(Collectors.toList());
+ Collections.shuffle(shuffledNodeNames);
+
+ testCluster.node(coordinatorNodeName).interceptor((senderNode, msg,
original) -> {
+ if (msg instanceof QueryStartRequest) {
+ QueryStartRequest queryStart = (QueryStartRequest) msg;
+
+ String nodeName = senderNode.name();
+
testCluster.node(coordinatorNodeName).messageService().send(nodeName, new
SqlQueryMessagesFactory().queryStartResponse()
+ .queryId(queryStart.queryId())
+ .fragmentId(queryStart.fragmentId())
+ .error(expectedEx)
+ .build()
+ );
+ } else {
+ original.onMessage(senderNode, msg);
+ }
+
+ if (msg instanceof QueryCloseMessage) {
+ queryClosed.countDown();
+ return CompletableFuture.failedFuture(new
RuntimeException("Test exception: failed to close"));
+ } else {
+ return nullCompletedFuture();
+ }
+ });
+
+ AtomicBoolean closeShouldFail = new AtomicBoolean();
+
+ for (String nodeName : shuffledNodeNames) {
+ testCluster.node(nodeName).interceptor((senderNode, msg, original)
-> {
+ original.onMessage(senderNode, msg);
+
+ if (msg instanceof QueryCloseMessage) {
+ queryClosed.countDown();
+ // Let only one QueryClose to return an exception.
+ if (closeShouldFail.compareAndSet(false, true)) {
+ return CompletableFuture.failedFuture(new
RuntimeException("Test exception: failed to close"));
+ }
+ }
+ return nullCompletedFuture();
+ });
+ }
+
+ SqlOperationContext ctx = createContext();
+ QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
+
+ RuntimeException actualException =
assertWillThrow(execService.executePlan(plan, ctx), RuntimeException.class);
+ assertEquals(expectedEx, actualException);
+
+ queryClosed.await();
+ }
+
private static Stream<Arguments> txTypes() {
return Stream.of(
Arguments.of(Named.named("ro-implicit",
NoOpTransaction.readOnly("ro", true))),