This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 732cfddb0e IGNITE-22466 Sql. Support mapping to non-primary replicas
(#3972)
732cfddb0e is described below
commit 732cfddb0eaae071e7af44aa147d28cfe551de7f
Author: korlov42 <[email protected]>
AuthorDate: Tue Jun 25 17:06:27 2024 +0300
IGNITE-22466 Sql. Support mapping to non-primary replicas (#3972)
---
modules/sql-engine/build.gradle | 1 +
.../sql/engine/ExecutionTargetProviderImpl.java | 173 +++++++++++
.../internal/sql/engine/SqlQueryProcessor.java | 94 +-----
.../sql/engine/exec/ExecutionServiceImpl.java | 3 +-
.../sql/engine/exec/mapping/ExecutionTarget.java | 10 +
.../exec/mapping/ExecutionTargetFactory.java | 5 +-
.../exec/mapping/ExecutionTargetProvider.java | 10 +-
.../sql/engine/exec/mapping/FragmentMapper.java | 317 ++++++++++++++++++++-
.../sql/engine/exec/mapping/FragmentMapping.java | 11 +-
.../sql/engine/exec/mapping/FragmentSplitter.java | 2 +-
.../sql/engine/exec/mapping/MappingParameters.java | 20 +-
.../engine/exec/mapping/MappingServiceImpl.java | 217 ++++++--------
.../exec/mapping/smallcluster/AllOfTarget.java | 5 +
.../exec/mapping/smallcluster/OneOfTarget.java | 15 +
.../mapping/smallcluster/PartitionedTarget.java | 51 ++++
.../mapping/smallcluster/SmallClusterFactory.java | 29 +-
.../exec/mapping/smallcluster/SomeOfTarget.java | 15 +
.../internal/sql/engine/prepare/Fragment.java | 10 +
.../sql/engine/exec/ExecutionServiceImplTest.java | 50 ++--
.../engine/exec/mapping/FragmentMappingTest.java | 97 +++++--
.../exec/mapping/MappingServiceImplTest.java | 31 +-
.../sql/engine/exec/mapping/MappingTestRunner.java | 12 +-
.../sql/engine/framework/TestBuilders.java | 48 ++--
.../src/test/resources/mapping/correlated.test | 136 ++++-----
.../src/test/resources/mapping/hash_join.test | 31 +-
.../src/test/resources/mapping/merge_join.test | 66 +----
.../src/test/resources/mapping/set_ops.test | 18 +-
.../src/test/resources/mapping/table_identity.test | 68 ++---
.../resources/mapping/table_identity_single.test | 60 ++--
.../src/test/resources/mapping/table_single.test | 80 +++---
.../resources/mapping/test_backup_mapping.test | 162 +++++++++++
.../resources/mapping/test_partition_pruning.test | 72 ++---
.../src/test/resources/mapping/union.test | 30 +-
33 files changed, 1337 insertions(+), 612 deletions(-)
diff --git a/modules/sql-engine/build.gradle b/modules/sql-engine/build.gradle
index 0d7dab1fbd..8f0804e08c 100644
--- a/modules/sql-engine/build.gradle
+++ b/modules/sql-engine/build.gradle
@@ -50,6 +50,7 @@ dependencies {
implementation project(':ignite-failure-handler')
implementation project(':ignite-placement-driver-api')
implementation project(':ignite-partition-replicator')
+ implementation project(':ignite-affinity')
implementation libs.jetbrains.annotations
implementation libs.fastutil.core
implementation libs.caffeine
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
new file mode 100644
index 0000000000..3992941bb3
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.AWAIT_PRIMARY_REPLICA_TIMEOUT;
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
+import static
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
+import org.apache.ignite.internal.affinity.TokenizedAssignmentsImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
+import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
+import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetProvider;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.systemview.api.SystemViewManager;
+import org.apache.ignite.lang.ErrorGroups.Sql;
+import org.apache.ignite.sql.SqlException;
+
+/**
+ * Implementation of {@link ExecutionTargetProvider} which takes assignments
from {@link PlacementDriver} and {@link SystemViewManager}.
+ */
+public class ExecutionTargetProviderImpl implements ExecutionTargetProvider {
+ private static final IgniteLogger LOG =
Loggers.forClass(ExecutionTargetProviderImpl.class);
+
+ private final PlacementDriver placementDriver;
+ private final SystemViewManager systemViewManager;
+
+ ExecutionTargetProviderImpl(
+ PlacementDriver placementDriver, SystemViewManager
systemViewManager
+ ) {
+ this.placementDriver = placementDriver;
+ this.systemViewManager = systemViewManager;
+ }
+
+ @Override
+ public CompletableFuture<ExecutionTarget> forTable(
+ HybridTimestamp operationTime,
+ ExecutionTargetFactory factory,
+ IgniteTable table,
+ boolean includeBackups
+ ) {
+ return collectAssignments(table, operationTime, includeBackups)
+ .thenApply(factory::partitioned);
+ }
+
+ @Override
+ public CompletableFuture<ExecutionTarget>
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
+ List<String> nodes = systemViewManager.owningNodes(view.name());
+
+ if (nullOrEmpty(nodes)) {
+ return failedFuture(
+ new SqlException(Sql.MAPPING_ERR, format("The view with
name '{}' could not be found on"
+ + " any active nodes in the cluster", view.name()))
+ );
+ }
+
+ return completedFuture(
+ view.distribution() == IgniteDistributions.single()
+ ? factory.oneOf(nodes)
+ : factory.allOf(nodes)
+ );
+ }
+
+ // need to be refactored after TODO:
https://issues.apache.org/jira/browse/IGNITE-20925
+ /** Get primary replicas. */
+ private CompletableFuture<List<TokenizedAssignments>> collectAssignments(
+ IgniteTable table, HybridTimestamp operationTime, boolean
includeBackups
+ ) {
+ int partitions = table.partitions();
+
+ List<CompletableFuture<TokenizedAssignments>> result = new
ArrayList<>(partitions);
+
+ // no need to wait all partitions after pruning was implemented.
+ for (int partId = 0; partId < partitions; ++partId) {
+ ReplicationGroupId partGroupId = new TablePartitionId(table.id(),
partId);
+
+ CompletableFuture<TokenizedAssignments> partitionAssignment =
includeBackups
+ ? allReplicas(partGroupId, operationTime)
+ : primaryReplica(partGroupId, operationTime);
+
+ result.add(partitionAssignment);
+ }
+
+ CompletableFuture<Void> all =
CompletableFuture.allOf(result.toArray(new CompletableFuture[0]));
+
+ return all.thenApply(v -> result.stream()
+ .map(CompletableFuture::join)
+ .collect(Collectors.toList())
+ );
+ }
+
+ private CompletableFuture<TokenizedAssignments> primaryReplica(
+ ReplicationGroupId replicationGroupId,
+ HybridTimestamp operationTime
+ ) {
+ CompletableFuture<ReplicaMeta> f = placementDriver.awaitPrimaryReplica(
+ replicationGroupId,
+ operationTime,
+ AWAIT_PRIMARY_REPLICA_TIMEOUT,
+ SECONDS
+ );
+
+ return f.handle((primaryReplica, e) -> {
+ if (e != null) {
+ LOG.debug("Failed to retrieve primary replica for partition
{}", e, replicationGroupId);
+
+ throw withCause(IgniteInternalException::new,
REPLICA_UNAVAILABLE_ERR, "Failed to get the primary replica"
+ + " [tablePartitionId=" + replicationGroupId + ']', e);
+ } else {
+ String holder = primaryReplica.getLeaseholder();
+
+ assert holder != null : "Unable to map query, nothing holds
the lease";
+
+ return new
TokenizedAssignmentsImpl(Set.of(Assignment.forPeer(holder)),
primaryReplica.getStartTime().longValue());
+ }
+ });
+ }
+
+ private CompletableFuture<TokenizedAssignments> allReplicas(
+ ReplicationGroupId replicationGroupId,
+ HybridTimestamp operationTime
+ ) {
+ CompletableFuture<TokenizedAssignments> f =
placementDriver.getAssignments(
+ replicationGroupId,
+ operationTime
+ );
+
+ return f.thenCompose(assignments -> {
+ if (assignments == null) {
+ // assignments are not ready yet, let's fall back to primary
replicas
+ return primaryReplica(replicationGroupId, operationTime);
+ }
+
+ return completedFuture(assignments);
+ });
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 8daf6897dd..477496eb63 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -19,16 +19,11 @@ package org.apache.ignite.internal.sql.engine;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException;
-import static
org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.AWAIT_PRIMARY_REPLICA_TIMEOUT;
-import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
-import static
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.EXECUTION_CANCELLED_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.RUNTIME_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR;
@@ -48,7 +43,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
-import java.util.stream.Collectors;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogManager;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
@@ -57,17 +51,12 @@ import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
-import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import org.apache.ignite.internal.replicator.ReplicaService;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.sql.ResultSetMetadataImpl;
import org.apache.ignite.internal.sql.SqlCommon;
@@ -81,15 +70,11 @@ import
org.apache.ignite.internal.sql.engine.exec.ExecutionService;
import org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImpl;
import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl;
-import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import
org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistryImpl;
-import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
-import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
-import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetProvider;
import org.apache.ignite.internal.sql.engine.exec.mapping.MappingServiceImpl;
import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
import org.apache.ignite.internal.sql.engine.prepare.KeyValueGetPlan;
@@ -102,14 +87,11 @@ import
org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
import
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPrunerImpl;
import org.apache.ignite.internal.sql.engine.property.SqlProperties;
import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper;
-import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
-import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManagerImpl;
import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
import org.apache.ignite.internal.sql.engine.sql.ParserService;
import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContextImpl;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
@@ -131,7 +113,6 @@ import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.lang.ErrorGroups.Sql;
import org.apache.ignite.sql.ResultSetMetadata;
import org.apache.ignite.sql.SqlException;
import org.jetbrains.annotations.Nullable;
@@ -144,9 +125,6 @@ public class SqlQueryProcessor implements QueryProcessor {
/** Default time-zone ID. */
public static final ZoneId DEFAULT_TIME_ZONE_ID = ZoneId.of("UTC");
- /** The logger. */
- private static final IgniteLogger LOG =
Loggers.forClass(SqlQueryProcessor.class);
-
private static final int PARSED_RESULT_CACHE_SIZE = 10_000;
/** Size of the table access cache. */
@@ -330,36 +308,13 @@ public class SqlQueryProcessor implements QueryProcessor {
view -> () -> systemViewManager.scanView(view.name())
);
- var executionTargetProvider = new ExecutionTargetProvider() {
- @Override
- public CompletableFuture<ExecutionTarget>
forTable(ExecutionTargetFactory factory, IgniteTable table) {
- return primaryReplicas(table)
- .thenApply(factory::partitioned);
- }
-
- @Override
- public CompletableFuture<ExecutionTarget>
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
- List<String> nodes =
systemViewManager.owningNodes(view.name());
-
- if (nullOrEmpty(nodes)) {
- return failedFuture(
- new SqlException(Sql.MAPPING_ERR, format("The view
with name '{}' could not be found on"
- + " any active nodes in the cluster",
view.name()))
- );
- }
-
- return completedFuture(
- view.distribution() == IgniteDistributions.single()
- ? factory.oneOf(nodes)
- : factory.allOf(nodes)
- );
- }
- };
+ var executionTargetProvider = new
ExecutionTargetProviderImpl(placementDriver, systemViewManager);
var partitionPruner = new PartitionPrunerImpl();
var mappingService = new MappingServiceImpl(
nodeName,
+ clockService,
executionTargetProvider,
CACHE_FACTORY,
clusterCfg.planner().estimatedNumberOfQueries().value(),
@@ -397,51 +352,6 @@ public class SqlQueryProcessor implements QueryProcessor {
return nullCompletedFuture();
}
- // need to be refactored after TODO:
https://issues.apache.org/jira/browse/IGNITE-20925
- /** Get primary replicas. */
- private CompletableFuture<List<NodeWithConsistencyToken>>
primaryReplicas(IgniteTable table) {
- int partitions = table.partitions();
-
- List<CompletableFuture<NodeWithConsistencyToken>> result = new
ArrayList<>(partitions);
-
- HybridTimestamp clockNow = clockService.now();
-
- // no need to wait all partitions after pruning was implemented.
- for (int partId = 0; partId < partitions; ++partId) {
- int partitionId = partId;
- ReplicationGroupId partGroupId = new TablePartitionId(table.id(),
partitionId);
-
- CompletableFuture<ReplicaMeta> f =
placementDriver.awaitPrimaryReplica(
- partGroupId,
- clockNow,
- AWAIT_PRIMARY_REPLICA_TIMEOUT,
- SECONDS
- );
-
- result.add(f.handle((primaryReplica, e) -> {
- if (e != null) {
- LOG.debug("Failed to retrieve primary replica for
partition {}", e, partitionId);
-
- throw withCause(IgniteInternalException::new,
REPLICA_UNAVAILABLE_ERR, "Failed to get the primary replica"
- + " [tablePartitionId=" + partGroupId + ']', e);
- } else {
- String holder = primaryReplica.getLeaseholder();
-
- assert holder != null : "Unable to map query, nothing
holds the lease";
-
- return new NodeWithConsistencyToken(holder,
primaryReplica.getStartTime().longValue());
- }
- }));
- }
-
- CompletableFuture<Void> all =
CompletableFuture.allOf(result.toArray(new CompletableFuture[0]));
-
- return all.thenApply(v -> result.stream()
- .map(CompletableFuture::join)
- .collect(Collectors.toList())
- );
- }
-
/** {@inheritDoc} */
@Override
public synchronized CompletableFuture<Void> stopAsync(ComponentContext
componentContext) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index a87817d0dc..be4d7705f6 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -930,7 +930,8 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
private AsyncCursor<InternalSqlRow> execute(InternalTransaction tx,
MultiStepPlan multiStepPlan) {
assert root != null;
- MappingParameters mappingParameters =
MappingParameters.create(ctx.parameters());
+ boolean mapOnBackups = tx.isReadOnly();
+ MappingParameters mappingParameters =
MappingParameters.create(ctx.parameters(), mapOnBackups);
mappingService.map(multiStepPlan,
mappingParameters).whenCompleteAsync((mappedFragments, mappingErr) -> {
if (mappingErr != null) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTarget.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTarget.java
index fbc24eca92..47f629fc2c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTarget.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTarget.java
@@ -45,6 +45,16 @@ public interface ExecutionTarget {
*/
ExecutionTarget colocateWith(ExecutionTarget other) throws
ColocationMappingException;
+ /**
+ * Removes options from current target which are not colocated with other
target.
+ *
+ * <p>If target has several options, remove those are not presented in
given target to improve colocation.
+ *
+ * @param other Target with which we need to colocate current target.
+ * @return Returns new target in case current has been adjusted, return
{@code this} instance otherwise.
+ */
+ ExecutionTarget trimTo(ExecutionTarget other);
+
/**
* Finalises target by choosing exactly one node for targets with multiple
options.
*
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactory.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactory.java
index 08b216725b..e3a02547ef 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactory.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactory.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine.exec.mapping;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import java.util.List;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
/**
@@ -28,10 +29,10 @@ public interface ExecutionTargetFactory {
/**
* Creates target from list of primary partitions.
*
- * @param nodes List of partitions.
+ * @param assignments List of assignments.
* @return An execution target.
*/
- ExecutionTarget partitioned(List<NodeWithConsistencyToken> nodes);
+ ExecutionTarget partitioned(List<TokenizedAssignments> assignments);
/**
* Creates target from list of required nodes.
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java
index fa6f3a343c..d131664cf1 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.sql.engine.exec.mapping;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
@@ -29,11 +30,18 @@ public interface ExecutionTargetProvider {
/**
* Returns an execution target for a given table.
*
+ * @param operationTime Time of the operation to get consistent results
among different calls.
* @param factory A factory to create target for given table.
* @param table A table to create execution target for.
+ * @param includeBackups Flags denotes whether to include non-primary
replicas into target.
* @return A future representing the result.
*/
- CompletableFuture<ExecutionTarget> forTable(ExecutionTargetFactory
factory, IgniteTable table);
+ CompletableFuture<ExecutionTarget> forTable(
+ HybridTimestamp operationTime,
+ ExecutionTargetFactory factory,
+ IgniteTable table,
+ boolean includeBackups
+ );
/**
* Returns an execution target for a given view.
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
index d466d9801d..271a7055bb 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
@@ -17,18 +17,31 @@
package org.apache.ignite.internal.sql.engine.exec.mapping;
+import static org.apache.ignite.internal.util.CollectionUtils.first;
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongList;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongSet;
import it.unimi.dsi.fastutil.longs.LongSets;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Queue;
+import java.util.stream.Collectors;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.rel.BiRel;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
import org.apache.calcite.rel.core.SetOp;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.metadata.RelMetadataQueryEx;
import org.apache.ignite.internal.sql.engine.prepare.Fragment;
@@ -69,11 +82,15 @@ import
org.apache.ignite.internal.sql.engine.rel.set.IgniteSetOp;
import org.apache.ignite.internal.sql.engine.schema.IgniteDataSource;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
+import org.apache.ignite.internal.util.Pair;
+import org.apache.ignite.lang.ErrorGroups.Sql;
+import org.jetbrains.annotations.Nullable;
/**
* A mapper that traverse a fragment tree and calculates a mapping.
*/
class FragmentMapper {
+ private static final int MAPPING_ATTEMPTS = 3;
private final RelMetadataQuery mq;
@@ -89,13 +106,7 @@ class FragmentMapper {
this.targets = targets;
}
- /**
- * Computes mapping for the given fragment.
- *
- * @param fragment A fragment to compute mapping for.
- * @return Fragment meta information.
- */
- public FragmentMapping map(Fragment fragment) throws
FragmentMappingException {
+ private Mapping map(Fragment fragment) throws FragmentMappingException {
Mapping mapping = fragment.root().accept(new MapperVisitor());
if (fragment.single()) {
@@ -125,7 +136,187 @@ class FragmentMapper {
}
}
- return new FragmentMapping(mapping.createColocationGroups());
+ mapping.validate();
+
+ return mapping;
+ }
+
+ /**
+ * Computes mapping for the given list of fragments.
+ *
+ * @param fragments Fragments to compute mapping for.
+ * @param idGenerator Identity generator is used to get id for a new
fragments which were split from original one in case mapper
+ * consider the fragment impossible to colocate.
+ * @return Fragment meta information.
+ */
+ public List<FragmentMapping> map(List<Fragment> fragments, IdGenerator
idGenerator) {
+ Exception ex = null;
+ boolean lastAttemptSucceed = false;
+ Long2ObjectMap<Pair<Fragment, Mapping>> mappingByFragmentId = new
Long2ObjectOpenHashMap<>();
+ for (int attempt = 0; attempt < MAPPING_ATTEMPTS &&
!lastAttemptSucceed; attempt++) {
+ Fragment currentFragment = null;
+ try {
+ for (Fragment fragment : fragments) {
+ currentFragment = fragment;
+
+ if
(mappingByFragmentId.containsKey(fragment.fragmentId())) {
+ continue;
+ }
+
+ mappingByFragmentId.put(fragment.fragmentId(), new
Pair<>(fragment, map(fragment)));
+ }
+
+ lastAttemptSucceed = true;
+ } catch (FragmentMappingException mappingException) {
+ if (ex == null) {
+ ex = mappingException;
+ } else {
+ ex.addSuppressed(mappingException);
+ }
+
+ fragments = replace(
+ fragments,
+ currentFragment,
+ new FragmentSplitter(idGenerator,
mappingException.node()).go(currentFragment)
+ );
+ }
+ }
+
+ if (!lastAttemptSucceed) {
+ throw new IgniteInternalException(Sql.MAPPING_ERR, "Unable to map
query: " + ex.getMessage(), ex);
+ }
+
+ return adjustMapping(mappingByFragmentId).stream()
+ .map(pair -> new FragmentMapping(pair.getFirst(),
pair.getSecond().createColocationGroups()))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Attempt to colocate different fragments with each other.
+ *
+ * <p>Current implementation assumes that intermediate results becomes
smaller on every step, thus it prioritizes colocation of children
+ * over colocation of parents. This assumption is not always correct, but
it will be addressed later by more sophisticated algorithm.
+ *
+ * @param mappingByFragmentId Fragments to map.
+ * @return List of pairs of fragments to theirs mapping metadata.
+ */
+ private static List<Pair<Fragment, Mapping>> adjustMapping(
+ Long2ObjectMap<Pair<Fragment, Mapping>> mappingByFragmentId
+ ) {
+ LongList fragmentIds = collectFragmentIds(mappingByFragmentId);
+
+ /*
+ Current implementation adjusts colocation in two passes. First pass
is bottom-up, here we
+ start from leaf nodes and adjust execution targets pairwise up to the
root. Second pass is
+ top-down, here we try to improve colocation of an entire plan with
root fragment.
+ */
+
+ // first pass is bottom-up
+ // the very first fragment is root, which don't have any parents, so
let's just skip it
+ for (int i = fragmentIds.size() - 1; i > 0; i--) {
+ Pair<Fragment, Mapping> currentPair =
mappingByFragmentId.get(fragmentIds.getLong(i));
+
+ Mapping currentMapping = currentPair.getSecond();
+
+ Long targetFragmentId = currentPair.getFirst().targetFragmentId();
+
+ assert targetFragmentId != null;
+
+ Pair<Fragment, Mapping> parentPair =
mappingByFragmentId.get((long) targetFragmentId);
+
+ Mapping parentMapping = parentPair.getSecond();
+
+ { // to reduce visibility scope of `newCurrentMapping`
+ Mapping newCurrentMapping =
currentMapping.bestEffortColocate(parentMapping);
+ if (newCurrentMapping != null) {
+ Fragment currentFragment = currentPair.getFirst();
+
+ mappingByFragmentId.put(currentFragment.fragmentId(), new
Pair<>(currentFragment, newCurrentMapping));
+ currentMapping = newCurrentMapping;
+ }
+ }
+
+ { // to reduce visibility scope of `newParentMapping`
+ Mapping newParentMapping =
parentMapping.bestEffortColocate(currentMapping);
+ if (newParentMapping != null) {
+ Fragment parentFragment = parentPair.getFirst();
+
+ mappingByFragmentId.put(parentFragment.fragmentId(), new
Pair<>(parentFragment, newParentMapping));
+ }
+ }
+ }
+
+ // second pass is top-down
+ // root fragment has been already processed on previous pass, thus
let's skip it
+ for (int i = 1; i < fragmentIds.size(); i++) {
+ Pair<Fragment, Mapping> currentPair =
mappingByFragmentId.get(fragmentIds.getLong(i));
+
+ Mapping currentMapping = currentPair.getSecond();
+
+ for (IgniteReceiver receiver : currentPair.getFirst().remotes()) {
+ Pair<Fragment, Mapping> childPair =
mappingByFragmentId.get(receiver.sourceFragmentId());
+
+ Mapping childMapping = childPair.getSecond();
+
+ { // to reduce visibility scope of `newCurrentMapping`
+ Mapping newCurrentMapping =
currentMapping.bestEffortColocate(childMapping);
+ if (newCurrentMapping != null) {
+ Fragment currentFragment = currentPair.getFirst();
+
+ mappingByFragmentId.put(currentFragment.fragmentId(),
new Pair<>(currentFragment, newCurrentMapping));
+ currentMapping = newCurrentMapping;
+ }
+ }
+
+ { // to reduce visibility scope of `newChildMapping`
+ Mapping newChildMapping =
childMapping.bestEffortColocate(currentMapping);
+ if (newChildMapping != null) {
+ Fragment childFragment = childPair.getFirst();
+
+ mappingByFragmentId.put(childFragment.fragmentId(),
new Pair<>(childFragment, newChildMapping));
+ }
+ }
+ }
+ }
+
+ return fragmentIds.longStream()
+ .mapToObj(mappingByFragmentId::get)
+ .collect(Collectors.toList());
+ }
+
+ private static LongList collectFragmentIds(Long2ObjectMap<Pair<Fragment,
Mapping>> mappingByFragmentId) {
+ LongList fragmentIds = new LongArrayList();
+ Queue<Fragment> toProcess = new LinkedList<>();
+
+ Fragment root = findRootFragment(mappingByFragmentId.values());
+
+ toProcess.add(root);
+
+ while (!toProcess.isEmpty()) {
+ Fragment current = toProcess.poll();
+
+ fragmentIds.add(current.fragmentId());
+
+ for (IgniteReceiver receiver : current.remotes()) {
+
toProcess.add(mappingByFragmentId.get(receiver.sourceFragmentId()).getFirst());
+ }
+ }
+ return fragmentIds;
+ }
+
+ private static Fragment findRootFragment(Collection<Pair<Fragment,
Mapping>> fragments) {
+ Fragment root = null;
+ for (Pair<Fragment, Mapping> pair : fragments) {
+ if (pair.getFirst().rootFragment()) {
+ assert root == null;
+
+ root = pair.getFirst();
+ }
+ }
+
+ assert root != null;
+
+ return root;
}
private class MapperVisitor implements IgniteRelVisitor<Mapping> {
@@ -339,6 +530,9 @@ class FragmentMapper {
RelOptCost leftVarCost = mq.getCumulativeCost(leftVar);
RelOptCost rightVarCost = mq.getCumulativeCost(rightVar);
+ assert leftVarCost != null;
+ assert rightVarCost != null;
+
if (leftVarCost.isLt(rightVarCost)) {
return new FailedMapping(new
FragmentMappingException(e.getMessage(), left, e));
} else {
@@ -351,10 +545,9 @@ class FragmentMapper {
* See {@link MapperVisitor#mapSingleRel(SingleRel)}
*
* <p>{@link ColocationMappingException} may be thrown on two children
nodes locations merge. This means that the
- * fragment (which part the parent node is) cannot be executed on any
node and additional exchange is needed.
- * This case we throw {@link FragmentMappingException} with an edge,
where we need the additional exchange.
- * After the exchange is put into the fragment and the fragment is
split into two ones, fragment meta information
- * will be recalculated for all fragments.
+ * fragment (which part the parent node is) cannot be executed on any
node and additional exchange is needed. This case we throw
+ * {@link FragmentMappingException} with an edge, where we need the
additional exchange. After the exchange is put into the fragment
+ * and the fragment is split into two ones, fragment meta information
will be recalculated for all fragments.
*/
private Mapping mapSetOp(SetOp rel) {
if (TraitUtils.distribution(rel) == IgniteDistributions.random()) {
@@ -387,6 +580,8 @@ class FragmentMapper {
}
}
+ assert res != null : "SetOp without inputs";
+
return res;
}
}
@@ -455,7 +650,11 @@ class FragmentMapper {
Mapping colocate(Mapping other) throws ColocationMappingException;
- List<ColocationGroup> createColocationGroups() throws
FragmentMappingException;
+ @Nullable Mapping bestEffortColocate(Mapping other);
+
+ void validate() throws FragmentMappingException;
+
+ List<ColocationGroup> createColocationGroups();
}
private static class FailedMapping implements Mapping {
@@ -481,9 +680,19 @@ class FragmentMapper {
}
@Override
- public List<ColocationGroup> createColocationGroups() throws
FragmentMappingException {
+ public @Nullable Mapping bestEffortColocate(Mapping other) {
+ return null;
+ }
+
+ @Override
+ public void validate() throws FragmentMappingException {
throw exception;
}
+
+ @Override
+ public List<ColocationGroup> createColocationGroups() {
+ throw new AssertionError("Should not be called");
+ }
}
private static class CombinedMapping implements Mapping {
@@ -509,7 +718,19 @@ class FragmentMapper {
}
@Override
- public List<ColocationGroup> createColocationGroups() throws
FragmentMappingException {
+ public @Nullable Mapping bestEffortColocate(Mapping other) {
+ return null;
+ }
+
+ @Override
+ public void validate() throws FragmentMappingException {
+ for (Mapping mapping : mappings) {
+ mapping.validate();
+ }
+ }
+
+ @Override
+ public List<ColocationGroup> createColocationGroups() {
List<ColocationGroup> groups = new ArrayList<>();
for (Mapping mapping : mappings) {
@@ -557,6 +778,30 @@ class FragmentMapper {
return new ColocatedMapping(LongSets.unmodifiable(sourceIds),
colocatedTarget);
}
+ @Override
+ public @Nullable Mapping bestEffortColocate(Mapping other) {
+ if (!other.colocated()) {
+ return null;
+ }
+
+ assert other instanceof ColocatedMapping :
other.getClass().getCanonicalName();
+
+ ColocatedMapping colocatedMapping = (ColocatedMapping) other;
+
+ ExecutionTarget newTarget = target.trimTo(colocatedMapping.target);
+
+ if (newTarget == target) {
+ return null;
+ }
+
+ return new ColocatedMapping(this.sourceIds, newTarget);
+ }
+
+ @Override
+ public void validate() {
+ // colocated mapping always valid
+ }
+
@Override
public List<ColocationGroup> createColocationGroups() {
ExecutionTarget finalised = target.finalise();
@@ -573,4 +818,46 @@ class FragmentMapper {
);
}
}
+
+ private static List<Fragment> replace(
+ List<Fragment> originalFragments,
+ Fragment fragmentToReplace,
+ List<Fragment> replacement
+ ) {
+ assert !nullOrEmpty(replacement);
+
+ Long2LongOpenHashMap newTargets = new Long2LongOpenHashMap();
+ for (Fragment fragment0 : replacement) {
+ for (IgniteReceiver remote : fragment0.remotes()) {
+ newTargets.put(remote.exchangeId(), fragment0.fragmentId());
+ }
+ }
+
+ List<Fragment> newFragments = new ArrayList<>(originalFragments.size()
+ replacement.size() - 1);
+ for (Fragment fragment : originalFragments) {
+ if (fragment == fragmentToReplace) {
+ //noinspection AssignmentToForLoopParameter
+ fragment = first(replacement);
+ } else if (!fragment.rootFragment()) {
+ IgniteSender sender = (IgniteSender) fragment.root();
+
+ long newTargetId =
newTargets.getOrDefault(sender.exchangeId(), Long.MIN_VALUE);
+
+ if (newTargetId != Long.MIN_VALUE) {
+ sender = new IgniteSender(sender.getCluster(),
sender.getTraitSet(),
+ sender.getInput(), sender.exchangeId(),
newTargetId, sender.distribution());
+
+ //noinspection AssignmentToForLoopParameter
+ fragment = new Fragment(fragment.fragmentId(),
fragment.correlated(), sender,
+ fragment.remotes(), fragment.tables(),
fragment.systemViews());
+ }
+ }
+
+ newFragments.add(fragment);
+ }
+
+ newFragments.addAll(replacement.subList(1, replacement.size()));
+
+ return newFragments;
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapping.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapping.java
index dba2b4967c..16e20ea5e5 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapping.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapping.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.sql.engine.exec.mapping;
import java.util.List;
+import org.apache.ignite.internal.sql.engine.prepare.Fragment;
/**
* Intermediate result returned by {@link FragmentMapper}.
@@ -30,13 +31,19 @@ import java.util.List;
* <p>That's why we need additional container here.
*/
class FragmentMapping {
+ private final Fragment fragment;
private final List<ColocationGroup> groups;
- FragmentMapping(List<ColocationGroup> groups) {
+ FragmentMapping(Fragment fragment, List<ColocationGroup> groups) {
+ this.fragment = fragment;
this.groups = groups;
}
- public List<ColocationGroup> groups() {
+ List<ColocationGroup> groups() {
return groups;
}
+
+ Fragment fragment() {
+ return fragment;
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentSplitter.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentSplitter.java
index becf16d557..a8105ec9eb 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentSplitter.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentSplitter.java
@@ -68,7 +68,7 @@ class FragmentSplitter extends IgniteRelShuttle {
public List<Fragment> go(Fragment fragment) {
ArrayList<Fragment> res = new ArrayList<>();
- stack.push(new FragmentProto(idGenerator.nextId(),
fragment.correlated(), fragment.root()));
+ stack.push(new FragmentProto(fragment.fragmentId(),
fragment.correlated(), fragment.root()));
while (!stack.isEmpty()) {
curr = stack.pop();
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
index 014cd4add1..1f586c4541 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
@@ -23,31 +23,41 @@ import org.apache.ignite.internal.util.ArrayUtils;
public class MappingParameters {
/** Empty mapping parameters. */
- public static final MappingParameters EMPTY = new
MappingParameters(ArrayUtils.OBJECT_EMPTY_ARRAY);
+ public static final MappingParameters EMPTY = new
MappingParameters(ArrayUtils.OBJECT_EMPTY_ARRAY, false);
+ private final boolean mapOnBackups;
private final Object[] dynamicParameters;
/**
* Creates mapping parameters.
*
* @param dynamicParameters Dynamic parameters.
+ * @param mapOnBackups Whether to use non-primary replicas for mapping or
not.
*
* @return Mapping parameters.
*/
- public static MappingParameters create(Object[] dynamicParameters) {
+ public static MappingParameters create(Object[] dynamicParameters, boolean
mapOnBackups) {
if (dynamicParameters.length == 0) {
return EMPTY;
} else {
- return new MappingParameters(dynamicParameters);
+ return new MappingParameters(dynamicParameters, mapOnBackups);
}
}
- private MappingParameters(Object[] dynamicParameters) {
+ private MappingParameters(
+ Object[] dynamicParameters,
+ boolean mapOnBackups
+ ) {
this.dynamicParameters = dynamicParameters;
+ this.mapOnBackups = mapOnBackups;
}
/** Values of dynamic parameters. */
- public Object[] dynamicParameters() {
+ Object[] dynamicParameters() {
return dynamicParameters;
}
+
+ boolean mapOnBackups() {
+ return mapOnBackups;
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
index 7c28ac249e..bcdf364850 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
@@ -18,20 +18,18 @@
package org.apache.ignite.internal.sql.engine.exec.mapping;
import static java.util.concurrent.CompletableFuture.allOf;
-import static org.apache.ignite.internal.util.CollectionUtils.first;
-import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntObjectPair;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
-import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -41,7 +39,8 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
-import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.TablePartitionId;
import
org.apache.ignite.internal.sql.engine.exec.mapping.MappingServiceImpl.LogicalTopologyHolder.TopologySnapshot;
@@ -55,7 +54,6 @@ import
org.apache.ignite.internal.sql.engine.schema.IgniteDataSource;
import org.apache.ignite.internal.sql.engine.util.cache.Cache;
import org.apache.ignite.internal.sql.engine.util.cache.CacheFactory;
import org.apache.ignite.internal.util.CompletableFutures;
-import org.apache.ignite.lang.ErrorGroups.Sql;
/**
* An implementation of {@link MappingService}.
@@ -64,15 +62,14 @@ import org.apache.ignite.lang.ErrorGroups.Sql;
* Always uses latest topology snapshot to map query.
*/
public class MappingServiceImpl implements MappingService,
LogicalTopologyEventListener {
- private static final int MAPPING_ATTEMPTS = 3;
-
private final LogicalTopologyHolder topologyHolder = new
LogicalTopologyHolder();
private final CompletableFuture<Void> initialTopologyFuture = new
CompletableFuture<>();
private final String localNodeName;
+ private final ClockService clock;
private final ExecutionTargetProvider targetProvider;
private final Cache<PlanId, FragmentsTemplate> templatesCache;
- private final Cache<PlanId, MappingsCacheValue> mappingsCache;
+ private final Cache<MappingsCacheKey, MappingsCacheValue> mappingsCache;
private final Executor taskExecutor;
private final PartitionPruner partitionPruner;
@@ -80,6 +77,7 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
* Constructor.
*
* @param localNodeName Name of the current Ignite node.
+ * @param clock Clock service to get actual time.
* @param targetProvider Execution target provider.
* @param cacheFactory A factory to create cache of fragments.
* @param cacheSize Size of the cache of query plans. Should be non
negative.
@@ -88,6 +86,7 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
*/
public MappingServiceImpl(
String localNodeName,
+ ClockService clock,
ExecutionTargetProvider targetProvider,
CacheFactory cacheFactory,
int cacheSize,
@@ -95,6 +94,7 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
Executor taskExecutor
) {
this.localNodeName = localNodeName;
+ this.clock = clock;
this.targetProvider = targetProvider;
this.templatesCache = cacheFactory.create(cacheSize);
this.mappingsCache = cacheFactory.create(cacheSize);
@@ -130,41 +130,50 @@ public class MappingServiceImpl implements
MappingService, LogicalTopologyEventL
FragmentsTemplate template = getOrCreateTemplate(multiStepPlan,
context);
- MappingsCacheValue cacheValue =
mappingsCache.compute(multiStepPlan.id(), (key, val) -> {
- if (val == null) {
- IntSet tableIds = new IntOpenHashSet();
- boolean topologyAware = false;
-
- for (Fragment fragment : template.fragments) {
- topologyAware = topologyAware ||
!fragment.systemViews().isEmpty();
- for (IgniteDataSource source : fragment.tables().values())
{
- tableIds.add(source.id());
- }
- }
+ boolean mapOnBackups = parameters.mapOnBackups();
+ MappingsCacheValue cacheValue = mappingsCache.compute(
+ new MappingsCacheKey(multiStepPlan.id(), mapOnBackups),
+ (key, val) -> {
+ if (val == null) {
+ IntSet tableIds = new IntOpenHashSet();
+ boolean topologyAware = false;
+
+ for (Fragment fragment : template.fragments) {
+ topologyAware = topologyAware ||
!fragment.systemViews().isEmpty();
+ for (IgniteDataSource source :
fragment.tables().values()) {
+ tableIds.add(source.id());
+ }
+ }
- long topVer = topologyAware ? topology.version() :
Long.MAX_VALUE;
+ long topVer = topologyAware ? topology.version() :
Long.MAX_VALUE;
- return new MappingsCacheValue(topVer, tableIds,
mapFragments(context, template));
- }
+ return new MappingsCacheValue(topVer, tableIds,
mapFragments(context, template, key.mapOnBackups));
+ }
- if (val.topVer < topology.version()) {
- return new MappingsCacheValue(topology.version(),
val.tableIds, mapFragments(context, template));
- }
+ if (val.topVer < topology.version()) {
+ return new MappingsCacheValue(topology.version(),
val.tableIds, mapFragments(context, template, key.mapOnBackups));
+ }
- return val;
- });
+ return val;
+ }
+ );
return cacheValue.mappedFragments.thenApply(mappedFragments ->
applyPartitionPruning(mappedFragments.fragments, parameters));
}
- private CompletableFuture<MappedFragments> mapFragments(MappingContext
context, FragmentsTemplate template) {
+ private CompletableFuture<MappedFragments> mapFragments(
+ MappingContext context,
+ FragmentsTemplate template,
+ boolean mapOnBackups
+ ) {
IdGenerator idGenerator = new IdGenerator(template.nextId);
List<Fragment> fragments = new ArrayList<>(template.fragments);
+ HybridTimestamp mappingTime = clock.now();
List<CompletableFuture<IntObjectPair<ExecutionTarget>>> targets =
fragments.stream().flatMap(fragment -> Stream.concat(
fragment.tables().values().stream()
- .map(table ->
targetProvider.forTable(context.targetFactory(), table)
+ .map(table ->
targetProvider.forTable(mappingTime, context.targetFactory(), table,
mapOnBackups)
.thenApply(target ->
IntObjectPair.of(table.id(), target))
),
fragment.systemViews().stream()
@@ -187,67 +196,35 @@ public class MappingServiceImpl implements
MappingService, LogicalTopologyEventL
FragmentMapper mapper = new
FragmentMapper(template.cluster.getMetadataQuery(), context, targetsById);
- Long2ObjectMap<FragmentMapping> mappingByFragmentId = new
Long2ObjectOpenHashMap<>();
+ List<FragmentMapping> mappings = mapper.map(fragments,
idGenerator);
+
Long2ObjectMap<ColocationGroup> groupsBySourceId = new
Long2ObjectOpenHashMap<>();
Long2ObjectMap<List<String>> allSourcesByExchangeId = new
Long2ObjectOpenHashMap<>();
- Exception ex = null;
- boolean lastAttemptSucceed = false;
- List<Fragment> fragmentsToMap = fragments;
- for (int attempt = 0; attempt < MAPPING_ATTEMPTS &&
!lastAttemptSucceed; attempt++) {
- Fragment currentFragment = null;
- try {
- for (Fragment fragment : fragmentsToMap) {
- currentFragment = fragment;
-
- if
(mappingByFragmentId.containsKey(fragment.fragmentId())) {
- continue;
- }
-
- FragmentMapping mapping = mapper.map(fragment);
-
- mappingByFragmentId.put(fragment.fragmentId(),
mapping);
- for (ColocationGroup group : mapping.groups())
{
- for (long sourceId : group.sourceIds()) {
- groupsBySourceId.put(sourceId, group);
- }
- }
-
- if (!fragment.rootFragment()) {
- IgniteSender sender = (IgniteSender)
fragment.root();
-
- List<String> nodeNames =
mapping.groups().stream()
- .flatMap(g ->
g.nodeNames().stream())
-
.distinct().collect(Collectors.toList());
-
-
allSourcesByExchangeId.put(sender.exchangeId(), nodeNames);
- }
- }
+ for (FragmentMapping mapping : mappings) {
+ Fragment fragment = mapping.fragment();
- lastAttemptSucceed = true;
- } catch (FragmentMappingException mappingException) {
- if (ex == null) {
- ex = mappingException;
- } else {
- ex.addSuppressed(mappingException);
+ for (ColocationGroup group : mapping.groups()) {
+ for (long sourceId : group.sourceIds()) {
+ groupsBySourceId.put(sourceId, group);
}
-
- fragmentsToMap = replace(
- fragmentsToMap,
- currentFragment,
- new FragmentSplitter(idGenerator,
mappingException.node()).go(currentFragment)
- );
}
- }
- if (!lastAttemptSucceed) {
- throw new IgniteInternalException(Sql.MAPPING_ERR,
"Unable to map query: " + ex.getMessage(), ex);
+ if (!fragment.rootFragment()) {
+ IgniteSender sender = (IgniteSender)
fragment.root();
+
+ List<String> nodeNames = mapping.groups().stream()
+ .flatMap(g -> g.nodeNames().stream())
+ .distinct().collect(Collectors.toList());
+
+ allSourcesByExchangeId.put(sender.exchangeId(),
nodeNames);
+ }
}
- List<MappedFragment> mappedFragmentsList = new
ArrayList<>(fragmentsToMap.size());
+ List<MappedFragment> mappedFragmentsList = new
ArrayList<>(mappings.size());
Set<String> targetNodes = new HashSet<>();
- for (Fragment fragment : fragmentsToMap) {
- FragmentMapping mapping =
mappingByFragmentId.get(fragment.fragmentId());
+ for (FragmentMapping mapping : mappings) {
+ Fragment fragment = mapping.fragment();
ColocationGroup targetGroup = null;
if (!fragment.rootFragment()) {
@@ -293,14 +270,9 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot
newTopology) {
topologyHolder.update(newTopology);
- mappingsCache.removeIfValue(value -> {
- if (value.mappedFragments.isDone()) {
- return
value.mappedFragments.join().nodes.contains(leftNode.name());
- }
-
- // Invalidate non-completed mappings to reduce the chance of
getting stale value
- return true;
- });
+ mappingsCache.removeIfValue(value ->
+ !value.mappedFragments.isDone() // Invalidate non-completed
mappings to reduce the chance of getting stale value
+ ||
value.mappedFragments.join().nodes.contains(leftNode.name()));
}
@Override
@@ -308,46 +280,6 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
topologyHolder.update(newTopology);
}
- private static List<Fragment> replace(
- List<Fragment> originalFragments,
- Fragment fragmentToReplace,
- List<Fragment> replacement
- ) {
- assert !nullOrEmpty(replacement);
-
- Long2LongOpenHashMap newTargets = new Long2LongOpenHashMap();
- for (Fragment fragment0 : replacement) {
- for (IgniteReceiver remote : fragment0.remotes()) {
- newTargets.put(remote.exchangeId(), fragment0.fragmentId());
- }
- }
-
- List<Fragment> newFragments = new ArrayList<>(originalFragments.size()
+ replacement.size() - 1);
- for (Fragment fragment : originalFragments) {
- if (fragment == fragmentToReplace) {
- fragment = first(replacement);
- } else if (!fragment.rootFragment()) {
- IgniteSender sender = (IgniteSender) fragment.root();
-
- long newTargetId =
newTargets.getOrDefault(sender.exchangeId(), Long.MIN_VALUE);
-
- if (newTargetId != Long.MIN_VALUE) {
- sender = new IgniteSender(sender.getCluster(),
sender.getTraitSet(),
- sender.getInput(), sender.exchangeId(),
newTargetId, sender.distribution());
-
- fragment = new Fragment(fragment.fragmentId(),
fragment.correlated(), sender,
- fragment.remotes(), fragment.tables(),
fragment.systemViews());
- }
- }
-
- newFragments.add(fragment);
- }
-
- newFragments.addAll(replacement.subList(1, replacement.size()));
-
- return newFragments;
- }
-
private List<MappedFragment> applyPartitionPruning(List<MappedFragment>
mappedFragments, MappingParameters parameters) {
return partitionPruner.apply(mappedFragments,
parameters.dynamicParameters());
}
@@ -448,4 +380,33 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
this.mappedFragments = mappedFragments;
}
}
+
+ private static class MappingsCacheKey {
+ private final PlanId planId;
+ private final boolean mapOnBackups;
+
+ MappingsCacheKey(PlanId planId, boolean mapOnBackups) {
+ this.planId = planId;
+ this.mapOnBackups = mapOnBackups;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ MappingsCacheKey that = (MappingsCacheKey) o;
+ return mapOnBackups == that.mapOnBackups && Objects.equals(planId,
that.planId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(planId, mapOnBackups);
+ }
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AllOfTarget.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AllOfTarget.java
index 8435366979..d2bae3afcf 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AllOfTarget.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AllOfTarget.java
@@ -49,6 +49,11 @@ class AllOfTarget extends AbstractTarget {
return ((AbstractTarget) other).colocate(this);
}
+ @Override
+ public ExecutionTarget trimTo(ExecutionTarget other) {
+ return this;
+ }
+
@Override
ExecutionTarget colocate(AllOfTarget other) throws
ColocationMappingException {
return colocate(this, other);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/OneOfTarget.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/OneOfTarget.java
index 500a5026c3..3098c7dd97 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/OneOfTarget.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/OneOfTarget.java
@@ -56,6 +56,21 @@ class OneOfTarget extends AbstractTarget {
return ((AbstractTarget) other).colocate(this);
}
+ @Override
+ public ExecutionTarget trimTo(ExecutionTarget other) {
+ assert other instanceof AbstractTarget : other == null ? "<null>" :
other.getClass().getCanonicalName();
+
+ long otherNodes = ((AbstractTarget) other).nodes;
+
+ long newNodes = nodes & otherNodes;
+
+ if (newNodes == nodes || newNodes == 0) {
+ return this;
+ }
+
+ return new OneOfTarget(newNodes);
+ }
+
@Override
public ExecutionTarget colocate(AllOfTarget other) throws
ColocationMappingException {
return colocate(other, this);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/PartitionedTarget.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/PartitionedTarget.java
index aee148f04e..9a0680eb3e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/PartitionedTarget.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/PartitionedTarget.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster;
+import it.unimi.dsi.fastutil.ints.Int2LongFunction;
import java.util.List;
import
org.apache.ignite.internal.sql.engine.exec.mapping.ColocationMappingException;
import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
@@ -67,6 +68,56 @@ class PartitionedTarget extends AbstractTarget {
return ((AbstractTarget) other).colocate(this);
}
+ @Override
+ public ExecutionTarget trimTo(ExecutionTarget other) {
+ assert other instanceof AbstractTarget : other == null ? "<null>" :
other.getClass().getCanonicalName();
+
+ if (finalised) {
+ return this;
+ }
+
+ long[] newPartitionsNodes = new long[partitionsNodes.length];
+ boolean changed = false;
+ Int2LongFunction partitionNodesResolver = partitionNodeResolver(other);
+
+ for (int i = 0; i < partitionsNodes.length; i++) {
+ long newNodes = partitionsNodes[i] & partitionNodesResolver.get(i);
+
+ if (newNodes == 0) {
+ newNodes = partitionsNodes[i];
+ }
+
+ if (newNodes != partitionsNodes[i]) {
+ changed = true;
+ }
+
+ newPartitionsNodes[i] = newNodes;
+ }
+
+ if (changed) {
+ return new PartitionedTarget(false, newPartitionsNodes,
enlistmentConsistencyTokens);
+ }
+
+ return this;
+ }
+
+ private Int2LongFunction partitionNodeResolver(ExecutionTarget other) {
+ Int2LongFunction partitionNodesResolver;
+
+ if (other instanceof PartitionedTarget
+ && ((PartitionedTarget) other).partitionsNodes.length ==
partitionsNodes.length) {
+ PartitionedTarget otherPartitioned = (PartitionedTarget) other;
+
+ partitionNodesResolver = partId ->
otherPartitioned.partitionsNodes[partId];
+ } else {
+ long otherNodes = ((AbstractTarget) other).nodes;
+
+ partitionNodesResolver = partId -> otherNodes;
+ }
+
+ return partitionNodesResolver;
+ }
+
@Override
ExecutionTarget colocate(AllOfTarget other) throws
ColocationMappingException {
return colocate(other, this);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
index 9a1eadae31..e369c311d3 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
@@ -21,6 +21,9 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
@@ -38,12 +41,13 @@ public class SmallClusterFactory implements
ExecutionTargetFactory {
throw new IllegalArgumentException("Supported up to 64 nodes, but
was " + nodes.size());
}
- this.nodes = nodes;
+ // to make mapping stable
+ this.nodes = nodes.stream().sorted().collect(Collectors.toList());
nodeNameToId = new Object2LongOpenHashMap<>(nodes.size());
int idx = 0;
- for (String name : nodes) {
+ for (String name : this.nodes) {
nodeNameToId.putIfAbsent(name, 1L << idx++);
}
}
@@ -64,17 +68,24 @@ public class SmallClusterFactory implements
ExecutionTargetFactory {
}
@Override
- public ExecutionTarget partitioned(List<NodeWithConsistencyToken> nodes) {
- long[] partitionNodes = new long[nodes.size()];
- long[] enlistmentConsistencyTokens = new long[nodes.size()];
+ public ExecutionTarget partitioned(List<TokenizedAssignments> assignments)
{
+ long[] partitionNodes = new long[assignments.size()];
+ long[] enlistmentConsistencyTokens = new long[assignments.size()];
int idx = 0;
- for (NodeWithConsistencyToken e : nodes) {
- partitionNodes[idx] = nodeNameToId.getOrDefault(e.name(), 0);
- enlistmentConsistencyTokens[idx++] =
e.enlistmentConsistencyToken();
+ boolean finalised = true;
+ for (TokenizedAssignments assignment : assignments) {
+ finalised = finalised && assignment.nodes().size() < 2;
+
+ for (Assignment a : assignment.nodes()) {
+ partitionNodes[idx] |=
nodeNameToId.getOrDefault(a.consistentId(), 0);
+ enlistmentConsistencyTokens[idx] = assignment.token();
+ }
+
+ idx++;
}
- return new PartitionedTarget(true, partitionNodes,
enlistmentConsistencyTokens);
+ return new PartitionedTarget(finalised, partitionNodes,
enlistmentConsistencyTokens);
}
@Override
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SomeOfTarget.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SomeOfTarget.java
index 199c8758ef..8810657f31 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SomeOfTarget.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SomeOfTarget.java
@@ -49,6 +49,21 @@ class SomeOfTarget extends AbstractTarget {
return ((AbstractTarget) other).colocate(this);
}
+ @Override
+ public ExecutionTarget trimTo(ExecutionTarget other) {
+ assert other instanceof AbstractTarget : other == null ? "<null>" :
other.getClass().getCanonicalName();
+
+ long otherNodes = ((AbstractTarget) other).nodes;
+
+ long newNodes = nodes & otherNodes;
+
+ if (newNodes == nodes || newNodes == 0) {
+ return this;
+ }
+
+ return new SomeOfTarget(newNodes);
+ }
+
@Override
ExecutionTarget colocate(AllOfTarget other) throws
ColocationMappingException {
return colocate(other, this);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
index f04c563144..653bd6aa60 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
@@ -31,6 +31,7 @@ import
org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.tostring.IgniteToStringExclude;
import org.apache.ignite.internal.tostring.S;
+import org.jetbrains.annotations.Nullable;
/**
* Fragment of distributed query.
@@ -123,6 +124,15 @@ public class Fragment {
return !(root instanceof IgniteSender);
}
+ /** Returns id of target fragment for non-root fragments, return {@code
null} otherwise. */
+ public @Nullable Long targetFragmentId() {
+ if (root instanceof IgniteSender) {
+ return ((IgniteSender) root).targetFragmentId();
+ }
+
+ return null;
+ }
+
public boolean single() {
return rootFragment() || (root instanceof IgniteSender
&& ((IgniteSender)
root).sourceDistribution().satisfies(IgniteDistributions.single()));
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 5926253c80..feac21c78a 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -73,6 +73,7 @@ import
org.apache.ignite.internal.failure.handlers.StopNodeFailureHandler;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.RunnableX;
@@ -873,24 +874,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
ExecutionDependencyResolver dependencyResolver = new
ExecutionDependencyResolverImpl(executableTableRegistry, null);
- var targetProvider = new ExecutionTargetProvider() {
- @Override
- public CompletableFuture<ExecutionTarget>
forTable(ExecutionTargetFactory factory, IgniteTable table) {
- if (mappingException != null) {
- return CompletableFuture.failedFuture(mappingException);
- }
-
- return completedFuture(factory.allOf(nodeNames));
- }
-
- @Override
- public CompletableFuture<ExecutionTarget>
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
- return CompletableFuture.failedFuture(new AssertionError("Not
supported"));
- }
- };
-
- var partitionPruner = new PartitionPrunerImpl();
- var mappingService = new MappingServiceImpl(nodeName, targetProvider,
EmptyCacheFactory.INSTANCE, 0, partitionPruner, taskExecutor);
+ var mappingService = createMappingService(nodeName, clockService,
taskExecutor);
var tableFunctionRegistry = new TableFunctionRegistryImpl();
List<LogicalNode> logicalNodes = nodeNames.stream()
@@ -921,6 +905,36 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
return executionService;
}
+ private MappingServiceImpl createMappingService(
+ String nodeName,
+ ClockService clock,
+ QueryTaskExecutorImpl taskExecutor
+ ) {
+ var targetProvider = new ExecutionTargetProvider() {
+ @Override
+ public CompletableFuture<ExecutionTarget> forTable(
+ HybridTimestamp operationTime,
+ ExecutionTargetFactory factory,
+ IgniteTable table,
+ boolean includeBackups
+ ) {
+ if (mappingException != null) {
+ return CompletableFuture.failedFuture(mappingException);
+ }
+
+ return completedFuture(factory.allOf(nodeNames));
+ }
+
+ @Override
+ public CompletableFuture<ExecutionTarget>
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
+ return CompletableFuture.failedFuture(new AssertionError("Not
supported"));
+ }
+ };
+
+ var partitionPruner = new PartitionPrunerImpl();
+ return new MappingServiceImpl(nodeName, clock, targetProvider,
EmptyCacheFactory.INSTANCE, 0, partitionPruner, taskExecutor);
+ }
+
private SqlOperationContext createContext() {
return createContext(null);
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
index 27078417d2..d5fa8bf400 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
@@ -32,6 +32,8 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.sql.SqlCommon;
@@ -93,7 +95,7 @@ public class FragmentMappingTest extends AbstractPlannerTest {
private final TreeSet<String> nodeNames = new TreeSet<>();
- private final TreeMap<String, Pair<IgniteDistribution, List<String>>>
tables = new TreeMap<>();
+ private final TreeMap<String, Pair<IgniteDistribution,
List<List<String>>>> tables = new TreeMap<>();
private final Map<String, Integer> tableRows = new HashMap<>();
@@ -258,6 +260,17 @@ public class FragmentMappingTest extends
AbstractPlannerTest {
testRunner.runTest(this::initSchema, "test_partition_pruning.test");
}
+ @Test
+ void backupPartitionsMapping() {
+ addNodes("N0", "N1", "N2", "N3");
+
+ addTable("T1", List.of(List.of("N0", "N1"), List.of("N1", "N2")));
+ addTable("T2", List.of(List.of("N0", "N1", "N2")));
+ addTable("T3", List.of(List.of("N1", "N2")));
+
+ testRunner.runTest(this::initSchema, "test_backup_mapping.test");
+ }
+
private void addNodes(String node, String... otherNodes) {
this.nodeNames.add(node);
this.nodeNames.addAll(Arrays.asList(otherNodes));
@@ -269,14 +282,38 @@ public class FragmentMappingTest extends
AbstractPlannerTest {
nodeNames.addAll(Arrays.asList(otherNodes));
String tableName = formatName(name, nodeNames);
- tables.put(tableName, new Pair<>(IgniteDistributions.affinity(-1, -1,
-1), new ArrayList<>(nodeNames)));
+ tables.put(
+ tableName,
+ new Pair<>(
+ IgniteDistributions.affinity(-1, -1, -1),
+ nodeNames.stream()
+ .map(List::of)
+ .collect(Collectors.toList())
+ )
+ );
+ }
+
+ private void addTable(String name, List<List<String>> assignments) {
+ String tableName = name;
+
+ for (List<String> partitionNodes : assignments) {
+ tableName = formatName(tableName, new TreeSet<>(partitionNodes));
+ }
+
+ tables.put(
+ tableName,
+ new Pair<>(
+ IgniteDistributions.affinity(-1, -1, -1),
+ assignments
+ )
+ );
}
private void addTableSingle(String name, String... nodes) {
TreeSet<String> nodeNames = new TreeSet<>(Arrays.asList(nodes));
String tableName = formatName(name, nodeNames);
- tables.put(tableName, new Pair<>(IgniteDistributions.single(), new
ArrayList<>(nodeNames)));
+ tables.put(tableName, new Pair<>(IgniteDistributions.single(),
List.of(new ArrayList<>(nodeNames))));
}
private void addTableIdent(String name, String node, String... otherNodes)
{
@@ -285,7 +322,15 @@ public class FragmentMappingTest extends
AbstractPlannerTest {
nodeNames.addAll(Arrays.asList(otherNodes));
String tableName = formatName(name, nodeNames);
- tables.put(tableName, new Pair<>(IgniteDistributions.identity(0), new
ArrayList<>(nodeNames)));
+ tables.put(
+ tableName,
+ new Pair<>(
+ IgniteDistributions.identity(0),
+ nodeNames.stream()
+ .map(List::of)
+ .collect(Collectors.toList())
+ )
+ );
}
private void setRowCount(String tableName, int rowCount) {
@@ -304,6 +349,10 @@ public class FragmentMappingTest extends
AbstractPlannerTest {
try (IgnitePlanner planner = ctx.planner()) {
assertNotNull(planner);
+ var plan = physicalPlan(planner, ctx.query());
+
+ System.out.println(RelOptUtil.toString(plan,
SqlExplainLevel.ALL_ATTRIBUTES));
+
return physicalPlan(planner, ctx.query());
}
} catch (Exception e) {
@@ -316,25 +365,17 @@ public class FragmentMappingTest extends
AbstractPlannerTest {
List<IgniteDataSource> dataSources = new ArrayList<>();
int objectId = 1;
- Map<String, List<String>> table2NodeNames = new HashMap<>();
+ Map<String, List<List<String>>> table2Assignments = new HashMap<>();
- for (Map.Entry<String, Pair<IgniteDistribution, List<String>>> e :
tables.entrySet()) {
+ for (Map.Entry<String, Pair<IgniteDistribution, List<List<String>>>> e
: tables.entrySet()) {
String tableName = e.getKey();
String tableShortName = tableName.substring(0,
tableName.indexOf('_'));
// Generate distinct row counts for each table to ensure that the
optimizer produces the same results.
- int tableSize = tableRows.getOrDefault(tableShortName, 100 +
objectId);
+ int tableSize = tableRows.getOrDefault(tableShortName, 100_000 +
objectId);
- List<String> tableNodeNames = e.getValue().getSecond();
+ List<List<String>> assignments = e.getValue().getSecond();
- for (String tableNodeName : tableNodeNames) {
- if (!nodeNames.contains(tableNodeName)) {
- String message = format(
- "Expected node {} for table {}. Registered nodes:
{}",
- tableNodeName, tableShortName, nodeNames
- );
- throw new IllegalArgumentException(message);
- }
- }
+ validateAssignments(tableShortName, assignments);
IgniteDistribution distribution = e.getValue().getFirst();
@@ -350,7 +391,7 @@ public class FragmentMappingTest extends
AbstractPlannerTest {
IgniteDistribution distributionToUse;
if (distribution.function() instanceof AffinityDistribution) {
- distributionToUse = IgniteDistributions.affinity(0, objectId,
1);
+ distributionToUse = IgniteDistributions.affinity(0, objectId,
objectId);
} else {
distributionToUse = distribution;
}
@@ -361,12 +402,12 @@ public class FragmentMappingTest extends
AbstractPlannerTest {
.size(tableSize)
.tableId(objectId)
.distribution(distributionToUse)
- .partitions(tableNodeNames.size())
+ .partitions(assignments.size())
.build();
dataSources.add(testTable);
- table2NodeNames.put(tableName, tableNodeNames);
+ table2Assignments.put(tableName, assignments);
objectId += 1;
}
@@ -374,13 +415,27 @@ public class FragmentMappingTest extends
AbstractPlannerTest {
IgniteSchema schema = new IgniteSchema(SqlCommon.DEFAULT_SCHEMA_NAME,
1, dataSources);
ExecutionTargetProvider executionTargetProvider =
TestBuilders.executionTargetProviderBuilder()
.useTablePartitions(true)
- .addTables(table2NodeNames)
+ .addTables(table2Assignments)
.build();
LogicalTopologySnapshot logicalTopologySnapshot = newLogicalTopology();
return new TestSetup(executionTargetProvider, schema,
logicalTopologySnapshot);
}
+ private void validateAssignments(String tableName, List<List<String>>
assignments) {
+ for (List<String> partitionAssignments : assignments) {
+ for (String tableNodeName : partitionAssignments) {
+ if (!nodeNames.contains(tableNodeName)) {
+ String message = format(
+ "Expected node {} for table {}. Registered nodes:
{}",
+ tableNodeName, tableName, nodeNames
+ );
+ throw new IllegalArgumentException(message);
+ }
+ }
+ }
+ }
+
private static String formatName(String name, TreeSet<String> nodeNames) {
return name + "_" + String.join("", nodeNames);
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
index 4979f14057..727eb7b023 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
@@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -38,12 +39,15 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.apache.ignite.internal.TestHybridClock;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.hlc.TestClockService;
import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
@@ -71,6 +75,7 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
private static final MultiStepPlan PLAN;
private static final MultiStepPlan PLAN_WITH_SYSTEM_VIEW;
private static final TestCluster cluster;
+ private static final ClockService CLOCK_SERVICE = new TestClockService(new
TestHybridClock(System::currentTimeMillis));
private static final MappingParameters PARAMS = MappingParameters.EMPTY;
private static final PartitionPruner PARTITION_PRUNER = (fragments,
dynParams) -> fragments;
@@ -179,8 +184,9 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
// Initialize mapping service.
ExecutionTargetProvider targetProvider =
Mockito.spy(createTargetProvider(nodeNames));
- MappingServiceImpl mappingService =
- new MappingServiceImpl(localNodeName, targetProvider,
CaffeineCacheFactory.INSTANCE, 100, PARTITION_PRUNER, Runnable::run);
+ MappingServiceImpl mappingService = new MappingServiceImpl(
+ localNodeName, CLOCK_SERVICE, targetProvider,
CaffeineCacheFactory.INSTANCE, 100, PARTITION_PRUNER, Runnable::run
+ );
mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
new LogicalTopologySnapshot(1,
logicalNodes(nodeNames.toArray(new String[0]))));
@@ -188,7 +194,7 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
List<MappedFragment> tableOnlyMapping = await(mappingService.map(PLAN,
PARAMS));
List<MappedFragment> sysViewMapping =
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS));
- verify(targetProvider, times(2)).forTable(any(), any());
+ verify(targetProvider, times(2)).forTable(any(), any(), any(),
anyBoolean());
verify(targetProvider, times(1)).forSystemView(any(), any());
assertSame(tableOnlyMapping, await(mappingService.map(PLAN, PARAMS)));
@@ -222,7 +228,7 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
// Plan with tables that include left node must be invalidated.
assertNotSame(tableOnlyMapping, await(mappingService.map(PLAN,
PARAMS)));
- verify(targetProvider, times(4)).forTable(any(), any());
+ verify(targetProvider, times(4)).forTable(any(), any(), any(),
anyBoolean());
verify(targetProvider, times(2)).forSystemView(any(), any());
}
@@ -248,14 +254,15 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
// Initialize mapping service.
ExecutionTargetProvider targetProvider =
Mockito.spy(createTargetProvider(nodeNames));
- MappingServiceImpl mappingService =
- new MappingServiceImpl(localNodeName, targetProvider,
CaffeineCacheFactory.INSTANCE, 100, PARTITION_PRUNER, Runnable::run);
+ MappingServiceImpl mappingService = new MappingServiceImpl(
+ localNodeName, CLOCK_SERVICE, targetProvider,
CaffeineCacheFactory.INSTANCE, 100, PARTITION_PRUNER, Runnable::run
+ );
mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
new LogicalTopologySnapshot(1,
logicalNodes(nodeNames.toArray(new String[0]))));
List<MappedFragment> mappedFragments =
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS));
- verify(targetProvider, times(1)).forTable(any(), any());
+ verify(targetProvider, times(1)).forTable(any(), any(), any(),
anyBoolean());
verify(targetProvider, times(1)).forSystemView(any(), any());
// Simulate expiration of the primary replica for non-mapped table -
the cache entry should not be invalidated.
@@ -266,7 +273,7 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
// Simulate expiration of the primary replica for mapped table - the
cache entry should be invalidated.
await(mappingService.onPrimaryReplicaExpired(prepareEvtParams.apply("T1")));
assertNotSame(mappedFragments,
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS)));
- verify(targetProvider, times(2)).forTable(any(), any());
+ verify(targetProvider, times(2)).forTable(any(), any(), any(),
anyBoolean());
verify(targetProvider, times(2)).forSystemView(any(), any());
}
@@ -279,6 +286,7 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
private static MappingServiceImpl createMappingServiceNoCache(String
localNodeName, List<String> nodeNames) {
return new MappingServiceImpl(
localNodeName,
+ CLOCK_SERVICE,
createTargetProvider(nodeNames),
EmptyCacheFactory.INSTANCE,
0,
@@ -290,7 +298,12 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
private static ExecutionTargetProvider createTargetProvider(List<String>
nodeNames) {
return new ExecutionTargetProvider() {
@Override
- public CompletableFuture<ExecutionTarget>
forTable(ExecutionTargetFactory factory, IgniteTable table) {
+ public CompletableFuture<ExecutionTarget> forTable(
+ HybridTimestamp operationTime,
+ ExecutionTargetFactory factory,
+ IgniteTable table,
+ boolean includeBackups
+ ) {
return
CompletableFuture.completedFuture(factory.allOf(nodeNames));
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
index 94d1518959..5ea7359095 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
@@ -36,7 +36,9 @@ import java.util.concurrent.CompletionException;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.apache.calcite.plan.RelOptUtil;
+import org.apache.ignite.internal.TestHybridClock;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.internal.sql.ResultSetMetadataImpl;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
@@ -188,13 +190,17 @@ final class MappingTestRunner {
}
}
- private String produceMapping(String nodeName,
+ private String produceMapping(
+ String nodeName,
ExecutionTargetProvider targetProvider,
LogicalTopologySnapshot snapshot,
- MultiStepPlan plan) {
+ MultiStepPlan plan
+ ) {
PartitionPruner partitionPruner = new PartitionPrunerImpl();
- MappingServiceImpl mappingService = new MappingServiceImpl(nodeName,
+ MappingServiceImpl mappingService = new MappingServiceImpl(
+ nodeName,
+ new TestClockService(new
TestHybridClock(System::currentTimeMillis)),
targetProvider,
EmptyCacheFactory.INSTANCE,
0,
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 02493459c8..c3a921a731 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -48,6 +48,9 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
+import org.apache.ignite.internal.affinity.TokenizedAssignmentsImpl;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogTestUtils;
@@ -73,6 +76,7 @@ import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.hlc.ClockWaiter;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -83,7 +87,6 @@ import
org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import org.apache.ignite.internal.sql.engine.exec.ExecutableTable;
import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
-import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
import
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
@@ -629,10 +632,10 @@ public class TestBuilders {
new DdlSqlToCommandConverter(), PLANNING_TIMEOUT,
PLANNING_THREAD_COUNT,
mock(MetricManagerImpl.class), schemaManager);
- Map<String, List<String>> owningNodesByTableName = new HashMap<>();
+ Map<String, List<List<String>>> owningNodesByTableName = new
HashMap<>();
for (Entry<String, Map<String, ScannableTable>> entry :
nodeName2tableName2table.entrySet()) {
for (String tableName : entry.getValue().keySet()) {
- owningNodesByTableName.computeIfAbsent(tableName, key ->
new ArrayList<>()).add(entry.getKey());
+ owningNodesByTableName.computeIfAbsent(tableName, key ->
new ArrayList<>()).add(List.of(entry.getKey()));
}
}
@@ -683,6 +686,7 @@ public class TestBuilders {
var partitionPruner = new PartitionPrunerImpl();
var mappingService = new MappingServiceImpl(
name,
+ new TestClockService(clock, clockWaiter),
targetProvider,
EmptyCacheFactory.INSTANCE,
0,
@@ -1469,7 +1473,7 @@ public class TestBuilders {
/** A builder to create instances of {@link ExecutionTargetProvider}. */
public static final class ExecutionTargetProviderBuilder {
- private final Map<String, List<String>> owningNodesByTableName = new
HashMap<>();
+ private final Map<String, List<List<String>>> owningNodesByTableName =
new HashMap<>();
private Function<String, List<String>> owningNodesBySystemViewName =
(n) -> null;
@@ -1480,7 +1484,7 @@ public class TestBuilders {
}
/** Adds tables to list of nodes mapping. */
- public ExecutionTargetProviderBuilder addTables(Map<String,
List<String>> tables) {
+ public ExecutionTargetProviderBuilder addTables(Map<String,
List<List<String>>> tables) {
this.owningNodesByTableName.putAll(tables);
return this;
}
@@ -1514,13 +1518,13 @@ public class TestBuilders {
final Function<String, List<String>> owningNodesBySystemViewName;
- final Map<String, List<String>> owningNodesByTableName;
+ final Map<String, List<List<String>>> owningNodesByTableName;
final boolean useTablePartitions;
private TestNodeExecutionTargetProvider(
Function<String, List<String>> owningNodesBySystemViewName,
- Map<String, List<String>> owningNodesByTableName,
+ Map<String, List<List<String>>> owningNodesByTableName,
boolean useTablePartitions
) {
this.owningNodesBySystemViewName = owningNodesBySystemViewName;
@@ -1529,33 +1533,45 @@ public class TestBuilders {
}
@Override
- public CompletableFuture<ExecutionTarget>
forTable(ExecutionTargetFactory factory, IgniteTable table) {
- List<String> owningNodes =
owningNodesByTableName.get(table.name());
+ public CompletableFuture<ExecutionTarget> forTable(
+ HybridTimestamp operationTime,
+ ExecutionTargetFactory factory,
+ IgniteTable table,
+ boolean includeBackups
+ ) {
+ List<List<String>> owningNodes =
owningNodesByTableName.get(table.name());
if (nullOrEmpty(owningNodes)) {
throw new AssertionError("DataProvider is not configured for
table " + table.name());
}
- List<NodeWithConsistencyToken> nodes;
+ List<TokenizedAssignments> assignments;
if (useTablePartitions) {
int p = table.partitions();
- nodes = IntStream.range(0, p).mapToObj(n -> {
- String nodeName = owningNodes.get(n % owningNodes.size());
- return new NodeWithConsistencyToken(nodeName, p);
+ assignments = IntStream.range(0, p).mapToObj(n -> {
+ List<String> nodes = owningNodes.get(n %
owningNodes.size());
+ return partitionNodesToAssignment(nodes, p);
}).collect(Collectors.toList());
} else {
- nodes = owningNodes.stream()
- .map(name -> new NodeWithConsistencyToken(name, 1))
+ assignments = owningNodes.stream()
+ .map(nodes -> partitionNodesToAssignment(nodes, 1))
.collect(Collectors.toList());
}
- ExecutionTarget target = factory.partitioned(nodes);
+ ExecutionTarget target = factory.partitioned(assignments);
return CompletableFuture.completedFuture(target);
}
+ private static TokenizedAssignments
partitionNodesToAssignment(List<String> nodes, long token) {
+ return new TokenizedAssignmentsImpl(
+
nodes.stream().map(Assignment::forPeer).collect(Collectors.toSet()),
+ token
+ );
+ }
+
@Override
public CompletableFuture<ExecutionTarget>
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
List<String> nodes =
owningNodesBySystemViewName.apply(view.name());
diff --git a/modules/sql-engine/src/test/resources/mapping/correlated.test
b/modules/sql-engine/src/test/resources/mapping/correlated.test
index 9dd930b7d4..b05ec49597 100644
--- a/modules/sql-engine/src/test/resources/mapping/correlated.test
+++ b/modules/sql-engine/src/test/resources/mapping/correlated.test
@@ -1,23 +1,14 @@
N0
SELECT (SELECT count(*) FROM ct_n1) FROM t_n1
---
-Fragment#4 root
+Fragment#0 root
executionNodes: [N0]
- remoteFragments: [5]
- exchangeSourceNodes: {5=[N1]}
- tree:
- Receiver(sourceFragment=5, exchange=5, distribution=single)
-
-Fragment#1
- targetNodes: [N1]
- executionNodes: [N1]
- tables: [T_N1]
- partitions: {N1=[0:1]}
+ remoteFragments: [4]
+ exchangeSourceNodes: {4=[N1]}
tree:
- Sender(targetFragment=5, exchange=1, distribution=single)
- TableScan(name=PUBLIC.T_N1, source=3, partitions=1, distribution=random)
+ Receiver(sourceFragment=4, exchange=4, distribution=single)
-Fragment#5
+Fragment#4
targetNodes: [N0]
executionNodes: [N1]
remoteFragments: [1]
@@ -25,12 +16,21 @@ Fragment#5
tables: [CT_N1]
partitions: {N1=[0:1]}
tree:
- Sender(targetFragment=4, exchange=5, distribution=single)
+ Sender(targetFragment=0, exchange=4, distribution=single)
Project
NestedLoopJoin
Receiver(sourceFragment=1, exchange=1, distribution=single)
ColocatedHashAggregate
TableScan(name=PUBLIC.CT_N1, source=2, partitions=1,
distribution=single)
+
+Fragment#1
+ targetNodes: [N1]
+ executionNodes: [N1]
+ tables: [T_N1]
+ partitions: {N1=[0:1]}
+ tree:
+ Sender(targetFragment=4, exchange=1, distribution=single)
+ TableScan(name=PUBLIC.T_N1, source=3, partitions=1, distribution=random)
---
N1
@@ -62,23 +62,14 @@ Fragment#1
N0
SELECT (SELECT count(*) FROM ct_n1) FROM t_n2
---
-Fragment#4 root
+Fragment#0 root
executionNodes: [N0]
- remoteFragments: [5]
- exchangeSourceNodes: {5=[N1]}
- tree:
- Receiver(sourceFragment=5, exchange=5, distribution=single)
-
-Fragment#1
- targetNodes: [N1]
- executionNodes: [N2]
- tables: [T_N2]
- partitions: {N2=[0:1]}
+ remoteFragments: [4]
+ exchangeSourceNodes: {4=[N1]}
tree:
- Sender(targetFragment=5, exchange=1, distribution=single)
- TableScan(name=PUBLIC.T_N2, source=3, partitions=1, distribution=random)
+ Receiver(sourceFragment=4, exchange=4, distribution=single)
-Fragment#5
+Fragment#4
targetNodes: [N0]
executionNodes: [N1]
remoteFragments: [1]
@@ -86,12 +77,21 @@ Fragment#5
tables: [CT_N1]
partitions: {N1=[0:1]}
tree:
- Sender(targetFragment=4, exchange=5, distribution=single)
+ Sender(targetFragment=0, exchange=4, distribution=single)
Project
NestedLoopJoin
Receiver(sourceFragment=1, exchange=1, distribution=single)
ColocatedHashAggregate
TableScan(name=PUBLIC.CT_N1, source=2, partitions=1,
distribution=single)
+
+Fragment#1
+ targetNodes: [N1]
+ executionNodes: [N2]
+ tables: [T_N2]
+ partitions: {N2=[0:1]}
+ tree:
+ Sender(targetFragment=4, exchange=1, distribution=single)
+ TableScan(name=PUBLIC.T_N2, source=3, partitions=1, distribution=random)
---
N1
@@ -148,20 +148,20 @@ N0
SELECT t.c1 FROM ct_n1 t WHERE t.c1 < 5 AND
EXISTS (SELECT x FROM table(system_range(t.c1, t.c2)) WHERE mod(x, 2) = 0)
---
-Fragment#3 root
+Fragment#0 root
executionNodes: [N0]
- remoteFragments: [4]
- exchangeSourceNodes: {4=[N1]}
+ remoteFragments: [3]
+ exchangeSourceNodes: {3=[N1]}
tree:
- Receiver(sourceFragment=4, exchange=4, distribution=single)
+ Receiver(sourceFragment=3, exchange=3, distribution=single)
-Fragment#4
+Fragment#3
targetNodes: [N0]
executionNodes: [N1]
tables: [CT_N1]
partitions: {N1=[0:1]}
tree:
- Sender(targetFragment=3, exchange=4, distribution=single)
+ Sender(targetFragment=0, exchange=3, distribution=single)
Project
CorrelatedNestedLoopJoin
TableScan(name=PUBLIC.CT_N1, source=1, partitions=1,
distribution=single)
@@ -212,25 +212,25 @@ Fragment#0 root
ReduceHashAggregate
Receiver(sourceFragment=2, exchange=2, distribution=single)
-Fragment#2 correlated
+Fragment#1
targetNodes: [N0]
executionNodes: [N0, N1, N2]
- tables: [T2_N0N1N2]
+ tables: [T1_N0N1N2]
partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
- pruningMetadata: [3=[{0=$cor0.ID}]]
tree:
- Sender(targetFragment=0, exchange=2, distribution=single)
- MapHashAggregate
- TableScan(name=PUBLIC.T2_N0N1N2, source=3, partitions=3,
distribution=random)
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ TableScan(name=PUBLIC.T1_N0N1N2, source=4, partitions=3,
distribution=affinity[table: T1_N0N1N2, columns: [ID]])
-Fragment#1
+Fragment#2 correlated
targetNodes: [N0]
executionNodes: [N0, N1, N2]
- tables: [T1_N0N1N2]
+ tables: [T2_N0N1N2]
partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
+ pruningMetadata: [3=[{0=$cor1.ID}]]
tree:
- Sender(targetFragment=0, exchange=1, distribution=single)
- TableScan(name=PUBLIC.T1_N0N1N2, source=4, partitions=3,
distribution=affinity[table: T1_N0N1N2, columns: [ID]])
+ Sender(targetFragment=0, exchange=2, distribution=single)
+ MapHashAggregate
+ TableScan(name=PUBLIC.T2_N0N1N2, source=3, partitions=3,
distribution=random)
---
# Pass partition pruning metadata to correlated joins.
N0
@@ -252,35 +252,35 @@ Fragment#0 root
ReduceHashAggregate
Receiver(sourceFragment=3, exchange=3, distribution=single)
-Fragment#3 correlated
+Fragment#1
targetNodes: [N0]
executionNodes: [N0, N1, N2]
- tables: [T2_N0N1N2]
+ tables: [T3_N0N1N2]
partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
- pruningMetadata: [4=[{0=$cor1.ID}]]
tree:
- Sender(targetFragment=0, exchange=3, distribution=single)
- MapHashAggregate
- TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=3,
distribution=random)
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ TableScan(name=PUBLIC.T3_N0N1N2, source=6, partitions=3,
distribution=affinity[table: T3_N0N1N2, columns: [ID]])
Fragment#2 correlated
targetNodes: [N0]
executionNodes: [N0, N1, N2]
tables: [T1_N0N1N2]
partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
- pruningMetadata: [5=[{0=$cor0.ID}]]
+ pruningMetadata: [5=[{0=$cor2.ID}]]
tree:
Sender(targetFragment=0, exchange=2, distribution=single)
TableScan(name=PUBLIC.T1_N0N1N2, source=5, partitions=3,
distribution=affinity[table: T1_N0N1N2, columns: [ID]])
-Fragment#1
+Fragment#3 correlated
targetNodes: [N0]
executionNodes: [N0, N1, N2]
- tables: [T3_N0N1N2]
+ tables: [T2_N0N1N2]
partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
+ pruningMetadata: [4=[{0=$cor3.ID}]]
tree:
- Sender(targetFragment=0, exchange=1, distribution=single)
- TableScan(name=PUBLIC.T3_N0N1N2, source=6, partitions=3,
distribution=affinity[table: T3_N0N1N2, columns: [ID]])
+ Sender(targetFragment=0, exchange=3, distribution=single)
+ MapHashAggregate
+ TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=3,
distribution=random)
---
# Pass partition pruning metadata to correlated joins one layer deep.
N0
@@ -305,33 +305,33 @@ Fragment#0 root
ReduceHashAggregate
Receiver(sourceFragment=3, exchange=3, distribution=single)
-Fragment#3 correlated
+Fragment#1
targetNodes: [N0]
executionNodes: [N0, N1, N2]
- tables: [T2_N0N1N2]
+ tables: [T3_N0N1N2]
partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
- pruningMetadata: [4=[{0=$cor0.ID}, {0=$cor2.ID}]]
tree:
- Sender(targetFragment=0, exchange=3, distribution=single)
- MapHashAggregate
- TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=3,
distribution=random)
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ TableScan(name=PUBLIC.T3_N0N1N2, source=6, partitions=3,
distribution=affinity[table: T3_N0N1N2, columns: [ID]])
Fragment#2 correlated
targetNodes: [N0]
executionNodes: [N0, N1, N2]
tables: [T1_N0N1N2]
partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
- pruningMetadata: [5=[{0=$cor0.ID}]]
+ pruningMetadata: [5=[{0=$cor3.ID}]]
tree:
Sender(targetFragment=0, exchange=2, distribution=single)
TableScan(name=PUBLIC.T1_N0N1N2, source=5, partitions=3,
distribution=affinity[table: T1_N0N1N2, columns: [ID]])
-Fragment#1
+Fragment#3 correlated
targetNodes: [N0]
executionNodes: [N0, N1, N2]
- tables: [T3_N0N1N2]
+ tables: [T2_N0N1N2]
partitions: {N0=[0:3], N1=[1:3], N2=[2:3]}
+ pruningMetadata: [4=[{0=$cor3.ID}, {0=$cor5.ID}]]
tree:
- Sender(targetFragment=0, exchange=1, distribution=single)
- TableScan(name=PUBLIC.T3_N0N1N2, source=6, partitions=3,
distribution=affinity[table: T3_N0N1N2, columns: [ID]])
+ Sender(targetFragment=0, exchange=3, distribution=single)
+ MapHashAggregate
+ TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=3,
distribution=random)
---
diff --git a/modules/sql-engine/src/test/resources/mapping/hash_join.test
b/modules/sql-engine/src/test/resources/mapping/hash_join.test
index 7c4ae58c24..c559e6c88a 100644
--- a/modules/sql-engine/src/test/resources/mapping/hash_join.test
+++ b/modules/sql-engine/src/test/resources/mapping/hash_join.test
@@ -11,14 +11,25 @@ Fragment#0 root
Fragment#1
targetNodes: [N0]
executionNodes: [N1]
- tables: [T1_N1, T2_N1]
+ remoteFragments: [3]
+ exchangeSourceNodes: {3=[N1]}
+ tables: [T1_N1]
partitions: {N1=[0:1]}
tree:
Sender(targetFragment=0, exchange=1, distribution=single)
Project
HashJoin
TableScan(name=PUBLIC.T1_N1, source=2, partitions=1,
distribution=affinity[table: T1_N1, columns: [ID]])
- TableScan(name=PUBLIC.T2_N1, source=3, partitions=1,
distribution=affinity[table: T2_N1, columns: [ID]])
+ Receiver(sourceFragment=3, exchange=3, distribution=affinity[table:
T1_N1, columns: [ID]])
+
+Fragment#3
+ targetNodes: [N1]
+ executionNodes: [N1]
+ tables: [T2_N1]
+ partitions: {N1=[0:1]}
+ tree:
+ Sender(targetFragment=1, exchange=3, distribution=affinity[table: T1_N1,
columns: [ID]])
+ TableScan(name=PUBLIC.T2_N1, source=4, partitions=1,
distribution=affinity[table: T2_N1, columns: [ID]])
---
N0
@@ -53,28 +64,28 @@ Fragment#0 root
tree:
Receiver(sourceFragment=1, exchange=1, distribution=single)
-Fragment#4
+Fragment#1
targetNodes: [N0]
executionNodes: [N1]
- remoteFragments: [5]
- exchangeSourceNodes: {5=[N2]}
- tables: [T1_N1, T2_N2]
+ remoteFragments: [3]
+ exchangeSourceNodes: {3=[N2]}
+ tables: [T1_N1]
partitions: {N1=[0:1]}
tree:
Sender(targetFragment=0, exchange=1, distribution=single)
Project
HashJoin
TableScan(name=PUBLIC.T1_N1, source=2, partitions=1,
distribution=affinity[table: T1_N1, columns: [ID]])
- Receiver(sourceFragment=5, exchange=5, distribution=affinity[table:
T2_N2, columns: [ID]])
+ Receiver(sourceFragment=3, exchange=3, distribution=affinity[table:
T1_N1, columns: [ID]])
-Fragment#5
+Fragment#3
targetNodes: [N1]
executionNodes: [N2]
tables: [T2_N2]
partitions: {N2=[0:1]}
tree:
- Sender(targetFragment=4, exchange=5, distribution=affinity[table: T2_N2,
columns: [ID]])
- TableScan(name=PUBLIC.T2_N2, source=3, partitions=1,
distribution=affinity[table: T2_N2, columns: [ID]])
+ Sender(targetFragment=1, exchange=3, distribution=affinity[table: T1_N1,
columns: [ID]])
+ TableScan(name=PUBLIC.T2_N2, source=4, partitions=1,
distribution=affinity[table: T2_N2, columns: [ID]])
---
N0
diff --git a/modules/sql-engine/src/test/resources/mapping/merge_join.test
b/modules/sql-engine/src/test/resources/mapping/merge_join.test
index c1d88b63bf..fd9ae5f287 100644
--- a/modules/sql-engine/src/test/resources/mapping/merge_join.test
+++ b/modules/sql-engine/src/test/resources/mapping/merge_join.test
@@ -1,53 +1,3 @@
-N0
-SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('HashJoinConverter',
'NestedLoopJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1 JOIN
t2_n1 USING (id)
----
-Fragment#0 root
- executionNodes: [N0]
- remoteFragments: [1]
- exchangeSourceNodes: {1=[N1]}
- tree:
- Receiver(sourceFragment=1, exchange=1, distribution=single)
-
-Fragment#1
- targetNodes: [N0]
- executionNodes: [N1]
- tables: [T1_N1, T2_N1]
- partitions: {N1=[0:1]}
- tree:
- Sender(targetFragment=0, exchange=1, distribution=single)
- Project
- MergeJoin
- Sort
- TableScan(name=PUBLIC.T1_N1, source=2, partitions=1,
distribution=affinity[table: T1_N1, columns: [ID]])
- Sort
- TableScan(name=PUBLIC.T2_N1, source=3, partitions=1,
distribution=affinity[table: T2_N1, columns: [ID]])
----
-
-N1
-SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('HashJoinConverter',
'NestedLoopJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1 JOIN
t2_n1 USING (id)
----
-Fragment#0 root
- executionNodes: [N1]
- remoteFragments: [1]
- exchangeSourceNodes: {1=[N1]}
- tree:
- Receiver(sourceFragment=1, exchange=1, distribution=single)
-
-Fragment#1
- targetNodes: [N1]
- executionNodes: [N1]
- tables: [T1_N1, T2_N1]
- partitions: {N1=[0:1]}
- tree:
- Sender(targetFragment=0, exchange=1, distribution=single)
- Project
- MergeJoin
- Sort
- TableScan(name=PUBLIC.T1_N1, source=2, partitions=1,
distribution=affinity[table: T1_N1, columns: [ID]])
- Sort
- TableScan(name=PUBLIC.T2_N1, source=3, partitions=1,
distribution=affinity[table: T2_N1, columns: [ID]])
----
-
N0
SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('HashJoinConverter',
'NestedLoopJoinConverter', 'CorrelatedNestedLoopJoin') */ * FROM t1_n1 JOIN
t2_n2 USING (id)
---
@@ -58,11 +8,11 @@ Fragment#0 root
tree:
Receiver(sourceFragment=1, exchange=1, distribution=single)
-Fragment#4
+Fragment#1
targetNodes: [N0]
executionNodes: [N1]
- remoteFragments: [5]
- exchangeSourceNodes: {5=[N2]}
+ remoteFragments: [3]
+ exchangeSourceNodes: {3=[N2]}
tables: [T1_N1]
partitions: {N1=[0:1]}
tree:
@@ -71,15 +21,15 @@ Fragment#4
MergeJoin
Sort
TableScan(name=PUBLIC.T1_N1, source=2, partitions=1,
distribution=affinity[table: T1_N1, columns: [ID]])
- Receiver(sourceFragment=5, exchange=5, distribution=affinity[table:
T2_N2, columns: [ID]])
+ Sort
+ Receiver(sourceFragment=3, exchange=3,
distribution=affinity[table: T1_N1, columns: [ID]])
-Fragment#5
+Fragment#3
targetNodes: [N1]
executionNodes: [N2]
tables: [T2_N2]
partitions: {N2=[0:1]}
tree:
- Sender(targetFragment=4, exchange=5, distribution=affinity[table: T2_N2,
columns: [ID]])
- Sort
- TableScan(name=PUBLIC.T2_N2, source=3, partitions=1,
distribution=affinity[table: T2_N2, columns: [ID]])
+ Sender(targetFragment=1, exchange=3, distribution=affinity[table: T1_N1,
columns: [ID]])
+ TableScan(name=PUBLIC.T2_N2, source=4, partitions=1,
distribution=affinity[table: T2_N2, columns: [ID]])
---
diff --git a/modules/sql-engine/src/test/resources/mapping/set_ops.test
b/modules/sql-engine/src/test/resources/mapping/set_ops.test
index 377a4d2bbd..42aa6ec7f9 100644
--- a/modules/sql-engine/src/test/resources/mapping/set_ops.test
+++ b/modules/sql-engine/src/test/resources/mapping/set_ops.test
@@ -10,15 +10,6 @@ Fragment#0 root
Receiver(sourceFragment=1, exchange=1, distribution=single)
Receiver(sourceFragment=2, exchange=2, distribution=single)
-Fragment#2
- targetNodes: [N1]
- executionNodes: [N2]
- tables: [T2_N2]
- partitions: {N2=[0:1]}
- tree:
- Sender(targetFragment=0, exchange=2, distribution=single)
- TableScan(name=PUBLIC.T2_N2, source=3, partitions=1, distribution=random)
-
Fragment#1
targetNodes: [N1]
executionNodes: [N1]
@@ -27,6 +18,15 @@ Fragment#1
tree:
Sender(targetFragment=0, exchange=1, distribution=single)
TableScan(name=PUBLIC.T1_N1, source=4, partitions=1, distribution=random)
+
+Fragment#2
+ targetNodes: [N1]
+ executionNodes: [N2]
+ tables: [T2_N2]
+ partitions: {N2=[0:1]}
+ tree:
+ Sender(targetFragment=0, exchange=2, distribution=single)
+ TableScan(name=PUBLIC.T2_N2, source=3, partitions=1, distribution=random)
---
N1
diff --git a/modules/sql-engine/src/test/resources/mapping/table_identity.test
b/modules/sql-engine/src/test/resources/mapping/table_identity.test
index d7f03a6109..e13befad79 100644
--- a/modules/sql-engine/src/test/resources/mapping/table_identity.test
+++ b/modules/sql-engine/src/test/resources/mapping/table_identity.test
@@ -10,15 +10,6 @@ Fragment#0 root
Receiver(sourceFragment=1, exchange=1, distribution=single)
Receiver(sourceFragment=2, exchange=2, distribution=single)
-Fragment#2
- targetNodes: [N0]
- executionNodes: [N2]
- tables: [NT2_N2]
- partitions: {N2=[0:1]}
- tree:
- Sender(targetFragment=0, exchange=2, distribution=single)
- TableScan(name=PUBLIC.NT2_N2, source=3, partitions=1,
distribution=identity[0])
-
Fragment#1
targetNodes: [N0]
executionNodes: [N1]
@@ -27,6 +18,15 @@ Fragment#1
tree:
Sender(targetFragment=0, exchange=1, distribution=single)
TableScan(name=PUBLIC.NT1_N1, source=4, partitions=1,
distribution=identity[0])
+
+Fragment#2
+ targetNodes: [N0]
+ executionNodes: [N2]
+ tables: [NT2_N2]
+ partitions: {N2=[0:1]}
+ tree:
+ Sender(targetFragment=0, exchange=2, distribution=single)
+ TableScan(name=PUBLIC.NT2_N2, source=3, partitions=1,
distribution=identity[0])
---
N1
@@ -41,15 +41,6 @@ Fragment#0 root
Receiver(sourceFragment=1, exchange=1, distribution=single)
Receiver(sourceFragment=2, exchange=2, distribution=single)
-Fragment#2
- targetNodes: [N1]
- executionNodes: [N2]
- tables: [NT2_N2]
- partitions: {N2=[0:1]}
- tree:
- Sender(targetFragment=0, exchange=2, distribution=single)
- TableScan(name=PUBLIC.NT2_N2, source=3, partitions=1,
distribution=identity[0])
-
Fragment#1
targetNodes: [N1]
executionNodes: [N1]
@@ -58,6 +49,15 @@ Fragment#1
tree:
Sender(targetFragment=0, exchange=1, distribution=single)
TableScan(name=PUBLIC.NT1_N1, source=4, partitions=1,
distribution=identity[0])
+
+Fragment#2
+ targetNodes: [N1]
+ executionNodes: [N2]
+ tables: [NT2_N2]
+ partitions: {N2=[0:1]}
+ tree:
+ Sender(targetFragment=0, exchange=2, distribution=single)
+ TableScan(name=PUBLIC.NT2_N2, source=3, partitions=1,
distribution=identity[0])
---
N0
@@ -72,23 +72,23 @@ Fragment#0 root
Receiver(sourceFragment=1, exchange=1, distribution=single)
Receiver(sourceFragment=2, exchange=2, distribution=single)
-Fragment#2
+Fragment#1
targetNodes: [N0]
executionNodes: [N1]
- tables: [NT2_N1]
+ tables: [NT1_N1]
partitions: {N1=[0:1]}
tree:
- Sender(targetFragment=0, exchange=2, distribution=single)
- TableScan(name=PUBLIC.NT2_N1, source=3, partitions=1,
distribution=identity[0])
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ TableScan(name=PUBLIC.NT1_N1, source=4, partitions=1,
distribution=identity[0])
-Fragment#1
+Fragment#2
targetNodes: [N0]
executionNodes: [N1]
- tables: [NT1_N1]
+ tables: [NT2_N1]
partitions: {N1=[0:1]}
tree:
- Sender(targetFragment=0, exchange=1, distribution=single)
- TableScan(name=PUBLIC.NT1_N1, source=4, partitions=1,
distribution=identity[0])
+ Sender(targetFragment=0, exchange=2, distribution=single)
+ TableScan(name=PUBLIC.NT2_N1, source=3, partitions=1,
distribution=identity[0])
---
N1
@@ -103,21 +103,21 @@ Fragment#0 root
Receiver(sourceFragment=1, exchange=1, distribution=single)
Receiver(sourceFragment=2, exchange=2, distribution=single)
-Fragment#2
+Fragment#1
targetNodes: [N1]
executionNodes: [N1]
- tables: [NT2_N1]
+ tables: [NT1_N1]
partitions: {N1=[0:1]}
tree:
- Sender(targetFragment=0, exchange=2, distribution=single)
- TableScan(name=PUBLIC.NT2_N1, source=3, partitions=1,
distribution=identity[0])
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ TableScan(name=PUBLIC.NT1_N1, source=4, partitions=1,
distribution=identity[0])
-Fragment#1
+Fragment#2
targetNodes: [N1]
executionNodes: [N1]
- tables: [NT1_N1]
+ tables: [NT2_N1]
partitions: {N1=[0:1]}
tree:
- Sender(targetFragment=0, exchange=1, distribution=single)
- TableScan(name=PUBLIC.NT1_N1, source=4, partitions=1,
distribution=identity[0])
+ Sender(targetFragment=0, exchange=2, distribution=single)
+ TableScan(name=PUBLIC.NT2_N1, source=3, partitions=1,
distribution=identity[0])
---
diff --git
a/modules/sql-engine/src/test/resources/mapping/table_identity_single.test
b/modules/sql-engine/src/test/resources/mapping/table_identity_single.test
index 006200b299..ce1d33459b 100644
--- a/modules/sql-engine/src/test/resources/mapping/table_identity_single.test
+++ b/modules/sql-engine/src/test/resources/mapping/table_identity_single.test
@@ -1,23 +1,14 @@
N0
SELECT * FROM CT_n1, NT_n1
---
-Fragment#4 root
+Fragment#0 root
executionNodes: [N0]
- remoteFragments: [5]
- exchangeSourceNodes: {5=[N1]}
+ remoteFragments: [4]
+ exchangeSourceNodes: {4=[N1]}
tree:
- Receiver(sourceFragment=5, exchange=5, distribution=single)
-
-Fragment#2
- targetNodes: [N1]
- executionNodes: [N1]
- tables: [NT_N1]
- partitions: {N1=[0:1]}
- tree:
- Sender(targetFragment=5, exchange=2, distribution=single)
- TableScan(name=PUBLIC.NT_N1, source=3, partitions=1,
distribution=identity[0])
+ Receiver(sourceFragment=4, exchange=4, distribution=single)
-Fragment#5
+Fragment#4
targetNodes: [N0]
executionNodes: [N1]
remoteFragments: [2]
@@ -25,10 +16,19 @@ Fragment#5
tables: [CT_N1]
partitions: {N1=[0:1]}
tree:
- Sender(targetFragment=4, exchange=5, distribution=single)
+ Sender(targetFragment=0, exchange=4, distribution=single)
NestedLoopJoin
TableScan(name=PUBLIC.CT_N1, source=1, partitions=1,
distribution=single)
Receiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2
+ targetNodes: [N1]
+ executionNodes: [N1]
+ tables: [NT_N1]
+ partitions: {N1=[0:1]}
+ tree:
+ Sender(targetFragment=4, exchange=2, distribution=single)
+ TableScan(name=PUBLIC.NT_N1, source=3, partitions=1,
distribution=identity[0])
---
N1
@@ -58,23 +58,14 @@ Fragment#2
N0
SELECT * FROM CT_n1, NT_n2
---
-Fragment#4 root
+Fragment#0 root
executionNodes: [N0]
- remoteFragments: [5]
- exchangeSourceNodes: {5=[N1]}
+ remoteFragments: [4]
+ exchangeSourceNodes: {4=[N1]}
tree:
- Receiver(sourceFragment=5, exchange=5, distribution=single)
-
-Fragment#2
- targetNodes: [N1]
- executionNodes: [N2]
- tables: [NT_N2]
- partitions: {N2=[0:1]}
- tree:
- Sender(targetFragment=5, exchange=2, distribution=single)
- TableScan(name=PUBLIC.NT_N2, source=3, partitions=1,
distribution=identity[0])
+ Receiver(sourceFragment=4, exchange=4, distribution=single)
-Fragment#5
+Fragment#4
targetNodes: [N0]
executionNodes: [N1]
remoteFragments: [2]
@@ -82,10 +73,19 @@ Fragment#5
tables: [CT_N1]
partitions: {N1=[0:1]}
tree:
- Sender(targetFragment=4, exchange=5, distribution=single)
+ Sender(targetFragment=0, exchange=4, distribution=single)
NestedLoopJoin
TableScan(name=PUBLIC.CT_N1, source=1, partitions=1,
distribution=single)
Receiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2
+ targetNodes: [N1]
+ executionNodes: [N2]
+ tables: [NT_N2]
+ partitions: {N2=[0:1]}
+ tree:
+ Sender(targetFragment=4, exchange=2, distribution=single)
+ TableScan(name=PUBLIC.NT_N2, source=3, partitions=1,
distribution=identity[0])
---
N1
diff --git a/modules/sql-engine/src/test/resources/mapping/table_single.test
b/modules/sql-engine/src/test/resources/mapping/table_single.test
index 12d7342b82..7a2aa7ea1d 100644
--- a/modules/sql-engine/src/test/resources/mapping/table_single.test
+++ b/modules/sql-engine/src/test/resources/mapping/table_single.test
@@ -1,20 +1,20 @@
N0
SELECT * FROM ct1_n1, ct2_n1
---
-Fragment#3 root
+Fragment#0 root
executionNodes: [N0]
- remoteFragments: [4]
- exchangeSourceNodes: {4=[N1]}
+ remoteFragments: [3]
+ exchangeSourceNodes: {3=[N1]}
tree:
- Receiver(sourceFragment=4, exchange=4, distribution=single)
+ Receiver(sourceFragment=3, exchange=3, distribution=single)
-Fragment#4
+Fragment#3
targetNodes: [N0]
executionNodes: [N1]
tables: [CT1_N1, CT2_N1]
partitions: {N1=[0:1]}
tree:
- Sender(targetFragment=3, exchange=4, distribution=single)
+ Sender(targetFragment=0, exchange=3, distribution=single)
NestedLoopJoin
TableScan(name=PUBLIC.CT1_N1, source=1, partitions=1,
distribution=single)
TableScan(name=PUBLIC.CT2_N1, source=2, partitions=1,
distribution=single)
@@ -36,65 +36,65 @@ Fragment#0 root
N0
SELECT * FROM ct1_n1, ct2_n2
---
-Fragment#5 root
+Fragment#0 root
executionNodes: [N0]
- remoteFragments: [6]
- exchangeSourceNodes: {6=[N2]}
+ remoteFragments: [4]
+ exchangeSourceNodes: {4=[N2]}
tree:
- Receiver(sourceFragment=6, exchange=6, distribution=single)
+ Receiver(sourceFragment=4, exchange=4, distribution=single)
Fragment#4
- targetNodes: [N2]
- executionNodes: [N1]
- tables: [CT1_N1]
- partitions: {N1=[0:1]}
- tree:
- Sender(targetFragment=6, exchange=4, distribution=single)
- TableScan(name=PUBLIC.CT1_N1, source=1, partitions=1,
distribution=single)
-
-Fragment#6
targetNodes: [N0]
executionNodes: [N2]
- remoteFragments: [4]
- exchangeSourceNodes: {4=[N1]}
+ remoteFragments: [3]
+ exchangeSourceNodes: {3=[N1]}
tables: [CT2_N2]
partitions: {N2=[0:1]}
tree:
- Sender(targetFragment=5, exchange=6, distribution=single)
+ Sender(targetFragment=0, exchange=4, distribution=single)
NestedLoopJoin
- Receiver(sourceFragment=4, exchange=4, distribution=single)
+ Receiver(sourceFragment=3, exchange=3, distribution=single)
TableScan(name=PUBLIC.CT2_N2, source=2, partitions=1,
distribution=single)
+
+Fragment#3
+ targetNodes: [N2]
+ executionNodes: [N1]
+ tables: [CT1_N1]
+ partitions: {N1=[0:1]}
+ tree:
+ Sender(targetFragment=4, exchange=3, distribution=single)
+ TableScan(name=PUBLIC.CT1_N1, source=1, partitions=1,
distribution=single)
---
N1
SELECT * FROM ct1_n1, ct2_n2
---
-Fragment#5 root
+Fragment#0 root
executionNodes: [N1]
- remoteFragments: [6]
- exchangeSourceNodes: {6=[N2]}
+ remoteFragments: [4]
+ exchangeSourceNodes: {4=[N2]}
tree:
- Receiver(sourceFragment=6, exchange=6, distribution=single)
+ Receiver(sourceFragment=4, exchange=4, distribution=single)
Fragment#4
- targetNodes: [N2]
- executionNodes: [N1]
- tables: [CT1_N1]
- partitions: {N1=[0:1]}
- tree:
- Sender(targetFragment=6, exchange=4, distribution=single)
- TableScan(name=PUBLIC.CT1_N1, source=1, partitions=1,
distribution=single)
-
-Fragment#6
targetNodes: [N1]
executionNodes: [N2]
- remoteFragments: [4]
- exchangeSourceNodes: {4=[N1]}
+ remoteFragments: [3]
+ exchangeSourceNodes: {3=[N1]}
tables: [CT2_N2]
partitions: {N2=[0:1]}
tree:
- Sender(targetFragment=5, exchange=6, distribution=single)
+ Sender(targetFragment=0, exchange=4, distribution=single)
NestedLoopJoin
- Receiver(sourceFragment=4, exchange=4, distribution=single)
+ Receiver(sourceFragment=3, exchange=3, distribution=single)
TableScan(name=PUBLIC.CT2_N2, source=2, partitions=1,
distribution=single)
+
+Fragment#3
+ targetNodes: [N2]
+ executionNodes: [N1]
+ tables: [CT1_N1]
+ partitions: {N1=[0:1]}
+ tree:
+ Sender(targetFragment=4, exchange=3, distribution=single)
+ TableScan(name=PUBLIC.CT1_N1, source=1, partitions=1,
distribution=single)
---
diff --git
a/modules/sql-engine/src/test/resources/mapping/test_backup_mapping.test
b/modules/sql-engine/src/test/resources/mapping/test_backup_mapping.test
new file mode 100644
index 0000000000..e3e833b50f
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/test_backup_mapping.test
@@ -0,0 +1,162 @@
+# root of the query is colocated with first partition
+N0
+SELECT * FROM t1_n0n1_n1n2
+---
+Fragment#0 root
+ executionNodes: [N0]
+ remoteFragments: [1]
+ exchangeSourceNodes: {1=[N0, N1]}
+ tree:
+ Receiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N0]
+ executionNodes: [N0, N1]
+ tables: [T1_N0N1_N1N2]
+ partitions: {N0=[0:2], N1=[1:2]}
+ tree:
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ TableScan(name=PUBLIC.T1_N0N1_N1N2, source=2, partitions=2,
distribution=affinity[table: T1_N0N1_N1N2, columns: [ID]])
+---
+# root of the query is colocated with both partitions
+N1
+SELECT * FROM t1_n0n1_n1n2
+---
+Fragment#0 root
+ executionNodes: [N1]
+ remoteFragments: [1]
+ exchangeSourceNodes: {1=[N1]}
+ tree:
+ Receiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N1]
+ executionNodes: [N1]
+ tables: [T1_N0N1_N1N2]
+ partitions: {N1=[0:2, 1:2]}
+ tree:
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ TableScan(name=PUBLIC.T1_N0N1_N1N2, source=2, partitions=2,
distribution=affinity[table: T1_N0N1_N1N2, columns: [ID]])
+---
+# root of the query is colocated with second partition
+N2
+SELECT * FROM t1_n0n1_n1n2
+---
+Fragment#0 root
+ executionNodes: [N2]
+ remoteFragments: [1]
+ exchangeSourceNodes: {1=[N0, N2]}
+ tree:
+ Receiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N2]
+ executionNodes: [N0, N2]
+ tables: [T1_N0N1_N1N2]
+ partitions: {N0=[0:2], N2=[1:2]}
+ tree:
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ TableScan(name=PUBLIC.T1_N0N1_N1N2, source=2, partitions=2,
distribution=affinity[table: T1_N0N1_N1N2, columns: [ID]])
+---
+# although root is colocated with one of the partitions, algorithm prefers to
colocate join stage
+N0
+SELECT * FROM t2_n0n1n2 as t1, t3_n1n2 as t2 WHERE t1.id = t2.id
+---
+Fragment#0 root
+ executionNodes: [N0]
+ remoteFragments: [1]
+ exchangeSourceNodes: {1=[N1]}
+ tree:
+ Receiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N0]
+ executionNodes: [N1]
+ remoteFragments: [3]
+ exchangeSourceNodes: {3=[N1]}
+ tables: [T3_N1N2]
+ partitions: {N1=[0:1]}
+ tree:
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ Project
+ HashJoin
+ TableScan(name=PUBLIC.T3_N1N2, source=2, partitions=1,
distribution=affinity[table: T3_N1N2, columns: [ID]])
+ Receiver(sourceFragment=3, exchange=3, distribution=affinity[table:
T3_N1N2, columns: [ID]])
+
+Fragment#3
+ targetNodes: [N1]
+ executionNodes: [N1]
+ tables: [T2_N0N1N2]
+ partitions: {N1=[0:1]}
+ tree:
+ Sender(targetFragment=1, exchange=3, distribution=affinity[table: T3_N1N2,
columns: [ID]])
+ TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=1,
distribution=affinity[table: T2_N0N1N2, columns: [ID]])
+---
+# everything is colocated
+N1
+SELECT * FROM t2_n0n1n2 as t1, t3_n1n2 as t2 WHERE t1.id = t2.id
+---
+Fragment#0 root
+ executionNodes: [N1]
+ remoteFragments: [1]
+ exchangeSourceNodes: {1=[N1]}
+ tree:
+ Receiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N1]
+ executionNodes: [N1]
+ remoteFragments: [3]
+ exchangeSourceNodes: {3=[N1]}
+ tables: [T3_N1N2]
+ partitions: {N1=[0:1]}
+ tree:
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ Project
+ HashJoin
+ TableScan(name=PUBLIC.T3_N1N2, source=2, partitions=1,
distribution=affinity[table: T3_N1N2, columns: [ID]])
+ Receiver(sourceFragment=3, exchange=3, distribution=affinity[table:
T3_N1N2, columns: [ID]])
+
+Fragment#3
+ targetNodes: [N1]
+ executionNodes: [N1]
+ tables: [T2_N0N1N2]
+ partitions: {N1=[0:1]}
+ tree:
+ Sender(targetFragment=1, exchange=3, distribution=affinity[table: T3_N1N2,
columns: [ID]])
+ TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=1,
distribution=affinity[table: T2_N0N1N2, columns: [ID]])
+---
+# everything is colocated, but from different root
+N2
+SELECT * FROM t2_n0n1n2 as t1, t3_n1n2 as t2 WHERE t1.id = t2.id
+---
+Fragment#0 root
+ executionNodes: [N2]
+ remoteFragments: [1]
+ exchangeSourceNodes: {1=[N2]}
+ tree:
+ Receiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N2]
+ executionNodes: [N2]
+ remoteFragments: [3]
+ exchangeSourceNodes: {3=[N2]}
+ tables: [T3_N1N2]
+ partitions: {N2=[0:1]}
+ tree:
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ Project
+ HashJoin
+ TableScan(name=PUBLIC.T3_N1N2, source=2, partitions=1,
distribution=affinity[table: T3_N1N2, columns: [ID]])
+ Receiver(sourceFragment=3, exchange=3, distribution=affinity[table:
T3_N1N2, columns: [ID]])
+
+Fragment#3
+ targetNodes: [N2]
+ executionNodes: [N2]
+ tables: [T2_N0N1N2]
+ partitions: {N2=[0:1]}
+ tree:
+ Sender(targetFragment=1, exchange=3, distribution=affinity[table: T3_N1N2,
columns: [ID]])
+ TableScan(name=PUBLIC.T2_N0N1N2, source=4, partitions=1,
distribution=affinity[table: T2_N0N1N2, columns: [ID]])
+---
diff --git
a/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
b/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
index 26ee11a006..371bfcaaae 100644
--- a/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
+++ b/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
@@ -31,16 +31,6 @@ Fragment#0 root
Receiver(sourceFragment=1, exchange=1, distribution=single)
Receiver(sourceFragment=2, exchange=2, distribution=single)
-Fragment#2
- targetNodes: [N1]
- executionNodes: [N4]
- tables: [T2_N4N5]
- partitions: {N4=[0:2]}
- tree:
- Sender(targetFragment=0, exchange=2, distribution=single)
- Sort
- TableScan(name=PUBLIC.T2_N4N5, source=3, partitions=2,
distribution=affinity[table: T2_N4N5, columns: [ID]])
-
Fragment#1
targetNodes: [N1]
executionNodes: [N2]
@@ -50,6 +40,16 @@ Fragment#1
Sender(targetFragment=0, exchange=1, distribution=single)
Sort
TableScan(name=PUBLIC.T1_N1N2N3, source=4, partitions=3,
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
+
+Fragment#2
+ targetNodes: [N1]
+ executionNodes: [N4]
+ tables: [T2_N4N5]
+ partitions: {N4=[0:2]}
+ tree:
+ Sender(targetFragment=0, exchange=2, distribution=single)
+ Sort
+ TableScan(name=PUBLIC.T2_N4N5, source=3, partitions=2,
distribution=affinity[table: T2_N4N5, columns: [ID]])
---
# Self join, different predicates that produce same set of partitions
N1
@@ -64,25 +64,25 @@ Fragment#0 root
Receiver(sourceFragment=1, exchange=1, distribution=single)
Receiver(sourceFragment=2, exchange=2, distribution=single)
-Fragment#2
+Fragment#1
targetNodes: [N1]
executionNodes: [N2]
tables: [T1_N1N2N3]
partitions: {N2=[1:3]}
tree:
- Sender(targetFragment=0, exchange=2, distribution=single)
+ Sender(targetFragment=0, exchange=1, distribution=single)
Sort
- TableScan(name=PUBLIC.T1_N1N2N3, source=3, partitions=3,
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
+ TableScan(name=PUBLIC.T1_N1N2N3, source=4, partitions=3,
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
-Fragment#1
+Fragment#2
targetNodes: [N1]
executionNodes: [N2]
tables: [T1_N1N2N3]
partitions: {N2=[1:3]}
tree:
- Sender(targetFragment=0, exchange=1, distribution=single)
+ Sender(targetFragment=0, exchange=2, distribution=single)
Sort
- TableScan(name=PUBLIC.T1_N1N2N3, source=4, partitions=3,
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
+ TableScan(name=PUBLIC.T1_N1N2N3, source=3, partitions=3,
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
---
# Self join, different predicates that produce disjoint set of partitions
N1
@@ -125,25 +125,25 @@ Fragment#0 root
ReduceHashAggregate
Receiver(sourceFragment=2, exchange=2, distribution=single)
+Fragment#1
+ targetNodes: [N0]
+ executionNodes: [N3]
+ tables: [T1_N1N2N3]
+ partitions: {N3=[2:3]}
+ tree:
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ TableScan(name=PUBLIC.T1_N1N2N3, source=4, partitions=3,
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
+
Fragment#2 correlated
targetNodes: [N0]
executionNodes: [N1, N2, N3]
tables: [T3_N1N2N3]
partitions: {N1=[0:3], N2=[1:3], N3=[2:3]}
- pruningMetadata: [3=[{0=$cor0.ID}]]
+ pruningMetadata: [3=[{0=$cor1.ID}]]
tree:
Sender(targetFragment=0, exchange=2, distribution=single)
MapHashAggregate
TableScan(name=PUBLIC.T3_N1N2N3, source=3, partitions=3,
distribution=random)
-
-Fragment#1
- targetNodes: [N0]
- executionNodes: [N3]
- tables: [T1_N1N2N3]
- partitions: {N3=[2:3]}
- tree:
- Sender(targetFragment=0, exchange=1, distribution=single)
- TableScan(name=PUBLIC.T1_N1N2N3, source=4, partitions=3,
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
---
# Correlated.
# Prune partitions from left arm statically, and pass meta to the right arm.
@@ -162,23 +162,23 @@ Fragment#0 root
ReduceHashAggregate
Receiver(sourceFragment=2, exchange=2, distribution=single)
+Fragment#1
+ targetNodes: [N0]
+ executionNodes: [N3]
+ tables: [T1_N1N2N3]
+ partitions: {N3=[2:3]}
+ tree:
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ TableScan(name=PUBLIC.T1_N1N2N3, source=4, partitions=3,
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
+
Fragment#2 correlated
targetNodes: [N0]
executionNodes: [N4, N5]
tables: [T2_N4N5]
partitions: {N4=[0:2], N5=[1:2]}
- pruningMetadata: [3=[{0=$cor0.ID}]]
+ pruningMetadata: [3=[{0=$cor1.ID}]]
tree:
Sender(targetFragment=0, exchange=2, distribution=single)
MapHashAggregate
TableScan(name=PUBLIC.T2_N4N5, source=3, partitions=2,
distribution=random)
-
-Fragment#1
- targetNodes: [N0]
- executionNodes: [N3]
- tables: [T1_N1N2N3]
- partitions: {N3=[2:3]}
- tree:
- Sender(targetFragment=0, exchange=1, distribution=single)
- TableScan(name=PUBLIC.T1_N1N2N3, source=4, partitions=3,
distribution=affinity[table: T1_N1N2N3, columns: [ID]])
---
diff --git a/modules/sql-engine/src/test/resources/mapping/union.test
b/modules/sql-engine/src/test/resources/mapping/union.test
index 1ab8915466..f364a29779 100644
--- a/modules/sql-engine/src/test/resources/mapping/union.test
+++ b/modules/sql-engine/src/test/resources/mapping/union.test
@@ -37,14 +37,25 @@ Fragment#0 root
Fragment#1
targetNodes: [N1]
executionNodes: [N1]
- tables: [T1_N1, T2_N1]
+ remoteFragments: [2]
+ exchangeSourceNodes: {2=[N1]}
+ tables: [T2_N1]
partitions: {N1=[0:1]}
tree:
Sender(targetFragment=0, exchange=1, distribution=single)
ColocatedHashAggregate
UnionAll
- TableScan(name=PUBLIC.T1_N1, source=2, partitions=1,
distribution=affinity[table: T1_N1, columns: [ID]])
+ Receiver(sourceFragment=2, exchange=2, distribution=affinity[table:
T2_N1, columns: [ID]])
TableScan(name=PUBLIC.T2_N1, source=3, partitions=1,
distribution=affinity[table: T2_N1, columns: [ID]])
+
+Fragment#2
+ targetNodes: [N1]
+ executionNodes: [N1]
+ tables: [T1_N1]
+ partitions: {N1=[0:1]}
+ tree:
+ Sender(targetFragment=1, exchange=2, distribution=affinity[table: T2_N1,
columns: [ID]])
+ TableScan(name=PUBLIC.T1_N1, source=4, partitions=1,
distribution=affinity[table: T1_N1, columns: [ID]])
---
N1
@@ -86,12 +97,23 @@ Fragment#0 root
Fragment#1
targetNodes: [N1]
executionNodes: [N1, N2]
- tables: [T1_N1N2, T2_N1N2]
+ remoteFragments: [2]
+ exchangeSourceNodes: {2=[N1, N2]}
+ tables: [T2_N1N2]
partitions: {N1=[0:2], N2=[1:2]}
tree:
Sender(targetFragment=0, exchange=1, distribution=single)
ColocatedHashAggregate
UnionAll
- TableScan(name=PUBLIC.T1_N1N2, source=2, partitions=2,
distribution=affinity[table: T1_N1N2, columns: [ID]])
+ Receiver(sourceFragment=2, exchange=2, distribution=affinity[table:
T2_N1N2, columns: [ID]])
TableScan(name=PUBLIC.T2_N1N2, source=3, partitions=2,
distribution=affinity[table: T2_N1N2, columns: [ID]])
+
+Fragment#2
+ targetNodes: [N1, N2]
+ executionNodes: [N1, N2]
+ tables: [T1_N1N2]
+ partitions: {N1=[0:2], N2=[1:2]}
+ tree:
+ Sender(targetFragment=1, exchange=2, distribution=affinity[table: T2_N1N2,
columns: [ID]])
+ TableScan(name=PUBLIC.T1_N1N2, source=4, partitions=2,
distribution=affinity[table: T1_N1N2, columns: [ID]])
---