This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dd63f817e2 IGNITE-12692 SQL Calcite: Distributed table modify - Fixes 
#12593.
8dd63f817e2 is described below

commit 8dd63f817e2076273176836b029037c3d0d60a3c
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Tue Dec 30 17:24:03 2025 +0300

    IGNITE-12692 SQL Calcite: Distributed table modify - Fixes #12593.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../query/calcite/metadata/IgniteMdRowCount.java   |   8 +
 .../query/calcite/prepare/PlannerPhase.java        |   6 +-
 .../query/calcite/rel/IgniteTableModify.java       |  56 +++++-
 .../calcite/rule/AbstractIgniteConverterRule.java  |   5 +-
 .../calcite/rule/HashAggregateConverterRule.java   |  21 +-
 .../query/calcite/rule/ProjectConverterRule.java   |   4 +-
 .../rule/TableModifyDistributedConverterRule.java  | 150 ++++++++++++++
 ...ava => TableModifySingleNodeConverterRule.java} |  14 +-
 .../processors/query/calcite/type/OtherType.java   |  11 ++
 .../query/calcite/planner/AbstractPlannerTest.java | 186 +-----------------
 .../query/calcite/planner/TableDmlPlannerTest.java | 215 +++++++++++++++++++++
 .../query/calcite/planner/TestTable.java           | 187 +++++++++++++++++-
 12 files changed, 651 insertions(+), 212 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
index 59c81cb3694..756e4b4e0c6 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
@@ -36,6 +36,7 @@ import org.apache.calcite.util.Util;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortedIndexSpool;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
 import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.Nullable;
 
@@ -149,4 +150,11 @@ public class IgniteMdRowCount extends RelMdRowCount {
     public double getRowCount(IgniteLimit rel, RelMetadataQuery mq) {
         return rel.estimateRowCount(mq);
     }
+
+    /**
+     * Estimation of row count for Table modify operator.
+     */
+    public double getRowCount(IgniteTableModify rel, RelMetadataQuery mq) {
+        return rel.estimateRowCount(mq);
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index bbc5217773d..5fc738cbc91 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -64,7 +64,8 @@ import 
org.apache.ignite.internal.processors.query.calcite.rule.SetOpConverterRu
 import 
org.apache.ignite.internal.processors.query.calcite.rule.SortAggregateConverterRule;
 import 
org.apache.ignite.internal.processors.query.calcite.rule.SortConverterRule;
 import 
org.apache.ignite.internal.processors.query.calcite.rule.TableFunctionScanConverterRule;
-import 
org.apache.ignite.internal.processors.query.calcite.rule.TableModifyConverterRule;
+import 
org.apache.ignite.internal.processors.query.calcite.rule.TableModifyDistributedConverterRule;
+import 
org.apache.ignite.internal.processors.query.calcite.rule.TableModifySingleNodeConverterRule;
 import 
org.apache.ignite.internal.processors.query.calcite.rule.UncollectConverterRule;
 import 
org.apache.ignite.internal.processors.query.calcite.rule.UnionConverterRule;
 import 
org.apache.ignite.internal.processors.query.calcite.rule.ValuesConverterRule;
@@ -292,7 +293,8 @@ public enum PlannerPhase {
                     SetOpConverterRule.MAP_REDUCE_INTERSECT,
                     ProjectConverterRule.INSTANCE,
                     FilterConverterRule.INSTANCE,
-                    TableModifyConverterRule.INSTANCE,
+                    TableModifySingleNodeConverterRule.INSTANCE,
+                    TableModifyDistributedConverterRule.INSTANCE,
                     UnionConverterRule.INSTANCE,
                     SortConverterRule.INSTANCE,
                     TableFunctionScanConverterRule.INSTANCE
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java
index 2ca7dfac1d3..7168caadc90 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java
@@ -18,17 +18,25 @@
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
 import java.util.List;
+import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /** */
 public class IgniteTableModify extends TableModify implements IgniteRel {
+    /** If table modify can affect data source. */
+    private final boolean affectsSrc;
+
     /**
      * Creates a {@code TableModify}.
      *
@@ -43,8 +51,9 @@ public class IgniteTableModify extends TableModify implements 
IgniteRel {
      * @param input Sub-query or filter condition.
      * @param operation Modify operation (INSERT, UPDATE, DELETE, MERGE).
      * @param updateColumnList List of column identifiers to be updated (e.g. 
ident1, ident2); null if not UPDATE.
-     * @param sourceExpressionList List of value expressions to be set (e.g. 
exp1, exp2); null if not UPDATE.
+     * @param srcExpressionList List of value expressions to be set (e.g. 
exp1, exp2); null if not UPDATE.
      * @param flattened Whether set flattens the input row type.
+     * @param affectsSrc If table modify can affect data source.
      */
     public IgniteTableModify(
         RelOptCluster cluster,
@@ -53,12 +62,15 @@ public class IgniteTableModify extends TableModify 
implements IgniteRel {
         RelNode input,
         Operation operation,
         List<String> updateColumnList,
-        List<RexNode> sourceExpressionList,
-        boolean flattened
+        List<RexNode> srcExpressionList,
+        boolean flattened,
+        boolean affectsSrc
     ) {
         super(cluster, traitSet, table, 
Commons.context(cluster).catalogReader(),
             input, operation, updateColumnList,
-            sourceExpressionList, flattened);
+            srcExpressionList, flattened);
+
+        this.affectsSrc = affectsSrc;
     }
 
     /**
@@ -75,7 +87,8 @@ public class IgniteTableModify extends TableModify implements 
IgniteRel {
             input.getEnum("operation", Operation.class),
             input.getStringList("updateColumnList"),
             input.getExpressionList("sourceExpressionList"),
-            input.getBoolean("flattened", true)
+            input.getBoolean("flattened", true),
+            false // Field only for planning.
         );
     }
 
@@ -89,7 +102,8 @@ public class IgniteTableModify extends TableModify 
implements IgniteRel {
             getOperation(),
             getUpdateColumnList(),
             getSourceExpressionList(),
-            isFlattened());
+            isFlattened(),
+            affectsSrc);
     }
 
     /** {@inheritDoc} */
@@ -100,6 +114,34 @@ public class IgniteTableModify extends TableModify 
implements IgniteRel {
     /** {@inheritDoc} */
     @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> 
inputs) {
         return new IgniteTableModify(cluster, getTraitSet(), getTable(), 
sole(inputs),
-            getOperation(), getUpdateColumnList(), getSourceExpressionList(), 
isFlattened());
+            getOperation(), getUpdateColumnList(), getSourceExpressionList(), 
isFlattened(), affectsSrc);
+    }
+
+    /** {@inheritDoc} */
+    @Override public double estimateRowCount(RelMetadataQuery mq) {
+        return 1.0D;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
deriveTraits(RelTraitSet childTraits, int childId) {
+        // Don't derive traits for single-node table modify.
+        if (TraitUtils.distribution(traitSet) == IgniteDistributions.single())
+            return null;
+
+        assert childId == 0;
+
+        if (childTraits.getConvention() != IgniteConvention.INSTANCE)
+            return null;
+
+        // If modify can affect data source (for example, INSERT contains self 
table as source) only
+        // modified table affinity distibution is possible, otherwise 
inconsistency is possible on remote nodes.
+        if (affectsSrc)
+            return null;
+
+        // Any distributed (random/hash) trait is accepted if data source is 
not affected by modify.
+        if 
(!TraitUtils.distribution(childTraits).satisfies(IgniteDistributions.random()))
+            return null;
+
+        return Pair.of(traitSet, ImmutableList.of(childTraits));
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AbstractIgniteConverterRule.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AbstractIgniteConverterRule.java
index edd656ab16e..935d4af371b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AbstractIgniteConverterRule.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AbstractIgniteConverterRule.java
@@ -24,6 +24,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.convert.ConverterRule;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.jetbrains.annotations.Nullable;
 
 /** */
 public abstract class AbstractIgniteConverterRule<T extends RelNode> extends 
ConverterRule {
@@ -39,7 +40,7 @@ public abstract class AbstractIgniteConverterRule<T extends 
RelNode> extends Con
     }
 
     /** {@inheritDoc} */
-    @Override public final RelNode convert(RelNode rel) {
+    @Override public final @Nullable RelNode convert(RelNode rel) {
         return convert(rel.getCluster().getPlanner(), 
rel.getCluster().getMetadataQuery(), (T)rel);
     }
 
@@ -51,5 +52,5 @@ public abstract class AbstractIgniteConverterRule<T extends 
RelNode> extends Con
      * @param rel Rel node.
      * @return Physical rel.
      */
-    protected abstract PhysicalNode convert(RelOptPlanner planner, 
RelMetadataQuery mq, T rel);
+    protected abstract @Nullable PhysicalNode convert(RelOptPlanner planner, 
RelMetadataQuery mq, T rel);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java
index 38f5b41fc14..65cca0e7759 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashAggregateConverterRule.java
@@ -19,10 +19,10 @@ package 
org.apache.ignite.internal.processors.query.calcite.rule;
 
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.PhysicalNode;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.ignite.internal.processors.query.calcite.hint.HintUtils;
@@ -31,16 +31,17 @@ import 
org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteColocat
 import 
org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
  */
 public class HashAggregateConverterRule {
     /** */
-    public static final RelOptRule COLOCATED = new 
ColocatedHashAggregateConverterRule();
+    public static final ConverterRule COLOCATED = new 
ColocatedHashAggregateConverterRule();
 
     /** */
-    public static final RelOptRule MAP_REDUCE = new 
MapReduceHashAggregateConverterRule();
+    public static final ConverterRule MAP_REDUCE = new 
MapReduceHashAggregateConverterRule();
 
     /** */
     private HashAggregateConverterRule() {
@@ -55,8 +56,11 @@ public class HashAggregateConverterRule {
         }
 
         /** {@inheritDoc} */
-        @Override protected PhysicalNode convert(RelOptPlanner planner, 
RelMetadataQuery mq,
-            LogicalAggregate agg) {
+        @Override protected @Nullable PhysicalNode convert(
+            RelOptPlanner planner,
+            RelMetadataQuery mq,
+            LogicalAggregate agg
+        ) {
             if (HintUtils.isExpandDistinctAggregate(agg))
                 return null;
 
@@ -84,8 +88,11 @@ public class HashAggregateConverterRule {
         }
 
         /** {@inheritDoc} */
-        @Override protected PhysicalNode convert(RelOptPlanner planner, 
RelMetadataQuery mq,
-            LogicalAggregate agg) {
+        @Override protected @Nullable PhysicalNode convert(
+            RelOptPlanner planner,
+            RelMetadataQuery mq,
+            LogicalAggregate agg
+        ) {
             if (HintUtils.isExpandDistinctAggregate(agg))
                 return null;
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/ProjectConverterRule.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/ProjectConverterRule.java
index 427398ef264..4be79e6ddbf 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/ProjectConverterRule.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/ProjectConverterRule.java
@@ -20,10 +20,10 @@ package 
org.apache.ignite.internal.processors.query.calcite.rule;
 import java.util.Set;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.PhysicalNode;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
@@ -39,7 +39,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
  */
 public class ProjectConverterRule extends 
AbstractIgniteConverterRule<LogicalProject> {
     /** */
-    public static final RelOptRule INSTANCE = new ProjectConverterRule();
+    public static final ConverterRule INSTANCE = new ProjectConverterRule();
 
     /** */
     public ProjectConverterRule() {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyDistributedConverterRule.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyDistributedConverterRule.java
new file mode 100644
index 00000000000..eaa3fa3db90
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyDistributedConverterRule.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableIntList;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Converts LogicalTableModify to distributed IgniteTableModify (Perform table 
modify on remote nodes,
+ * aggregate affected rows count and send result to the initiator node).
+ */
+public class TableModifyDistributedConverterRule extends 
AbstractIgniteConverterRule<LogicalTableModify> {
+    /** */
+    public static final RelOptRule INSTANCE = new 
TableModifyDistributedConverterRule();
+
+    /**
+     * Creates a ConverterRule.
+     */
+    public TableModifyDistributedConverterRule() {
+        super(LogicalTableModify.class, 
TableModifyDistributedConverterRule.class.getSimpleName());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected @Nullable PhysicalNode convert(
+        RelOptPlanner planner,
+        RelMetadataQuery mq,
+        LogicalTableModify rel
+    ) {
+        // If transaction is explicitly started it's only allowed to perform 
table modify on initiator node.
+        if (Commons.queryTransactionVersion(planner.getContext()) != null)
+            return null;
+
+        RelDataType rowType = rel.getRowType();
+        IgniteTable table = rel.getTable().unwrap(IgniteTable.class);
+        IgniteDistribution inputDistribution = table.distribution();
+        boolean affectsSrc = false;
+
+        // Single distribution table modify is prefered in this case.
+        if (inputDistribution == IgniteDistributions.single())
+            return null;
+
+        RelOptCluster cluster = rel.getCluster();
+
+        switch (rel.getOperation()) {
+            case MERGE:
+                // Merge contains insert fields as well as update fields. It's 
impossible to check input distribution
+                // over these two fields sets in common case. Only corner 
cases can be implemented. Skip it for now.
+                return null;
+
+            case INSERT:
+                affectsSrc = 
RelOptUtil.findTables(rel).contains(rel.getTable());
+
+                if (inputDistribution.getType() != 
RelDistribution.Type.HASH_DISTRIBUTED) {
+                    if (affectsSrc)
+                        return null;
+                    else
+                        inputDistribution = 
IgniteDistributions.hash(ImmutableIntList.range(0,
+                            
table.getRowType(cluster.getTypeFactory()).getFieldCount()));
+                }
+
+                break;
+
+            case UPDATE:
+                if (inputDistribution.getType() != 
RelDistribution.Type.HASH_DISTRIBUTED)
+                    inputDistribution = 
IgniteDistributions.hash(ImmutableIntList.of(0));
+
+                break;
+
+            case DELETE:
+                inputDistribution = 
IgniteDistributions.hash(ImmutableIntList.of(0));
+
+                break;
+
+            default:
+                throw new IllegalStateException("Unknown operation type: " + 
rel.getOperation());
+        }
+
+        // Create distributed table modify.
+        RelTraitSet outputTraits = 
cluster.traitSetOf(IgniteConvention.INSTANCE)
+            .replace(IgniteDistributions.random())
+            .replace(RewindabilityTrait.ONE_WAY)
+            .replace(RelCollations.EMPTY);
+
+        RelTraitSet inputTraits = outputTraits.replace(inputDistribution);
+
+        RelNode input = convert(rel.getInput(), inputTraits);
+
+        RelNode tableModify = new IgniteTableModify(cluster, outputTraits, 
rel.getTable(), input, rel.getOperation(),
+            rel.getUpdateColumnList(), rel.getSourceExpressionList(), 
rel.isFlattened(), affectsSrc);
+
+        assert rowType.getFieldCount() == 1 : "Unexpected field count: " + 
rowType.getFieldCount();
+
+        // Create aggregate to pass affected rows count to initiator node.
+        RelDataTypeField outFld = rowType.getFieldList().get(0);
+
+        RelBuilder relBuilder = relBuilderFactory.create(cluster, null);
+
+        relBuilder.push(tableModify);
+        relBuilder.aggregate(relBuilder.groupKey(),
+            relBuilder.aggregateCall(SqlStdOperatorTable.SUM0, 
relBuilder.field(0)).as(outFld.getName()));
+
+        PhysicalNode agg = 
(PhysicalNode)HashAggregateConverterRule.MAP_REDUCE.convert(relBuilder.build());
+
+        if (agg == null)
+            return null;
+
+        // Create cast to original data type, since SUM aggregate extends type 
(i.e. sum(INT) -> BIGINT).
+        relBuilder.push(agg);
+        relBuilder.project(relBuilder.cast(relBuilder.fields().get(0), 
outFld.getType().getSqlTypeName()));
+
+        return 
(PhysicalNode)ProjectConverterRule.INSTANCE.convert(relBuilder.build());
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyConverterRule.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifySingleNodeConverterRule.java
similarity index 79%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyConverterRule.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifySingleNodeConverterRule.java
index 49fda24c0a5..e6ae0c3c88e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifyConverterRule.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/TableModifySingleNodeConverterRule.java
@@ -32,17 +32,17 @@ import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribut
 import 
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
 
 /**
- *
+ * Converts LogicalTableModify to single distribution IgniteTableModify 
(perform table modify on initiator node).
  */
-public class TableModifyConverterRule extends 
AbstractIgniteConverterRule<LogicalTableModify> {
+public class TableModifySingleNodeConverterRule extends 
AbstractIgniteConverterRule<LogicalTableModify> {
     /** */
-    public static final RelOptRule INSTANCE = new TableModifyConverterRule();
+    public static final RelOptRule INSTANCE = new 
TableModifySingleNodeConverterRule();
 
     /**
      * Creates a ConverterRule.
      */
-    public TableModifyConverterRule() {
-        super(LogicalTableModify.class, "TableModifyConverterRule");
+    public TableModifySingleNodeConverterRule() {
+        super(LogicalTableModify.class, 
TableModifySingleNodeConverterRule.class.getSimpleName());
     }
 
     /** {@inheritDoc} */
@@ -54,7 +54,7 @@ public class TableModifyConverterRule extends 
AbstractIgniteConverterRule<Logica
             .replace(RelCollations.EMPTY);
         RelNode input = convert(rel.getInput(), traits);
 
-        return new IgniteTableModify(cluster, traits, rel.getTable(), input,
-                rel.getOperation(), rel.getUpdateColumnList(), 
rel.getSourceExpressionList(), rel.isFlattened());
+        return new IgniteTableModify(cluster, traits, rel.getTable(), input, 
rel.getOperation(),
+            rel.getUpdateColumnList(), rel.getSourceExpressionList(), 
rel.isFlattened(), false);
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/OtherType.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/OtherType.java
index fbb0ce8600a..1a87a117ccd 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/OtherType.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/type/OtherType.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite.type;
 
 import java.lang.reflect.Type;
+import org.jetbrains.annotations.Nullable;
 
 /** OTHER SQL type for any value. */
 public class OtherType extends IgniteCustomType {
@@ -35,4 +36,14 @@ public class OtherType extends IgniteCustomType {
     @Override public Type storageType() {
         return Object.class;
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(@Nullable Object obj) {
+        // Digest is the same for built-in Calcite's OTHER type, make sure we 
get instance of correct class during
+        // canonization.
+        if (obj == null || obj.getClass() != getClass())
+            return false;
+
+        return super.equals(obj);
+    }
 }
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index 4dd43a9c807..b0310c799ce 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -24,14 +24,11 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
-import java.util.function.BiFunction;
 import java.util.function.Predicate;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import com.google.common.collect.ImmutableSet;
 import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.RelOptListener;
-import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.AbstractRelNode;
@@ -39,34 +36,21 @@ import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.RelVisitor;
-import org.apache.calcite.rel.core.TableModify;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.ColumnStrategy;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlExplainLevel;
-import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql2rel.InitializerContext;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Util;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.failure.FailureProcessor;
-import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import 
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
-import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor;
 import 
org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader;
 import 
org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage;
@@ -84,15 +68,10 @@ import 
org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import 
org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
-import 
org.apache.ignite.internal.processors.query.calcite.schema.ColumnDescriptor;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
-import org.apache.ignite.internal.processors.query.calcite.schema.ModifyTuple;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.ListeningTestLogger;
@@ -102,7 +81,6 @@ import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.junit.After;
 import org.junit.Before;
-import org.mockito.Mockito;
 
 import static org.apache.calcite.tools.Frameworks.createRootSchema;
 import static 
org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonWriter.toJson;
@@ -703,8 +681,9 @@ public abstract class AbstractPlannerTest extends 
GridCommonAbstractTest {
             if (!(fields[i + 1] instanceof Class) && !(fields[i + 1] 
instanceof SqlTypeName))
                 throw new IllegalArgumentException("'fields[" + i + "]' should 
be a class or a SqlTypeName");
 
-            RelDataType type = fields[i + 1] instanceof Class ? 
TYPE_FACTORY.createJavaType((Class<?>)fields[i + 1]) :
-                TYPE_FACTORY.createSqlType((SqlTypeName)fields[i + 1]);
+            RelDataType type = fields[i + 1] instanceof Class
+                ? TYPE_FACTORY.createJavaType((Class<?>)fields[i + 1])
+                : 
TYPE_FACTORY.createTypeWithNullability(TYPE_FACTORY.createSqlType((SqlTypeName)fields[i
 + 1]), true);
 
             b.add((String)fields[i], type);
         }
@@ -749,165 +728,6 @@ public abstract class AbstractPlannerTest extends 
GridCommonAbstractTest {
             .build();
     }
 
-    /** */
-    static class TestTableDescriptor implements CacheTableDescriptor {
-        /** */
-        private final Supplier<IgniteDistribution> distributionSupp;
-
-        /** */
-        private final RelDataType rowType;
-
-        /** */
-        private final GridCacheContextInfo<?, ?> cacheInfo;
-
-        /** */
-        public TestTableDescriptor(Supplier<IgniteDistribution> distribution, 
RelDataType rowType) {
-            this.distributionSupp = distribution;
-            this.rowType = rowType;
-            cacheInfo = Mockito.mock(GridCacheContextInfo.class);
-
-            CacheConfiguration cfg = Mockito.mock(CacheConfiguration.class);
-            Mockito.when(cfg.isEagerTtl()).thenReturn(true);
-
-            Mockito.when(cacheInfo.cacheId()).thenReturn(CU.cacheId("TEST"));
-            Mockito.when(cacheInfo.config()).thenReturn(cfg);
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridCacheContextInfo cacheInfo() {
-            return cacheInfo;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridCacheContext cacheContext() {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteDistribution distribution() {
-            return distributionSupp.get();
-        }
-
-        /** {@inheritDoc} */
-        @Override public ColocationGroup colocationGroup(MappingQueryContext 
ctx) {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public RelDataType rowType(IgniteTypeFactory factory, 
ImmutableBitSet usedColumns) {
-            return rowType;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isUpdateAllowed(RelOptTable tbl, int colIdx) {
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean match(CacheDataRow row) {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public <Row> Row toRow(ExecutionContext<Row> ectx, 
CacheDataRow row, RowHandler.RowFactory<Row> factory,
-            @Nullable ImmutableBitSet requiredColumns) throws 
IgniteCheckedException {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public <Row> ModifyTuple toTuple(ExecutionContext<Row> ectx, 
Row row, TableModify.Operation op,
-            @Nullable Object arg) throws IgniteCheckedException {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public ColumnDescriptor columnDescriptor(String fieldName) {
-            RelDataTypeField field = rowType.getField(fieldName, false, false);
-            return new TestColumnDescriptor(field.getIndex(), fieldName);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<ColumnDescriptor> columnDescriptors() {
-            return Commons.transform(rowType.getFieldList(), f -> new 
TestColumnDescriptor(f.getIndex(), f.getName()));
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridQueryTypeDescriptor typeDescription() {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isGeneratedAlways(RelOptTable table, int 
iColumn) {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public ColumnStrategy generationStrategy(RelOptTable table, 
int iColumn) {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public RexNode newColumnDefaultValue(RelOptTable table, int 
iColumn, InitializerContext context) {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public BiFunction<InitializerContext, RelNode, RelNode> 
postExpressionConversionHook() {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public RexNode newAttributeInitializer(RelDataType type, 
SqlFunction constructor, int iAttribute,
-            List<RexNode> constructorArgs, InitializerContext context) {
-            throw new AssertionError();
-        }
-    }
-
-    /** */
-    static class TestColumnDescriptor implements ColumnDescriptor {
-        /** */
-        private final int idx;
-
-        /** */
-        private final String name;
-
-        /** */
-        public TestColumnDescriptor(int idx, String name) {
-            this.idx = idx;
-            this.name = name;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean hasDefaultValue() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String name() {
-            return name;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int fieldIndex() {
-            return idx;
-        }
-
-        /** {@inheritDoc} */
-        @Override public RelDataType logicalType(IgniteTypeFactory f) {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public Class<?> storageType() {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public Object defaultValue() {
-            throw new AssertionError();
-        }
-    }
-
     /** */
     static class TestMessageServiceImpl extends MessageServiceImpl {
         /** */
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java
index c71d56a7357..1694b8d7773 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java
@@ -20,14 +20,22 @@ package 
org.apache.ignite.internal.processors.query.calcite.planner;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Spool;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
+import 
org.apache.ignite.internal.processors.query.calcite.rule.TableModifySingleNodeConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Test;
 
 import static org.apache.calcite.sql.type.SqlTypeName.INTEGER;
+import static org.apache.calcite.sql.type.SqlTypeName.OTHER;
 import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.instanceOf;
@@ -190,4 +198,211 @@ public class TableDmlPlannerTest extends 
AbstractPlannerTest {
         assertPlan("SELECT CITY.NAME, STREET.NAME FROM STREET JOIN CITY ON 
STREET.CITY_ID = CITY.ID ORDER BY STREET.ID",
             schema, hasColumns("NAME", "NAME1"));
     }
+
+    /** Tests that table modify can be executed on remote nodes. */
+    @Test
+    public void testDistributedTableModify() throws Exception {
+        IgniteSchema schema = createSchema(
+            createTable("TEST_PART", IgniteDistributions.affinity(3, "test", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ).addIndex("VAL_IDX", 4),
+            createTable("TEST_PART2", IgniteDistributions.affinity(2, "test2", 
"hash"),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_RND", IgniteDistributions.random(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            ),
+            createTable("TEST_REPL2", IgniteDistributions.broadcast(),
+                QueryUtils.KEY_FIELD_NAME, OTHER,
+                QueryUtils.VAL_FIELD_NAME, OTHER,
+                "ID", INTEGER,
+                "AFF_ID", INTEGER,
+                "VAL", INTEGER
+            )
+        );
+
+        // Check MERGE statement.
+
+        // Merge doesn't support distributed TableModify.
+        assertPlan("MERGE INTO test_part dst USING test_part src ON dst.id = 
src.id " +
+            "WHEN MATCHED THEN UPDATE SET val = dst.val + 1 " +
+            "WHEN NOT MATCHED THEN INSERT (id, aff_id, val) VALUES (src.id, 
src.aff_id, src.val)", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single())));
+
+        // Check INSERT statements.
+
+        // partitioned <- values (broadcast).
+        assertPlan("INSERT INTO test_part VALUES (?, ?, ?)", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single())));
+
+        // partitioned <- partitioned (same).
+        assertPlan("INSERT INTO test_part SELECT * FROM test_part", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isTableScan("TEST_PART")))))));
+
+        // partitioned <- partitioned (same, affinity key change).
+        assertPlan("INSERT INTO test_part SELECT id, aff_id + 1, val FROM 
test_part", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isInstanceOf(IgniteExchange.class)
+                        .and(input(isTableScan("TEST_PART"))))))));
+
+        // partitioned <- partitioned (another affinity).
+        assertPlan("INSERT INTO test_part SELECT * FROM test_part2", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST_PART2")))));
+
+        // partitioned <- partitioned (another affinity, affinity key change).
+        assertPlan("INSERT INTO test_part SELECT id, aff_id + 1, val FROM 
test_part2", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST_PART2")))));
+
+        // partitioned <- random.
+        assertPlan("INSERT INTO test_part SELECT * FROM test_rnd", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST_RND")))));
+
+        // partitioned <- broadcast.
+        assertPlan("INSERT INTO test_part SELECT * FROM test_repl", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isTableScan("TEST_REPL"))));
+
+        // partitioned <- broadcast (force distributed).
+        assertPlan("INSERT INTO test_part SELECT * FROM test_repl", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTrimExchange.class)
+                    .and(input(isTableScan("TEST_REPL")))))),
+            TableModifySingleNodeConverterRule.class.getSimpleName()
+        );
+
+        // broadcast <- partitioned.
+        assertPlan("INSERT INTO test_repl SELECT * FROM test_part", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST_PART")))));
+
+        // broadcast <- random.
+        assertPlan("INSERT INTO test_repl SELECT * FROM test_rnd", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST_RND")))));
+
+        // broadcast <- broadcast.
+        assertPlan("INSERT INTO test_repl SELECT * FROM test_repl2", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isTableScan("TEST_REPL2"))));
+
+        // broadcast <- broadcast (force distributed).
+        assertPlan("INSERT INTO test_repl SELECT * FROM test_repl2", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTrimExchange.class)
+                    .and(input(isTableScan("TEST_REPL2")))))),
+            TableModifySingleNodeConverterRule.class.getSimpleName()
+        );
+
+        // broadcast <- broadcast (same).
+        assertPlan("INSERT INTO test_repl SELECT * FROM test_repl", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isTableScan("TEST_REPL"))))));
+
+        // broadcast <- broadcast (same, force distributed).
+        GridTestUtils.assertThrows(null, () -> {
+                physicalPlan("INSERT INTO test_repl SELECT * FROM test_repl", 
schema,
+                    TableModifySingleNodeConverterRule.class.getSimpleName());
+            }, IgniteException.class, ""
+        );
+
+        // random <- partitioned.
+        assertPlan("INSERT INTO test_rnd SELECT * FROM test_part", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST_PART")))));
+
+        // random <- broadcast.
+        assertPlan("INSERT INTO test_rnd SELECT * FROM test_repl", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isTableScan("TEST_REPL"))));
+
+        // random <- broadcast (force distributed).
+        assertPlan("INSERT INTO test_rnd SELECT * FROM test_repl", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTrimExchange.class)
+                    .and(input(isTableScan("TEST_REPL")))))),
+            TableModifySingleNodeConverterRule.class.getSimpleName()
+        );
+
+        // Check UPDATE statements.
+
+        // partitioned.
+        assertPlan("UPDATE test_part SET val = val + 1", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST_PART")))));
+
+        // partitioned (change indexed column for index scan).
+        assertPlan("UPDATE test_part SET val = val + 1 WHERE val = 10", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTableSpool.class)
+                    .and(input(isIndexScan("TEST_PART", "VAL_IDX")))))));
+
+        // broadcast.
+        assertPlan("UPDATE test_repl SET val = val + 1", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isTableScan("TEST_REPL"))));
+
+        // broadcast (force distributed).
+        assertPlan("UPDATE test_repl SET val = val + 1", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTrimExchange.class)
+                    .and(input(isTableScan("TEST_REPL")))))),
+            TableModifySingleNodeConverterRule.class.getSimpleName()
+        );
+
+        // random.
+        assertPlan("UPDATE test_rnd SET val = val + 1", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST_RND")))));
+
+        // Check DELETE statements.
+
+        // partitioned.
+        assertPlan("DELETE FROM test_part", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST_PART")))));
+
+        // broadcast.
+        assertPlan("DELETE FROM test_repl WHERE val = 10", schema,
+            
isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.single()))
+                .and(input(isTableScan("TEST_REPL"))));
+
+        // broadcast (force distributed).
+        assertPlan("DELETE FROM test_repl WHERE val = 10", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isInstanceOf(IgniteTrimExchange.class)
+                    .and(input(isTableScan("TEST_REPL")))))),
+            TableModifySingleNodeConverterRule.class.getSimpleName()
+        );
+
+        // random.
+        assertPlan("DELETE FROM test_rnd WHERE val = 10", schema,
+            
hasChildThat(isInstanceOf(IgniteTableModify.class).and(hasDistribution(IgniteDistributions.random()))
+                .and(input(isTableScan("TEST_RND")))));
+    }
 }
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
index 45bcb3a6d76..744c351ddf6 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
@@ -19,18 +19,21 @@ package 
org.apache.ignite.internal.processors.query.calcite.planner;
 
 import java.lang.reflect.Type;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.core.TableModify;
 import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -40,35 +43,48 @@ import org.apache.calcite.rel.type.RelProtoDataType;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Wrapper;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql2rel.NullInitializerExpressionFactory;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.cache.query.index.IndexDefinition;
 import org.apache.ignite.internal.cache.query.index.IndexName;
 import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition;
 import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyType;
 import org.apache.ignite.internal.cache.query.index.sorted.client.ClientIndex;
 import 
org.apache.ignite.internal.cache.query.index.sorted.client.ClientIndexDefinition;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
 import 
org.apache.ignite.internal.processors.query.calcite.schema.CacheIndexImpl;
 import 
org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
+import 
org.apache.ignite.internal.processors.query.calcite.schema.ColumnDescriptor;
 import 
org.apache.ignite.internal.processors.query.calcite.schema.IgniteCacheTable;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
 import 
org.apache.ignite.internal.processors.query.calcite.schema.IgniteStatisticsImpl;
+import org.apache.ignite.internal.processors.query.calcite.schema.ModifyTuple;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.processors.query.stat.ObjectStatisticsImpl;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.jetbrains.annotations.Nullable;
+import org.mockito.Mockito;
 
 import static 
org.apache.ignite.internal.processors.query.calcite.planner.AbstractPlannerTest.DEFAULT_SCHEMA;
 
 /** */
-public class TestTable implements IgniteCacheTable {
+public class TestTable implements IgniteCacheTable, Wrapper {
     /** */
     private final String name;
 
@@ -109,7 +125,7 @@ public class TestTable implements IgniteCacheTable {
         statistics = new IgniteStatisticsImpl(new 
ObjectStatisticsImpl((long)rowCnt, Collections.emptyMap()));
         this.name = name;
 
-        desc = new AbstractPlannerTest.TestTableDescriptor(this::distribution, 
type);
+        desc = new TestTableDescriptor(this::distribution, type);
     }
 
     /**
@@ -314,4 +330,171 @@ public class TestTable implements IgniteCacheTable {
     @Override public void authorize(Operation op) {
         // No-op.
     }
+
+    /** {@inheritDoc} */
+    @Override public <C> C unwrap(Class<C> cls) {
+        if (cls.isInstance(this))
+            return cls.cast(this);
+
+        if (cls.isInstance(desc))
+            return cls.cast(desc);
+
+        return null;
+    }
+
+    /** */
+    static class TestTableDescriptor extends NullInitializerExpressionFactory 
implements CacheTableDescriptor {
+        /** */
+        private final Supplier<IgniteDistribution> distributionSupp;
+
+        /** */
+        private final RelDataType rowType;
+
+        /** */
+        private final GridCacheContextInfo<?, ?> cacheInfo;
+
+        /** */
+        public TestTableDescriptor(Supplier<IgniteDistribution> distribution, 
RelDataType rowType) {
+            distributionSupp = distribution;
+            this.rowType = rowType;
+            cacheInfo = Mockito.mock(GridCacheContextInfo.class);
+
+            CacheConfiguration cfg = Mockito.mock(CacheConfiguration.class);
+            Mockito.when(cfg.isEagerTtl()).thenReturn(true);
+
+            Mockito.when(cacheInfo.cacheId()).thenReturn(CU.cacheId("TEST"));
+            Mockito.when(cacheInfo.config()).thenReturn(cfg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridCacheContextInfo<?, ?> cacheInfo() {
+            return cacheInfo;
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridCacheContext<?, ?> cacheContext() {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteDistribution distribution() {
+            return distributionSupp.get();
+        }
+
+        /** {@inheritDoc} */
+        @Override public ColocationGroup colocationGroup(MappingQueryContext 
ctx) {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType rowType(IgniteTypeFactory factory, 
@Nullable ImmutableBitSet usedColumns) {
+            if (usedColumns == null)
+                return rowType;
+            else {
+                RelDataTypeFactory.Builder b = new 
RelDataTypeFactory.Builder(factory);
+
+                for (int i = usedColumns.nextSetBit(0); i != -1; i = 
usedColumns.nextSetBit(i + 1))
+                    b.add(rowType.getFieldList().get(i));
+
+                return b.build();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isUpdateAllowed(RelOptTable tbl, int colIdx) {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean match(CacheDataRow row) {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public <Row> Row toRow(ExecutionContext<Row> ectx, 
CacheDataRow row, RowHandler.RowFactory<Row> factory,
+            @Nullable ImmutableBitSet requiredColumns) {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public <Row> ModifyTuple toTuple(ExecutionContext<Row> ectx, 
Row row, TableModify.Operation op,
+            @Nullable Object arg) {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public ColumnDescriptor columnDescriptor(String fieldName) {
+            RelDataTypeField field = rowType.getField(fieldName, false, false);
+            return new TestColumnDescriptor(field.getIndex(), fieldName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<ColumnDescriptor> columnDescriptors() {
+            return Commons.transform(rowType.getFieldList(), f -> new 
TestColumnDescriptor(f.getIndex(), f.getName()));
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridQueryTypeDescriptor typeDescription() {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType insertRowType(IgniteTypeFactory factory) {
+            ImmutableBitSet.Builder bitSetBuilder = ImmutableBitSet.builder();
+
+            for (int i = 0; i < rowType.getFieldCount(); i++) {
+                String fldName = rowType.getFieldList().get(i).getName();
+
+                if (!QueryUtils.KEY_FIELD_NAME.equals(fldName) && 
!QueryUtils.VAL_FIELD_NAME.equals(fldName))
+                    bitSetBuilder.set(i);
+            }
+
+            return rowType(factory, bitSetBuilder.build());
+        }
+    }
+
+    /** */
+    static class TestColumnDescriptor implements ColumnDescriptor {
+        /** */
+        private final int idx;
+
+        /** */
+        private final String name;
+
+        /** */
+        public TestColumnDescriptor(int idx, String name) {
+            this.idx = idx;
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasDefaultValue() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String name() {
+            return name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int fieldIndex() {
+            return idx;
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType logicalType(IgniteTypeFactory f) {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Class<?> storageType() {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object defaultValue() {
+            throw new AssertionError();
+        }
+    }
 }

Reply via email to