This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 894e6982322 IGNITE-26521 Sql. Flaky
ItUnstableTopologyTest.ensureLostOfNodeDoesntCausesQueryToFail (#6958)
894e6982322 is described below
commit 894e69823222649ed856372c8ed22347f1e3454c
Author: Max Zhuravkov <[email protected]>
AuthorDate: Thu Nov 20 13:17:41 2025 +0000
IGNITE-26521 Sql. Flaky
ItUnstableTopologyTest.ensureLostOfNodeDoesntCausesQueryToFail (#6958)
---
.../internal/sql/engine/SqlOperationContext.java | 23 ++++-
.../internal/sql/engine/SqlQueryProcessor.java | 5 +-
.../internal/sql/engine/exec/ExecutionContext.java | 15 +++-
.../sql/engine/exec/ExecutionServiceImpl.java | 98 +++++++++++++++-----
.../sql/engine/exec/MailboxRegistryImpl.java | 19 ++--
.../{MappingService.java => MappedFragments.java} | 34 ++++---
.../sql/engine/exec/mapping/MappingService.java | 3 +-
.../engine/exec/mapping/MappingServiceImpl.java | 19 ++--
.../ignite/internal/sql/engine/exec/rel/Inbox.java | 7 +-
.../internal/sql/engine/exec/rel/Outbox.java | 7 +-
.../sql/engine/message/QueryStartRequest.java | 4 +
.../sql/engine/exec/ExecutionServiceImplTest.java | 100 ++++++++++++++++++---
.../sql/engine/exec/RuntimeSortedIndexTest.java | 3 +-
.../exec/mapping/MappingServiceImplTest.java | 24 ++---
.../sql/engine/exec/mapping/MappingTestRunner.java | 4 +-
.../sql/engine/exec/rel/AbstractExecutionTest.java | 3 +-
.../sql/engine/framework/TestBuilders.java | 3 +-
17 files changed, 276 insertions(+), 95 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
index ac0e697ebbf..737270b9ff5 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
@@ -51,6 +51,7 @@ public final class SqlOperationContext {
private final @Nullable Consumer<QueryTransactionWrapper> txUsedListener;
private final @Nullable Consumer<Throwable> errorListener;
private final @Nullable String userName;
+ private final @Nullable Long topologyVersion;
/**
* Private constructor, used by a builder.
@@ -65,7 +66,8 @@ public final class SqlOperationContext {
@Nullable String defaultSchemaName,
@Nullable Consumer<QueryTransactionWrapper> txUsedListener,
@Nullable Consumer<Throwable> errorListener,
- @Nullable String userName
+ @Nullable String userName,
+ @Nullable Long topologyVersion
) {
this.queryId = queryId;
this.timeZoneId = timeZoneId;
@@ -77,6 +79,7 @@ public final class SqlOperationContext {
this.txUsedListener = txUsedListener;
this.errorListener = errorListener;
this.userName = userName;
+ this.topologyVersion = topologyVersion;
}
public static Builder builder() {
@@ -130,6 +133,15 @@ public final class SqlOperationContext {
return txContext;
}
+ /**
+ * Returns topology version with query was mapped on.
+ *
+ * <p>May be null, if the node is the initiator.
+ */
+ public @Nullable Long topologyVersion() {
+ return topologyVersion;
+ }
+
/**
* Notifies context that transaction was used for query execution.
*/
@@ -186,6 +198,7 @@ public final class SqlOperationContext {
private @Nullable QueryCancel cancel;
private @Nullable String defaultSchemaName;
private @Nullable String userName;
+ private @Nullable Long topologyVersion;
public Builder cancel(@Nullable QueryCancel cancel) {
this.cancel = requireNonNull(cancel);
@@ -237,6 +250,11 @@ public final class SqlOperationContext {
return this;
}
+ public Builder topologyVersion(@Nullable Long topologyVersion) {
+ this.topologyVersion = topologyVersion;
+ return this;
+ }
+
/** Creates new context. */
public SqlOperationContext build() {
return new SqlOperationContext(
@@ -249,7 +267,8 @@ public final class SqlOperationContext {
defaultSchemaName,
txUsedListener,
errorListener,
- userName
+ userName,
+ topologyVersion
);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 817cda00ec3..4a485c6a54d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -363,6 +363,8 @@ public class SqlQueryProcessor implements QueryProcessor,
SystemViewProvider {
);
logicalTopologyService.addEventListener(mappingService);
+ logicalTopologyService.addEventListener(mailboxRegistry);
+
placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
mappingService::onPrimaryReplicaExpired);
// Need to be implemented after
https://issues.apache.org/jira/browse/IGNITE-23519 Add an event for lease
Assignments
// placementDriver.listen(PrimaryReplicaEvent.ASSIGNMENTS_CHANGED,
mappingService::onPrimaryReplicaAssignment);
@@ -409,8 +411,7 @@ public class SqlQueryProcessor implements QueryProcessor,
SystemViewProvider {
queriesViewProvider.init(queryExecutor, prepareSvc);
- clusterSrvc.topologyService().addEventHandler(executionSrvc);
- clusterSrvc.topologyService().addEventHandler(mailboxRegistry);
+ logicalTopologyService.addEventListener(executionSrvc);
registerService(sqlStatisticManager);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
index cbda6b5c819..6e5462c3933 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
@@ -110,6 +110,8 @@ public class ExecutionContext<RowT> implements DataContext {
private SharedState sharedState = new SharedState();
+ private final @Nullable Long topologyVersion;
+
/**
* Constructor.
*
@@ -126,8 +128,8 @@ public class ExecutionContext<RowT> implements DataContext {
* @param inBufSize Default execution nodes' internal buffer size.
Negative value means default value.
* @param clock The clock to use to get the system time.
* @param username Authenticated user name or {@code null} for unknown
user.
+ * @param topologyVersion Topology version the query was mapped on.
*/
- @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
public ExecutionContext(
ExpressionFactory<RowT> expressionFactory,
QueryTaskExecutor executor,
@@ -142,7 +144,8 @@ public class ExecutionContext<RowT> implements DataContext {
ZoneId timeZoneId,
int inBufSize,
Clock clock,
- @Nullable String username
+ @Nullable String username,
+ @Nullable Long topologyVersion
) {
this.expressionFactory = expressionFactory;
this.executor = executor;
@@ -157,6 +160,7 @@ public class ExecutionContext<RowT> implements DataContext {
this.timeZoneId = timeZoneId;
this.inBufSize = inBufSize < 0 ? Commons.IN_BUFFER_SIZE : inBufSize;
this.currentUser = username;
+ this.topologyVersion = topologyVersion;
assert this.inBufSize > 0 : this.inBufSize;
@@ -314,6 +318,11 @@ public class ExecutionContext<RowT> implements DataContext
{
}
}
+ /** Returns the topology version the query was mapped on. */
+ public @Nullable Long topologyVersion() {
+ return topologyVersion;
+ }
+
/** Gets dynamic parameters by name. */
private @Nullable Object getParameter(String name) {
assert name.startsWith("?") : name;
@@ -398,7 +407,7 @@ public class ExecutionContext<RowT> implements DataContext {
Throwable unwrappedException = ExceptionUtils.unwrapCause(e);
onError.accept(unwrappedException);
- if (unwrappedException instanceof IgniteException
+ if (unwrappedException instanceof IgniteException
|| unwrappedException instanceof
IgniteInternalException
|| unwrappedException instanceof IgniteCheckedException
|| unwrappedException instanceof
IgniteInternalCheckedException
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 11640decc3e..d7718550a4d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -60,6 +60,9 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.components.NodeProperties;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -70,7 +73,6 @@ import org.apache.ignite.internal.lang.IgniteStringBuilder;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.InternalClusterNode;
-import org.apache.ignite.internal.network.TopologyEventHandler;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -96,6 +98,7 @@ import
org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentPrinter;
import org.apache.ignite.internal.sql.engine.exec.mapping.MappedFragment;
+import org.apache.ignite.internal.sql.engine.exec.mapping.MappedFragments;
import org.apache.ignite.internal.sql.engine.exec.mapping.MappingParameters;
import org.apache.ignite.internal.sql.engine.exec.mapping.MappingService;
import org.apache.ignite.internal.sql.engine.exec.mapping.MappingUtils;
@@ -140,7 +143,7 @@ import org.jetbrains.annotations.TestOnly;
/**
* Provide ability to execute SQL query plan and retrieve results of the
execution.
*/
-public class ExecutionServiceImpl<RowT> implements ExecutionService,
TopologyEventHandler, Debuggable {
+public class ExecutionServiceImpl<RowT> implements ExecutionService,
LogicalTopologyEventListener, Debuggable {
private static final int CACHE_SIZE = 1024;
private static final IgniteLogger LOG =
Loggers.forClass(ExecutionServiceImpl.class);
private static final SqlQueryMessagesFactory FACTORY = new
SqlQueryMessagesFactory();
@@ -385,7 +388,12 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
}
private static SqlOperationContext createOperationContext(
- UUID queryId, ZoneId timeZoneId, Object[] params, HybridTimestamp
operationTime, @Nullable String username
+ UUID queryId,
+ ZoneId timeZoneId,
+ Object[] params,
+ HybridTimestamp operationTime,
+ @Nullable String username,
+ @Nullable Long topologyVersion
) {
return SqlOperationContext.builder()
.queryId(queryId)
@@ -393,6 +401,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
.timeZoneId(timeZoneId)
.operationTime(operationTime)
.userName(username)
+ .topologyVersion(topologyVersion)
.build();
}
@@ -478,7 +487,9 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
operationContext.timeZoneId(),
-1,
Clock.systemUTC(),
- operationContext.userName()
+ operationContext.userName(),
+ // ExecutablePlan use no mapping.
+ null
);
QueryTransactionContext txContext = operationContext.txContext();
@@ -559,7 +570,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
return new
IteratorToDataCursorAdapter<>(List.of(res).iterator());
}
case MAPPING:
- CompletableFuture<List<MappedFragment>> mappedFragments;
+ CompletableFuture<MappedFragments> mappedFragments;
if (plan.plan() instanceof MultiStepPlan) {
QueryTransactionContext txContext =
operationContext.txContext();
@@ -579,13 +590,12 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
mappedFragments = mappingService.map((MultiStepPlan)
plan.plan(), mappingParameters);
} else {
- mappedFragments = completedFuture(List.of(
-
MappingUtils.createSingleNodeMapping(localNode.name(), plan.plan().getRel())
- ));
+ MappedFragment singleNodeMapping =
MappingUtils.createSingleNodeMapping(localNode.name(), plan.plan().getRel());
+ mappedFragments = completedFuture(new
MappedFragments(List.of(singleNodeMapping), 0));
}
CompletableFuture<Iterator<InternalSqlRow>> fragments0 =
- mappedFragments.thenApply(mfs ->
FragmentPrinter.fragmentsToString(false, mfs))
+ mappedFragments.thenApply(mfs ->
FragmentPrinter.fragmentsToString(false, mfs.fragments()))
.thenApply(InternalSqlRowSingleString::new)
.thenApply(InternalSqlRow.class::cast)
.thenApply(List::of)
@@ -686,8 +696,8 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
/** {@inheritDoc} */
@Override
- public void onDisappeared(InternalClusterNode member) {
- queryManagerMap.values().forEach(qm -> qm.onNodeLeft(member.name()));
+ public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot
newTopology) {
+ queryManagerMap.values().forEach(qm -> qm.onNodeLeft(leftNode.name(),
newTopology.version()));
}
/** Returns local fragments for the query with given id. */
@@ -702,7 +712,14 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
private void submitFragment(InternalClusterNode initiatorNode,
QueryStartRequest msg) {
DistributedQueryManager queryManager =
getOrCreateQueryManager(initiatorNode.name(), msg);
- queryManager.submitFragment(initiatorNode, msg.catalogVersion(),
msg.root(), msg.fragmentDescription(), msg.txAttributes());
+ queryManager.submitFragment(
+ initiatorNode,
+ msg.catalogVersion(),
+ msg.root(),
+ msg.fragmentDescription(),
+ msg.txAttributes(),
+ msg.topologyVersion()
+ );
}
private void handleError(Throwable ex, String nodeName, QueryStartRequest
msg) {
@@ -714,7 +731,12 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
private DistributedQueryManager getOrCreateQueryManager(String
coordinatorNodeName, QueryStartRequest msg) {
return queryManagerMap.computeIfAbsent(new ExecutionId(msg.queryId(),
msg.executionToken()), key -> {
SqlOperationContext operationContext = createOperationContext(
- key.queryId(), ZoneId.of(msg.timeZoneId()),
msg.parameters(), msg.operationTime(), msg.username()
+ key.queryId(),
+ ZoneId.of(msg.timeZoneId()),
+ msg.parameters(),
+ msg.operationTime(),
+ msg.username(),
+ msg.topologyVersion()
);
return new DistributedQueryManager(key, coordinatorNodeName,
operationContext);
@@ -887,6 +909,9 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
private volatile Long rootFragmentId = null;
+ /** On the initiator this field is assigned when mapping completes. */
+ private volatile @Nullable Long topologyVersion;
+
private DistributedQueryManager(
ExecutionId executionId,
String coordinatorNodeName,
@@ -911,9 +936,15 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
} else {
this.root = null;
}
+
+ this.topologyVersion = ctx.topologyVersion();
}
- private DistributedQueryManager(ExecutionId executionId, String
coordinatorNodeName, SqlOperationContext ctx) {
+ private DistributedQueryManager(
+ ExecutionId executionId,
+ String coordinatorNodeName,
+ SqlOperationContext ctx
+ ) {
this(executionId, coordinatorNodeName, false, ctx);
}
@@ -922,7 +953,12 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
}
private CompletableFuture<Void> sendFragment(
- String targetNodeName, String serialisedFragment,
FragmentDescription desc, TxAttributes txAttributes, int catalogVersion
+ String targetNodeName,
+ String serialisedFragment,
+ FragmentDescription desc,
+ TxAttributes txAttributes,
+ int catalogVersion,
+ long topologyVersion
) {
QueryStartRequest request = FACTORY.queryStartRequest()
.queryId(executionId.queryId())
@@ -937,6 +973,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
.operationTime(ctx.operationTime())
.timestamp(clockService.now())
.username(ctx.userName())
+ .topologyVersion(topologyVersion)
.build();
return messageService.send(targetNodeName, request);
@@ -968,7 +1005,11 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
});
}
- private void onNodeLeft(String nodeName) {
+ private void onNodeLeft(String nodeName, long topologyVersion) {
+ Long mappingTopologyVersion = this.topologyVersion;
+ if (mappingTopologyVersion != null && mappingTopologyVersion >
topologyVersion) {
+ return; // Ignore outdated event.
+ }
remoteFragmentInitCompletion.entrySet().stream()
.filter(e -> nodeName.equals(e.getKey().nodeName()))
.forEach(e -> e.getValue().completeExceptionally(new
NodeLeftException(nodeName)));
@@ -1018,7 +1059,8 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
private ExecutionContext<RowT> createContext(
InternalClusterNode initiatorNode,
FragmentDescription desc,
- TxAttributes txAttributes
+ TxAttributes txAttributes,
+ @Nullable Long topologyVersion
) {
return new ExecutionContext<>(
expressionFactory,
@@ -1034,7 +1076,8 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
ctx.timeZoneId(),
-1,
Clock.systemUTC(),
- ctx.userName()
+ ctx.userName(),
+ topologyVersion
);
}
@@ -1043,10 +1086,11 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
int catalogVersion,
String fragmentString,
FragmentDescription desc,
- TxAttributes txAttributes
+ TxAttributes txAttributes,
+ @Nullable Long topologyVersion
) {
try {
- ExecutionContext<RowT> context = createContext(initiatorNode,
desc, txAttributes);
+ ExecutionContext<RowT> context = createContext(initiatorNode,
desc, txAttributes, topologyVersion);
IgniteRel treeRoot =
relationalTreeFromJsonString(catalogVersion, fragmentString);
ResolvedDependencies resolvedDependencies =
dependencyResolver.resolveDependencies(List.of(treeRoot), catalogVersion);
@@ -1099,8 +1143,13 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
private CompletableFuture<AsyncCursor<InternalSqlRow>> sendFragments(
InternalTransaction tx,
MultiStepPlan multiStepPlan,
- List<MappedFragment> mappedFragments
+ MappedFragments mappedFragmentList
) {
+ long topologyVersion = mappedFragmentList.topologyVersion();
+ this.topologyVersion = topologyVersion;
+
+ List<MappedFragment> mappedFragments =
mappedFragmentList.fragments();
+
// we rely on the fact that the very first fragment is a root.
Otherwise we need to handle
// the case when a non-root fragment will fail before the root is
processed.
assert !nullOrEmpty(mappedFragments) &&
mappedFragments.get(0).fragment().rootFragment()
@@ -1161,7 +1210,12 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
for (String nodeName : mappedFragment.nodes()) {
CompletableFuture<Void> resultOfSending =
- sendFragment(nodeName, fragment.serialized(),
fragmentDesc, attributes, multiStepPlan.catalogVersion());
+ sendFragment(nodeName,
+ fragment.serialized(),
+ fragmentDesc, attributes,
+ multiStepPlan.catalogVersion(),
+ topologyVersion
+ );
resultOfSending.whenComplete((ignored, t) -> {
if (t == null) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
index fdc51ca4cfe..9d31088b645 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
@@ -20,10 +20,11 @@ package org.apache.ignite.internal.sql.engine.exec;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.network.InternalClusterNode;
-import org.apache.ignite.internal.network.TopologyEventHandler;
import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
import org.apache.ignite.internal.sql.engine.exec.rel.Outbox;
import org.apache.ignite.internal.tostring.S;
@@ -32,7 +33,7 @@ import org.apache.ignite.internal.tostring.S;
* MailboxRegistryImpl.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
-public class MailboxRegistryImpl implements MailboxRegistry,
TopologyEventHandler {
+public class MailboxRegistryImpl implements MailboxRegistry,
LogicalTopologyEventListener {
private static final IgniteLogger LOG =
Loggers.forClass(MailboxRegistryImpl.class);
private final Map<MailboxKey, CompletableFuture<Outbox<?>>> locals;
@@ -130,15 +131,9 @@ public class MailboxRegistryImpl implements
MailboxRegistry, TopologyEventHandle
/** {@inheritDoc} */
@Override
- public void onAppeared(InternalClusterNode member) {
- // NO-OP
- }
-
- /** {@inheritDoc} */
- @Override
- public void onDisappeared(InternalClusterNode member) {
- locals.values().forEach(fut -> fut.thenAccept(n ->
n.onNodeLeft(member)));
- remotes.values().forEach(n -> n.onNodeLeft(member));
+ public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot
newTopology) {
+ locals.values().forEach(fut -> fut.thenAccept(n ->
n.onNodeLeft(leftNode, newTopology.version())));
+ remotes.values().forEach(n -> n.onNodeLeft(leftNode,
newTopology.version()));
}
private static class MailboxKey {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingService.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappedFragments.java
similarity index 60%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingService.java
copy to
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappedFragments.java
index dc778cab542..5b1bf07e52b 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingService.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappedFragments.java
@@ -18,19 +18,29 @@
package org.apache.ignite.internal.sql.engine.exec.mapping;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
/**
- * A service to map multi step plan to an actual topology.
+ * Mapped fragments.
*/
-@FunctionalInterface
-public interface MappingService {
- /**
- * Maps given plan to a cluster topology.
- *
- * @param multiStepPlan A plan to map.
- * @return A list of fragments with metadata related to a fragment
topology.
- */
- CompletableFuture<List<MappedFragment>> map(MultiStepPlan multiStepPlan,
MappingParameters parameters);
+public final class MappedFragments {
+
+ private final long topologyVersion;
+
+ private final List<MappedFragment> fragments;
+
+ /** Constructor. */
+ public MappedFragments(List<MappedFragment> fragments, long
topologyVersion) {
+ this.fragments = fragments;
+ this.topologyVersion = topologyVersion;
+ }
+
+ /** Cluster topology version these fragments were mapped on. */
+ public long topologyVersion() {
+ return topologyVersion;
+ }
+
+ /** Fragments. */
+ public List<MappedFragment> fragments() {
+ return fragments;
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingService.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingService.java
index dc778cab542..e70f82be55e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingService.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingService.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.sql.engine.exec.mapping;
-import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
@@ -32,5 +31,5 @@ public interface MappingService {
* @param multiStepPlan A plan to map.
* @return A list of fragments with metadata related to a fragment
topology.
*/
- CompletableFuture<List<MappedFragment>> map(MultiStepPlan multiStepPlan,
MappingParameters parameters);
+ CompletableFuture<MappedFragments> map(MultiStepPlan multiStepPlan,
MappingParameters parameters);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
index 9ffa1119238..d66c71762dd 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
@@ -143,7 +143,7 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
}
@Override
- public CompletableFuture<List<MappedFragment>> map(MultiStepPlan
multiStepPlan, MappingParameters parameters) {
+ public CompletableFuture<MappedFragments> map(MultiStepPlan multiStepPlan,
MappingParameters parameters) {
if (initialTopologyFuture.isDone()) {
return map0(multiStepPlan, parameters);
}
@@ -151,7 +151,7 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
return initialTopologyFuture.thenComposeAsync(ignore ->
map0(multiStepPlan, parameters), taskExecutor);
}
- private CompletableFuture<List<MappedFragment>> map0(MultiStepPlan
multiStepPlan, MappingParameters parameters) {
+ private CompletableFuture<MappedFragments> map0(MultiStepPlan
multiStepPlan, MappingParameters parameters) {
FragmentsTemplate template = getOrCreateTemplate(multiStepPlan);
boolean mapOnBackups = parameters.mapOnBackups();
@@ -160,7 +160,7 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
PartitionPruningMetadata partitionPruningMetadata =
multiStepPlan.partitionPruningMetadata();
TopologySnapshot topologySnapshot = topologyHolder.topology();
- CompletableFuture<MappedFragments> mappedFragments;
+ CompletableFuture<MappedFragmentsWithNodes> mappedFragments;
// TODO: https://issues.apache.org/jira/browse/IGNITE-26465 enable
cache
// if (nodeExclusionFilter != null) {
// mappedFragments = mapFragments(
@@ -176,7 +176,8 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
template, mapOnBackups,
composeNodeExclusionFilter(topologySnapshot, parameters)
);
- return mappedFragments.thenApply(frags ->
applyPartitionPruning(frags.fragments, parameters, partitionPruningMetadata));
+ return mappedFragments.thenApply(frags ->
applyPartitionPruning(frags.fragments, parameters, partitionPruningMetadata))
+ .thenApply(frags -> new MappedFragments(frags,
topologySnapshot.version));
}
private MappingsCacheValue computeMappingCacheKey(
@@ -265,7 +266,7 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
}
}
- private CompletableFuture<MappedFragments> mapFragments(
+ private CompletableFuture<MappedFragmentsWithNodes> mapFragments(
FragmentsTemplate template,
boolean mapOnBackups,
Predicate<String> nodeExclusionFilter
@@ -360,7 +361,7 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
targetNodes.addAll(mappedFragment.nodes());
}
- return new MappedFragments(mappedFragmentsList, targetNodes);
+ return new MappedFragmentsWithNodes(mappedFragmentsList,
targetNodes);
});
}
@@ -454,11 +455,11 @@ public class MappingServiceImpl implements
MappingService, LogicalTopologyEventL
}
/** Wraps list of mapped fragments with target node names. */
- private static class MappedFragments {
+ private static class MappedFragmentsWithNodes {
final List<MappedFragment> fragments;
final Set<String> nodes;
- MappedFragments(List<MappedFragment> fragments, Set<String> nodes) {
+ MappedFragmentsWithNodes(List<MappedFragment> fragments, Set<String>
nodes) {
this.fragments = fragments;
this.nodes = nodes;
}
@@ -470,7 +471,7 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
// TODO: https://issues.apache.org/jira/browse/IGNITE-26465 enable
cache
// private final CompletableFuture<MappedFragments> mappedFragments;
- MappingsCacheValue(long topologyVersion, IntSet tableOrZoneIds,
CompletableFuture<MappedFragments> mappedFragments) {
+ MappingsCacheValue(long topologyVersion, IntSet tableOrZoneIds,
CompletableFuture<MappedFragmentsWithNodes> mappedFragments) {
this.topologyVersion = topologyVersion;
this.tableOrZoneIds = tableOrZoneIds;
// TODO: https://issues.apache.org/jira/browse/IGNITE-26465 enable
cache
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
index 6fdb17cfdf1..3625db32d30 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
@@ -392,7 +392,12 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
}
/** Notifies the inbox that provided node has left the cluster. */
- public void onNodeLeft(InternalClusterNode node) {
+ public void onNodeLeft(InternalClusterNode node, long version) {
+ Long topologyVersion = context().topologyVersion();
+ if (topologyVersion != null && topologyVersion > version) {
+ return; // Ignore outdated event.
+ }
+
if (srcNodeNames.contains(node.name())) {
this.execute(() -> onNodeLeft0(node.name()));
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
index 7410fadcafc..bd286a1e8aa 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
@@ -356,7 +356,12 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
}
/** Notifies the outbox that provided node has left the cluster. */
- public void onNodeLeft(InternalClusterNode node) {
+ public void onNodeLeft(InternalClusterNode node, long version) {
+ Long topologyVersion = context().topologyVersion();
+ if (topologyVersion != null && topologyVersion > version) {
+ return; // Ignore outdated event.
+ }
+
if (node.id().equals(context().originatingNodeId())) {
this.execute(this::close);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
index a4d058529f2..9b0ae4c693c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
@@ -70,4 +70,8 @@ public interface QueryStartRequest extends TimestampAware,
ExecutionContextAware
/** Name of user who starts the query. */
@Nullable
String username();
+
+ /** The version of the cluster logical topology this query was mapped on.
*/
+ @Nullable
+ Long topologyVersion();
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 0e5a6c11ce0..ef1e716e09c 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -69,6 +69,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
@@ -78,7 +79,9 @@ import java.util.stream.Stream;
import org.apache.ignite.internal.catalog.CatalogApplyResult;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.components.SystemPropertiesNodeProperties;
import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.failure.FailureManager;
@@ -580,7 +583,18 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
// start response trigger
CountDownLatch startResponse = new CountDownLatch(1);
+ AtomicLong currentTopologyVersion = new AtomicLong();
+
nodeNames.stream().map(testCluster::node).forEach(node ->
node.interceptor((senderNode, msg, original) -> {
+ if (msg instanceof QueryStartRequest) {
+ QueryStartRequest startRequest = (QueryStartRequest) msg;
+ Long topologyVersion = startRequest.topologyVersion();
+ if (topologyVersion == null) {
+ throw new IllegalStateException("Topology version is
missing");
+ }
+ currentTopologyVersion.set(topologyVersion);
+ }
+
if (node.node.name().equals(nodeNames.get(0))) {
// On node_1, hang until an exception from another node fails
the query to make sure that the root fragment does not execute
// before other fragments.
@@ -613,7 +627,10 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
CompletableFuture<BatchedResult<InternalSqlRow>> resFut =
cursor.requestNextAsync(9);
startResponse.await();
- execService.onDisappeared(firstNode);
+ execService.onNodeLeft(
+ new LogicalNode(firstNode, Map.of()),
+ new LogicalTopologySnapshot(currentTopologyVersion.get(),
List.of(), randomUUID())
+ );
nodeFailedLatch.countDown();
@@ -999,13 +1016,13 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
* <p>The sequence of events on real cluster is as follow:<ul>
* <li>Given: cluster of 3 nodes, distribution zone spans all these
nodes.</li>
* <li>Node 1 has been restarted.</li>
- * <li>Notification of
org.apache.ignite.internal.network.TopologyEventHandler#onDisappeared handlers
are delayed on node 2 (due to
- * metastorage lagging or whatever reason).</li>
+ * <li>Notification of
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener#onNodeLeft
+ * handlers are delayed on node 2 (due to metastorage lagging or whatever
reason).</li>
* <li>Query started from node 1.</li>
* <li>Root fragment processed locally, QueryBatchRequest came to node 2
before QueryStartRequest. This step
* is crucial since it puts not completed future to mailbox registry
*
(org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl#locals).</li>
- * <li>TopologyEventHandler's are notified on node 2. This step
+ * <li>LogicalTopologyEventListener's are notified on node 2. This step
* causes onNodeLeft handler to be chained to the future from previous
step. QueryStartRequest came to node 2. Query fragment is created
* an immediately closed by onNodeLeft handler.</li>
* </ul>
@@ -1042,12 +1059,15 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
throw new RuntimeException(e);
}
- InternalClusterNode sameNameDifferentIdNode =
clusterNode(nodeNames.get(0));
- // Fire NODE_LEFT event on map-node in question. This event
contains
- // cluster node with consistent ID equals to ID of
node-initiator, but
- // different volatile id. This emulates situation, when
request prepared
- // on newer topology outruns event processing from previous
topology change.
-
testCluster.node(nodeNames.get(2)).notifyNodeLeft(sameNameDifferentIdNode);
+ QueryStartRequest queryStartRequest = (QueryStartRequest) msg;
+ Long topologyVersion = queryStartRequest.topologyVersion();
+ if (topologyVersion == null) {
+ throw new IllegalStateException("Topology version is
missing");
+ }
+
+ InternalClusterNode node = clusterNode(nodeNames.get(2));
+ // This emulates situation, when request prepared on newer
topology outruns event processing from previous topology change.
+ testCluster.node(nodeNames.get(2)).notifyNodeLeft(node,
topologyVersion - 1);
}
return nullCompletedFuture();
@@ -1060,6 +1080,61 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
await(await(cursorFuture).requestNextAsync(100));
}
+ /**
+ * Tests scenario when nodes receive a node left event from the previous
topology.
+ */
+ @Test
+ void outdatedNodeLeftEventDoesntCauseQueryToHangAllNodes() {
+ QueryPlan plan = prepare("SELECT * FROM test_tbl", createContext());
+
+ AtomicLong currentTopologyVersion = new AtomicLong();
+ CountDownLatch latch = new CountDownLatch(1);
+
+ // Triggers node left events with previous topology version for every
node in the cluster.
+ for (String nodeName : nodeNames) {
+ testCluster.node(nodeName).interceptor((senderNode, msg, original)
-> {
+ original.onMessage(senderNode, msg);
+
+ if (msg instanceof QueryStartRequest
+ // Fragment without target is a root.
+ && ((QueryStartRequest)
msg).fragmentDescription().target() == null) {
+
+ QueryStartRequest queryStartRequest = (QueryStartRequest)
msg;
+ Long topologyVersion = queryStartRequest.topologyVersion();
+ if (topologyVersion == null) {
+ throw new IllegalStateException("Topology version is
missing");
+ }
+
+ currentTopologyVersion.set(topologyVersion);
+ latch.countDown();
+ }
+
+ if (!(msg instanceof QueryStartRequest)) {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Test was interrupted", e);
+ }
+ long previousVersion = currentTopologyVersion.get() - 1;
+
+ InternalClusterNode node = clusterNode(nodeName);
+ // This emulates situation, when request prepared on newer
topology receive an event from previous topology change.
+ testCluster.node(nodeName).notifyNodeLeft(node,
previousVersion);
+
+ return nullCompletedFuture();
+ } else {
+ return nullCompletedFuture();
+ }
+ });
+ }
+
+ SqlOperationContext ctx = createContext();
+
+ CompletableFuture<AsyncDataCursor<InternalSqlRow>> cursorFuture =
executionServices.get(0).executePlan(plan, ctx);
+ // Request must not hung.
+ await(await(cursorFuture).requestNextAsync(100));
+ }
+
@ParameterizedTest
@MethodSource("txTypes")
public void transactionRollbackOnError(NoOpTransaction tx) {
@@ -1311,8 +1386,9 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
this.mailboxRegistry = mailboxRegistry;
}
- public void notifyNodeLeft(InternalClusterNode node) {
- mailboxRegistry.onDisappeared(node);
+ public void notifyNodeLeft(InternalClusterNode node, long
topologyVersion) {
+ LogicalTopologySnapshot newTopology = new
LogicalTopologySnapshot(topologyVersion, Set.of(), randomUUID());
+ mailboxRegistry.onNodeLeft(new LogicalNode(node, Map.of()),
newTopology);
}
public void dataset(List<Object[]> dataset) {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
index 1a335c42500..33809cd91e8 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
@@ -131,7 +131,8 @@ public class RuntimeSortedIndexTest extends
IgniteAbstractTest {
SqlCommon.DEFAULT_TIME_ZONE_ID,
-1,
Clock.systemUTC(),
- null
+ null,
+ 1L
),
RelCollations.of(ImmutableIntList.copyOf(idxCols)),
(o1, o2) -> {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
index 3ad5c3ece74..cd59c69b8d1 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
@@ -155,8 +155,8 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
mappingService.onTopologyLeap(logicalTopology.getLogicalTopology());
- List<MappedFragment> defaultMapping = await(mappingService.map(PLAN,
PARAMS));
- List<MappedFragment> mappingOnBackups = await(mappingService.map(PLAN,
MappingParameters.MAP_ON_BACKUPS));
+ MappedFragments defaultMapping = await(mappingService.map(PLAN,
PARAMS));
+ MappedFragments mappingOnBackups = await(mappingService.map(PLAN,
MappingParameters.MAP_ON_BACKUPS));
verify(execProvider, times(2)).forTable(any(HybridTimestamp.class),
any(IgniteTable.class), anyBoolean());
@@ -175,7 +175,7 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
MappingServiceImpl mappingService =
createMappingServiceNoCache(localNodeName, List.of(localNodeName));
- CompletableFuture<List<MappedFragment>> mappingFuture =
mappingService.map(PLAN, PARAMS);
+ CompletableFuture<MappedFragments> mappingFuture =
mappingService.map(PLAN, PARAMS);
assertThat(mappingFuture, willSucceedFast());
}
@@ -187,11 +187,11 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
MappingService service = createMappingService(localNodeName,
nodeNames, 100);
- List<MappedFragment> defaultMapping =
await(service.map(PLAN_WITH_SYSTEM_VIEW, PARAMS));
+ MappedFragments defaultMapping =
await(service.map(PLAN_WITH_SYSTEM_VIEW, PARAMS));
- assertThat(defaultMapping, hasSize(2));
+ assertThat(defaultMapping.fragments(), hasSize(2));
- MappedFragment leafFragment = defaultMapping.stream()
+ MappedFragment leafFragment = defaultMapping.fragments().stream()
.filter(fragment -> !fragment.fragment().rootFragment())
.findFirst()
.orElseThrow();
@@ -201,11 +201,11 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
String nodeToExclude = leafFragment.nodes().get(0);
MappingParameters params = MappingParameters.create(new Object[0],
false, nodeToExclude::equals);
- List<MappedFragment> mappingWithExclusion =
await(service.map(PLAN_WITH_SYSTEM_VIEW, params));
+ MappedFragments mappingWithExclusion =
await(service.map(PLAN_WITH_SYSTEM_VIEW, params));
assertNotSame(defaultMapping, mappingWithExclusion);
- for (MappedFragment fragment : mappingWithExclusion) {
+ for (MappedFragment fragment : mappingWithExclusion.fragments()) {
assertThat(nodeToExclude, not(in(fragment.nodes())));
}
}
@@ -233,8 +233,8 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
mappingService.onTopologyLeap(logicalTopology.getLogicalTopology());
- List<MappedFragment> tableOnlyMapping = await(mappingService.map(PLAN,
PARAMS));
- List<MappedFragment> sysViewMapping =
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS));
+ MappedFragments tableOnlyMapping = await(mappingService.map(PLAN,
PARAMS));
+ MappedFragments sysViewMapping =
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS));
verify(execProvider, times(1)).forTable(any(HybridTimestamp.class),
any(IgniteTable.class), anyBoolean());
verify(execProvider, times(1)).forSystemView(any());
@@ -297,7 +297,7 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
Runnable::run
));
- List<MappedFragment> mappedFragments = await(mappingService.map(PLAN,
PARAMS));
+ MappedFragments mappedFragments = await(mappingService.map(PLAN,
PARAMS));
verify(execProvider, times(1)).forTable(any(HybridTimestamp.class),
any(IgniteTable.class), anyBoolean());
// Simulate expiration of the primary replica for non-mapped table -
the cache entry should not be invalidated.
@@ -349,7 +349,7 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
mappingService.onTopologyLeap(logicalTopology.getLogicalTopology());
- List<MappedFragment> mappedFragments = await(mappingService.map(PLAN,
PARAMS));
+ MappedFragments mappedFragments = await(mappingService.map(PLAN,
PARAMS));
verify(execProvider, times(1)).forTable(any(HybridTimestamp.class),
any(IgniteTable.class), anyBoolean());
// Simulate expiration of the primary replica for non-mapped table -
the cache entry should not be invalidated.
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
index 1c54e7fa6f3..a42c974f784 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
@@ -237,7 +237,7 @@ final class MappingTestRunner {
mappingService.onTopologyLeap(snapshot);
- List<MappedFragment> mappedFragments;
+ MappedFragments mappedFragments;
try {
mappedFragments = await(mappingService.map(plan,
readFromPrimaryOnly
@@ -256,7 +256,7 @@ final class MappingTestRunner {
throw new IllegalStateException("Mapped fragments");
}
- return FragmentPrinter.fragmentsToString(true, mappedFragments);
+ return FragmentPrinter.fragmentsToString(true,
mappedFragments.fragments());
}
static class TestCaseDef {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index ed8a167f6e2..bba84a5eab4 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -158,7 +158,8 @@ public abstract class AbstractExecutionTest<T> extends
IgniteAbstractTest {
SqlCommon.DEFAULT_TIME_ZONE_ID,
bufferSize,
Clock.systemUTC(),
- null
+ null,
+ 1L
);
contexts.add(executionContext);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 2db3f33f24c..318839e179a 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -590,7 +590,8 @@ public class TestBuilders {
zoneId,
-1,
clock,
- null
+ null,
+ 1L
);
}
}