This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e247ad1047 IGNITE-22768 Sql. Worker node left the cluster before
fragment has been sent (#4697)
e247ad1047 is described below
commit e247ad1047d53e126b5b8786da03ef3e4463bc65
Author: korlov42 <[email protected]>
AuthorDate: Thu Nov 14 15:16:35 2024 +0200
IGNITE-22768 Sql. Worker node left the cluster before fragment has been
sent (#4697)
---
.../sql/engine/ItUnstableTopologyTest.java | 92 ++++++++
.../internal/lang/SqlExceptionMapperUtil.java | 12 +
.../ignite/internal/sql/engine/QueryCancel.java | 21 +-
.../internal/sql/engine/SqlOperationContext.java | 17 ++
.../internal/sql/engine/exec/ExecutionService.java | 11 +-
.../sql/engine/exec/ExecutionServiceImpl.java | 245 +++++++++------------
.../exec/fsm/CursorInitializationPhaseHandler.java | 81 ++++---
.../sql/engine/exec/fsm/QueryExecutionProgram.java | 44 +++-
.../sql/engine/exec/fsm/QueryExecutor.java | 2 +-
.../exec/fsm/ScriptItemExecutionProgram.java | 6 +-
.../MappingException.java} | 19 +-
.../sql/engine/exec/mapping/MappingParameters.java | 28 ++-
.../engine/exec/mapping/MappingServiceImpl.java | 71 +++---
.../mapping/largecluster/LargeClusterFactory.java | 30 ++-
.../mapping/smallcluster/SmallClusterFactory.java | 31 ++-
.../sql/engine/message/MessageService.java | 3 +
.../sql/engine/message/MessageServiceImpl.java | 13 +-
.../UnknownNodeException.java} | 28 ++-
.../sql/engine/exec/ExecutionServiceImplTest.java | 89 +++-----
.../sql/engine/exec/QueryRecoveryTest.java | 231 +++++++++++++++++++
.../mapping/ExecutionTargetFactorySelfTest.java | 32 ++-
.../exec/mapping/MappingServiceImplTest.java | 47 +++-
22 files changed, 808 insertions(+), 345 deletions(-)
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItUnstableTopologyTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItUnstableTopologyTest.java
new file mode 100644
index 0000000000..73bfb23b42
--- /dev/null
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItUnstableTopologyTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static org.apache.ignite.internal.sql.engine.util.QueryChecker.matches;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests to make sure sql engine can recover execution when run on unstable
topology.
+ */
+public class ItUnstableTopologyTest extends BaseSqlIntegrationTest {
+ private static final String DATA_NODE_BOOTSTRAP_CFG_TEMPLATE = "ignite {\n"
+ + " network: {\n"
+ + " port: {},\n"
+ + " nodeFinder: {\n"
+ + " netClusterNodes: [ {} ]\n"
+ + " }\n"
+ + " },\n"
+ + " clientConnector: { port:{} },\n"
+ + " nodeAttributes: {\n"
+ + " nodeAttributes: {role: {attribute: \"data\"}}\n"
+ + " },\n"
+ + " rest.port: {}\n"
+ + "}";
+
+ @Override
+ protected int initialNodes() {
+ return 1;
+ }
+
+ @AfterEach
+ public void dropTables() {
+ dropAllTables();
+ dropAllZonesExceptDefaultOne();
+ }
+
+ @Test
+ public void ensureLostOfNodeDoesntCausesQueryToFail() {
+ CLUSTER.startNode(1, DATA_NODE_BOOTSTRAP_CFG_TEMPLATE);
+ CLUSTER.startNode(2, DATA_NODE_BOOTSTRAP_CFG_TEMPLATE);
+ CLUSTER.startNode(3, DATA_NODE_BOOTSTRAP_CFG_TEMPLATE);
+
+ sql("CREATE ZONE my_zone WITH"
+ + " partitions = 1,"
+ + " replicas = 3,"
+ + " storage_profiles = 'default',"
+ + " data_nodes_filter='$[?(@.role == \"data\")]'");
+
+ sql("CREATE TABLE my_table (id INT PRIMARY KEY, val INT) ZONE my_zone
STORAGE PROFILE 'default'");
+ assertQuery("INSERT INTO my_table SELECT x, x FROM system_range(1,
1000)")
+ .returns(1000L)
+ .check();
+
+ Ignite gateway = CLUSTER.node(0);
+
+ assertQuery(gateway, "SELECT count(*) FROM my_table WHERE val > -1")
+ .matches(matches(".*TableScan.*"))
+ .returns(1000L)
+ .check();
+
+ // The choice of node to stop depends on current mapping algorithm
which sorts
+ // nodes by name and chooses the first one among all options,
therefore we stop
+ // 1th node -- the first data node. This invariant is fragile, but
without
+ // EXPLAIN MAPPING FOR <query> it seems there is no other options to
derive
+ // information about execution nodes for particular query.
+ CLUSTER.stopNode(1);
+
+ assertQuery(gateway, "SELECT count(*) FROM my_table WHERE val > -1")
+ .matches(matches(".*TableScan.*"))
+ .returns(1000L)
+ .check();
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtil.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtil.java
index 33be23771a..c580a84893 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtil.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtil.java
@@ -20,8 +20,12 @@ package org.apache.ignite.internal.lang;
import static
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.mapToPublicException;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+import org.apache.ignite.internal.sql.engine.exec.mapping.MappingException;
+import org.apache.ignite.internal.sql.engine.message.UnknownNodeException;
+import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.CursorClosedException;
import org.apache.ignite.lang.ErrorGroups.Common;
+import org.apache.ignite.lang.ErrorGroups.Sql;
import org.apache.ignite.lang.TraceableException;
import org.apache.ignite.sql.SqlException;
@@ -48,6 +52,14 @@ public class SqlExceptionMapperUtil {
* @return Public exception.
*/
public static Throwable mapToPublicSqlException(Throwable origin) {
+ Throwable unwrapped = ExceptionUtils.unwrapCause(origin);
+ if (unwrapped instanceof MappingException) {
+ return new SqlException(Sql.MAPPING_ERR, unwrapped.getMessage());
+ }
+ if (unwrapped instanceof UnknownNodeException) {
+ return new SqlException(Common.NODE_LEFT_ERR, "Node left the
cluster. Node: " + ((UnknownNodeException) unwrapped).nodeName());
+ }
+
Throwable e = mapToPublicException(origin);
if (e instanceof Error) {
return e;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryCancel.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryCancel.java
index 1b1fb5ab66..9154c575a8 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryCancel.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryCancel.java
@@ -43,11 +43,7 @@ public class QueryCancel {
state.thenAccept(reason -> clo.cancel(reason == Reason.TIMEOUT));
- if (state.isDone()) {
- Reason reason = state.join();
-
- throwException(reason);
- }
+ throwIfCancelled();
}
/**
@@ -62,11 +58,7 @@ public class QueryCancel {
public void attach(QueryCancel another) throws QueryCancelledException {
state.thenAccept(another.state::complete);
- if (state.isDone()) {
- Reason reason = state.join();
-
- throwException(reason);
- }
+ throwIfCancelled();
}
/**
@@ -92,7 +84,14 @@ public class QueryCancel {
return state.isDone();
}
- private static void throwException(Reason reason) {
+ /** Throws {@link QueryCancelledException} If operation has been already
cancelled.*/
+ public void throwIfCancelled() throws QueryCancelledException {
+ if (!state.isDone()) {
+ return;
+ }
+
+ Reason reason = state.join();
+
throw new QueryCancelledException(
reason == Reason.TIMEOUT
? QueryCancelledException.TIMEOUT_MSG
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
index f3e02aa05e..180207e247 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
@@ -20,8 +20,11 @@ package org.apache.ignite.internal.sql.engine;
import static java.util.Objects.requireNonNull;
import java.time.ZoneId;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
+import java.util.function.Predicate;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import
org.apache.ignite.internal.sql.engine.SqlQueryProcessor.PrefetchCallback;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
@@ -36,6 +39,8 @@ import org.jetbrains.annotations.Nullable;
* kick-starting of the statement execution.
*/
public final class SqlOperationContext {
+ private final Set<String> excludedNodes = ConcurrentHashMap.newKeySet();
+
private final UUID queryId;
private final ZoneId timeZoneId;
private final Object[] parameters;
@@ -146,6 +151,18 @@ public final class SqlOperationContext {
return operationTime;
}
+ /** Updates the {@link #nodeExclusionFilter()} with given node. */
+ public void excludeNode(String nodeName) {
+ excludedNodes.add(nodeName);
+ }
+
+ /** Returns the predicate to exclude nodes from mapping, or {@code null}
if all nodes must be used. */
+ public @Nullable Predicate<String> nodeExclusionFilter() {
+ Set<String> excludedNodes = Set.copyOf(this.excludedNodes);
+
+ return excludedNodes.isEmpty() ? null : excludedNodes::contains;
+ }
+
/**
* Query context builder.
*/
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionService.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionService.java
index e62e4e576c..5c08a895d6 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionService.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionService.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.sql.engine.exec;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.SqlOperationContext;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
@@ -25,7 +26,15 @@ import
org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
* SQL query plan execution interface.
*/
public interface ExecutionService extends LifecycleAware {
- AsyncDataCursor<InternalSqlRow> executePlan(
+ /**
+ * Executes the given plan.
+ *
+ * @param plan Plan to execute.
+ * @param operationContext Context of operation.
+ * @return Future that will be completed when cursor is successfully
initialized, implying for distributed plans all fragments have been
+ * sent successfully.
+ */
+ CompletableFuture<AsyncDataCursor<InternalSqlRow>> executePlan(
QueryPlan plan, SqlOperationContext operationContext
);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 305c8ce4e6..749583e28d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -17,10 +17,12 @@
package org.apache.ignite.internal.sql.engine.exec;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.sql.engine.externalize.RelJsonReader.fromJson;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import com.github.benmanes.caffeine.cache.Caffeine;
@@ -52,7 +54,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
-import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.ignite.configuration.ConfigurationChangeException;
@@ -62,7 +64,6 @@ import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringBuilder;
-import org.apache.ignite.internal.lang.SqlExceptionMapperUtil;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.TopologyEventHandler;
@@ -291,7 +292,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
return ddlCmdHnd;
}
- private AsyncDataCursor<InternalSqlRow> executeQuery(
+ private CompletableFuture<AsyncDataCursor<InternalSqlRow>> executeQuery(
SqlOperationContext operationContext,
MultiStepPlan plan
) {
@@ -318,8 +319,6 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
QueryTransactionWrapper txWrapper =
txContext.getOrStartImplicit(plan.type() != SqlQueryType.DML);
InternalTransaction tx = txWrapper.unwrap();
- AsyncCursor<InternalSqlRow> dataCursor = queryManager.execute(tx,
plan);
-
operationContext.notifyTxUsed(txWrapper);
PrefetchCallback prefetchCallback =
operationContext.prefetchCallback();
@@ -335,11 +334,15 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
firstPageReady = firstPageReady.thenCompose(none ->
txWrapper.commitImplicit());
}
- return new TxAwareAsyncCursor<>(
+ CompletableFuture<Void> firstPageReady0 = firstPageReady;
+
+ Predicate<String> nodeExclusionFilter =
operationContext.nodeExclusionFilter();
+
+ return queryManager.execute(tx, plan,
nodeExclusionFilter).thenApply(dataCursor -> new TxAwareAsyncCursor<>(
txWrapper,
dataCursor,
- firstPageReady
- );
+ firstPageReady0
+ ));
}
private static SqlOperationContext createOperationContext(
@@ -365,7 +368,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
/** {@inheritDoc} */
@Override
@SuppressWarnings("CastConflictsWithInstanceof") // IDEA incorrectly
highlights casts in EXPLAIN and DDL branches
- public AsyncDataCursor<InternalSqlRow> executePlan(
+ public CompletableFuture<AsyncDataCursor<InternalSqlRow>> executePlan(
QueryPlan plan, SqlOperationContext operationContext
) {
SqlQueryType queryType = plan.type();
@@ -374,16 +377,16 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
case DML:
case QUERY:
if (plan instanceof ExecutablePlan) {
- return executeExecutablePlan(operationContext,
(ExecutablePlan) plan);
+ return
completedFuture(executeExecutablePlan(operationContext, (ExecutablePlan) plan));
}
assert plan instanceof MultiStepPlan : plan.getClass();
return executeQuery(operationContext, (MultiStepPlan) plan);
case EXPLAIN:
- return executeExplain(operationContext, (ExplainPlan) plan);
+ return completedFuture(executeExplain(operationContext,
(ExplainPlan) plan));
case DDL:
- return executeDdl(operationContext, (DdlPlan) plan);
+ return completedFuture(executeDdl(operationContext, (DdlPlan)
plan));
default:
throw new AssertionError("Unexpected query type: " + plan);
@@ -972,79 +975,26 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
}
}
- private AsyncCursor<InternalSqlRow> execute(InternalTransaction tx,
MultiStepPlan multiStepPlan) {
+ private CompletableFuture<AsyncCursor<InternalSqlRow>> execute(
+ InternalTransaction tx,
+ MultiStepPlan multiStepPlan,
+ @Nullable Predicate<String> nodeExclusionFilter
+ ) {
assert root != null;
- // Query has already been cancelled, return immediately.
- if (cancelled.get()) {
- return new AsyncCursor<>() {
- @Override
- public CompletableFuture<BatchedResult<InternalSqlRow>>
requestNextAsync(int rows) {
- Throwable t =
SqlExceptionMapperUtil.mapToPublicSqlException(new QueryCancelledException());
- return CompletableFuture.failedFuture(t);
- }
-
- @Override
- public CompletableFuture<Void> closeAsync() {
- return DistributedQueryManager.this.cancelFut;
- }
- };
- }
-
boolean mapOnBackups = tx.isReadOnly();
- MappingParameters mappingParameters =
MappingParameters.create(ctx.parameters(), mapOnBackups);
+ MappingParameters mappingParameters =
MappingParameters.create(ctx.parameters(), mapOnBackups, nodeExclusionFilter);
- mappingService.map(multiStepPlan,
mappingParameters).whenCompleteAsync((mappedFragments, mappingErr) -> {
- if (mappingErr != null) {
- if (!root.completeExceptionally(mappingErr)) {
- root.thenAccept(root -> root.onError(mappingErr));
- }
- return;
- }
-
- try {
- sendFragments(tx, multiStepPlan, mappedFragments);
- } catch (Throwable t) {
- LOG.warn("Unexpected exception during query
initialization", t);
-
- if (!root.completeExceptionally(t)) {
- root.thenAccept(root -> root.onError(t));
- }
- }
- }, taskExecutor);
-
- return new AsyncCursor<>() {
- @Override
- public CompletableFuture<BatchedResult<InternalSqlRow>>
requestNextAsync(int rows) {
- return root.thenCompose(cur -> {
- CompletableFuture<BatchedResult<InternalSqlRow>> fut =
cur.requestNextAsync(rows);
-
- fut.thenAccept(batch -> {
- if (!batch.hasMore()) {
- DistributedQueryManager.this.close();
- }
- });
-
- return fut;
- });
- }
-
- @Override
- public CompletableFuture<Void> closeAsync() {
- return root.handle((ignored, ex) -> {
- if (ex != null) {
- // cancellation should be triggered by listener of
exceptional
- // completion of `root` future, thus let's just
return a result here
- return DistributedQueryManager.this.cancelFut;
- }
-
- return DistributedQueryManager.this.close();
- }).thenCompose(Function.identity());
- }
- };
+ return mappingService.map(multiStepPlan, mappingParameters)
+ .thenComposeAsync(mappedFragments -> sendFragments(tx,
multiStepPlan, mappedFragments), taskExecutor)
+ .thenApply(this::wrapRootNode);
}
- private void sendFragments(InternalTransaction tx, MultiStepPlan
multiStepPlan, List<MappedFragment> mappedFragments) {
+ private CompletableFuture<AsyncCursor<InternalSqlRow>> sendFragments(
+ InternalTransaction tx,
+ MultiStepPlan multiStepPlan,
+ List<MappedFragment> mappedFragments
+ ) {
// we rely on the fact that the very first fragment is a root.
Otherwise we need to handle
// the case when a non-root fragment will fail before the root is
processed.
assert !nullOrEmpty(mappedFragments) &&
mappedFragments.get(0).fragment().rootFragment()
@@ -1061,8 +1011,8 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
QueryCancel queryCancel = ctx.cancel();
// skipping initialization if cancel has already been triggered
- if (queryCancel != null && queryCancel.isCancelled()) {
- return;
+ if (queryCancel != null) {
+ queryCancel.throwIfCancelled();
}
// then let's register all remote fragment's initialization
futures. This need
@@ -1107,62 +1057,51 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
CompletableFuture<Void> resultOfSending =
sendFragment(nodeName, fragment.serialized(),
fragmentDesc, attributes, multiStepPlan.catalogVersion());
- resultsOfFragmentSending.add(
- resultOfSending.handle((ignored, t) -> {
- if (t == null) {
- return null;
- }
-
- // if we were unable to send a request, then
no need
- // to wait for the remote node to complete
initialization
-
- CompletableFuture<?> completionFuture =
remoteFragmentInitCompletion.get(
- new RemoteFragmentKey(nodeName,
fragment.fragmentId())
- );
-
- if (completionFuture != null) {
- completionFuture.complete(null);
- }
-
- throw ExceptionUtils.withCause(
- t instanceof NodeLeftException ?
NodeLeftException::new : IgniteInternalException::new,
- INTERNAL_ERR,
- format("Unable to send fragment
[targetNode={}, fragmentId={}, cause={}]",
- nodeName,
fragment.fragmentId(), t.getMessage()), t
- );
- })
- );
+ resultOfSending.whenComplete((ignored, t) -> {
+ if (t == null) {
+ return;
+ }
+
+ // if we were unable to send a request, then no need
+ // to wait for the remote node to complete
initialization
+
+ CompletableFuture<?> completionFuture =
remoteFragmentInitCompletion.get(
+ new RemoteFragmentKey(nodeName,
fragment.fragmentId())
+ );
+
+ if (completionFuture != null) {
+ completionFuture.complete(null);
+ }
+ });
+
+ resultsOfFragmentSending.add(resultOfSending);
}
}
- CompletableFutures.allOf(resultsOfFragmentSending)
+ return CompletableFutures.allOf(resultsOfFragmentSending)
.handle((ignoredVal, ignoredTh) -> {
if (ignoredTh == null) {
- return null;
+ return root;
}
- Throwable firstFoundError = null;
+ Throwable error =
deriveExceptionFromListOfFutures(resultsOfFragmentSending);
- for (CompletableFuture<?> fut :
resultsOfFragmentSending) {
- if (fut.isCompletedExceptionally()) {
- // this is non blocking join() because we are
inside of CompletableFuture.allOf call
- Throwable fromFuture = fut.handle((ignored,
ex) -> ex).join();
+ assert error != null;
- if (firstFoundError == null) {
- firstFoundError = fromFuture;
- } else {
- firstFoundError.addSuppressed(fromFuture);
- }
- }
- }
+ return
CompletableFutures.allOf(remoteFragmentInitCompletion.values())
+ .thenCompose(none -> {
+ if (!root.completeExceptionally(error)) {
+ root.thenAccept(root ->
root.onError(error));
- Throwable error = firstFoundError;
- if (!root.completeExceptionally(error)) {
- root.thenAccept(root -> root.onError(error));
- }
+ close(QueryCompletionReason.ERROR);
+ }
- return null;
- });
+ return cancelFut
+ .thenRun(() -> sneakyThrow(error));
+ })
+ .thenCompose(none -> root);
+ })
+ .thenCompose(Commons::cast);
}
private void enlistPartitions(MappedFragment mappedFragment,
InternalTransaction tx) {
@@ -1258,17 +1197,6 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
queryManagerMap.remove(ctx.queryId());
- QueryCancel cancelHandler = ctx.cancel();
-
- // Query cancel runs only at the coordinator node.
- if (cancelHandler != null) {
- try {
- cancelHandler.cancel();
- } catch (Exception th) {
- LOG.debug("Exception raised while cancel", th);
- }
- }
-
cancelFut.complete(null);
}).thenRun(() -> localFragments.forEach(f ->
f.context().cancel()));
@@ -1366,6 +1294,49 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
return nullCompletedFuture();
}
+
+ private AsyncCursor<InternalSqlRow>
wrapRootNode(AsyncCursor<InternalSqlRow> cursor) {
+ return new AsyncCursor<>() {
+ @Override
+ public CompletableFuture<BatchedResult<InternalSqlRow>>
requestNextAsync(int rows) {
+ CompletableFuture<BatchedResult<InternalSqlRow>> fut =
cursor.requestNextAsync(rows);
+
+ fut.thenAccept(batch -> {
+ if (!batch.hasMore()) {
+ DistributedQueryManager.this.close();
+ }
+ });
+
+ return fut;
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ return DistributedQueryManager.this.close();
+ }
+ };
+ }
+ }
+
+ private static @Nullable Throwable
deriveExceptionFromListOfFutures(List<CompletableFuture<?>> futures) {
+ Throwable firstFoundError = null;
+
+ for (CompletableFuture<?> fut : futures) {
+ assert fut.isDone();
+
+ if (fut.isCompletedExceptionally()) {
+ // all futures are expected to be completed by this point
+ Throwable fromFuture = fut.handle((ignored, ex) -> ex).join();
+
+ if (firstFoundError == null) {
+ firstFoundError = fromFuture;
+ } else {
+ firstFoundError.addSuppressed(fromFuture);
+ }
+ }
+ }
+
+ return firstFoundError;
}
/**
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/CursorInitializationPhaseHandler.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/CursorInitializationPhaseHandler.java
index c38b9e70fe..b5eed6a5db 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/CursorInitializationPhaseHandler.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/CursorInitializationPhaseHandler.java
@@ -17,13 +17,14 @@
package org.apache.ignite.internal.sql.engine.exec.fsm;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursorImpl;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.SqlOperationContext;
import
org.apache.ignite.internal.sql.engine.SqlQueryProcessor.PrefetchCallback;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
-import org.apache.ignite.internal.sql.engine.exec.AsyncDataCursor;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
@@ -41,53 +42,51 @@ class CursorInitializationPhaseHandler implements
ExecutionPhaseHandler {
assert plan != null;
assert context != null;
- AsyncDataCursor<InternalSqlRow> dataCursor =
query.executor.executePlan(context, plan);
-
SqlQueryType queryType = plan.type();
-
PrefetchCallback prefetchCallback = context.prefetchCallback();
assert prefetchCallback != null;
- AsyncSqlCursorImpl<InternalSqlRow> cursor = new AsyncSqlCursorImpl<>(
- queryType,
- plan.metadata(),
- dataCursor,
- query.nextCursorFuture
- );
-
- query.cursor = cursor;
-
- QueryTransactionContext txContext = query.txContext;
-
- assert txContext != null;
-
- if (queryType == SqlQueryType.QUERY) {
- if (txContext.explicitTx() == null) {
- // TODO: IGNITE-23604
- // implicit transaction started by InternalTable doesn't
update observableTimeTracker. At
- // this point we don't know whether tx was started by
InternalTable or ExecutionService, thus
- // let's update tracker explicitly to preserve consistency
- txContext.updateObservableTime(query.executor.clockNow());
- }
-
- // preserve lazy execution for statements that only reads
- return Result.completed();
- }
-
- // for other types let's wait for the first page to make sure premature
- // close of the cursor won't cancel an entire operation
- CompletableFuture<Void> awaitFuture = cursor.onFirstPageReady()
- .thenApply(none -> {
- if (txContext.explicitTx() == null) {
- // TODO: IGNITE-23604
- // implicit transaction started by InternalTable
doesn't update observableTimeTracker. At
- // this point we don't know whether tx was started by
InternalTable or ExecutionService, thus
- // let's update tracker explicitly to preserve
consistency
-
txContext.updateObservableTime(query.executor.clockNow());
+ CompletableFuture<Void> awaitFuture =
query.executor.executePlan(context, plan)
+ .thenCompose(dataCursor -> {
+ AsyncSqlCursorImpl<InternalSqlRow> cursor = new
AsyncSqlCursorImpl<>(
+ queryType,
+ plan.metadata(),
+ dataCursor,
+ query.nextCursorFuture
+ );
+
+ query.cursor = cursor;
+
+ QueryTransactionContext txContext = query.txContext;
+
+ assert txContext != null;
+
+ if (queryType == SqlQueryType.QUERY) {
+ if (txContext.explicitTx() == null) {
+ // TODO: IGNITE-23604
+ // implicit transaction started by InternalTable
doesn't update observableTimeTracker. At
+ // this point we don't know whether tx was started
by InternalTable or ExecutionService, thus
+ // let's update tracker explicitly to preserve
consistency
+
txContext.updateObservableTime(query.executor.clockNow());
+ }
+
+ // preserve lazy execution for statements that only
reads
+ return nullCompletedFuture();
}
- return null;
+ // for other types let's wait for the first page to make
sure premature
+ // close of the cursor won't cancel an entire operation
+ return cursor.onFirstPageReady()
+ .thenRun(() -> {
+ if (txContext.explicitTx() == null) {
+ // TODO: IGNITE-23604
+ // implicit transaction started by
InternalTable doesn't update observableTimeTracker. At
+ // this point we don't know whether tx was
started by InternalTable or ExecutionService, thus
+ // let's update tracker explicitly to
preserve consistency
+
txContext.updateObservableTime(query.executor.clockNow());
+ }
+ });
});
return Result.proceedAfter(awaitFuture);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
index f54f7e8a71..f1da01abc6 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.sql.engine.exec.fsm;
import java.util.List;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
+import org.apache.ignite.internal.sql.engine.SqlOperationContext;
+import org.apache.ignite.internal.sql.engine.message.UnknownNodeException;
/**
* Generic execution program, which accepts query string and initialization
parameters as input, and returns cursor as result.
@@ -58,11 +60,43 @@ class QueryExecutionProgram extends
Program<AsyncSqlCursor<InternalSqlRow>> {
TRANSITIONS,
phase -> phase == ExecutionPhase.EXECUTING,
query -> query.cursor,
- (query, throwable) -> {
- query.onError(throwable);
-
- return false;
- }
+ QueryExecutionProgram::errorHandler
);
}
+
+ static boolean errorHandler(Query query, Throwable th) {
+ if (canRecover(query, th)) {
+ SqlOperationContext context = query.operationContext;
+
+ assert context != null;
+
+ // ensured by canRecover() method
+ assert th instanceof UnknownNodeException : th;
+
+ UnknownNodeException exception = (UnknownNodeException) th;
+
+ context.excludeNode(exception.nodeName());
+
+ return true;
+ }
+
+ query.onError(th);
+
+ return false;
+ }
+
+ @SuppressWarnings("SimplifiableIfStatement")
+ private static boolean canRecover(Query query, Throwable th) {
+ if (query.currentPhase() != ExecutionPhase.CURSOR_INITIALIZATION) {
+ return false;
+ }
+
+ // DataCursor unconditionally rolls back transaction in case of error,
therefore it's not possible
+ // to recover any explicit transaction at the moment
+ if (query.txContext.explicitTx() != null) {
+ return false;
+ }
+
+ return th instanceof UnknownNodeException;
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
index a634265a9c..084b35fbe9 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
@@ -284,7 +284,7 @@ public class QueryExecutor implements LifecycleAware {
return clockService.now();
}
- AsyncDataCursor<InternalSqlRow> executePlan(
+ CompletableFuture<AsyncDataCursor<InternalSqlRow>> executePlan(
SqlOperationContext ctx,
QueryPlan plan
) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ScriptItemExecutionProgram.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ScriptItemExecutionProgram.java
index a7712a2b81..c611c5f59e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ScriptItemExecutionProgram.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ScriptItemExecutionProgram.java
@@ -49,11 +49,7 @@ class ScriptItemExecutionProgram extends
Program<AsyncSqlCursor<InternalSqlRow>>
TRANSITIONS,
phase -> phase == ExecutionPhase.EXECUTING,
query -> query.cursor,
- (query, throwable) -> {
- query.onError(throwable);
-
- return false;
- }
+ QueryExecutionProgram::errorHandler
);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionService.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingException.java
similarity index 63%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionService.java
copy to
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingException.java
index e62e4e576c..ec6a56d78b 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionService.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingException.java
@@ -15,17 +15,14 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.exec;
+package org.apache.ignite.internal.sql.engine.exec.mapping;
-import org.apache.ignite.internal.sql.engine.InternalSqlRow;
-import org.apache.ignite.internal.sql.engine.SqlOperationContext;
-import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
+/** Generic mapping exception. */
+public class MappingException extends RuntimeException {
+ private static final long serialVersionUID = -5428003837694156132L;
-/**
- * SQL query plan execution interface.
- */
-public interface ExecutionService extends LifecycleAware {
- AsyncDataCursor<InternalSqlRow> executePlan(
- QueryPlan plan, SqlOperationContext operationContext
- );
+ /** Constructs the object. */
+ public MappingException(String message) {
+ super(message, null, true, false);
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
index 1e94ff3b89..3eb2faca33 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
@@ -17,42 +17,51 @@
package org.apache.ignite.internal.sql.engine.exec.mapping;
+import java.util.function.Predicate;
import org.apache.ignite.internal.util.ArrayUtils;
+import org.jetbrains.annotations.Nullable;
/** Additional information used for mapping. */
public class MappingParameters {
-
/** Empty mapping parameters. */
- public static final MappingParameters EMPTY = new
MappingParameters(ArrayUtils.OBJECT_EMPTY_ARRAY, false);
+ public static final MappingParameters EMPTY = new
MappingParameters(ArrayUtils.OBJECT_EMPTY_ARRAY, false, null);
/** Allow map on backups. */
- public static final MappingParameters MAP_ON_BACKUPS = new
MappingParameters(ArrayUtils.OBJECT_EMPTY_ARRAY, true);
+ public static final MappingParameters MAP_ON_BACKUPS = new
MappingParameters(ArrayUtils.OBJECT_EMPTY_ARRAY, true, null);
private final boolean mapOnBackups;
private final Object[] dynamicParameters;
+ private final @Nullable Predicate<String> nodeExclusionFilter;
/**
* Creates mapping parameters.
*
* @param dynamicParameters Dynamic parameters.
* @param mapOnBackups Whether to use non-primary replicas for mapping or
not.
+ * @param nodeExclusionFilter If provided, all nodes which meet the
predicate must be excluded from mapping.
*
* @return Mapping parameters.
*/
- public static MappingParameters create(Object[] dynamicParameters, boolean
mapOnBackups) {
- if (dynamicParameters.length == 0) {
+ public static MappingParameters create(
+ Object[] dynamicParameters,
+ boolean mapOnBackups,
+ @Nullable Predicate<String> nodeExclusionFilter
+ ) {
+ if (dynamicParameters.length == 0 && nodeExclusionFilter == null) {
return mapOnBackups ? MAP_ON_BACKUPS : EMPTY;
} else {
- return new MappingParameters(dynamicParameters, mapOnBackups);
+ return new MappingParameters(dynamicParameters, mapOnBackups,
nodeExclusionFilter);
}
}
private MappingParameters(
Object[] dynamicParameters,
- boolean mapOnBackups
+ boolean mapOnBackups,
+ @Nullable Predicate<String> nodeExclusionFilter
) {
this.dynamicParameters = dynamicParameters;
this.mapOnBackups = mapOnBackups;
+ this.nodeExclusionFilter = nodeExclusionFilter;
}
/** Values of dynamic parameters. */
@@ -63,4 +72,9 @@ public class MappingParameters {
boolean mapOnBackups() {
return mapOnBackups;
}
+
+ /** If returned, all nodes which meet the predicate must be excluded from
mapping. */
+ @Nullable Predicate<String> nodeExclusionFilter() {
+ return nodeExclusionFilter;
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
index 34c5cd4f2c..e6bc8fd307 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
@@ -36,6 +36,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -62,6 +63,7 @@ import
org.apache.ignite.internal.sql.engine.util.cache.CacheFactory;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.lang.ErrorGroups.Sql;
import org.apache.ignite.sql.SqlException;
+import org.jetbrains.annotations.Nullable;
/**
* An implementation of {@link MappingService}.
@@ -125,36 +127,44 @@ public class MappingServiceImpl implements MappingService
{
FragmentsTemplate template = getOrCreateTemplate(multiStepPlan);
boolean mapOnBackups = parameters.mapOnBackups();
+ Predicate<String> nodeExclusionFilter =
parameters.nodeExclusionFilter();
- MappingsCacheValue cacheValue = mappingsCache.compute(
- new MappingsCacheKey(multiStepPlan.id(), mapOnBackups),
- (key, val) -> {
- if (val == null) {
- IntSet tableIds = new IntOpenHashSet();
- boolean topologyAware = false;
-
- for (Fragment fragment : template.fragments) {
- topologyAware = topologyAware ||
!fragment.systemViews().isEmpty();
- for (IgniteDataSource source :
fragment.tables().values()) {
- tableIds.add(source.id());
+ CompletableFuture<MappedFragments> mappedFragments;
+ if (nodeExclusionFilter != null) {
+ mappedFragments = mapFragments(template, mapOnBackups,
nodeExclusionFilter);
+ } else {
+ mappedFragments = mappingsCache.compute(
+ new MappingsCacheKey(multiStepPlan.id(), mapOnBackups),
+ (key, val) -> {
+ if (val == null) {
+ IntSet tableIds = new IntOpenHashSet();
+ boolean topologyAware = false;
+
+ for (Fragment fragment : template.fragments) {
+ topologyAware = topologyAware ||
!fragment.systemViews().isEmpty();
+ for (IgniteDataSource source :
fragment.tables().values()) {
+ tableIds.add(source.id());
+ }
}
- }
- long topVer = topologyAware ?
logicalTopologyVerSupplier.get() : Long.MAX_VALUE;
+ long topVer = topologyAware ?
logicalTopologyVerSupplier.get() : Long.MAX_VALUE;
- return new MappingsCacheValue(topVer, tableIds,
mapFragments(template, mapOnBackups));
- }
+ assert nodeExclusionFilter == null;
- long topologyVer = logicalTopologyVerSupplier.get();
+ return new MappingsCacheValue(topVer, tableIds,
mapFragments(template, mapOnBackups, null));
+ }
- if (val.topologyVersion < topologyVer) {
- return new MappingsCacheValue(topologyVer,
val.tableIds, mapFragments(template, mapOnBackups));
- }
+ long topologyVer = logicalTopologyVerSupplier.get();
+
+ if (val.topologyVersion < topologyVer) {
+ return new MappingsCacheValue(topologyVer,
val.tableIds, mapFragments(template, mapOnBackups, null));
+ }
- return val;
- });
+ return val;
+ }).mappedFragments;
+ }
- return cacheValue.mappedFragments.thenApply(frags ->
applyPartitionPruning(frags.fragments, parameters));
+ return mappedFragments.thenApply(frags ->
applyPartitionPruning(frags.fragments, parameters));
}
CompletableFuture<DistributionHolder> composeDistributions(
@@ -207,7 +217,8 @@ public class MappingServiceImpl implements MappingService {
private CompletableFuture<MappedFragments> mapFragments(
FragmentsTemplate template,
- boolean mapOnBackups
+ boolean mapOnBackups,
+ @Nullable Predicate<String> nodeExclusionFilter
) {
Set<IgniteSystemView> views =
template.fragments.stream().flatMap(fragment -> fragment.systemViews().stream())
.collect(Collectors.toSet());
@@ -220,7 +231,9 @@ public class MappingServiceImpl implements MappingService {
return res.thenApply(assignments -> {
Int2ObjectMap<ExecutionTarget> targetsById = new
Int2ObjectOpenHashMap<>();
- MappingContext context = new MappingContext(localNodeName, new
ArrayList<>(assignments.nodes()), template.cluster);
+ MappingContext context = new MappingContext(
+ localNodeName, assignments.nodes(nodeExclusionFilter),
template.cluster
+ );
ExecutionTargetFactory targetFactory = context.targetFactory();
@@ -429,8 +442,14 @@ public class MappingServiceImpl implements MappingService {
this.nodesPerView = nodesPerView;
}
- Set<String> nodes() {
- return nodes;
+ List<String> nodes(@Nullable Predicate<String> nodeExclusionFilter) {
+ if (nodeExclusionFilter == null) {
+ return List.copyOf(nodes);
+ }
+
+ return nodes.stream()
+ .filter(nodeExclusionFilter.negate())
+ .collect(Collectors.toList());
}
List<TokenizedAssignments> tableAssignments(int tableId) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
index d5d5be9877..99f353cee1 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
@@ -22,11 +22,13 @@ import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import java.util.BitSet;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
+import org.apache.ignite.internal.sql.engine.exec.mapping.MappingException;
/**
* A factory that able to create targets for cluster with up to 64 nodes.
@@ -55,7 +57,9 @@ public class LargeClusterFactory implements
ExecutionTargetFactory {
for (String name : nodes) {
int id = nodeNameToId.getOrDefault(name, -1);
- assert id >= 0 : "invalid node";
+ if (id == -1) {
+ throw new MappingException("Mandatory node was excluded from
mapping: " + name);
+ }
nodesSet.set(id);
}
@@ -85,12 +89,18 @@ public class LargeClusterFactory implements
ExecutionTargetFactory {
for (Assignment a : assignment.nodes()) {
int node = nodeNameToId.getOrDefault(a.consistentId(), -1);
- assert node >= 0 : "invalid node";
-
- nodes.set(node);
+ if (node != -1) {
+ nodes.set(node);
+ }
}
- assert !nodes.isEmpty() : "No partition node found";
+ if (nodes.isEmpty()) {
+ List<String> nodes0 = assignment.nodes().stream()
+ .map(Assignment::consistentId)
+ .collect(Collectors.toList());
+
+ throw new MappingException("Mandatory nodes was excluded from
mapping: " + nodes0);
+ }
finalised = finalised && nodes.cardinality() < 2;
@@ -121,14 +131,20 @@ public class LargeClusterFactory implements
ExecutionTargetFactory {
}
private BitSet nodeListToMap(List<String> nodes) {
+ assert !nodes.isEmpty() : "Empty target is not allowed";
+
BitSet nodesSet = new BitSet(nodeNameToId.size());
for (String name : nodes) {
int id = nodeNameToId.getOrDefault(name, -1);
- assert id >= 0 : "invalid node";
+ if (id >= 0) {
+ nodesSet.set(id);
+ }
+ }
- nodesSet.set(id);
+ if (nodesSet.isEmpty()) {
+ throw new MappingException("Mandatory nodes was excluded from
mapping: " + nodes);
}
return nodesSet;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
index fde0fe0fac..c4accfc78c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
@@ -29,6 +29,7 @@ import
org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
+import org.apache.ignite.internal.sql.engine.exec.mapping.MappingException;
/**
* A factory that able to create targets for cluster with up to 64 nodes.
@@ -60,7 +61,11 @@ public class SmallClusterFactory implements
ExecutionTargetFactory {
for (String name : nodes) {
long node = nodeNameToId.getOrDefault(name, -1);
- assert node >= 0 : "invalid node";
+
+ if (node == -1) {
+ throw new MappingException("Mandatory node was excluded from
mapping: " + name);
+ }
+
nodesMap |= node;
}
@@ -89,12 +94,18 @@ public class SmallClusterFactory implements
ExecutionTargetFactory {
for (Assignment a : assignment.nodes()) {
long node = nodeNameToId.getOrDefault(a.consistentId(), -1);
- assert node >= 0 : "invalid node";
-
- currentPartitionNodes |= node;
+ if (node != -1) {
+ currentPartitionNodes |= node;
+ }
}
- assert currentPartitionNodes != 0L : "No partition node found";
+ if (currentPartitionNodes == 0) {
+ List<String> nodes = assignment.nodes().stream()
+ .map(Assignment::consistentId)
+ .collect(Collectors.toList());
+
+ throw new MappingException("Mandatory nodes was excluded from
mapping: " + nodes);
+ }
finalised = finalised && isPow2(currentPartitionNodes);
@@ -126,14 +137,20 @@ public class SmallClusterFactory implements
ExecutionTargetFactory {
}
private long nodeListToMap(List<String> nodes) {
+ assert !nodes.isEmpty() : "Empty target is not allowed";
+
long nodesMap = 0;
for (String name : nodes) {
long node = nodeNameToId.getOrDefault(name, -1);
- assert node >= 0 : "invalid node";
+ if (node >= 0) {
+ nodesMap |= node;
+ }
+ }
- nodesMap |= node;
+ if (nodesMap == 0) {
+ throw new MappingException("Mandatory nodes was excluded from
mapping: " + nodes);
}
return nodesMap;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageService.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageService.java
index 4fca0b0140..e35cef1744 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageService.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageService.java
@@ -31,6 +31,9 @@ public interface MessageService extends LifecycleAware {
*
* @param nodeName Node consistent ID.
* @param msg Message.
+ * @return Future which will be completed successfully when message is
sent. The future returned may be completed exceptionally with
+ * {@link UnknownNodeException} when recipient cannot be found by
given name, or any other error in case something went
+ * wrong during sending.
*/
CompletableFuture<Void> send(String nodeName, NetworkMessage msg);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
index 2cdd9a0432..1ac2a92479 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine.message;
import static
org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup.GROUP_TYPE;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
import java.util.HashMap;
import java.util.Map;
@@ -28,6 +29,7 @@ import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.network.ChannelType;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.UnresolvableConsistentIdException;
import org.apache.ignite.internal.replicator.message.TimestampAware;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -88,7 +90,16 @@ public class MessageServiceImpl implements MessageService {
return nullCompletedFuture();
} else {
- return messagingSrvc.send(nodeName, ChannelType.DEFAULT, msg);
+ return messagingSrvc.send(nodeName, ChannelType.DEFAULT, msg)
+ .exceptionally(ex -> {
+ if (ex instanceof
UnresolvableConsistentIdException) {
+ ex = new UnknownNodeException(nodeName);
+ }
+
+ sneakyThrow(ex);
+
+ throw new AssertionError("Should not get here");
+ });
}
} catch (Exception ex) {
return CompletableFuture.failedFuture(ex);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionService.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/UnknownNodeException.java
similarity index 55%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionService.java
copy to
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/UnknownNodeException.java
index e62e4e576c..318c5c7328 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionService.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/UnknownNodeException.java
@@ -15,17 +15,23 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.exec;
+package org.apache.ignite.internal.sql.engine.message;
-import org.apache.ignite.internal.sql.engine.InternalSqlRow;
-import org.apache.ignite.internal.sql.engine.SqlOperationContext;
-import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
+/** Thrown by {@link MessageService} when recipient cannot be found in
physical topology. */
+public class UnknownNodeException extends RuntimeException {
+ private static final long serialVersionUID = -8242883657846080305L;
-/**
- * SQL query plan execution interface.
- */
-public interface ExecutionService extends LifecycleAware {
- AsyncDataCursor<InternalSqlRow> executePlan(
- QueryPlan plan, SqlOperationContext operationContext
- );
+ private final String nodeName;
+
+ /** Constructs the object. */
+ UnknownNodeException(String nodeName) {
+ super("Unknown node: " + nodeName, null, true, false);
+
+ this.nodeName = nodeName;
+ }
+
+ /** Returns name of the node that cannot be found in physical topology. */
+ public String nodeName() {
+ return nodeName;
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index c0cc139efa..72c8cb29e0 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -23,15 +23,13 @@ import static
org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static org.apache.ignite.lang.ErrorGroups.Common.NODE_LEFT_ERR;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -39,7 +37,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
@@ -62,7 +59,6 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingQueue;
@@ -163,6 +159,7 @@ import org.junit.jupiter.api.TestInfo;
/**
* Test class to verify {@link ExecutionServiceImplTest}.
*/
+@SuppressWarnings("ThrowableNotThrown")
public class ExecutionServiceImplTest extends BaseIgniteAbstractTest {
/** Tag allows to skip default cluster setup. */
private static final String CUSTOM_CLUSTER_SETUP_TAG =
"skipDefaultClusterSetup";
@@ -280,7 +277,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
- AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan,
ctx);
+ AsyncCursor<InternalSqlRow> cursor =
await(execService.executePlan(plan, ctx));
assertTrue(waitForCondition(
() -> executionServices.stream().map(es ->
es.localFragments(ctx.queryId()).size())
@@ -320,7 +317,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
- AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan,
ctx);
+ AsyncCursor<InternalSqlRow> cursor =
await(execService.executePlan(plan, ctx));
assertTrue(waitForCondition(
() -> executionServices.stream().map(es ->
es.localFragments(ctx.queryId()).size())
@@ -377,7 +374,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
return nullCompletedFuture();
});
- AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan,
ctx);
+ AsyncCursor<InternalSqlRow> cursor =
await(execService.executePlan(plan, ctx));
CompletionStage<?> batchFut = cursor.requestNextAsync(1);
@@ -418,25 +415,15 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
- AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan,
ctx);
-
- var batchFut = cursor.requestNextAsync(1);
-
- await(batchFut.exceptionally(ex -> {
- assertInstanceOf(CompletionException.class, ex);
- assertInstanceOf(SqlException.class, ex.getCause());
- assertInstanceOf(IgniteException.class, ex.getCause().getCause());
- assertInstanceOf(mappingException.getClass(),
ex.getCause().getCause().getCause());
- assertEquals(mappingException.getMessage(),
ex.getCause().getCause().getCause().getMessage());
-
- return null;
- }));
-
- assertTrue(batchFut.toCompletableFuture().isCompletedExceptionally());
+ IgniteTestUtils.assertThrows(
+ IllegalStateException.class,
+ () -> await(execService.executePlan(plan, ctx)),
+ mappingException.getMessage()
+ );
}
@Test
- void cursorCloseCompletesSuccessfullyIfRootWasInitializedWithError()
throws InterruptedException {
+ void testErrorOnCursorInitialization() throws InterruptedException {
ExecutionService execService = executionServices.get(0);
SqlOperationContext ctx = createContext();
QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
@@ -460,9 +447,9 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
return nullCompletedFuture();
});
- AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan,
ctx);
+ RuntimeException actualException =
assertWillThrow(execService.executePlan(plan, ctx), RuntimeException.class);
- await(cursor.closeAsync());
+ assertEquals(expectedEx, actualException);
// try gather all possible nodes.
List<AbstractNode<?>> execNodes = executionServices.stream()
@@ -486,7 +473,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
- AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan,
ctx);
+ AsyncCursor<InternalSqlRow> cursor =
await(execService.executePlan(plan, ctx));
assertTrue(waitForCondition(
() -> executionServices.stream().map(es ->
es.localFragments(ctx.queryId()).size())
@@ -530,9 +517,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
QueryPlan plan = prepare("SELECT 1", ctx);
- AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan,
ctx);
-
- assertThrows(ExceptionInInitializerError.class, () ->
await(cursor.requestNextAsync(1), 2, TimeUnit.SECONDS));
+ assertWillThrow(execService.executePlan(plan, ctx),
ExceptionInInitializerError.class);
}
/**
@@ -544,7 +529,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
SqlOperationContext ctx = createContext();
QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
- AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan,
ctx);
+ AsyncCursor<InternalSqlRow> cursor =
await(execService.executePlan(plan, ctx));
BatchedResult<?> res = await(cursor.requestNextAsync(8));
assertNotNull(res);
@@ -570,7 +555,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
SqlOperationContext ctx = createContext();
QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
- AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan,
ctx);
+ AsyncCursor<InternalSqlRow> cursor =
await(execService.executePlan(plan, ctx));
BatchedResult<?> res = await(cursor.requestNextAsync(9));
assertNotNull(res);
@@ -587,12 +572,10 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
*/
@Test
public void testCursorIsClosedAfterAllDataReadWithNodeFailure() throws
InterruptedException {
- ExecutionServiceImpl execService = executionServices.get(0);
+ ExecutionServiceImpl<InternalSqlRow> execService =
(ExecutionServiceImpl<InternalSqlRow>) executionServices.get(0);
SqlOperationContext ctx = createContext();
QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
- AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan,
ctx);
-
// node failed trigger
CountDownLatch nodeFailedLatch = new CountDownLatch(1);
// start response trigger
@@ -623,6 +606,8 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
}
}));
+ AsyncCursor<InternalSqlRow> cursor =
await(execService.executePlan(plan, ctx));
+
CompletableFuture<BatchedResult<InternalSqlRow>> resFut =
cursor.requestNextAsync(9);
startResponse.await();
@@ -680,14 +665,11 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
}
}));
- AsyncDataCursor<InternalSqlRow> cursor = execService.executePlan(plan,
ctx);
-
- // Wait till the query fails due to nodes' unavailability.
- ExecutionException eex = assertThrows(ExecutionException.class, () ->
cursor.requestNextAsync(1).get(10, TimeUnit.SECONDS));
- assertThat(eex.getCause(), instanceOf(SqlException.class));
- assertThat(eex.getCause().getCause(),
instanceOf(NodeLeftException.class));
- assertThat(eex.getCause().getCause().getMessage(),
containsString("cause=Node left the cluster"));
- assertThat(((NodeLeftException) eex.getCause().getCause()).code(),
equalTo(NODE_LEFT_ERR));
+ IgniteTestUtils.assertThrowsWithCause(
+ () -> await(execService.executePlan(plan, ctx)),
+ NodeLeftException.class,
+ "Node left the cluster"
+ );
CompletableFuture<Void> stopFuture = CompletableFuture.runAsync(() -> {
try {
@@ -709,7 +691,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
public void testPrefetchCallbackInvocation() throws Exception {
String query = "SELECT * FROM test_tbl";
int totalStatements = 20;
- Collection<AsyncCursor<InternalSqlRow>> resultCursors = new
ArrayBlockingQueue<>(totalStatements);
+ Collection<CompletableFuture<AsyncDataCursor<InternalSqlRow>>>
resultCursors = new ArrayBlockingQueue<>(totalStatements);
List<String> queries = IntStream.range(0,
totalStatements).boxed().map(n -> query).collect(Collectors.toList());
ArrayBlockingQueue<String> queriesQueue = new
ArrayBlockingQueue<>(totalStatements, false, queries);
AtomicReference<AssertionError> errHolder = new AtomicReference<>();
@@ -757,7 +739,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
assertEquals(queries.size(), resultCursors.size());
CompletableFuture<?>[] closeFutures = resultCursors.stream()
- .map(AsyncCursor::closeAsync)
+ .map(cursorFuture ->
cursorFuture.thenCompose(AsyncCursor::closeAsync))
.toArray(CompletableFuture[]::new);
assertThat(allOf(closeFutures), willCompleteSuccessfully());
@@ -789,7 +771,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
});
QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
- AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan,
ctx);
+ AsyncCursor<InternalSqlRow> cursor =
await(execService.executePlan(plan, ctx));
assertThat(prefetchCallback.prefetchFuture(),
willThrow(equalTo(expectedException)));
@@ -812,7 +794,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
// Should immediately trigger query cancel exception.
IgniteTestUtils.assertThrows(QueryCancelledException.class,
- () -> execService.executePlan(plan, ctx),
+ () -> await(execService.executePlan(plan, ctx)),
"The query was cancelled while executing"
);
}
@@ -852,7 +834,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
}
}));
- AsyncCursor<InternalSqlRow> cursor = execService.executePlan(plan,
ctx);
+ AsyncCursor<InternalSqlRow> cursor =
await(execService.executePlan(plan, ctx));
startResponseLatch.await(TIMEOUT_IN_MS, TimeUnit.MILLISECONDS);
@@ -945,7 +927,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
// Should immediately trigger query cancel exception.
IgniteTestUtils.assertThrows(QueryCancelledException.class,
- () -> execService.executePlan(plan, ctx),
+ () -> await(execService.executePlan(plan, ctx)),
"Query timeout"
);
}
@@ -1068,18 +1050,17 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
.build();
CompletableFuture<AsyncDataCursor<InternalSqlRow>> execPlanFut =
- runAsync(() -> executionServices.get(0).executePlan(plan,
ctx));
+ runAsync(() ->
await(executionServices.get(0).executePlan(plan, ctx)));
// Wait until timeout is fired and unblock mapping service.
assertThat(timeoutFut, willCompleteSuccessfully());
mappingsCacheAccessBlock.countDown();
- AsyncDataCursor<InternalSqlRow> cursor = await(execPlanFut);
//noinspection ThrowableNotThrown
IgniteTestUtils.assertThrowsWithCause(
- () -> await(cursor.requestNextAsync(9)),
- SqlException.class,
+ () -> await(execPlanFut),
+ QueryCancelledException.class,
"Query timeout"
);
@@ -1256,7 +1237,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
AsyncCursor<InternalSqlRow> cursor;
try {
- cursor = execService.executePlan(plan, execCtx);
+ cursor = await(execService.executePlan(plan, execCtx));
} catch (QueryCancelledException e) {
// This might happen when initialization took longer than a
time out,
// Retry to get a proper error.
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRecoveryTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRecoveryTest.java
new file mode 100644
index 0000000000..7e14487bc4
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRecoveryTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
+import org.apache.ignite.internal.sql.engine.InternalSqlRow;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
+import org.apache.ignite.internal.sql.engine.framework.NoOpTransactionTracker;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import org.apache.ignite.internal.sql.engine.framework.TestCluster;
+import org.apache.ignite.internal.sql.engine.framework.TestNode;
+import org.apache.ignite.internal.sql.engine.message.UnknownNodeException;
+import org.apache.ignite.internal.sql.engine.prepare.QueryMetadata;
+import org.apache.ignite.internal.sql.engine.property.SqlProperties;
+import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
+import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
+import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapperImpl;
+import org.apache.ignite.internal.sql.engine.util.InjectQueryCheckerFactory;
+import org.apache.ignite.internal.sql.engine.util.QueryChecker;
+import org.apache.ignite.internal.sql.engine.util.QueryCheckerExtension;
+import org.apache.ignite.internal.sql.engine.util.QueryCheckerFactory;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.HybridTimestampTracker;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.lang.CancellationToken;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/**
+ * Tests for test execution runtime used in benchmarking.
+ */
+@SuppressWarnings("ThrowableNotThrown")
+@ExtendWith(QueryCheckerExtension.class)
+public class QueryRecoveryTest extends BaseIgniteAbstractTest {
+ private static final List<String> DATA_NODES = List.of("DATA_1", "DATA_2");
+ private static final String GATEWAY_NODE_NAME = "gateway";
+
+ @InjectQueryCheckerFactory
+ private static QueryCheckerFactory queryCheckerFactory;
+
+ private TestCluster cluster;
+
+ @BeforeEach
+ void startCluster() {
+ cluster = TestBuilders.cluster()
+ .nodes(GATEWAY_NODE_NAME, DATA_NODES.toArray(new String[0]))
+ .build();
+
+ cluster.start();
+
+ TestNode gatewayNode = cluster.node(GATEWAY_NODE_NAME);
+
+ gatewayNode.initSchema("CREATE TABLE t1 (id INT PRIMARY KEY, part_id
INT, node VARCHAR(128))");
+
+ cluster.setAssignmentsProvider("T1", (partitionCount, b) ->
+ IntStream.range(0, partitionCount)
+ .mapToObj(i -> DATA_NODES)
+ .collect(Collectors.toList())
+ );
+
+ cluster.setDataProvider("T1", TestBuilders.tableScan((nodeName,
partId) ->
+ Collections.singleton(new Object[]{partId, partId, nodeName}))
+ );
+ }
+
+ @AfterEach
+ void stopCluster() throws Exception {
+ cluster.stop();
+ }
+
+ @ParameterizedTest
+ @EnumSource
+ void
queryWithImplicitTxRecoversWhenNodeLeftClusterBeforeFragmentHasBeenSent(TxType
txType) throws Exception {
+ TestNode gatewayNode = cluster.node(GATEWAY_NODE_NAME);
+
+ QueryTransactionContext txContext = createTxContext(txType, true);
+
+ // mapping is supposed to be stable, thus if it returns 0th node from
DATA_NODES on some environment,
+ // it should return the same node on all environments
+ String firstExpectedNode = DATA_NODES.get(0);
+ assertQuery(gatewayNode, "SELECT node FROM t1 WHERE part_id = 0",
txContext)
+ .returns(firstExpectedNode)
+ .check();
+
+ cluster.node(firstExpectedNode).stop();
+
+ // first expected node is not available, query must be remapped to
next available node
+ assertQuery(gatewayNode, "SELECT node FROM t1 WHERE part_id = 0",
txContext)
+ .returns(DATA_NODES.get(1))
+ .check();
+ }
+
+ @ParameterizedTest
+ @EnumSource
+ void
queryWithExplicitTxCannotRecoverWhenNodeLeftClusterBeforeFragmentHasBeenSent(TxType
txType) throws Exception {
+ TestNode gatewayNode = cluster.node(GATEWAY_NODE_NAME);
+
+ QueryTransactionContext txContext = createTxContext(txType, false);
+
+ // mapping is supposed to be stable, thus if it returns 0th node from
DATA_NODES on some environment,
+ // it should return the same node on all environments
+ String firstExpectedNode = DATA_NODES.get(0);
+ assertQuery(gatewayNode, "SELECT node FROM t1 WHERE part_id = 0",
txContext)
+ .returns(firstExpectedNode)
+ .check();
+
+ cluster.node(firstExpectedNode).stop();
+
+ assertThrows(
+ UnknownNodeException.class,
+ () -> assertQuery(gatewayNode, "SELECT node FROM t1 WHERE
part_id = 0", txContext).check(),
+ "Unknown node: " + firstExpectedNode
+ );
+ }
+
+ private static QueryTransactionContext createTxContext(TxType type,
boolean implicit) {
+ InternalTransaction tx = type.create();
+ QueryTransactionWrapper wrapper = new QueryTransactionWrapperImpl(tx,
implicit, NoOpTransactionTracker.INSTANCE);
+ return new PredefinedTxContext(wrapper);
+ }
+
+ private static QueryChecker assertQuery(TestNode node, String query,
QueryTransactionContext txContext) {
+ return queryCheckerFactory.create(
+ node.name(),
+ new QueryProcessor() {
+ @Override
+ public CompletableFuture<QueryMetadata>
prepareSingleAsync(SqlProperties properties,
+ @Nullable InternalTransaction transaction, String
qry, Object... params) {
+ throw new AssertionError("Should not be called");
+ }
+
+ @Override
+ public CompletableFuture<AsyncSqlCursor<InternalSqlRow>>
queryAsync(
+ SqlProperties properties,
+ HybridTimestampTracker observableTime,
+ @Nullable InternalTransaction transaction,
+ @Nullable CancellationToken token,
+ String qry,
+ Object... params
+ ) {
+ return completedFuture(node.executeQuery(txContext,
query));
+ }
+
+ @Override
+ public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
+ return nullCompletedFuture();
+ }
+
+ @Override
+ public CompletableFuture<Void> stopAsync(ComponentContext
componentContext) {
+ return nullCompletedFuture();
+ }
+ },
+ null,
+ null,
+ query
+ );
+ }
+
+ static class PredefinedTxContext implements QueryTransactionContext {
+ private final QueryTransactionWrapper txWrapper;
+
+ PredefinedTxContext(QueryTransactionWrapper txWrapper) {
+ this.txWrapper = txWrapper;
+ }
+
+ @Override
+ public QueryTransactionWrapper getOrStartImplicit(boolean readOnly) {
+ return txWrapper;
+ }
+
+ @Override
+ public void updateObservableTime(HybridTimestamp time) {
+ // NO-OP
+ }
+
+ @Override
+ public @Nullable QueryTransactionWrapper explicitTx() {
+ return txWrapper.implicit() ? null : txWrapper;
+ }
+ }
+
+ enum TxType {
+ RW {
+ @Override
+ InternalTransaction create() {
+ return NoOpTransaction.readWrite("LOCALHOST", false);
+ }
+ },
+
+ RO {
+ @Override
+ InternalTransaction create() {
+ return NoOpTransaction.readOnly("LOCALHOST", false);
+ }
+ };
+
+ abstract InternalTransaction create();
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
index 5fc233d0be..1a4c2503ec 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
@@ -92,18 +92,26 @@ public class ExecutionTargetFactorySelfTest {
@MethodSource("clusterFactory")
void invalidTargets(ExecutionTargetFactory f) {
List<String> invalidNodeSet = List.of("node100");
- List<String> partiallyInvalidNodeSet =
CollectionUtils.concat(SINGLE_NODE_SET, invalidNodeSet);
-
- assertThrows(AssertionError.class, () -> f.allOf(invalidNodeSet),
"invalid node");
- assertThrows(AssertionError.class, () -> f.someOf(invalidNodeSet),
"invalid node");
- assertThrows(AssertionError.class, () -> f.oneOf(invalidNodeSet),
"invalid node");
- assertThrows(AssertionError.class, () ->
f.partitioned(assignmentFromPrimaries(invalidNodeSet)), "invalid node");
-
- assertThrows(AssertionError.class, () ->
f.allOf(partiallyInvalidNodeSet), "invalid node");
- assertThrows(AssertionError.class, () ->
f.resolveNodes(f.someOf(partiallyInvalidNodeSet)), "invalid node");
- assertThrows(AssertionError.class, () ->
f.resolveNodes(f.oneOf(partiallyInvalidNodeSet)), "invalid node");
- assertThrows(AssertionError.class, () -> f.resolveNodes(
- f.partitioned(assignment(partiallyInvalidNodeSet,
partiallyInvalidNodeSet))), "invalid node");
+
+ assertThrows(MappingException.class, () -> f.allOf(invalidNodeSet),
"Mandatory node was excluded from mapping: node100");
+ assertThrows(MappingException.class, () -> f.someOf(invalidNodeSet),
"Mandatory nodes was excluded from mapping: [node100]");
+ assertThrows(MappingException.class, () -> f.oneOf(invalidNodeSet),
"Mandatory nodes was excluded from mapping: [node100]");
+ assertThrows(MappingException.class, () -> f.partitioned(
+ assignmentFromPrimaries(invalidNodeSet)), "Mandatory nodes was
excluded from mapping: [node100]");
+ }
+
+ @ParameterizedTest
+ @MethodSource("clusterFactory")
+ void partiallyInvalidTargets(ExecutionTargetFactory f) {
+ List<String> partiallyInvalidNodeSet =
CollectionUtils.concat(SINGLE_NODE_SET, List.of("node100"));
+
+ // AllOf requires all provided node to be used
+ assertThrows(MappingException.class, () ->
f.allOf(partiallyInvalidNodeSet), "Mandatory node was excluded from mapping:
node100");
+
+ // rest of the targets can be executed on subset of the nodes
+ assertThat(f.resolveNodes(f.someOf(partiallyInvalidNodeSet)),
equalTo(SINGLE_NODE_SET));
+ assertThat(f.resolveNodes(f.oneOf(partiallyInvalidNodeSet)),
equalTo(SINGLE_NODE_SET));
+
assertThat(f.resolveNodes(f.partitioned(assignment(partiallyInvalidNodeSet,
partiallyInvalidNodeSet))), equalTo(SINGLE_NODE_SET));
}
@ParameterizedTest
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
index fef3e7fa88..b5c2e88a17 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
@@ -20,6 +20,9 @@ package org.apache.ignite.internal.sql.engine.exec.mapping;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.in;
+import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -106,7 +109,7 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
try {
PLAN = (MultiStepPlan) cluster.node("N1").prepare("SELECT * FROM
t1");
- PLAN_WITH_SYSTEM_VIEW = (MultiStepPlan)
cluster.node("N1").prepare("SELECT * FROM system.test_view, t1");
+ PLAN_WITH_SYSTEM_VIEW = (MultiStepPlan)
cluster.node("N1").prepare("SELECT * FROM system.test_view");
} finally {
try {
cluster.stop();
@@ -156,6 +159,36 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
assertThat(mappingFuture, willSucceedFast());
}
+ @Test
+ void mappingWithNodeFilter() {
+ String localNodeName = "NODE";
+ List<String> nodeNames = List.of("NODE1", "NODE2");
+
+ MappingService service = createMappingService(localNodeName,
nodeNames, 100);
+
+ List<MappedFragment> defaultMapping =
await(service.map(PLAN_WITH_SYSTEM_VIEW, PARAMS));
+
+ assertThat(defaultMapping, hasSize(2));
+
+ MappedFragment leafFragment = defaultMapping.stream()
+ .filter(fragment -> !fragment.fragment().rootFragment())
+ .findFirst()
+ .orElseThrow();
+
+ assertThat(leafFragment.nodes(), hasSize(1));
+
+ String nodeToExclude = leafFragment.nodes().get(0);
+
+ MappingParameters params = MappingParameters.create(new Object[0],
false, nodeToExclude::equals);
+ List<MappedFragment> mappingWithExclusion =
await(service.map(PLAN_WITH_SYSTEM_VIEW, params));
+
+ assertNotSame(defaultMapping, mappingWithExclusion);
+
+ for (MappedFragment fragment : mappingWithExclusion) {
+ assertThat(nodeToExclude, not(in(fragment.nodes())));
+ }
+ }
+
@Test
public void testCacheInvalidationOnTopologyChange() {
String localNodeName = "NODE";
@@ -177,7 +210,7 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
List<MappedFragment> tableOnlyMapping = await(mappingService.map(PLAN,
PARAMS));
List<MappedFragment> sysViewMapping =
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS));
- verify(execProvider, times(2)).forTable(any(HybridTimestamp.class),
any(IgniteTable.class), anyBoolean());
+ verify(execProvider, times(1)).forTable(any(HybridTimestamp.class),
any(IgniteTable.class), anyBoolean());
verify(execProvider, times(1)).forSystemView(any());
verify(mappingService, times(2)).composeDistributions(anySet(),
anySet(), anyBoolean());
@@ -195,7 +228,7 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
// Plan with tables only must not be invalidated on topology change.
assertSame(tableOnlyMapping, await(mappingService.map(PLAN, PARAMS)));
- verify(execProvider, times(3)).forTable(any(HybridTimestamp.class),
any(IgniteTable.class), anyBoolean());
+ verify(execProvider, times(1)).forTable(any(HybridTimestamp.class),
any(IgniteTable.class), anyBoolean());
verify(execProvider, times(2)).forSystemView(any());
}
@@ -233,21 +266,19 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
execProvider
));
- List<MappedFragment> mappedFragments =
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS));
+ List<MappedFragment> mappedFragments = await(mappingService.map(PLAN,
PARAMS));
verify(execProvider, times(1)).forTable(any(HybridTimestamp.class),
any(IgniteTable.class), anyBoolean());
- verify(execProvider, times(1)).forSystemView(any());
// Simulate expiration of the primary replica for non-mapped table -
the cache entry should not be invalidated.
await(mappingService.onPrimaryReplicaExpired(prepareEvtParams.apply("T2")));
- assertSame(mappedFragments,
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS)));
+ assertSame(mappedFragments, await(mappingService.map(PLAN, PARAMS)));
verify(mappingService, times(1)).composeDistributions(anySet(),
anySet(), anyBoolean());
// Simulate expiration of the primary replica for mapped table - the
cache entry should be invalidated.
await(mappingService.onPrimaryReplicaExpired(prepareEvtParams.apply("T1")));
- assertNotSame(mappedFragments,
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS)));
+ assertNotSame(mappedFragments, await(mappingService.map(PLAN,
PARAMS)));
verify(execProvider, times(2)).forTable(any(HybridTimestamp.class),
any(IgniteTable.class), anyBoolean());
- verify(execProvider, times(2)).forSystemView(any());
}
private MappingServiceImpl createMappingServiceNoCache(String
localNodeName, List<String> nodeNames) {