This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d6750bab57 IGNITE-23831 Sql. ExecutionService must create unique
executionId for retries for same queryId (#4827)
d6750bab57 is described below
commit d6750bab57532248308b029572f3221b0c3d60ee
Author: korlov42 <[email protected]>
AuthorDate: Thu Dec 5 15:15:44 2024 +0200
IGNITE-23831 Sql. ExecutionService must create unique executionId for
retries for same queryId (#4827)
---
.../internal/sql/engine/exec/ExchangeService.java | 13 +--
.../sql/engine/exec/ExchangeServiceImpl.java | 34 +++---
.../internal/sql/engine/exec/ExecutionContext.java | 30 +++--
.../internal/sql/engine/exec/ExecutionId.java | 67 +++++++++++
.../sql/engine/exec/ExecutionServiceImpl.java | 97 ++++++++--------
.../internal/sql/engine/exec/MailboxRegistry.java | 9 +-
.../sql/engine/exec/MailboxRegistryImpl.java | 39 +++----
.../internal/sql/engine/exec/rel/AbstractNode.java | 4 +-
.../ignite/internal/sql/engine/exec/rel/Inbox.java | 2 +-
.../internal/sql/engine/exec/rel/Mailbox.java | 8 +-
.../internal/sql/engine/exec/rel/Outbox.java | 12 +-
.../internal/sql/engine/message/ErrorMessage.java | 5 +
.../message/ExecutionContextAwareMessage.java | 5 +
.../sql/engine/message/QueryCloseMessage.java | 5 +
.../sql/engine/message/QueryStartResponse.java | 5 +
.../sql/engine/exec/ExecutionServiceImplTest.java | 129 +++++++++++++--------
.../sql/engine/exec/RuntimeSortedIndexTest.java | 2 +-
.../sql/engine/exec/rel/AbstractExecutionTest.java | 3 +-
.../sql/engine/framework/TestBuilders.java | 3 +-
19 files changed, 298 insertions(+), 174 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
index e7eb05e33d..6846280428 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.sql.engine.exec;
import java.util.List;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryTupleMessage;
import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
@@ -38,7 +37,7 @@ public interface ExchangeService extends LifecycleAware {
* Asynchronously sends a batch of data to the specified node.
*
* @param nodeName The name of the node to which the data will be sent.
- * @param queryId The ID of the query to which the data belongs.
+ * @param executionId The ID of the execution to which the data belongs.
* @param fragmentId The ID of the fragment to which the data will be sent.
* @param exchangeId The ID of the exchange through which the data will be
sent.
* @param batchId The ID of the batch to which the data belongs.
@@ -47,14 +46,14 @@ public interface ExchangeService extends LifecycleAware {
* @return A {@link CompletableFuture future} representing the result of
operation,
* which completes when the data has been sent.
*/
- CompletableFuture<Void> sendBatch(String nodeName, UUID queryId, long
fragmentId, long exchangeId, int batchId, boolean last,
+ CompletableFuture<Void> sendBatch(String nodeName, ExecutionId
executionId, long fragmentId, long exchangeId, int batchId, boolean last,
List<BinaryTupleMessage> rows);
/**
* Asynchronously requests data from the specified node.
*
* @param nodeName The name of the node from which the data will be
requested.
- * @param queryId The ID of the query for which the data is being
requested.
+ * @param executionId The ID of the execution for which the data is being
requested.
* @param fragmentId The ID of the fragment from which the data will be
requested.
* @param exchangeId The ID of the exchange through which the data will be
requested.
* @param amountOfBatches The number of batches of data to request.
@@ -62,18 +61,18 @@ public interface ExchangeService extends LifecycleAware {
* @return A {@link CompletableFuture future} representing the result of
operation,
* which completes when the request message has been sent.
*/
- CompletableFuture<Void> request(String nodeName, UUID queryId, long
fragmentId, long exchangeId, int amountOfBatches,
+ CompletableFuture<Void> request(String nodeName, ExecutionId executionId,
long fragmentId, long exchangeId, int amountOfBatches,
@Nullable SharedState state);
/**
* Asynchronously sends an error message to the specified node.
*
* @param nodeName The name of the node to which the error will be sent.
- * @param queryId The ID of the query to which the error belongs.
+ * @param executionId The ID of the execution to which the error belongs.
* @param fragmentId The ID of the fragment to which the error belongs.
* @param error The error to send.
* @return A {@link CompletableFuture future} representing the result of
operation,
* which completes when the error message has been sent.
*/
- CompletableFuture<Void> sendError(String nodeName, UUID queryId, long
fragmentId, Throwable error);
+ CompletableFuture<Void> sendError(String nodeName, ExecutionId
executionId, long fragmentId, Throwable error);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
index c105e6f219..67f586fd4b 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
@@ -21,7 +21,6 @@ import static
org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import java.util.List;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.ignite.internal.hlc.ClockService;
@@ -80,13 +79,14 @@ public class ExchangeServiceImpl implements ExchangeService
{
/** {@inheritDoc} */
@Override
- public CompletableFuture<Void> sendBatch(String nodeName, UUID qryId, long
fragmentId, long exchangeId, int batchId,
+ public CompletableFuture<Void> sendBatch(String nodeName, ExecutionId
executionId, long fragmentId, long exchangeId, int batchId,
boolean last, List<BinaryTupleMessage> rows) {
return messageService.send(
nodeName,
FACTORY.queryBatchMessage()
- .queryId(qryId)
+ .queryId(executionId.queryId())
+ .executionToken(executionId.executionToken())
.fragmentId(fragmentId)
.exchangeId(exchangeId)
.batchId(batchId)
@@ -99,12 +99,13 @@ public class ExchangeServiceImpl implements ExchangeService
{
/** {@inheritDoc} */
@Override
- public CompletableFuture<Void> request(String nodeName, UUID queryId, long
fragmentId, long exchangeId, int amountOfBatches,
+ public CompletableFuture<Void> request(String nodeName, ExecutionId
executionId, long fragmentId, long exchangeId, int amountOfBatches,
@Nullable SharedState state) {
return messageService.send(
nodeName,
FACTORY.queryBatchRequestMessage()
- .queryId(queryId)
+ .queryId(executionId.queryId())
+ .executionToken(executionId.executionToken())
.fragmentId(fragmentId)
.exchangeId(exchangeId)
.amountOfBatches(amountOfBatches)
@@ -115,23 +116,24 @@ public class ExchangeServiceImpl implements
ExchangeService {
/** {@inheritDoc} */
@Override
- public CompletableFuture<Void> sendError(String nodeName, UUID queryId,
long fragmentId, Throwable error) {
+ public CompletableFuture<Void> sendError(String nodeName, ExecutionId
executionId, long fragmentId, Throwable error) {
Throwable traceableErr = ExceptionUtils.unwrapCause(error);
if (!(traceableErr instanceof TraceableException)) {
traceableErr = error = new IgniteInternalException(INTERNAL_ERR,
error);
- LOG.info(format("Failed to execute query fragment: traceId={},
queryId={}, fragmentId={}",
- ((TraceableException) traceableErr).traceId(), queryId,
fragmentId), error);
+ LOG.info(format("Failed to execute query fragment: traceId={},
executionId={}, fragmentId={}",
+ ((TraceableException) traceableErr).traceId(),
executionId, fragmentId), error);
} else if (LOG.isDebugEnabled()) {
- LOG.debug(format("Failed to execute query fragment: traceId={},
queryId={}, fragmentId={}",
- ((TraceableException) traceableErr).traceId(), queryId,
fragmentId), error);
+ LOG.debug(format("Failed to execute query fragment: traceId={},
executionId={}, fragmentId={}",
+ ((TraceableException) traceableErr).traceId(),
executionId, fragmentId), error);
}
return messageService.send(
nodeName,
FACTORY.errorMessage()
- .queryId(queryId)
+ .queryId(executionId.queryId())
+ .executionToken(executionId.executionToken())
.fragmentId(fragmentId)
.traceId(((TraceableException) traceableErr).traceId())
.code(((TraceableException) traceableErr).code())
@@ -141,7 +143,8 @@ public class ExchangeServiceImpl implements ExchangeService
{
}
private void onMessage(String nodeName, QueryBatchRequestMessage msg) {
- CompletableFuture<Outbox<?>> outboxFut =
mailboxRegistry.outbox(msg.queryId(), msg.exchangeId());
+ ExecutionId executionId = new ExecutionId(msg.queryId(),
msg.executionToken());
+ CompletableFuture<Outbox<?>> outboxFut =
mailboxRegistry.outbox(executionId, msg.exchangeId());
Consumer<Outbox<?>> onRequestHandler = outbox -> {
try {
@@ -166,7 +169,8 @@ public class ExchangeServiceImpl implements ExchangeService
{
}
private void onMessage(String nodeName, QueryBatchMessage msg) {
- Inbox<?> inbox = mailboxRegistry.inbox(msg.queryId(),
msg.exchangeId());
+ ExecutionId executionId = new ExecutionId(msg.queryId(),
msg.executionToken());
+ Inbox<?> inbox = mailboxRegistry.inbox(executionId, msg.exchangeId());
if (inbox != null) {
try {
@@ -181,8 +185,8 @@ public class ExchangeServiceImpl implements ExchangeService
{
LOG.warn("Unexpected exception", e);
}
} else if (LOG.isDebugEnabled()) {
- LOG.debug("Stale batch message received: [nodeName={}, queryId={},
fragmentId={}, exchangeId={}, batchId={}]",
- nodeName, msg.queryId(), msg.fragmentId(),
msg.exchangeId(), msg.batchId());
+ LOG.debug("Stale batch message received: [nodeName={},
executionId={}, fragmentId={}, exchangeId={}, batchId={}]",
+ nodeName, executionId, msg.fragmentId(), msg.exchangeId(),
msg.batchId());
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
index 37a02c8950..21c656effe 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
@@ -68,7 +68,7 @@ public class ExecutionContext<RowT> implements DataContext {
private final QueryTaskExecutor executor;
- private final UUID qryId;
+ private final ExecutionId executionId;
private final FragmentDescription description;
@@ -102,7 +102,7 @@ public class ExecutionContext<RowT> implements DataContext {
* Constructor.
*
* @param executor Task executor.
- * @param qryId Query ID.
+ * @param executionId Execution ID.
* @param localNode Local node.
* @param originatingNodeName Name of the node that initiated the query.
* @param description Partitions information.
@@ -115,7 +115,7 @@ public class ExecutionContext<RowT> implements DataContext {
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
public ExecutionContext(
QueryTaskExecutor executor,
- UUID qryId,
+ ExecutionId executionId,
ClusterNode localNode,
String originatingNodeName,
FragmentDescription description,
@@ -126,7 +126,7 @@ public class ExecutionContext<RowT> implements DataContext {
@Nullable QueryCancel cancel
) {
this.executor = executor;
- this.qryId = qryId;
+ this.executionId = executionId;
this.description = description;
this.handler = handler;
this.params = params;
@@ -145,7 +145,7 @@ public class ExecutionContext<RowT> implements DataContext {
startTs =
nowUtc.plusSeconds(this.timeZoneId.getRules().getOffset(nowUtc).getTotalSeconds()).toEpochMilli();
if (LOG.isTraceEnabled()) {
- LOG.trace("Context created [qryId={}, fragmentId={}]", qryId,
fragmentId());
+ LOG.trace("Context created [executionId={}, fragmentId={}]",
executionId, fragmentId());
}
}
@@ -153,7 +153,15 @@ public class ExecutionContext<RowT> implements DataContext
{
* Get query ID.
*/
public UUID queryId() {
- return qryId;
+ return executionId.queryId();
+ }
+
+ public int executionToken() {
+ return executionId.executionToken();
+ }
+
+ public ExecutionId executionId() {
+ return executionId;
}
/**
@@ -334,7 +342,7 @@ public class ExecutionContext<RowT> implements DataContext {
return;
}
- executor.execute(qryId, fragmentId(), () -> {
+ executor.execute(queryId(), fragmentId(), () -> {
try {
if (!isCancelled()) {
task.run();
@@ -362,7 +370,7 @@ public class ExecutionContext<RowT> implements DataContext {
public CompletableFuture<?> submit(RunnableX task, Consumer<Throwable>
onError) {
assert !isCancelled() : "Call submit after execution was cancelled.";
- return executor.submit(qryId, fragmentId(), () -> {
+ return executor.submit(queryId(), fragmentId(), () -> {
try {
task.run();
} catch (Throwable e) {
@@ -387,7 +395,7 @@ public class ExecutionContext<RowT> implements DataContext {
boolean res = !cancelFlag.get() && cancelFlag.compareAndSet(false,
true);
if (res && LOG.isTraceEnabled()) {
- LOG.trace("Context cancelled [qryId={}, fragmentId={}]", qryId,
fragmentId());
+ LOG.trace("Context cancelled [executionId={}, fragmentId={}]",
executionId, fragmentId());
}
return res;
@@ -440,12 +448,12 @@ public class ExecutionContext<RowT> implements
DataContext {
ExecutionContext<?> context = (ExecutionContext<?>) o;
- return qryId.equals(context.qryId) && description.fragmentId() ==
context.description.fragmentId();
+ return executionId.equals(context.executionId) &&
description.fragmentId() == context.description.fragmentId();
}
/** {@inheritDoc} */
@Override
public int hashCode() {
- return Objects.hash(qryId, description.fragmentId());
+ return Objects.hash(executionId, description.fragmentId());
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionId.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionId.java
new file mode 100644
index 0000000000..6502db1b6d
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionId.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import java.util.Objects;
+import java.util.UUID;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Unique identity of distributed plan execution.
+ *
+ * <p>Includes a token to separate retries within a single query.
+ */
+public class ExecutionId {
+ private final UUID queryId;
+ private final int executionToken;
+
+ public ExecutionId(UUID queryId, int executionToken) {
+ this.queryId = queryId;
+ this.executionToken = executionToken;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ExecutionId that = (ExecutionId) o;
+ return executionToken == that.executionToken &&
Objects.equals(queryId, that.queryId);
+ }
+
+ public UUID queryId() {
+ return queryId;
+ }
+
+ public int executionToken() {
+ return executionToken;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(queryId, executionToken);
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 2eccc7f565..66c0ce6dc9 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -53,6 +53,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -129,24 +130,21 @@ import org.jetbrains.annotations.TestOnly;
*/
public class ExecutionServiceImpl<RowT> implements ExecutionService,
TopologyEventHandler {
private static final int CACHE_SIZE = 1024;
-
- private final ConcurrentMap<FragmentCacheKey, IgniteRel> physNodesCache =
Caffeine.newBuilder()
- .maximumSize(CACHE_SIZE)
- .<FragmentCacheKey, IgniteRel>build()
- .asMap();
-
private static final IgniteLogger LOG =
Loggers.forClass(ExecutionServiceImpl.class);
-
private static final SqlQueryMessagesFactory FACTORY = new
SqlQueryMessagesFactory();
-
private static final List<InternalSqlRow> APPLIED_ANSWER = List.of(new
InternalSqlRowSingleBoolean(true));
-
private static final List<InternalSqlRow> NOT_APPLIED_ANSWER = List.of(new
InternalSqlRowSingleBoolean(false));
-
private static final FragmentDescription DUMMY_DESCRIPTION = new
FragmentDescription(
0, true, Long2ObjectMaps.emptyMap(), null, null, null
);
+ private final ConcurrentMap<FragmentCacheKey, IgniteRel> physNodesCache =
Caffeine.newBuilder()
+ .maximumSize(CACHE_SIZE)
+ .<FragmentCacheKey, IgniteRel>build()
+ .asMap();
+
+ private final AtomicInteger executionTokenGen = new AtomicInteger();
+
private final MessageService messageService;
private final TopologyService topSrvc;
@@ -169,7 +167,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
private final ImplementorFactory<RowT> implementorFactory;
- private final Map<UUID, DistributedQueryManager> queryManagerMap = new
ConcurrentHashMap<>();
+ private final Map<ExecutionId, DistributedQueryManager> queryManagerMap =
new ConcurrentHashMap<>();
private final long shutdownTimeout;
@@ -296,9 +294,10 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
SqlOperationContext operationContext,
MultiStepPlan plan
) {
- DistributedQueryManager queryManager = new
DistributedQueryManager(localNode.name(), true, operationContext);
+ ExecutionId executionid = nextExecutionId(operationContext.queryId());
+ DistributedQueryManager queryManager = new
DistributedQueryManager(executionid, localNode.name(), true, operationContext);
- DistributedQueryManager old =
queryManagerMap.put(operationContext.queryId(), queryManager);
+ DistributedQueryManager old = queryManagerMap.put(executionid,
queryManager);
assert old == null;
@@ -393,17 +392,6 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
}
}
- /** Cancels the query with given id. */
- public CompletableFuture<?> cancel(UUID qryId) {
- var mgr = queryManagerMap.get(qryId);
-
- if (mgr == null) {
- return nullCompletedFuture();
- }
-
- return mgr.close(QueryCompletionReason.CANCEL);
- }
-
private AsyncDataCursor<InternalSqlRow> executeExecutablePlan(
SqlOperationContext operationContext,
ExecutablePlan plan
@@ -411,9 +399,10 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
QueryCancel queryCancel = operationContext.cancel();
assert queryCancel != null;
+ ExecutionId executionId = nextExecutionId(operationContext.queryId());
ExecutionContext<RowT> ectx = new ExecutionContext<>(
taskExecutor,
- operationContext.queryId(),
+ executionId,
localNode,
localNode.name(),
DUMMY_DESCRIPTION,
@@ -527,7 +516,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
private void onMessage(String nodeName, QueryStartResponse msg) {
assert nodeName != null && msg != null;
- DistributedQueryManager dqm = queryManagerMap.get(msg.queryId());
+ DistributedQueryManager dqm = queryManagerMap.get(new
ExecutionId(msg.queryId(), msg.executionToken()));
if (dqm != null) {
dqm.acknowledgeFragment(nodeName, msg.fragmentId(), msg.error());
@@ -537,7 +526,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
private void onMessage(String nodeName, ErrorMessage msg) {
assert nodeName != null && msg != null;
- DistributedQueryManager dqm = queryManagerMap.get(msg.queryId());
+ DistributedQueryManager dqm = queryManagerMap.get(new
ExecutionId(msg.queryId(), msg.executionToken()));
if (dqm != null) {
RemoteFragmentExecutionException e = new
RemoteFragmentExecutionException(
@@ -560,7 +549,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
private void onMessage(String nodeName, QueryCloseMessage msg) {
assert nodeName != null && msg != null;
- DistributedQueryManager dqm = queryManagerMap.get(msg.queryId());
+ DistributedQueryManager dqm = queryManagerMap.get(new
ExecutionId(msg.queryId(), msg.executionToken()));
if (dqm != null) {
dqm.close(QueryCompletionReason.CANCEL);
@@ -599,14 +588,12 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
}
/** Returns local fragments for the query with given id. */
+ @TestOnly
public List<AbstractNode<?>> localFragments(UUID queryId) {
- DistributedQueryManager mgr = queryManagerMap.get(queryId);
-
- if (mgr == null) {
- return List.of();
- }
-
- return mgr.localFragments();
+ return queryManagerMap.entrySet().stream()
+ .filter(e -> e.getKey().queryId().equals(queryId))
+ .flatMap(e -> e.getValue().localFragments().stream())
+ .collect(Collectors.toList());
}
private void submitFragment(String nodeName, QueryStartRequest msg) {
@@ -622,12 +609,12 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
}
private DistributedQueryManager getOrCreateQueryManager(String
coordinatorNodeName, QueryStartRequest msg) {
- return queryManagerMap.computeIfAbsent(msg.queryId(), key -> {
+ return queryManagerMap.computeIfAbsent(new ExecutionId(msg.queryId(),
msg.executionToken()), key -> {
SqlOperationContext operationContext = createOperationContext(
- key, ZoneId.of(msg.timeZoneId()), msg.parameters(),
msg.operationTime()
+ key.queryId(), ZoneId.of(msg.timeZoneId()),
msg.parameters(), msg.operationTime()
);
- return new DistributedQueryManager(coordinatorNodeName,
operationContext);
+ return new DistributedQueryManager(key, coordinatorNodeName,
operationContext);
});
}
@@ -657,12 +644,12 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
String dumpDebugInfo() {
IgniteStringBuilder buf = new IgniteStringBuilder();
- for (Map.Entry<UUID, DistributedQueryManager> entry :
queryManagerMap.entrySet()) {
- UUID queryId = entry.getKey();
+ for (Map.Entry<ExecutionId, DistributedQueryManager> entry :
queryManagerMap.entrySet()) {
+ ExecutionId executionId = entry.getKey();
DistributedQueryManager mgr = entry.getValue();
buf.nl();
- buf.app("Debug info for query: ").app(queryId)
+ buf.app("Debug info for query: ").app(executionId)
.app(" (canceled=").app(mgr.cancelled.get()).app(",
stopped=").app(mgr.cancelFut.isDone()).app(")");
buf.nl();
@@ -760,6 +747,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
* A convenient class that manages the initialization and termination of
distributed queries.
*/
private class DistributedQueryManager {
+ private final ExecutionId executionId;
private final boolean coordinator;
private final String coordinatorNodeName;
@@ -784,10 +772,12 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
private volatile Long rootFragmentId = null;
private DistributedQueryManager(
+ ExecutionId executionId,
String coordinatorNodeName,
boolean coordinator,
SqlOperationContext ctx
) {
+ this.executionId = executionId;
this.ctx = ctx;
this.coordinator = coordinator;
this.coordinatorNodeName = coordinatorNodeName;
@@ -815,8 +805,8 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
}
}
- private DistributedQueryManager(String coordinatorNodeName,
SqlOperationContext ctx) {
- this(coordinatorNodeName, false, ctx);
+ private DistributedQueryManager(ExecutionId executionId, String
coordinatorNodeName, SqlOperationContext ctx) {
+ this(executionId, coordinatorNodeName, false, ctx);
}
private List<AbstractNode<?>> localFragments() {
@@ -827,7 +817,8 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
String targetNodeName, String serialisedFragment,
FragmentDescription desc, TxAttributes txAttributes, int catalogVersion
) {
QueryStartRequest request = FACTORY.queryStartRequest()
- .queryId(ctx.queryId())
+ .queryId(executionId.queryId())
+ .executionToken(executionId.executionToken())
.fragmentId(desc.fragmentId())
.root(serialisedFragment)
.fragmentDescription(desc)
@@ -903,6 +894,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
origNodeName,
FACTORY.queryStartResponse()
.queryId(ectx.queryId())
+ .executionToken(ectx.executionToken())
.fragmentId(ectx.fragmentId())
.build()
);
@@ -917,7 +909,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
private ExecutionContext<RowT> createContext(String initiatorNodeName,
FragmentDescription desc, TxAttributes txAttributes) {
return new ExecutionContext<>(
taskExecutor,
- ctx.queryId(),
+ executionId,
localNode,
initiatorNodeName,
desc,
@@ -963,7 +955,8 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
messageService.send(
initiatorNode,
FACTORY.queryStartResponse()
- .queryId(ctx.queryId())
+ .queryId(executionId.queryId())
+ .executionToken(executionId.executionToken())
.fragmentId(fragmentId)
.error(ex)
.build()
@@ -1179,7 +1172,8 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
.thenCompose(ignored ->
awaitFragmentInitialisationAndClose());
} else {
stage = start.thenCompose(ignored ->
messageService.send(coordinatorNodeName, FACTORY.queryCloseMessage()
- .queryId(ctx.queryId())
+ .queryId(executionId.queryId())
+ .executionToken(executionId.executionToken())
.build()))
.thenCompose(ignored -> closeLocalFragments());
}
@@ -1191,7 +1185,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
LOG.warn("Fragment closing processed with errors:
[queryId={}]", ex, ctx.queryId());
}
- queryManagerMap.remove(ctx.queryId());
+ queryManagerMap.remove(executionId);
cancelFut.complete(null);
}).thenRun(() -> localFragments.forEach(f ->
f.context().cancel()));
@@ -1248,7 +1242,8 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
return messageService.send(
nodeId,
FACTORY.queryCloseMessage()
- .queryId(ctx.queryId())
+
.queryId(executionId.queryId())
+
.executionToken(executionId.executionToken())
.build()
);
})
@@ -1339,6 +1334,10 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
return firstFoundError;
}
+ private ExecutionId nextExecutionId(UUID queryId) {
+ return new ExecutionId(queryId, executionTokenGen.getAndIncrement());
+ }
+
/**
* A factory of the relational node implementors.
*
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistry.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistry.java
index 75676d435f..28f210cce3 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistry.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistry.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.sql.engine.exec;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
import org.apache.ignite.internal.sql.engine.exec.rel.Outbox;
@@ -58,18 +57,18 @@ public interface MailboxRegistry extends LifecycleAware {
/**
* Returns a registered outbox by provided query ID, exchange ID pair.
*
- * @param qryId Query ID.
+ * @param executionId Execution ID.
* @param exchangeId Exchange ID.
* @return Registered outbox. May be {@code null} if execution was
cancelled.
*/
- CompletableFuture<Outbox<?>> outbox(UUID qryId, long exchangeId);
+ CompletableFuture<Outbox<?>> outbox(ExecutionId executionId, long
exchangeId);
/**
* Returns a registered inbox by provided query ID, exchange ID pair.
*
- * @param qryId Query ID.
+ * @param executionId Execution ID.
* @param exchangeId Exchange ID.
* @return Registered inbox. May be {@code null} if execution was
cancelled.
*/
- Inbox<?> inbox(UUID qryId, long exchangeId);
+ Inbox<?> inbox(ExecutionId executionId, long exchangeId);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
index 1f8dde0de6..b4d5bf3389 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.sql.engine.exec;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -58,10 +57,10 @@ public class MailboxRegistryImpl implements
MailboxRegistry, TopologyEventHandle
/** {@inheritDoc} */
@Override
public void register(Inbox<?> inbox) {
- Inbox<?> res = remotes.putIfAbsent(new MailboxKey(inbox.queryId(),
inbox.exchangeId()), inbox);
+ Inbox<?> res = remotes.putIfAbsent(new MailboxKey(inbox.executionId(),
inbox.exchangeId()), inbox);
if (LOG.isTraceEnabled()) {
- LOG.trace("Inbox registered [qryId={}, fragmentId={}]",
inbox.queryId(), inbox.fragmentId());
+ LOG.trace("Inbox registered [executionId={}, fragmentId={}]",
inbox.executionId(), inbox.fragmentId());
}
assert res == null : res;
@@ -70,7 +69,7 @@ public class MailboxRegistryImpl implements MailboxRegistry,
TopologyEventHandle
/** {@inheritDoc} */
@Override
public void register(Outbox<?> outbox) {
- CompletableFuture<Outbox<?>> res = locals.computeIfAbsent(new
MailboxKey(outbox.queryId(), outbox.exchangeId()),
+ CompletableFuture<Outbox<?>> res = locals.computeIfAbsent(new
MailboxKey(outbox.executionId(), outbox.exchangeId()),
k -> new CompletableFuture<>());
assert !res.isDone();
@@ -78,42 +77,42 @@ public class MailboxRegistryImpl implements
MailboxRegistry, TopologyEventHandle
res.complete(outbox);
if (LOG.isTraceEnabled()) {
- LOG.trace("Outbox registered [qryId={}, fragmentId={}]",
outbox.queryId(), outbox.fragmentId());
+ LOG.trace("Outbox registered [executionId={}, fragmentId={}]",
outbox.executionId(), outbox.fragmentId());
}
}
/** {@inheritDoc} */
@Override
public void unregister(Inbox<?> inbox) {
- boolean removed = remotes.remove(new MailboxKey(inbox.queryId(),
inbox.exchangeId()), inbox);
+ boolean removed = remotes.remove(new MailboxKey(inbox.executionId(),
inbox.exchangeId()), inbox);
if (LOG.isTraceEnabled()) {
- LOG.trace("Inbox {} unregistered [qryId={}, fragmentId={}]",
removed ? "was" : "wasn't",
- inbox.queryId(), inbox.fragmentId());
+ LOG.trace("Inbox {} unregistered [executionId={}, fragmentId={}]",
removed ? "was" : "wasn't",
+ inbox.executionId(), inbox.fragmentId());
}
}
/** {@inheritDoc} */
@Override
public void unregister(Outbox<?> outbox) {
- boolean removed = locals.remove(new MailboxKey(outbox.queryId(),
outbox.exchangeId())) != null;
+ boolean removed = locals.remove(new MailboxKey(outbox.executionId(),
outbox.exchangeId())) != null;
if (LOG.isTraceEnabled()) {
- LOG.trace("Outbox {} unregistered [qryId={}, fragmentId={}]",
removed ? "was" : "wasn't",
- outbox.queryId(), outbox.fragmentId());
+ LOG.trace("Outbox {} unregistered [executionId={},
fragmentId={}]", removed ? "was" : "wasn't",
+ outbox.executionId(), outbox.fragmentId());
}
}
/** {@inheritDoc} */
@Override
- public CompletableFuture<Outbox<?>> outbox(UUID qryId, long exchangeId) {
- return locals.computeIfAbsent(new MailboxKey(qryId, exchangeId), k ->
new CompletableFuture<>());
+ public CompletableFuture<Outbox<?>> outbox(ExecutionId executionId, long
exchangeId) {
+ return locals.computeIfAbsent(new MailboxKey(executionId, exchangeId),
k -> new CompletableFuture<>());
}
/** {@inheritDoc} */
@Override
- public Inbox<?> inbox(UUID qryId, long exchangeId) {
- return remotes.get(new MailboxKey(qryId, exchangeId));
+ public Inbox<?> inbox(ExecutionId executionId, long exchangeId) {
+ return remotes.get(new MailboxKey(executionId, exchangeId));
}
/** {@inheritDoc} */
@@ -143,12 +142,12 @@ public class MailboxRegistryImpl implements
MailboxRegistry, TopologyEventHandle
}
private static class MailboxKey {
- private final UUID qryId;
+ private final ExecutionId executionId;
private final long exchangeId;
- private MailboxKey(UUID qryId, long exchangeId) {
- this.qryId = qryId;
+ private MailboxKey(ExecutionId executionId, long exchangeId) {
+ this.executionId = executionId;
this.exchangeId = exchangeId;
}
@@ -167,13 +166,13 @@ public class MailboxRegistryImpl implements
MailboxRegistry, TopologyEventHandle
if (exchangeId != that.exchangeId) {
return false;
}
- return qryId.equals(that.qryId);
+ return executionId.equals(that.executionId);
}
/** {@inheritDoc} */
@Override
public int hashCode() {
- int res = qryId.hashCode();
+ int res = executionId.hashCode();
res = 31 * res + (int) (exchangeId ^ (exchangeId >>> 32));
return res;
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
index a0397a1d87..ce5cb06af6 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
@@ -152,8 +152,8 @@ public abstract class AbstractNode<RowT> implements
Node<RowT> {
thread = Thread.currentThread();
} else {
assert thread == Thread.currentThread() : format("expThread={},
actThread={}, "
- + "qryId={}, fragmentId={}", thread.getName(),
Thread.currentThread().getName(),
- context().queryId(), context().fragmentId());
+ + "executionId={}, fragmentId={}",
thread.getName(), Thread.currentThread().getName(),
+ context().executionId(), context().fragmentId());
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
index f9f75c6ccc..2c1d8548eb 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
@@ -346,7 +346,7 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
}
private void requestBatches(String nodeName, int cnt, @Nullable
SharedState state) {
- exchange.request(nodeName, queryId(), srcFragmentId, exchangeId, cnt,
state)
+ exchange.request(nodeName, executionId(), srcFragmentId, exchangeId,
cnt, state)
.whenComplete((ignored, ex) -> {
if (ex != null) {
IgniteInternalException wrapperEx =
ExceptionUtils.withCause(
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Mailbox.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Mailbox.java
index 513df84e08..eea7131866 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Mailbox.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Mailbox.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.sql.engine.exec.rel;
-import java.util.UUID;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionId;
/**
* Mailbox interface.
@@ -25,10 +25,10 @@ import java.util.UUID;
*/
public interface Mailbox<T> extends Node<T> {
/**
- * Get query ID.
+ * Get execution ID.
*/
- default UUID queryId() {
- return context().queryId();
+ default ExecutionId executionId() {
+ return context().executionId();
}
/**
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
index 59b3055619..34a51fb63d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
@@ -26,7 +26,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
-import java.util.UUID;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -36,6 +35,7 @@ import
org.apache.ignite.internal.partition.replicator.network.replication.Binar
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionId;
import org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.SharedState;
@@ -252,7 +252,7 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
);
}
- exchange.sendBatch(nodeName, queryId(), targetFragmentId, exchangeId,
batchId, last, rows0)
+ exchange.sendBatch(nodeName, executionId(), targetFragmentId,
exchangeId, batchId, last, rows0)
.whenComplete((ignored, ex) -> {
if (ex == null) {
return;
@@ -271,10 +271,10 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
private void sendError(Throwable original) {
String nodeName = context().originatingNodeName();
- UUID queryId = queryId();
+ ExecutionId executionId = executionId();
long fragmentId = fragmentId();
- exchange.sendError(nodeName, queryId, fragmentId, original)
+ exchange.sendError(nodeName, executionId, fragmentId, original)
.whenComplete((ignored, ex) -> {
if (ex == null) {
return;
@@ -289,8 +289,8 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
wrapperEx.addSuppressed(original);
- LOG.warn("Unable to send error to a remote node
[queryId={}, fragmentId={}, targetNode={}]",
- queryId, fragmentId, nodeName, wrapperEx);
+ LOG.warn("Unable to send error to a remote node
[executionId={}, fragmentId={}, targetNode={}]",
+ executionId, fragmentId, nodeName, wrapperEx);
});
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/ErrorMessage.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/ErrorMessage.java
index 688c7ee871..4b11d202e0 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/ErrorMessage.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/ErrorMessage.java
@@ -34,6 +34,11 @@ public interface ErrorMessage extends NetworkMessage,
Serializable {
*/
UUID queryId();
+ /**
+ * Get execution token.
+ */
+ int executionToken();
+
/**
* Get fragment ID.
*/
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/ExecutionContextAwareMessage.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/ExecutionContextAwareMessage.java
index 2bedead151..055eb22589 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/ExecutionContextAwareMessage.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/ExecutionContextAwareMessage.java
@@ -30,6 +30,11 @@ public interface ExecutionContextAwareMessage extends
NetworkMessage, Serializab
*/
UUID queryId();
+ /**
+ * Get execution token.
+ */
+ int executionToken();
+
/**
* Get fragment ID.
*/
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryCloseMessage.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryCloseMessage.java
index e5c4e1bea6..502ae71d5a 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryCloseMessage.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryCloseMessage.java
@@ -32,4 +32,9 @@ public interface QueryCloseMessage extends NetworkMessage,
Serializable {
* Get query ID.
*/
UUID queryId();
+
+ /**
+ * Get execution token.
+ */
+ int executionToken();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartResponse.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartResponse.java
index ade2e65a6b..a86cf5924c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartResponse.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartResponse.java
@@ -35,6 +35,11 @@ public interface QueryStartResponse extends NetworkMessage,
Serializable {
*/
UUID queryId();
+ /**
+ * Get execution token.
+ */
+ int executionToken();
+
/**
* Get fragment ID.
*/
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 7728fa6855..43dff116a5 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -29,14 +29,15 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
@@ -58,6 +59,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
@@ -114,6 +116,7 @@ import
org.apache.ignite.internal.sql.engine.framework.TestBuilders;
import
org.apache.ignite.internal.sql.engine.message.ExecutionContextAwareMessage;
import org.apache.ignite.internal.sql.engine.message.MessageListener;
import org.apache.ignite.internal.sql.engine.message.MessageService;
+import org.apache.ignite.internal.sql.engine.message.QueryBatchRequestMessage;
import org.apache.ignite.internal.sql.engine.message.QueryStartRequest;
import org.apache.ignite.internal.sql.engine.message.QueryStartResponseImpl;
import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
@@ -310,7 +313,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
* The very simple case where a query is cancelled in the middle of a
normal execution.
*/
@Test
- public void testCancelOnInitiator() throws InterruptedException {
+ public void testCancel() throws InterruptedException {
ExecutionServiceImpl<?> execService = executionServices.get(0);
SqlOperationContext ctx = createContext();
QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
@@ -328,7 +331,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
CompletionStage<?> batchFut = cursor.requestNextAsync(1);
- await(execService.cancel(ctx.queryId()));
+ ctx.cancel().cancel();
assertTrue(waitForCondition(
() -> executionServices.stream().map(es ->
es.localFragments(ctx.queryId()).size())
@@ -462,48 +465,6 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
awaitContextCancellation(execNodes);
}
- /**
- * The very simple case where a query is cancelled in the middle of a
normal execution on non-initiator node.
- */
- @Test
- public void testCancelOnRemote() throws InterruptedException {
- ExecutionService execService = executionServices.get(0);
- SqlOperationContext ctx = createContext();
- QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
-
- nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
-
- AsyncCursor<InternalSqlRow> cursor =
await(execService.executePlan(plan, ctx));
-
- assertTrue(waitForCondition(
- () -> executionServices.stream().map(es ->
es.localFragments(ctx.queryId()).size())
- .mapToInt(i -> i).sum() == 4, TIMEOUT_IN_MS));
-
- List<AbstractNode<?>> execNodes = executionServices.stream()
- .flatMap(s ->
s.localFragments(ctx.queryId()).stream()).collect(Collectors.toList());
-
- var batchFut = cursor.requestNextAsync(1);
-
- await(executionServices.get(1).cancel(ctx.queryId()));
-
- assertTrue(waitForCondition(
- () -> executionServices.stream().map(es ->
es.localFragments(ctx.queryId()).size())
- .mapToInt(i -> i).sum() == 0, TIMEOUT_IN_MS));
-
- awaitContextCancellation(execNodes);
-
- await(batchFut.exceptionally(ex -> {
- assertInstanceOf(CompletionException.class, ex);
- assertInstanceOf(SqlException.class, ex.getCause());
- assertInstanceOf(QueryCancelledException.class,
ex.getCause().getCause());
- assertNull(ex.getCause().getCause().getCause());
-
- return null;
- }));
- assertTrue(batchFut.toCompletableFuture().isCompletedExceptionally());
- }
-
-
/**
* Emulate exception during initialization of context. Cursor shouldn't
hung.
*/
@@ -868,7 +829,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
+ nl
+ " Local fragments:" + nl
+ " id=0, state=opened, canceled=false, class=Inbox
(root)" + nl
- + " id=1, state=opened, canceled=false, class=Outbox" + nl,
ctx.queryId());
+ + " id=1, state=opened, canceled=false, class=Outbox" + nl,
new ExecutionId(ctx.queryId(), 0));
assertThat(debugInfoCoordinator, equalTo(expectedOnCoordinator));
@@ -877,7 +838,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
+ " Coordinator node: node_1" + nl
+ nl
+ " Local fragments:" + nl
- + " id=1, state=opened, canceled=false, class=Outbox" + nl,
ctx.queryId());
+ + " id=1, state=opened, canceled=false, class=Outbox" + nl,
new ExecutionId(ctx.queryId(), 0));
assertThat(debugInfo2, equalTo(expectedOnNonCoordinator));
assertThat(debugInfo3, equalTo(expectedOnNonCoordinator));
@@ -1081,6 +1042,72 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
}
}
+ @Test
+ void executionsWithTheSameQueryIdMustNotInterfere() {
+ QueryPlan plan = prepare("SELECT * FROM test_tbl", createContext());
+
+ String expectedExceptionMessage = "This is expected";
+
+ TestNode corruptedNode = testCluster.node(nodeNames.get(2));
+ corruptedNode.interceptor((nodeName, msg, original) -> {
+ if (msg instanceof QueryBatchRequestMessage) {
+ corruptedNode.messageService().send(nodeName, new
SqlQueryMessagesFactory().errorMessage()
+ .queryId(((QueryBatchRequestMessage) msg).queryId())
+ .executionToken(((QueryBatchRequestMessage)
msg).executionToken())
+ .fragmentId(((QueryBatchRequestMessage)
msg).fragmentId())
+ .message(expectedExceptionMessage)
+ .traceId(((QueryBatchRequestMessage) msg).queryId())
+ .code(Common.INTERNAL_ERR)
+ .build()
+ );
+ } else {
+ original.onMessage(nodeName, msg);
+ }
+
+ return nullCompletedFuture();
+ });
+
+ SqlOperationContext ctx = createContext();
+
+ Queue<Throwable> exceptions = new ConcurrentLinkedQueue<>();
+ BiFunction<AsyncDataCursor<InternalSqlRow>, Integer,
CompletableFuture<Void>> retryChainBuilder = new BiFunction<>() {
+ @Override
+ public CompletableFuture<Void> apply(
+ @Nullable AsyncDataCursor<InternalSqlRow> cursor, Integer
remainingAttempts
+ ) {
+ CompletableFuture<Void> previousStep;
+ if (cursor == null) {
+ previousStep = nullCompletedFuture();
+ } else {
+ previousStep = cursor.onFirstPageReady()
+ .thenCompose(none -> cursor.onClose())
+ .exceptionally(ex -> {
+ exceptions.add(ex);
+
+ return null;
+ });
+ }
+
+ if (remainingAttempts > 0) {
+ return previousStep
+ .thenCompose(ignored ->
executionServices.get(0).executePlan(plan, ctx))
+ .thenCompose(c -> this.apply(c, remainingAttempts
- 1));
+ }
+
+ return previousStep;
+ }
+ };
+
+ int retryCount = 20;
+ await(retryChainBuilder.apply(null, retryCount));
+
+ assertThat(exceptions, hasSize(retryCount));
+
+ for (Throwable th : exceptions) {
+ assertThat(th.getMessage(),
containsString(expectedExceptionMessage));
+ }
+ }
+
/** Creates an execution service instance for the node with given
consistent id. */
public ExecutionServiceImpl<Object[]> create(String nodeName, CacheFactory
mappingCacheFactory, QueryTaskExecutor taskExecutor) {
if (!nodeNames.contains(nodeName)) {
@@ -1478,13 +1505,13 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
}
@Override
- public CompletableFuture<Outbox<?>> outbox(UUID qryId, long
exchangeId) {
- return delegate.outbox(qryId, exchangeId);
+ public CompletableFuture<Outbox<?>> outbox(ExecutionId executionId,
long exchangeId) {
+ return delegate.outbox(executionId, exchangeId);
}
@Override
- public Inbox<?> inbox(UUID qryId, long exchangeId) {
- return delegate.inbox(qryId, exchangeId);
+ public Inbox<?> inbox(ExecutionId executionId, long exchangeId) {
+ return delegate.inbox(executionId, exchangeId);
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
index c9fc247cd4..9eb069e790 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
@@ -115,7 +115,7 @@ public class RuntimeSortedIndexTest extends
IgniteAbstractTest {
RuntimeSortedIndex<Object[]> idx = new RuntimeSortedIndex<>(
new ExecutionContext<>(
null,
- randomUUID(),
+ new ExecutionId(randomUUID(), 0),
new ClusterNodeImpl(randomUUID(), "fake-test-node",
NetworkAddress.from("127.0.0.1:1111")),
"fake-test-node",
null,
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index 5891155444..875544ab6f 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionId;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.TxAttributes;
@@ -116,7 +117,7 @@ public abstract class AbstractExecutionTest<T> extends
IgniteAbstractTest {
return new ExecutionContext<>(
taskExecutor,
- randomUUID(),
+ new ExecutionId(randomUUID(), 0),
new ClusterNodeImpl(randomUUID(), "fake-test-node",
NetworkAddress.from("127.0.0.1:1111")),
"fake-test-node",
fragmentDesc,
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index dbfa752793..beb3896d97 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -99,6 +99,7 @@ import
org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import org.apache.ignite.internal.sql.engine.exec.ExecutableTable;
import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionId;
import
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
@@ -589,7 +590,7 @@ public class TestBuilders {
public ExecutionContext<Object[]> build() {
return new ExecutionContext<>(
Objects.requireNonNull(executor, "executor"),
- queryId,
+ new ExecutionId(queryId, 0),
Objects.requireNonNull(node, "node"),
node.name(),
description,