This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d6750bab57 IGNITE-23831 Sql. ExecutionService must create unique 
executionId for retries for same queryId (#4827)
d6750bab57 is described below

commit d6750bab57532248308b029572f3221b0c3d60ee
Author: korlov42 <[email protected]>
AuthorDate: Thu Dec 5 15:15:44 2024 +0200

    IGNITE-23831 Sql. ExecutionService must create unique executionId for 
retries for same queryId (#4827)
---
 .../internal/sql/engine/exec/ExchangeService.java  |  13 +--
 .../sql/engine/exec/ExchangeServiceImpl.java       |  34 +++---
 .../internal/sql/engine/exec/ExecutionContext.java |  30 +++--
 .../internal/sql/engine/exec/ExecutionId.java      |  67 +++++++++++
 .../sql/engine/exec/ExecutionServiceImpl.java      |  97 ++++++++--------
 .../internal/sql/engine/exec/MailboxRegistry.java  |   9 +-
 .../sql/engine/exec/MailboxRegistryImpl.java       |  39 +++----
 .../internal/sql/engine/exec/rel/AbstractNode.java |   4 +-
 .../ignite/internal/sql/engine/exec/rel/Inbox.java |   2 +-
 .../internal/sql/engine/exec/rel/Mailbox.java      |   8 +-
 .../internal/sql/engine/exec/rel/Outbox.java       |  12 +-
 .../internal/sql/engine/message/ErrorMessage.java  |   5 +
 .../message/ExecutionContextAwareMessage.java      |   5 +
 .../sql/engine/message/QueryCloseMessage.java      |   5 +
 .../sql/engine/message/QueryStartResponse.java     |   5 +
 .../sql/engine/exec/ExecutionServiceImplTest.java  | 129 +++++++++++++--------
 .../sql/engine/exec/RuntimeSortedIndexTest.java    |   2 +-
 .../sql/engine/exec/rel/AbstractExecutionTest.java |   3 +-
 .../sql/engine/framework/TestBuilders.java         |   3 +-
 19 files changed, 298 insertions(+), 174 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
index e7eb05e33d..6846280428 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.sql.engine.exec;
 
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryTupleMessage;
 import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
@@ -38,7 +37,7 @@ public interface ExchangeService extends LifecycleAware {
      * Asynchronously sends a batch of data to the specified node.
      *
      * @param nodeName The name of the node to which the data will be sent.
-     * @param queryId The ID of the query to which the data belongs.
+     * @param executionId The ID of the execution to which the data belongs.
      * @param fragmentId The ID of the fragment to which the data will be sent.
      * @param exchangeId The ID of the exchange through which the data will be 
sent.
      * @param batchId The ID of the batch to which the data belongs.
@@ -47,14 +46,14 @@ public interface ExchangeService extends LifecycleAware {
      * @return A {@link CompletableFuture future} representing the result of 
operation,
      *      which completes when the data has been sent.
      */
-    CompletableFuture<Void> sendBatch(String nodeName, UUID queryId, long 
fragmentId, long exchangeId, int batchId, boolean last,
+    CompletableFuture<Void> sendBatch(String nodeName, ExecutionId 
executionId, long fragmentId, long exchangeId, int batchId, boolean last,
             List<BinaryTupleMessage> rows);
 
     /**
      * Asynchronously requests data from the specified node.
      *
      * @param nodeName The name of the node from which the data will be 
requested.
-     * @param queryId The ID of the query for which the data is being 
requested.
+     * @param executionId The ID of the execution for which the data is being 
requested.
      * @param fragmentId The ID of the fragment from which the data will be 
requested.
      * @param exchangeId The ID of the exchange through which the data will be 
requested.
      * @param amountOfBatches The number of batches of data to request.
@@ -62,18 +61,18 @@ public interface ExchangeService extends LifecycleAware {
      * @return A {@link CompletableFuture future} representing the result of 
operation,
      *      which completes when the request message has been sent.
      */
-    CompletableFuture<Void> request(String nodeName, UUID queryId, long 
fragmentId, long exchangeId, int amountOfBatches,
+    CompletableFuture<Void> request(String nodeName, ExecutionId executionId, 
long fragmentId, long exchangeId, int amountOfBatches,
             @Nullable SharedState state);
 
     /**
      * Asynchronously sends an error message to the specified node.
      *
      * @param nodeName The name of the node to which the error will be sent.
-     * @param queryId The ID of the query to which the error belongs.
+     * @param executionId The ID of the execution to which the error belongs.
      * @param fragmentId The ID of the fragment to which the error belongs.
      * @param error The error to send.
      * @return A {@link CompletableFuture future} representing the result of 
operation,
      *      which completes when the error message has been sent.
      */
-    CompletableFuture<Void> sendError(String nodeName, UUID queryId, long 
fragmentId, Throwable error);
+    CompletableFuture<Void> sendError(String nodeName, ExecutionId 
executionId, long fragmentId, Throwable error);
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
index c105e6f219..67f586fd4b 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
@@ -21,7 +21,6 @@ import static 
org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 import org.apache.ignite.internal.hlc.ClockService;
@@ -80,13 +79,14 @@ public class ExchangeServiceImpl implements ExchangeService 
{
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Void> sendBatch(String nodeName, UUID qryId, long 
fragmentId, long exchangeId, int batchId,
+    public CompletableFuture<Void> sendBatch(String nodeName, ExecutionId 
executionId, long fragmentId, long exchangeId, int batchId,
             boolean last, List<BinaryTupleMessage> rows) {
 
         return messageService.send(
                 nodeName,
                 FACTORY.queryBatchMessage()
-                        .queryId(qryId)
+                        .queryId(executionId.queryId())
+                        .executionToken(executionId.executionToken())
                         .fragmentId(fragmentId)
                         .exchangeId(exchangeId)
                         .batchId(batchId)
@@ -99,12 +99,13 @@ public class ExchangeServiceImpl implements ExchangeService 
{
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Void> request(String nodeName, UUID queryId, long 
fragmentId, long exchangeId, int amountOfBatches,
+    public CompletableFuture<Void> request(String nodeName, ExecutionId 
executionId, long fragmentId, long exchangeId, int amountOfBatches,
             @Nullable SharedState state) {
         return messageService.send(
                 nodeName,
                 FACTORY.queryBatchRequestMessage()
-                        .queryId(queryId)
+                        .queryId(executionId.queryId())
+                        .executionToken(executionId.executionToken())
                         .fragmentId(fragmentId)
                         .exchangeId(exchangeId)
                         .amountOfBatches(amountOfBatches)
@@ -115,23 +116,24 @@ public class ExchangeServiceImpl implements 
ExchangeService {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Void> sendError(String nodeName, UUID queryId, 
long fragmentId, Throwable error) {
+    public CompletableFuture<Void> sendError(String nodeName, ExecutionId 
executionId, long fragmentId, Throwable error) {
         Throwable traceableErr = ExceptionUtils.unwrapCause(error);
 
         if (!(traceableErr instanceof TraceableException)) {
             traceableErr = error = new IgniteInternalException(INTERNAL_ERR, 
error);
 
-            LOG.info(format("Failed to execute query fragment: traceId={}, 
queryId={}, fragmentId={}",
-                    ((TraceableException) traceableErr).traceId(), queryId, 
fragmentId), error);
+            LOG.info(format("Failed to execute query fragment: traceId={}, 
executionId={}, fragmentId={}",
+                    ((TraceableException) traceableErr).traceId(), 
executionId, fragmentId), error);
         } else if (LOG.isDebugEnabled()) {
-            LOG.debug(format("Failed to execute query fragment: traceId={}, 
queryId={}, fragmentId={}",
-                    ((TraceableException) traceableErr).traceId(), queryId, 
fragmentId), error);
+            LOG.debug(format("Failed to execute query fragment: traceId={}, 
executionId={}, fragmentId={}",
+                    ((TraceableException) traceableErr).traceId(), 
executionId, fragmentId), error);
         }
 
         return messageService.send(
                 nodeName,
                 FACTORY.errorMessage()
-                        .queryId(queryId)
+                        .queryId(executionId.queryId())
+                        .executionToken(executionId.executionToken())
                         .fragmentId(fragmentId)
                         .traceId(((TraceableException) traceableErr).traceId())
                         .code(((TraceableException) traceableErr).code())
@@ -141,7 +143,8 @@ public class ExchangeServiceImpl implements ExchangeService 
{
     }
 
     private void onMessage(String nodeName, QueryBatchRequestMessage msg) {
-        CompletableFuture<Outbox<?>> outboxFut = 
mailboxRegistry.outbox(msg.queryId(), msg.exchangeId());
+        ExecutionId executionId = new ExecutionId(msg.queryId(), 
msg.executionToken());
+        CompletableFuture<Outbox<?>> outboxFut = 
mailboxRegistry.outbox(executionId, msg.exchangeId());
 
         Consumer<Outbox<?>> onRequestHandler = outbox -> {
             try {
@@ -166,7 +169,8 @@ public class ExchangeServiceImpl implements ExchangeService 
{
     }
 
     private void onMessage(String nodeName, QueryBatchMessage msg) {
-        Inbox<?> inbox = mailboxRegistry.inbox(msg.queryId(), 
msg.exchangeId());
+        ExecutionId executionId = new ExecutionId(msg.queryId(), 
msg.executionToken());
+        Inbox<?> inbox = mailboxRegistry.inbox(executionId, msg.exchangeId());
 
         if (inbox != null) {
             try {
@@ -181,8 +185,8 @@ public class ExchangeServiceImpl implements ExchangeService 
{
                 LOG.warn("Unexpected exception", e);
             }
         } else if (LOG.isDebugEnabled()) {
-            LOG.debug("Stale batch message received: [nodeName={}, queryId={}, 
fragmentId={}, exchangeId={}, batchId={}]",
-                    nodeName, msg.queryId(), msg.fragmentId(), 
msg.exchangeId(), msg.batchId());
+            LOG.debug("Stale batch message received: [nodeName={}, 
executionId={}, fragmentId={}, exchangeId={}, batchId={}]",
+                    nodeName, executionId, msg.fragmentId(), msg.exchangeId(), 
msg.batchId());
         }
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
index 37a02c8950..21c656effe 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
@@ -68,7 +68,7 @@ public class ExecutionContext<RowT> implements DataContext {
 
     private final QueryTaskExecutor executor;
 
-    private final UUID qryId;
+    private final ExecutionId executionId;
 
     private final FragmentDescription description;
 
@@ -102,7 +102,7 @@ public class ExecutionContext<RowT> implements DataContext {
      * Constructor.
      *
      * @param executor Task executor.
-     * @param qryId Query ID.
+     * @param executionId Execution ID.
      * @param localNode Local node.
      * @param originatingNodeName Name of the node that initiated the query.
      * @param description Partitions information.
@@ -115,7 +115,7 @@ public class ExecutionContext<RowT> implements DataContext {
     @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
     public ExecutionContext(
             QueryTaskExecutor executor,
-            UUID qryId,
+            ExecutionId executionId,
             ClusterNode localNode,
             String originatingNodeName,
             FragmentDescription description,
@@ -126,7 +126,7 @@ public class ExecutionContext<RowT> implements DataContext {
             @Nullable QueryCancel cancel
     ) {
         this.executor = executor;
-        this.qryId = qryId;
+        this.executionId = executionId;
         this.description = description;
         this.handler = handler;
         this.params = params;
@@ -145,7 +145,7 @@ public class ExecutionContext<RowT> implements DataContext {
         startTs = 
nowUtc.plusSeconds(this.timeZoneId.getRules().getOffset(nowUtc).getTotalSeconds()).toEpochMilli();
 
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Context created [qryId={}, fragmentId={}]", qryId, 
fragmentId());
+            LOG.trace("Context created [executionId={}, fragmentId={}]", 
executionId, fragmentId());
         }
     }
 
@@ -153,7 +153,15 @@ public class ExecutionContext<RowT> implements DataContext 
{
      * Get query ID.
      */
     public UUID queryId() {
-        return qryId;
+        return executionId.queryId();
+    }
+
+    public int executionToken() {
+        return executionId.executionToken();
+    }
+
+    public ExecutionId executionId() {
+        return executionId;
     }
 
     /**
@@ -334,7 +342,7 @@ public class ExecutionContext<RowT> implements DataContext {
             return;
         }
 
-        executor.execute(qryId, fragmentId(), () -> {
+        executor.execute(queryId(), fragmentId(), () -> {
             try {
                 if (!isCancelled()) {
                     task.run();
@@ -362,7 +370,7 @@ public class ExecutionContext<RowT> implements DataContext {
     public CompletableFuture<?> submit(RunnableX task, Consumer<Throwable> 
onError) {
         assert !isCancelled() : "Call submit after execution was cancelled.";
 
-        return executor.submit(qryId, fragmentId(), () -> {
+        return executor.submit(queryId(), fragmentId(), () -> {
             try {
                 task.run();
             } catch (Throwable e) {
@@ -387,7 +395,7 @@ public class ExecutionContext<RowT> implements DataContext {
         boolean res = !cancelFlag.get() && cancelFlag.compareAndSet(false, 
true);
 
         if (res && LOG.isTraceEnabled()) {
-            LOG.trace("Context cancelled [qryId={}, fragmentId={}]", qryId, 
fragmentId());
+            LOG.trace("Context cancelled [executionId={}, fragmentId={}]", 
executionId, fragmentId());
         }
 
         return res;
@@ -440,12 +448,12 @@ public class ExecutionContext<RowT> implements 
DataContext {
 
         ExecutionContext<?> context = (ExecutionContext<?>) o;
 
-        return qryId.equals(context.qryId) && description.fragmentId() == 
context.description.fragmentId();
+        return executionId.equals(context.executionId) && 
description.fragmentId() == context.description.fragmentId();
     }
 
     /** {@inheritDoc} */
     @Override
     public int hashCode() {
-        return Objects.hash(qryId, description.fragmentId());
+        return Objects.hash(executionId, description.fragmentId());
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionId.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionId.java
new file mode 100644
index 0000000000..6502db1b6d
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionId.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import java.util.Objects;
+import java.util.UUID;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Unique identity of distributed plan execution.
+ * 
+ * <p>Includes a token to separate retries within a single query.
+ */
+public class ExecutionId {
+    private final UUID queryId;
+    private final int executionToken;
+
+    public ExecutionId(UUID queryId, int executionToken) {
+        this.queryId = queryId;
+        this.executionToken = executionToken;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ExecutionId that = (ExecutionId) o;
+        return executionToken == that.executionToken && 
Objects.equals(queryId, that.queryId);
+    }
+
+    public UUID queryId() {
+        return queryId;
+    }
+
+    public int executionToken() {
+        return executionToken;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(queryId, executionToken);
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 2eccc7f565..66c0ce6dc9 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -53,6 +53,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -129,24 +130,21 @@ import org.jetbrains.annotations.TestOnly;
  */
 public class ExecutionServiceImpl<RowT> implements ExecutionService, 
TopologyEventHandler {
     private static final int CACHE_SIZE = 1024;
-
-    private final ConcurrentMap<FragmentCacheKey, IgniteRel> physNodesCache = 
Caffeine.newBuilder()
-            .maximumSize(CACHE_SIZE)
-            .<FragmentCacheKey, IgniteRel>build()
-            .asMap();
-
     private static final IgniteLogger LOG = 
Loggers.forClass(ExecutionServiceImpl.class);
-
     private static final SqlQueryMessagesFactory FACTORY = new 
SqlQueryMessagesFactory();
-
     private static final List<InternalSqlRow> APPLIED_ANSWER = List.of(new 
InternalSqlRowSingleBoolean(true));
-
     private static final List<InternalSqlRow> NOT_APPLIED_ANSWER = List.of(new 
InternalSqlRowSingleBoolean(false));
-
     private static final FragmentDescription DUMMY_DESCRIPTION = new 
FragmentDescription(
             0, true, Long2ObjectMaps.emptyMap(), null, null, null
     );
 
+    private final ConcurrentMap<FragmentCacheKey, IgniteRel> physNodesCache = 
Caffeine.newBuilder()
+            .maximumSize(CACHE_SIZE)
+            .<FragmentCacheKey, IgniteRel>build()
+            .asMap();
+
+    private final AtomicInteger executionTokenGen = new AtomicInteger();
+
     private final MessageService messageService;
 
     private final TopologyService topSrvc;
@@ -169,7 +167,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
 
     private final ImplementorFactory<RowT> implementorFactory;
 
-    private final Map<UUID, DistributedQueryManager> queryManagerMap = new 
ConcurrentHashMap<>();
+    private final Map<ExecutionId, DistributedQueryManager> queryManagerMap = 
new ConcurrentHashMap<>();
 
     private final long shutdownTimeout;
 
@@ -296,9 +294,10 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
             SqlOperationContext operationContext,
             MultiStepPlan plan
     ) {
-        DistributedQueryManager queryManager = new 
DistributedQueryManager(localNode.name(), true, operationContext);
+        ExecutionId executionid = nextExecutionId(operationContext.queryId());
+        DistributedQueryManager queryManager = new 
DistributedQueryManager(executionid, localNode.name(), true, operationContext);
 
-        DistributedQueryManager old = 
queryManagerMap.put(operationContext.queryId(), queryManager);
+        DistributedQueryManager old = queryManagerMap.put(executionid, 
queryManager);
 
         assert old == null;
 
@@ -393,17 +392,6 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
         }
     }
 
-    /** Cancels the query with given id. */
-    public CompletableFuture<?> cancel(UUID qryId) {
-        var mgr = queryManagerMap.get(qryId);
-
-        if (mgr == null) {
-            return nullCompletedFuture();
-        }
-
-        return mgr.close(QueryCompletionReason.CANCEL);
-    }
-
     private AsyncDataCursor<InternalSqlRow> executeExecutablePlan(
             SqlOperationContext operationContext,
             ExecutablePlan plan
@@ -411,9 +399,10 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
         QueryCancel queryCancel = operationContext.cancel();
         assert queryCancel != null;
 
+        ExecutionId executionId = nextExecutionId(operationContext.queryId());
         ExecutionContext<RowT> ectx = new ExecutionContext<>(
                 taskExecutor,
-                operationContext.queryId(),
+                executionId,
                 localNode,
                 localNode.name(),
                 DUMMY_DESCRIPTION,
@@ -527,7 +516,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
     private void onMessage(String nodeName, QueryStartResponse msg) {
         assert nodeName != null && msg != null;
 
-        DistributedQueryManager dqm = queryManagerMap.get(msg.queryId());
+        DistributedQueryManager dqm = queryManagerMap.get(new 
ExecutionId(msg.queryId(), msg.executionToken()));
 
         if (dqm != null) {
             dqm.acknowledgeFragment(nodeName, msg.fragmentId(), msg.error());
@@ -537,7 +526,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
     private void onMessage(String nodeName, ErrorMessage msg) {
         assert nodeName != null && msg != null;
 
-        DistributedQueryManager dqm = queryManagerMap.get(msg.queryId());
+        DistributedQueryManager dqm = queryManagerMap.get(new 
ExecutionId(msg.queryId(), msg.executionToken()));
 
         if (dqm != null) {
             RemoteFragmentExecutionException e = new 
RemoteFragmentExecutionException(
@@ -560,7 +549,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
     private void onMessage(String nodeName, QueryCloseMessage msg) {
         assert nodeName != null && msg != null;
 
-        DistributedQueryManager dqm = queryManagerMap.get(msg.queryId());
+        DistributedQueryManager dqm = queryManagerMap.get(new 
ExecutionId(msg.queryId(), msg.executionToken()));
 
         if (dqm != null) {
             dqm.close(QueryCompletionReason.CANCEL);
@@ -599,14 +588,12 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
     }
 
     /** Returns local fragments for the query with given id. */
+    @TestOnly
     public List<AbstractNode<?>> localFragments(UUID queryId) {
-        DistributedQueryManager mgr = queryManagerMap.get(queryId);
-
-        if (mgr == null) {
-            return List.of();
-        }
-
-        return mgr.localFragments();
+        return queryManagerMap.entrySet().stream()
+                .filter(e -> e.getKey().queryId().equals(queryId))
+                .flatMap(e -> e.getValue().localFragments().stream())
+                .collect(Collectors.toList());
     }
 
     private void submitFragment(String nodeName, QueryStartRequest msg) {
@@ -622,12 +609,12 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
     }
 
     private DistributedQueryManager getOrCreateQueryManager(String 
coordinatorNodeName, QueryStartRequest msg) {
-        return queryManagerMap.computeIfAbsent(msg.queryId(), key -> {
+        return queryManagerMap.computeIfAbsent(new ExecutionId(msg.queryId(), 
msg.executionToken()), key -> {
             SqlOperationContext operationContext = createOperationContext(
-                    key, ZoneId.of(msg.timeZoneId()), msg.parameters(), 
msg.operationTime()
+                    key.queryId(), ZoneId.of(msg.timeZoneId()), 
msg.parameters(), msg.operationTime()
             );
 
-            return new DistributedQueryManager(coordinatorNodeName, 
operationContext);
+            return new DistributedQueryManager(key, coordinatorNodeName, 
operationContext);
         });
     }
 
@@ -657,12 +644,12 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
     String dumpDebugInfo() {
         IgniteStringBuilder buf = new IgniteStringBuilder();
 
-        for (Map.Entry<UUID, DistributedQueryManager> entry : 
queryManagerMap.entrySet()) {
-            UUID queryId = entry.getKey();
+        for (Map.Entry<ExecutionId, DistributedQueryManager> entry : 
queryManagerMap.entrySet()) {
+            ExecutionId executionId = entry.getKey();
             DistributedQueryManager mgr = entry.getValue();
 
             buf.nl();
-            buf.app("Debug info for query: ").app(queryId)
+            buf.app("Debug info for query: ").app(executionId)
                     .app(" (canceled=").app(mgr.cancelled.get()).app(", 
stopped=").app(mgr.cancelFut.isDone()).app(")");
             buf.nl();
 
@@ -760,6 +747,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
      * A convenient class that manages the initialization and termination of 
distributed queries.
      */
     private class DistributedQueryManager {
+        private final ExecutionId executionId;
         private final boolean coordinator;
 
         private final String coordinatorNodeName;
@@ -784,10 +772,12 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
         private volatile Long rootFragmentId = null;
 
         private DistributedQueryManager(
+                ExecutionId executionId,
                 String coordinatorNodeName,
                 boolean coordinator,
                 SqlOperationContext ctx
         ) {
+            this.executionId = executionId;
             this.ctx = ctx;
             this.coordinator = coordinator;
             this.coordinatorNodeName = coordinatorNodeName;
@@ -815,8 +805,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
             }
         }
 
-        private DistributedQueryManager(String coordinatorNodeName, 
SqlOperationContext ctx) {
-            this(coordinatorNodeName, false, ctx);
+        private DistributedQueryManager(ExecutionId executionId, String 
coordinatorNodeName, SqlOperationContext ctx) {
+            this(executionId, coordinatorNodeName, false, ctx);
         }
 
         private List<AbstractNode<?>> localFragments() {
@@ -827,7 +817,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                 String targetNodeName, String serialisedFragment, 
FragmentDescription desc, TxAttributes txAttributes, int catalogVersion
         ) {
             QueryStartRequest request = FACTORY.queryStartRequest()
-                    .queryId(ctx.queryId())
+                    .queryId(executionId.queryId())
+                    .executionToken(executionId.executionToken())
                     .fragmentId(desc.fragmentId())
                     .root(serialisedFragment)
                     .fragmentDescription(desc)
@@ -903,6 +894,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                     origNodeName,
                     FACTORY.queryStartResponse()
                             .queryId(ectx.queryId())
+                            .executionToken(ectx.executionToken())
                             .fragmentId(ectx.fragmentId())
                             .build()
             );
@@ -917,7 +909,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
         private ExecutionContext<RowT> createContext(String initiatorNodeName, 
FragmentDescription desc, TxAttributes txAttributes) {
             return new ExecutionContext<>(
                     taskExecutor,
-                    ctx.queryId(),
+                    executionId,
                     localNode,
                     initiatorNodeName,
                     desc,
@@ -963,7 +955,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                 messageService.send(
                         initiatorNode,
                         FACTORY.queryStartResponse()
-                                .queryId(ctx.queryId())
+                                .queryId(executionId.queryId())
+                                .executionToken(executionId.executionToken())
                                 .fragmentId(fragmentId)
                                 .error(ex)
                                 .build()
@@ -1179,7 +1172,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                         .thenCompose(ignored -> 
awaitFragmentInitialisationAndClose());
             } else {
                 stage = start.thenCompose(ignored -> 
messageService.send(coordinatorNodeName, FACTORY.queryCloseMessage()
-                                .queryId(ctx.queryId())
+                                .queryId(executionId.queryId())
+                                .executionToken(executionId.executionToken())
                                 .build()))
                         .thenCompose(ignored -> closeLocalFragments());
             }
@@ -1191,7 +1185,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                     LOG.warn("Fragment closing processed with errors: 
[queryId={}]", ex, ctx.queryId());
                 }
 
-                queryManagerMap.remove(ctx.queryId());
+                queryManagerMap.remove(executionId);
 
                 cancelFut.complete(null);
             }).thenRun(() -> localFragments.forEach(f -> 
f.context().cancel()));
@@ -1248,7 +1242,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                                     return messageService.send(
                                             nodeId,
                                             FACTORY.queryCloseMessage()
-                                                    .queryId(ctx.queryId())
+                                                    
.queryId(executionId.queryId())
+                                                    
.executionToken(executionId.executionToken())
                                                     .build()
                                     );
                                 })
@@ -1339,6 +1334,10 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
         return firstFoundError;
     }
 
+    private ExecutionId nextExecutionId(UUID queryId) {
+        return new ExecutionId(queryId, executionTokenGen.getAndIncrement());
+    }
+
     /**
      * A factory of the relational node implementors.
      *
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistry.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistry.java
index 75676d435f..28f210cce3 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistry.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistry.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.sql.engine.exec;
 
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
 import org.apache.ignite.internal.sql.engine.exec.rel.Outbox;
@@ -58,18 +57,18 @@ public interface MailboxRegistry extends LifecycleAware {
     /**
      * Returns a registered outbox by provided query ID, exchange ID pair.
      *
-     * @param qryId      Query ID.
+     * @param executionId Execution ID.
      * @param exchangeId Exchange ID.
      * @return Registered outbox. May be {@code null} if execution was 
cancelled.
      */
-    CompletableFuture<Outbox<?>> outbox(UUID qryId, long exchangeId);
+    CompletableFuture<Outbox<?>> outbox(ExecutionId executionId, long 
exchangeId);
 
     /**
      * Returns a registered inbox by provided query ID, exchange ID pair.
      *
-     * @param qryId      Query ID.
+     * @param executionId Execution ID.
      * @param exchangeId Exchange ID.
      * @return Registered inbox. May be {@code null} if execution was 
cancelled.
      */
-    Inbox<?> inbox(UUID qryId, long exchangeId);
+    Inbox<?> inbox(ExecutionId executionId, long exchangeId);
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
index 1f8dde0de6..b4d5bf3389 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.sql.engine.exec;
 
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -58,10 +57,10 @@ public class MailboxRegistryImpl implements 
MailboxRegistry, TopologyEventHandle
     /** {@inheritDoc} */
     @Override
     public void register(Inbox<?> inbox) {
-        Inbox<?> res = remotes.putIfAbsent(new MailboxKey(inbox.queryId(), 
inbox.exchangeId()), inbox);
+        Inbox<?> res = remotes.putIfAbsent(new MailboxKey(inbox.executionId(), 
inbox.exchangeId()), inbox);
 
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Inbox registered [qryId={}, fragmentId={}]", 
inbox.queryId(), inbox.fragmentId());
+            LOG.trace("Inbox registered [executionId={}, fragmentId={}]", 
inbox.executionId(), inbox.fragmentId());
         }
 
         assert res == null : res;
@@ -70,7 +69,7 @@ public class MailboxRegistryImpl implements MailboxRegistry, 
TopologyEventHandle
     /** {@inheritDoc} */
     @Override
     public void register(Outbox<?> outbox) {
-        CompletableFuture<Outbox<?>> res = locals.computeIfAbsent(new 
MailboxKey(outbox.queryId(), outbox.exchangeId()),
+        CompletableFuture<Outbox<?>> res = locals.computeIfAbsent(new 
MailboxKey(outbox.executionId(), outbox.exchangeId()),
                 k -> new CompletableFuture<>());
 
         assert !res.isDone();
@@ -78,42 +77,42 @@ public class MailboxRegistryImpl implements 
MailboxRegistry, TopologyEventHandle
         res.complete(outbox);
 
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Outbox registered [qryId={}, fragmentId={}]", 
outbox.queryId(), outbox.fragmentId());
+            LOG.trace("Outbox registered [executionId={}, fragmentId={}]", 
outbox.executionId(), outbox.fragmentId());
         }
     }
 
     /** {@inheritDoc} */
     @Override
     public void unregister(Inbox<?> inbox) {
-        boolean removed = remotes.remove(new MailboxKey(inbox.queryId(), 
inbox.exchangeId()), inbox);
+        boolean removed = remotes.remove(new MailboxKey(inbox.executionId(), 
inbox.exchangeId()), inbox);
 
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Inbox {} unregistered [qryId={}, fragmentId={}]", 
removed ? "was" : "wasn't",
-                    inbox.queryId(), inbox.fragmentId());
+            LOG.trace("Inbox {} unregistered [executionId={}, fragmentId={}]", 
removed ? "was" : "wasn't",
+                    inbox.executionId(), inbox.fragmentId());
         }
     }
 
     /** {@inheritDoc} */
     @Override
     public void unregister(Outbox<?> outbox) {
-        boolean removed = locals.remove(new MailboxKey(outbox.queryId(), 
outbox.exchangeId())) != null;
+        boolean removed = locals.remove(new MailboxKey(outbox.executionId(), 
outbox.exchangeId())) != null;
 
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Outbox {} unregistered [qryId={}, fragmentId={}]", 
removed ? "was" : "wasn't",
-                    outbox.queryId(), outbox.fragmentId());
+            LOG.trace("Outbox {} unregistered [executionId={}, 
fragmentId={}]", removed ? "was" : "wasn't",
+                    outbox.executionId(), outbox.fragmentId());
         }
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Outbox<?>> outbox(UUID qryId, long exchangeId) {
-        return locals.computeIfAbsent(new MailboxKey(qryId, exchangeId), k -> 
new CompletableFuture<>());
+    public CompletableFuture<Outbox<?>> outbox(ExecutionId executionId, long 
exchangeId) {
+        return locals.computeIfAbsent(new MailboxKey(executionId, exchangeId), 
k -> new CompletableFuture<>());
     }
 
     /** {@inheritDoc} */
     @Override
-    public Inbox<?> inbox(UUID qryId, long exchangeId) {
-        return remotes.get(new MailboxKey(qryId, exchangeId));
+    public Inbox<?> inbox(ExecutionId executionId, long exchangeId) {
+        return remotes.get(new MailboxKey(executionId, exchangeId));
     }
 
     /** {@inheritDoc} */
@@ -143,12 +142,12 @@ public class MailboxRegistryImpl implements 
MailboxRegistry, TopologyEventHandle
     }
 
     private static class MailboxKey {
-        private final UUID qryId;
+        private final ExecutionId executionId;
 
         private final long exchangeId;
 
-        private MailboxKey(UUID qryId, long exchangeId) {
-            this.qryId = qryId;
+        private MailboxKey(ExecutionId executionId, long exchangeId) {
+            this.executionId = executionId;
             this.exchangeId = exchangeId;
         }
 
@@ -167,13 +166,13 @@ public class MailboxRegistryImpl implements 
MailboxRegistry, TopologyEventHandle
             if (exchangeId != that.exchangeId) {
                 return false;
             }
-            return qryId.equals(that.qryId);
+            return executionId.equals(that.executionId);
         }
 
         /** {@inheritDoc} */
         @Override
         public int hashCode() {
-            int res = qryId.hashCode();
+            int res = executionId.hashCode();
             res = 31 * res + (int) (exchangeId ^ (exchangeId >>> 32));
             return res;
         }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
index a0397a1d87..ce5cb06af6 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
@@ -152,8 +152,8 @@ public abstract class AbstractNode<RowT> implements 
Node<RowT> {
             thread = Thread.currentThread();
         } else {
             assert thread == Thread.currentThread() : format("expThread={}, 
actThread={}, "
-                            + "qryId={}, fragmentId={}", thread.getName(), 
Thread.currentThread().getName(),
-                    context().queryId(), context().fragmentId());
+                            + "executionId={}, fragmentId={}", 
thread.getName(), Thread.currentThread().getName(),
+                    context().executionId(), context().fragmentId());
         }
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
index f9f75c6ccc..2c1d8548eb 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
@@ -346,7 +346,7 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
     }
 
     private void requestBatches(String nodeName, int cnt, @Nullable 
SharedState state) {
-        exchange.request(nodeName, queryId(), srcFragmentId, exchangeId, cnt, 
state)
+        exchange.request(nodeName, executionId(), srcFragmentId, exchangeId, 
cnt, state)
                 .whenComplete((ignored, ex) -> {
                     if (ex != null) {
                         IgniteInternalException wrapperEx = 
ExceptionUtils.withCause(
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Mailbox.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Mailbox.java
index 513df84e08..eea7131866 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Mailbox.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Mailbox.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.sql.engine.exec.rel;
 
-import java.util.UUID;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionId;
 
 /**
  * Mailbox interface.
@@ -25,10 +25,10 @@ import java.util.UUID;
  */
 public interface Mailbox<T> extends Node<T> {
     /**
-     * Get query ID.
+     * Get execution ID.
      */
-    default UUID queryId() {
-        return context().queryId();
+    default ExecutionId executionId() {
+        return context().executionId();
     }
 
     /**
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
index 59b3055619..34a51fb63d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
@@ -26,7 +26,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
-import java.util.UUID;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -36,6 +35,7 @@ import 
org.apache.ignite.internal.partition.replicator.network.replication.Binar
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionId;
 import org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.SharedState;
@@ -252,7 +252,7 @@ public class Outbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, S
             );
         }
 
-        exchange.sendBatch(nodeName, queryId(), targetFragmentId, exchangeId, 
batchId, last, rows0)
+        exchange.sendBatch(nodeName, executionId(), targetFragmentId, 
exchangeId, batchId, last, rows0)
                 .whenComplete((ignored, ex) -> {
                     if (ex == null) {
                         return;
@@ -271,10 +271,10 @@ public class Outbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, S
 
     private void sendError(Throwable original) {
         String nodeName = context().originatingNodeName();
-        UUID queryId = queryId();
+        ExecutionId executionId = executionId();
         long fragmentId = fragmentId();
 
-        exchange.sendError(nodeName, queryId, fragmentId, original)
+        exchange.sendError(nodeName, executionId, fragmentId, original)
                 .whenComplete((ignored, ex) -> {
                     if (ex == null) {
                         return;
@@ -289,8 +289,8 @@ public class Outbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, S
 
                     wrapperEx.addSuppressed(original);
 
-                    LOG.warn("Unable to send error to a remote node 
[queryId={}, fragmentId={}, targetNode={}]",
-                            queryId, fragmentId, nodeName, wrapperEx);
+                    LOG.warn("Unable to send error to a remote node 
[executionId={}, fragmentId={}, targetNode={}]",
+                            executionId, fragmentId, nodeName, wrapperEx);
                 });
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/ErrorMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/ErrorMessage.java
index 688c7ee871..4b11d202e0 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/ErrorMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/ErrorMessage.java
@@ -34,6 +34,11 @@ public interface ErrorMessage extends NetworkMessage, 
Serializable {
      */
     UUID queryId();
 
+    /**
+     * Get execution token.
+     */
+    int executionToken();
+
     /**
      * Get fragment ID.
      */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/ExecutionContextAwareMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/ExecutionContextAwareMessage.java
index 2bedead151..055eb22589 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/ExecutionContextAwareMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/ExecutionContextAwareMessage.java
@@ -30,6 +30,11 @@ public interface ExecutionContextAwareMessage extends 
NetworkMessage, Serializab
      */
     UUID queryId();
 
+    /**
+     * Get execution token.
+     */
+    int executionToken();
+
     /**
      * Get fragment ID.
      */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryCloseMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryCloseMessage.java
index e5c4e1bea6..502ae71d5a 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryCloseMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryCloseMessage.java
@@ -32,4 +32,9 @@ public interface QueryCloseMessage extends NetworkMessage, 
Serializable {
      * Get query ID.
      */
     UUID queryId();
+
+    /**
+     * Get execution token.
+     */
+    int executionToken();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartResponse.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartResponse.java
index ade2e65a6b..a86cf5924c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartResponse.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartResponse.java
@@ -35,6 +35,11 @@ public interface QueryStartResponse extends NetworkMessage, 
Serializable {
      */
     UUID queryId();
 
+    /**
+     * Get execution token.
+     */
+    int executionToken();
+
     /**
      * Get fragment ID.
      */
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 7728fa6855..43dff116a5 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -29,14 +29,15 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -58,6 +59,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ForkJoinPool;
@@ -114,6 +116,7 @@ import 
org.apache.ignite.internal.sql.engine.framework.TestBuilders;
 import 
org.apache.ignite.internal.sql.engine.message.ExecutionContextAwareMessage;
 import org.apache.ignite.internal.sql.engine.message.MessageListener;
 import org.apache.ignite.internal.sql.engine.message.MessageService;
+import org.apache.ignite.internal.sql.engine.message.QueryBatchRequestMessage;
 import org.apache.ignite.internal.sql.engine.message.QueryStartRequest;
 import org.apache.ignite.internal.sql.engine.message.QueryStartResponseImpl;
 import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
@@ -310,7 +313,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
      * The very simple case where a query is cancelled in the middle of a 
normal execution.
      */
     @Test
-    public void testCancelOnInitiator() throws InterruptedException {
+    public void testCancel() throws InterruptedException {
         ExecutionServiceImpl<?> execService = executionServices.get(0);
         SqlOperationContext ctx = createContext();
         QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
@@ -328,7 +331,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
 
         CompletionStage<?> batchFut = cursor.requestNextAsync(1);
 
-        await(execService.cancel(ctx.queryId()));
+        ctx.cancel().cancel();
 
         assertTrue(waitForCondition(
                 () -> executionServices.stream().map(es -> 
es.localFragments(ctx.queryId()).size())
@@ -462,48 +465,6 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
         awaitContextCancellation(execNodes);
     }
 
-    /**
-     * The very simple case where a query is cancelled in the middle of a 
normal execution on non-initiator node.
-     */
-    @Test
-    public void testCancelOnRemote() throws InterruptedException {
-        ExecutionService execService = executionServices.get(0);
-        SqlOperationContext ctx = createContext();
-        QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
-
-        nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
-
-        AsyncCursor<InternalSqlRow> cursor = 
await(execService.executePlan(plan, ctx));
-
-        assertTrue(waitForCondition(
-                () -> executionServices.stream().map(es -> 
es.localFragments(ctx.queryId()).size())
-                        .mapToInt(i -> i).sum() == 4, TIMEOUT_IN_MS));
-
-        List<AbstractNode<?>> execNodes = executionServices.stream()
-                .flatMap(s -> 
s.localFragments(ctx.queryId()).stream()).collect(Collectors.toList());
-
-        var batchFut = cursor.requestNextAsync(1);
-
-        await(executionServices.get(1).cancel(ctx.queryId()));
-
-        assertTrue(waitForCondition(
-                () -> executionServices.stream().map(es -> 
es.localFragments(ctx.queryId()).size())
-                        .mapToInt(i -> i).sum() == 0, TIMEOUT_IN_MS));
-
-        awaitContextCancellation(execNodes);
-
-        await(batchFut.exceptionally(ex -> {
-            assertInstanceOf(CompletionException.class, ex);
-            assertInstanceOf(SqlException.class, ex.getCause());
-            assertInstanceOf(QueryCancelledException.class, 
ex.getCause().getCause());
-            assertNull(ex.getCause().getCause().getCause());
-
-            return null;
-        }));
-        assertTrue(batchFut.toCompletableFuture().isCompletedExceptionally());
-    }
-
-
     /**
      * Emulate exception during initialization of context. Cursor shouldn't 
hung.
      */
@@ -868,7 +829,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
                 + nl
                 + "  Local fragments:" + nl
                 + "    id=0, state=opened, canceled=false, class=Inbox  
(root)" + nl
-                + "    id=1, state=opened, canceled=false, class=Outbox" + nl, 
ctx.queryId());
+                + "    id=1, state=opened, canceled=false, class=Outbox" + nl, 
new ExecutionId(ctx.queryId(), 0));
 
         assertThat(debugInfoCoordinator, equalTo(expectedOnCoordinator));
 
@@ -877,7 +838,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
                 + "  Coordinator node: node_1" + nl
                 + nl
                 + "  Local fragments:" + nl
-                + "    id=1, state=opened, canceled=false, class=Outbox" + nl, 
ctx.queryId());
+                + "    id=1, state=opened, canceled=false, class=Outbox" + nl, 
new ExecutionId(ctx.queryId(), 0));
 
         assertThat(debugInfo2, equalTo(expectedOnNonCoordinator));
         assertThat(debugInfo3, equalTo(expectedOnNonCoordinator));
@@ -1081,6 +1042,72 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
         }
     }
 
+    @Test
+    void executionsWithTheSameQueryIdMustNotInterfere() {
+        QueryPlan plan = prepare("SELECT * FROM test_tbl", createContext());
+
+        String expectedExceptionMessage = "This is expected";
+
+        TestNode corruptedNode = testCluster.node(nodeNames.get(2));
+        corruptedNode.interceptor((nodeName, msg, original) -> {
+            if (msg instanceof QueryBatchRequestMessage) {
+                corruptedNode.messageService().send(nodeName, new 
SqlQueryMessagesFactory().errorMessage()
+                        .queryId(((QueryBatchRequestMessage) msg).queryId())
+                        .executionToken(((QueryBatchRequestMessage) 
msg).executionToken())
+                        .fragmentId(((QueryBatchRequestMessage) 
msg).fragmentId())
+                        .message(expectedExceptionMessage)
+                        .traceId(((QueryBatchRequestMessage) msg).queryId())
+                        .code(Common.INTERNAL_ERR)
+                        .build()
+                );
+            } else {
+                original.onMessage(nodeName, msg);
+            }
+
+            return nullCompletedFuture();
+        });
+
+        SqlOperationContext ctx = createContext();
+
+        Queue<Throwable> exceptions = new ConcurrentLinkedQueue<>();
+        BiFunction<AsyncDataCursor<InternalSqlRow>, Integer, 
CompletableFuture<Void>> retryChainBuilder = new BiFunction<>() {
+            @Override
+            public CompletableFuture<Void> apply(
+                    @Nullable AsyncDataCursor<InternalSqlRow> cursor, Integer 
remainingAttempts
+            ) {
+                CompletableFuture<Void> previousStep;
+                if (cursor == null) {
+                    previousStep = nullCompletedFuture();
+                } else {
+                    previousStep = cursor.onFirstPageReady()
+                            .thenCompose(none -> cursor.onClose())
+                            .exceptionally(ex -> {
+                                exceptions.add(ex);
+
+                                return null;
+                            });
+                }
+
+                if (remainingAttempts > 0) {
+                    return previousStep
+                            .thenCompose(ignored -> 
executionServices.get(0).executePlan(plan, ctx))
+                            .thenCompose(c -> this.apply(c, remainingAttempts 
- 1));
+                }
+
+                return previousStep;
+            }
+        };
+
+        int retryCount = 20;
+        await(retryChainBuilder.apply(null, retryCount));
+
+        assertThat(exceptions, hasSize(retryCount));
+
+        for (Throwable th : exceptions) {
+            assertThat(th.getMessage(), 
containsString(expectedExceptionMessage));
+        }
+    }
+
     /** Creates an execution service instance for the node with given 
consistent id. */
     public ExecutionServiceImpl<Object[]> create(String nodeName, CacheFactory 
mappingCacheFactory, QueryTaskExecutor taskExecutor) {
         if (!nodeNames.contains(nodeName)) {
@@ -1478,13 +1505,13 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
         }
 
         @Override
-        public CompletableFuture<Outbox<?>> outbox(UUID qryId, long 
exchangeId) {
-            return delegate.outbox(qryId, exchangeId);
+        public CompletableFuture<Outbox<?>> outbox(ExecutionId executionId, 
long exchangeId) {
+            return delegate.outbox(executionId, exchangeId);
         }
 
         @Override
-        public Inbox<?> inbox(UUID qryId, long exchangeId) {
-            return delegate.inbox(qryId, exchangeId);
+        public Inbox<?> inbox(ExecutionId executionId, long exchangeId) {
+            return delegate.inbox(executionId, exchangeId);
         }
     }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
index c9fc247cd4..9eb069e790 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
@@ -115,7 +115,7 @@ public class RuntimeSortedIndexTest extends 
IgniteAbstractTest {
         RuntimeSortedIndex<Object[]> idx = new RuntimeSortedIndex<>(
                 new ExecutionContext<>(
                         null,
-                        randomUUID(),
+                        new ExecutionId(randomUUID(), 0),
                         new ClusterNodeImpl(randomUUID(), "fake-test-node", 
NetworkAddress.from("127.0.0.1:1111")),
                         "fake-test-node",
                         null,
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index 5891155444..875544ab6f 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTupleSchema;
 import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionId;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.TxAttributes;
@@ -116,7 +117,7 @@ public abstract class AbstractExecutionTest<T> extends 
IgniteAbstractTest {
 
         return new ExecutionContext<>(
                 taskExecutor,
-                randomUUID(),
+                new ExecutionId(randomUUID(), 0),
                 new ClusterNodeImpl(randomUUID(), "fake-test-node", 
NetworkAddress.from("127.0.0.1:1111")),
                 "fake-test-node",
                 fragmentDesc,
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index dbfa752793..beb3896d97 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -99,6 +99,7 @@ import 
org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
 import org.apache.ignite.internal.sql.engine.exec.ExecutableTable;
 import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionId;
 import 
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
@@ -589,7 +590,7 @@ public class TestBuilders {
         public ExecutionContext<Object[]> build() {
             return new ExecutionContext<>(
                     Objects.requireNonNull(executor, "executor"),
-                    queryId,
+                    new ExecutionId(queryId, 0),
                     Objects.requireNonNull(node, "node"),
                     node.name(),
                     description,

Reply via email to