This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a041b6c644 [IGNITE-18225] Sql. Pushdown MODIFY to data node. (#1798)
a041b6c644 is described below

commit a041b6c644380c130e8f9b19226cfff865dd7df1
Author: Max Zhuravkov <[email protected]>
AuthorDate: Tue Mar 28 15:35:01 2023 +0400

    [IGNITE-18225] Sql. Pushdown MODIFY to data node. (#1798)
---
 .../apache/ignite/jdbc/ItJdbcBatchSelfTest.java    |   6 +-
 .../raftsnapshot/ItTableRaftSnapshotsTest.java     |  15 ++
 .../runner/app/ItIgniteNodeRestartTest.java        |  10 +-
 .../sql/engine/ClusterPerClassIntegrationTest.java |  28 +++-
 .../internal/sql/engine/ItCorrelatesTest.java      |  20 ++-
 .../internal/sql/engine/ItCreateTableDdlTest.java  |  11 +-
 .../ignite/internal/sql/engine/ItDmlTest.java      |  39 +++++
 .../internal/sql/engine/AsyncSqlCursorImpl.java    |  53 ++++++-
 .../sql/engine/exec/ExecutionServiceImpl.java      |   7 +-
 .../sql/engine/exec/exp/IgniteSqlFunctions.java    |   8 +
 .../internal/sql/engine/exec/exp/RexImpTable.java  |  24 ++-
 .../internal/sql/engine/exec/rel/ModifyNode.java   |  26 ++--
 .../sql/engine/message/MessageServiceImpl.java     |   4 +-
 .../engine/metadata/IgniteMdFragmentMapping.java   |  14 +-
 .../prepare/ddl/DdlSqlToCommandConverter.java      |   4 +
 .../rule/TableFunctionScanConverterRule.java       |   4 +-
 .../sql/engine/rule/TableModifyConverterRule.java  |  74 ++++++++-
 .../sql/engine/rule/ValuesConverterRule.java       |   4 +-
 .../sql/engine/schema/IgniteTableImpl.java         |   1 +
 .../sql/engine/schema/TableDescriptorImpl.java     |  20 ++-
 .../sql/engine/sql/fun/IgniteSqlOperatorTable.java |  25 ++++
 .../sql/engine/util/HashFunctionFactoryImpl.java   |  17 ++-
 .../internal/sql/engine/util/IgniteMethod.java     |   6 +-
 .../sql/engine/exec/MockedStructuresTest.java      |   2 +-
 .../sql/engine/planner/DmlPlannerTest.java         | 166 +++++++++++++++++++++
 25 files changed, 522 insertions(+), 66 deletions(-)

diff --git 
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
 
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
index 2dc6a7c588..cc581ac178 100644
--- 
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
+++ 
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
@@ -329,11 +329,7 @@ public class ItJdbcBatchSelfTest extends 
AbstractJdbcSelfTest {
                 assertEquals(i + 1, updCnts[i], "Invalid update count: " + i);
             }
 
-            if (!e.getMessage().contains("PK unique constraint is violated")) {
-                log.error("Invalid exception: ", e);
-
-                fail();
-            }
+            assertThat(e.toString(), e.getMessage(), containsString("PK unique 
constraint is violated"));
 
             assertEquals(SqlStateCode.INTERNAL_ERROR, e.getSQLState(), 
"Invalid SQL state.");
             assertEquals(IgniteQueryErrorCode.UNKNOWN, e.getErrorCode(), 
"Invalid error code.");
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index 64f7b35fc1..69de6b5912 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.raftsnapshot;
 
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static 
org.apache.ignite.internal.sql.engine.ClusterPerClassIntegrationTest.waitForIndex;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
@@ -237,6 +238,8 @@ class ItTableRaftSnapshotsTest extends 
IgniteIntegrationTest {
      * to knock-out the follower to make it require a snapshot installation).
      */
     @Test
+    // Hangs at 
org.apache.ignite.internal.sql.engine.message.MessageServiceImpl.send(MessageServiceImpl.java:98)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19121";)
     void leaderFeedsFollowerWithSnapshotWithKnockoutPartitionNetwork() throws 
Exception {
         
testLeaderFeedsFollowerWithSnapshot(Cluster.NodeKnockout.PARTITION_NETWORK, 
DEFAULT_STORAGE_ENGINE);
     }
@@ -327,6 +330,8 @@ class ItTableRaftSnapshotsTest extends 
IgniteIntegrationTest {
                 + (DEFAULT_STORAGE_ENGINE.equals(storageEngine) ? "" : " 
engine " + storageEngine)
                 + " with partitions=1, replicas=3";
 
+        waitForIndex("test_PK");
+
         cluster.doInSession(0, session -> {
             executeUpdate(sql, session);
         });
@@ -563,6 +568,8 @@ class ItTableRaftSnapshotsTest extends 
IgniteIntegrationTest {
             PersistentPageMemoryStorageEngine.ENGINE_NAME,
             VolatilePageMemoryStorageEngine.ENGINE_NAME
     })
+    // Hangs at 
org.apache.ignite.internal.sql.engine.message.MessageServiceImpl.send(MessageServiceImpl.java:98)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19121";)
     void leaderFeedsFollowerWithSnapshot(String storageEngine) throws 
Exception {
         testLeaderFeedsFollowerWithSnapshot(DEFAULT_KNOCKOUT, storageEngine);
     }
@@ -663,6 +670,8 @@ class ItTableRaftSnapshotsTest extends 
IgniteIntegrationTest {
      * Tests that, if a snapshot installation fails for some reason, a 
subsequent retry due to a timeout happens successfully.
      */
     @Test
+    // Hangs at 
org.apache.ignite.internal.sql.engine.message.MessageServiceImpl.send(MessageServiceImpl.java:98)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19121";)
     void snapshotInstallationRepeatsOnTimeout() throws Exception {
         prepareClusterForInstallingSnapshotToNode2(DEFAULT_KNOCKOUT, 
DEFAULT_STORAGE_ENGINE, theCluster -> {
             theCluster.node(0).dropMessages(dropFirstSnapshotMetaResponse());
@@ -716,6 +725,8 @@ class ItTableRaftSnapshotsTest extends 
IgniteIntegrationTest {
      * stuck because one 'download' task will remain unfinished forever.
      */
     @Test
+    // Hangs at 
org.apache.ignite.internal.sql.engine.message.MessageServiceImpl.send(MessageServiceImpl.java:98)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19121";)
     void 
snapshotInstallTimeoutDoesNotBreakSubsequentInstallsWhenSecondAttemptIsIdenticalToFirst()
 throws Exception {
         AtomicBoolean snapshotInstallFailedDueToIdenticalRetry = new 
AtomicBoolean(false);
 
@@ -754,6 +765,8 @@ class ItTableRaftSnapshotsTest extends 
IgniteIntegrationTest {
     }
 
     @Test
+    // Hangs at 
org.apache.ignite.internal.sql.engine.message.MessageServiceImpl.send(MessageServiceImpl.java:98)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19121";)
     void testChangeLeaderOnInstallSnapshotInMiddle() throws Exception {
         CompletableFuture<Void> sentSnapshotMetaResponseFormNode1Future = new 
CompletableFuture<>();
 
@@ -828,6 +841,8 @@ class ItTableRaftSnapshotsTest extends 
IgniteIntegrationTest {
      * </ol>
      */
     @Test
+    // Hangs at 
org.apache.ignite.internal.sql.engine.message.MessageServiceImpl.send(MessageServiceImpl.java:98)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19121";)
     void testChangeLeaderDuringSnapshotInstallationToLeaderWithEnoughLog() 
throws Exception {
         CompletableFuture<Void> sentSnapshotMetaResponseFormNode0Future = new 
CompletableFuture<>();
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 32345e60a7..8c20e3e15b 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -690,6 +690,7 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
      * </ol>
      */
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19091";)
     public void testQueryCorrectnessAfterNodeRestart() throws 
InterruptedException {
         IgniteImpl ignite1 = startNode(0);
 
@@ -878,6 +879,8 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
      * checks that the table created before node stop, is not available when 
majority if lost.
      */
     @Test
+    // No sql engine
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19092";)
     public void testOneNodeRestartWithGap() throws InterruptedException {
         IgniteImpl ignite = startNode(0);
 
@@ -909,6 +912,8 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
      * Checks that the table created in cluster of 2 nodes, is recovered on a 
node after restart of this node.
      */
     @Test
+    // No SQL engine
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19092";)
     public void testRecoveryOnOneNode() throws InterruptedException {
         IgniteImpl ignite = startNode(0);
 
@@ -931,7 +936,8 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
      * Checks that a cluster is able to restart when some changes were made in 
configuration.
      */
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19079";)
+    // No sql engine
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19092";)
     public void testRestartDiffConfig() throws InterruptedException {
         List<IgniteImpl> ignites = startNodes(2);
 
@@ -959,6 +965,8 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
      * The test for node restart when there is a gap between the node local 
configuration and distributed configuration.
      */
     @Test
+    // No SQL engine
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19092";)
     @WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, 
value = "0")
     public void testCfgGapWithoutData() throws InterruptedException {
         List<IgniteImpl> nodes = startNodes(3);
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
index ba44e1d023..3f187ed126 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
@@ -228,6 +228,20 @@ public abstract class ClusterPerClassIntegrationTest 
extends IgniteIntegrationTe
         tearDownBase(testInfo);
     }
 
+    /**
+     * Returns table index configuration of the given index at the given node, 
or {@code null} if no such index exists.
+     *
+     * @param node  A node.
+     * @param indexName  An index.
+     * @return  An index configuration.
+     */
+    public static @Nullable TableIndexConfiguration 
getIndexConfiguration(Ignite node, String indexName) {
+        return ((IgniteImpl) node).clusterConfiguration()
+                .getConfiguration(TablesConfiguration.KEY)
+                .indexes()
+                .get(indexName.toUpperCase());
+    }
+
     /**
      * Executes the query and validates any asserts passed to the builder.
      *
@@ -438,7 +452,12 @@ public abstract class ClusterPerClassIntegrationTest 
extends IgniteIntegrationTe
         );
     }
 
-    protected static void waitForIndex(String indexName) throws 
InterruptedException {
+    /**
+     * Waits for all nodes in the cluster to have the given index in the 
configuration.
+     *
+     * @param indexName  An index.
+     */
+    public static void waitForIndex(String indexName) throws 
InterruptedException {
         // FIXME: Wait for the index to be created on all nodes,
         //  this is a workaround for 
https://issues.apache.org/jira/browse/IGNITE-18733 to avoid missed updates to 
the index.
         assertTrue(waitForCondition(
@@ -446,11 +465,4 @@ public abstract class ClusterPerClassIntegrationTest 
extends IgniteIntegrationTe
                 10_000)
         );
     }
-
-    private static @Nullable TableIndexConfiguration 
getIndexConfiguration(Ignite node, String indexName) {
-        return ((IgniteImpl) node).clusterConfiguration()
-                .getConfiguration(TablesConfiguration.KEY)
-                .indexes()
-                .get(indexName.toUpperCase());
-    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
index aabf891d91..c9b9ceca1f 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine;
 import static 
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsSubPlan;
 
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 
@@ -58,10 +59,10 @@ public class ItCorrelatesTest extends 
ClusterPerClassIntegrationTest {
     }
 
     /**
-     * Tests resolving of collisions in correlates.
+     * Tests resolving of collisions in correlates with correlate variables in 
the left hand.
      */
     @Test
-    public void testCorrelatesCollision() throws InterruptedException {
+    public void testCorrelatesCollisionLeft() throws InterruptedException {
         sql("CREATE TABLE test1 (a INTEGER PRIMARY KEY, b INTEGER)");
         sql("INSERT INTO test1 VALUES (11, 1), (12, 2), (13, 3)");
         sql("CREATE TABLE test2 (a INTEGER PRIMARY KEY, c INTEGER)");
@@ -76,6 +77,21 @@ public class ItCorrelatesTest extends 
ClusterPerClassIntegrationTest {
                 + "AND NOT EXISTS(SELECT * FROM test2 WHERE test1.a=test2.a 
AND test1.b<test2.c)")
                 .returns(12, 2)
                 .check();
+    }
+
+    /**
+     * Tests resolving of collisions in correlates with correlate variables in 
both, left and right hands.
+     */
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19018";)
+    public void testCorrelatesCollisionRight() throws InterruptedException {
+        sql("CREATE TABLE test1 (a INTEGER PRIMARY KEY, b INTEGER)");
+        sql("INSERT INTO test1 VALUES (11, 1), (12, 2), (13, 3)");
+        sql("CREATE TABLE test2 (a INTEGER PRIMARY KEY, c INTEGER)");
+        sql("INSERT INTO test2 VALUES (11, 1), (12, 1), (13, 4)");
+
+        waitForIndex("TEST1_PK");
+        waitForIndex("TEST2_PK");
 
         // Collision by correlate variables in both, left and right hands.
         assertQuery("SELECT * FROM test1 WHERE "
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
index 8529b37e01..e7a54f0552 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
@@ -26,12 +26,13 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.SqlException;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 
 /**
- * Integration test for set op (EXCEPT, INTERSECT).
+ * Integration test for CREATE TABLE DDL command.
  */
 public class ItCreateTableDdlTest extends ClusterPerClassIntegrationTest {
     /**
@@ -146,4 +147,12 @@ public class ItCreateTableDdlTest extends 
ClusterPerClassIntegrationTest {
         assertEquals(1, colocationColumns.length);
         assertEquals("Id0", colocationColumns[0].name());
     }
+
+    @Test
+    public void doNotAllowFunctionsInNonPkColumns() {
+        SqlException t = assertThrows(SqlException.class,
+                () -> sql("create table t (id varchar primary key, val varchar 
default gen_random_uuid)"));
+
+        assertThat(t.getMessage(), containsString("Functional defaults are not 
supported for non-primary key columns"));
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
index 53eb5cf1a6..0b776d5dc6 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
@@ -555,4 +555,43 @@ public class ItDmlTest extends 
ClusterPerClassIntegrationTest {
             this.expectedVal = expectedVal;
         }
     }
+
+    @Test
+    public void testInsertMultipleDefaults() {
+        sql("CREATE TABLE integers(i INTEGER PRIMARY KEY, j INTEGER DEFAULT 
2)");
+
+        sql("INSERT INTO integers VALUES (1, DEFAULT)");
+
+        assertQuery("SELECT i, j FROM integers").returns(1, 2).check();
+
+        sql("INSERT INTO integers VALUES (2, 3), (3, DEFAULT), (4, 4), (5, 
DEFAULT)");
+
+        assertQuery("SELECT i, j FROM integers ORDER BY i")
+                .returns(1, 2)
+                .returns(2, 3)
+                .returns(3, 2)
+                .returns(4, 4)
+                .returns(5, 2)
+                .check();
+    }
+
+    @Test
+    @WithSystemProperty(key = "IMPLICIT_PK_ENABLED", value = "true")
+    public void testInsertMultipleDefaultsWithImplicitPk() {
+        sql("CREATE TABLE integers(i INTEGER, j INTEGER DEFAULT 2)");
+
+        sql("INSERT INTO integers VALUES (1, DEFAULT)");
+
+        assertQuery("SELECT i, j FROM integers").returns(1, 2).check();
+
+        sql("INSERT INTO integers VALUES (2, 3), (3, DEFAULT), (4, 4), (5, 
DEFAULT)");
+
+        assertQuery("SELECT i, j FROM integers ORDER BY i")
+                .returns(1, 2)
+                .returns(2, 3)
+                .returns(3, 2)
+                .returns(4, 4)
+                .returns(5, 2)
+                .check();
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
index 4ce6b57681..65c6995b1c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
@@ -17,10 +17,15 @@
 
 package org.apache.ignite.internal.sql.engine;
 
+import java.lang.reflect.Constructor;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.sql.engine.metadata.RemoteException;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.sql.ResultSetMetadata;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -74,7 +79,7 @@ public class AsyncSqlCursorImpl<T> implements 
AsyncSqlCursor<T> {
                     implicitTx.rollback();
                 }
 
-                throw IgniteException.wrap(t);
+                throw wrapIfNecessary(t);
             }
 
             if (implicitTx != null && !batch.hasMore()) {
@@ -91,4 +96,50 @@ public class AsyncSqlCursorImpl<T> implements 
AsyncSqlCursor<T> {
     public CompletableFuture<Void> closeAsync() {
         return dataCursor.closeAsync();
     }
+
+    private static RuntimeException wrapIfNecessary(@NotNull Throwable t) {
+        Throwable cause = unwrapRemoteCause(t);
+
+        // If the cause is IgniteException then create
+        // an exception of the same type with the same properties
+        // and set its cause to the original exception.
+        if (cause instanceof IgniteException) {
+            return preserveExceptionType((IgniteException) cause, t);
+        } else {
+            // If the cause is not a subclass of IgniteException, wrap it in 
IgniteException.
+            return IgniteException.wrap(t);
+        }
+    }
+
+    private static Throwable unwrapRemoteCause(@NotNull Throwable t) {
+        Throwable err = t;
+
+        while (err != null) {
+            err = ExceptionUtils.unwrapCause(err);
+            // Unwrap RemoteExceptions because they are just wrappers.
+            if (err instanceof RemoteException) {
+                err = err.getCause();
+                continue;
+            }
+
+            return err;
+        }
+
+        return t;
+    }
+
+    private static IgniteException preserveExceptionType(IgniteException e, 
Throwable t) {
+        // Return IgniteException as is
+        if (e.getClass() == IgniteException.class) {
+            return e;
+        }
+
+        try {
+            Constructor<?> ctor = 
e.getClass().getDeclaredConstructor(UUID.class, int.class, String.class, 
Throwable.class);
+
+            return (IgniteException) ctor.newInstance(e.traceId(), e.code(), 
e.getMessage(), t);
+        } catch (Exception ex) {
+            throw new RuntimeException("IgniteException-derived class does not 
have required constructor: " + e.getClass().getName(), ex);
+        }
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 555da1183a..edf8afb225 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -375,11 +376,13 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
     /** {@inheritDoc} */
     @Override
     public void stop() throws Exception {
-        CompletableFuture.allOf(queryManagerMap.values().stream()
+        CompletableFuture<Void> f = 
CompletableFuture.allOf(queryManagerMap.values().stream()
                 .filter(mgr -> mgr.rootFragmentId != null)
                 .map(mgr -> mgr.close(true))
                 .toArray(CompletableFuture[]::new)
-        ).join();
+        );
+        // TODO Workaround for 
https://issues.apache.org/jira/browse/IGNITE-19088
+        f.get(1, TimeUnit.MINUTES);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/IgniteSqlFunctions.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/IgniteSqlFunctions.java
index a6abfaf79d..965b67942c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/IgniteSqlFunctions.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/IgniteSqlFunctions.java
@@ -21,6 +21,7 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.math.RoundingMode;
 import java.time.LocalTime;
+import java.util.UUID;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.avatica.util.ByteString;
 import org.apache.calcite.config.CalciteConnectionConfig;
@@ -28,6 +29,7 @@ import org.apache.calcite.linq4j.AbstractEnumerable;
 import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.linq4j.Enumerator;
 import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.linq4j.function.NonDeterministic;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.schema.ScannableTable;
@@ -174,6 +176,12 @@ public class IgniteSqlFunctions {
         return leastOrGreatest(false, arg0, arg1);
     }
 
+    /** Generates a random UUID and converts it to string. **/
+    @NonDeterministic
+    public static String genRandomUuid() {
+        return UUID.randomUUID().toString();
+    }
+
     private static @Nullable Object leastOrGreatest(boolean least, Object 
arg0, Object arg1) {
         if (arg0 == null || arg1 == null) {
             return null;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java
index f845837246..1016ea64e5 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java
@@ -229,6 +229,7 @@ import 
org.apache.calcite.sql.validate.SqlUserDefinedTableFunction;
 import org.apache.calcite.sql.validate.SqlUserDefinedTableMacro;
 import org.apache.calcite.util.BuiltInMethod;
 import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.sql.engine.sql.fun.IgniteSqlOperatorTable;
 import org.apache.ignite.internal.sql.engine.util.IgniteMethod;
 
 /**
@@ -250,16 +251,11 @@ public class RexImpTable {
     private final Map<SqlOperator, RexCallImplementor> map = new HashMap<>();
 
     /** Placeholder for DEFAULT operator value. */
-    public static final Object DEFAULT_VALUE_PLACEHOLDER = new 
DefaultValuePlaceholder();
+    // TODO Remove this constant when 
https://issues.apache.org/jira/browse/IGNITE-19096 is complete
+    public static final Object DEFAULT_VALUE_PLACEHOLDER = 
Placeholder.DEFAULT_VALUE;
 
     /** Placeholder for values, which expressions are not specified. */
-    public static final Object UNSPECIFIED_VALUE_PLACEHOLDER = new Object() {
-        /** {@inheritDoc} */
-        @Override
-        public String toString() {
-            return "<unspecified_value>";
-        }
-    };
+    public static final Object UNSPECIFIED_VALUE_PLACEHOLDER = 
Placeholder.UNSPECIFIED_VALUE;
 
     /**
      * Constructor.
@@ -513,6 +509,7 @@ public class RexImpTable {
         defineMethod(IS_NOT_DISTINCT_FROM, 
IgniteMethod.IS_NOT_DISTINCT_FROM.method(), NullPolicy.NONE);
 
         defineMethod(RAND_UUID, IgniteMethod.RAND_UUID.method(), 
NullPolicy.NONE);
+        defineMethod(IgniteSqlOperatorTable.GEN_RANDOM_UUID, 
IgniteMethod.GEN_RANDOM_UUID.method(), NullPolicy.NONE);
     }
 
     private void defineMethod(SqlOperator operator, String functionName, 
NullPolicy nullPolicy) {
@@ -2491,6 +2488,7 @@ public class RexImpTable {
     }
 
     /** Implementor for the {@code DEFAULT} function. */
+    // TODO Remove this class when 
https://issues.apache.org/jira/browse/IGNITE-19096 is complete
     private static class DefaultImplementor extends AbstractRexCallImplementor 
{
         DefaultImplementor() {
             super(NullPolicy.NONE, false);
@@ -2579,10 +2577,10 @@ public class RexImpTable {
         };
     }
 
-    private static class DefaultValuePlaceholder {
-        @Override
-        public String toString() {
-            return "DEFAULT";
-        }
+    // We use enums for placeholders because enum 
serialization/deserialization guarantees to preserve object's identity.
+    private enum Placeholder {
+        // TODO Remove this enum element when 
https://issues.apache.org/jira/browse/IGNITE-19096 is complete
+        DEFAULT_VALUE,
+        UNSPECIFIED_VALUE
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
index eb0ba711a7..0a7d9e54e2 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
@@ -412,6 +412,7 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<R
         return mapping;
     }
 
+    // TODO Remove this method when 
https://issues.apache.org/jira/browse/IGNITE-19096 is complete
     private static <RowT> void injectDefaults(
             TableDescriptor tableDescriptor,
             RowHandler<RowT> handler,
@@ -422,16 +423,7 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<R
                 ColumnDescriptor columnDescriptor = 
tableDescriptor.columnDescriptor(i);
 
                 Object oldValue = handler.get(columnDescriptor.logicalIndex(), 
row);
-
-                Object newValue;
-                if (columnDescriptor.key()
-                        && Commons.implicitPkEnabled()
-                        && 
Commons.IMPLICIT_PK_COL_NAME.equals(columnDescriptor.name())
-                ) {
-                    newValue = columnDescriptor.defaultValue();
-                } else {
-                    newValue = replaceDefaultValuePlaceholder(oldValue, 
columnDescriptor);
-                }
+                Object newValue = replaceDefaultValuePlaceholder(oldValue, 
columnDescriptor);
 
                 if (oldValue != newValue) {
                     handler.set(columnDescriptor.logicalIndex(), row, 
newValue);
@@ -440,8 +432,20 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<R
         }
     }
 
+    // TODO Remove this method when 
https://issues.apache.org/jira/browse/IGNITE-19096 is complete
     private static @Nullable Object replaceDefaultValuePlaceholder(@Nullable 
Object val, ColumnDescriptor desc) {
-        return val == RexImpTable.DEFAULT_VALUE_PLACEHOLDER ? 
desc.defaultValue() : val;
+        Object newValue;
+
+        if (desc.key() && Commons.implicitPkEnabled() && 
Commons.IMPLICIT_PK_COL_NAME.equals(desc.name())) {
+            assert val != RexImpTable.DEFAULT_VALUE_PLACEHOLDER  : "Implicit 
primary key value should have already been generated";
+            newValue = val;
+        } else {
+            newValue = val == RexImpTable.DEFAULT_VALUE_PLACEHOLDER ? 
desc.defaultValue() : val;
+        }
+
+        assert newValue != RexImpTable.DEFAULT_VALUE_PLACEHOLDER : 
"Placeholder should have been replaced. Column: " + desc.name();
+
+        return newValue;
     }
 
     private enum State {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
index a0010e81b9..1fd3d591ff 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
@@ -24,6 +24,7 @@ import static 
org.apache.ignite.lang.ErrorGroups.Sql.NODE_LEFT_ERR;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
@@ -95,7 +96,8 @@ public class MessageServiceImpl implements MessageService {
                 }
 
                 try {
-                    messagingSrvc.send(node, msg).join();
+                    // TODO Workaround for 
https://issues.apache.org/jira/browse/IGNITE-19088
+                    messagingSrvc.send(node, msg).get(1, TimeUnit.SECONDS);
                 } catch (Exception ex) {
                     if (ex instanceof IgniteInternalCheckedException) {
                         throw (IgniteInternalCheckedException) ex;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java
index be167dc151..bb27607b4e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java
@@ -225,17 +225,25 @@ public class IgniteMdFragmentMapping implements 
MetadataHandler<FragmentMappingM
      * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, 
RelMetadataQuery, MappingQueryContext)}.
      */
     public FragmentMapping fragmentMapping(IgniteTableModify rel, 
RelMetadataQuery mq, MappingQueryContext ctx) {
-        FragmentMapping mapping = 
fragmentMappingForMetadataQuery(rel.getInput(), mq, ctx);
+        RelNode input = rel.getInput();
+        FragmentMapping mapping = fragmentMappingForMetadataQuery(input, mq, 
ctx);
 
         // In case of the statement like UPDATE t SET a = a + 1
         // this will be the second call to the collation group, hence the 
result may differ.
         // But such query should be rejected during execution, since we will 
try to do RW read
         // from replica that is not primary anymore.
-        List<NodeWithTerm> assignments = 
rel.getTable().unwrap(IgniteTable.class)
-                .colocationGroup(ctx).assignments().stream()
+        ColocationGroup tableColocationGroup = 
rel.getTable().unwrap(IgniteTable.class).colocationGroup(ctx);
+        List<NodeWithTerm> assignments = 
tableColocationGroup.assignments().stream()
                 .map(CollectionUtils::first)
                 .collect(Collectors.toList());
 
+        FragmentMapping tableMapping = FragmentMapping.create(-1, 
tableColocationGroup);
+        try {
+            mapping = tableMapping.colocate(mapping);
+        } catch (ColocationMappingException e) {
+            throw new NodeMappingException("Failed to calculate physical 
distribution", input, e);
+        }
+
         mapping = mapping.updatingTableAssignments(assignments);
 
         return mapping;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ddl/DdlSqlToCommandConverter.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ddl/DdlSqlToCommandConverter.java
index 8f72c55136..b68df714b0 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ddl/DdlSqlToCommandConverter.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ddl/DdlSqlToCommandConverter.java
@@ -345,6 +345,10 @@ public class DdlSqlToCommandConverter {
             dedupSetPk.remove(name);
 
             DefaultValueDefinition dflt = convertDefault(col.expression, 
relType);
+            if (dflt.type() == DefaultValueDefinition.Type.FUNCTION_CALL && 
!pkCols.contains(name)) {
+                throw new SqlException(QUERY_INVALID_ERR,
+                        "Functional defaults are not supported for non-primary 
key columns [col=" + name + "]");
+            }
 
             cols.add(new ColumnDefinition(name, relType, dflt));
         }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableFunctionScanConverterRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableFunctionScanConverterRule.java
index afa7c85cd1..0b0493b61d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableFunctionScanConverterRule.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableFunctionScanConverterRule.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.sql.engine.rule;
 
+import static 
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.single;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
 import org.apache.calcite.plan.RelOptPlanner;
@@ -27,7 +28,6 @@ import 
org.apache.calcite.rel.logical.LogicalTableFunctionScan;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTableFunctionScan;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 
 /**
  * Rule to convert a {@link LogicalTableFunctionScan} to an {@link 
IgniteTableFunctionScan}.
@@ -47,7 +47,7 @@ public class TableFunctionScanConverterRule extends 
AbstractIgniteConverterRule<
 
         RelTraitSet traitSet = rel.getTraitSet()
                 .replace(IgniteConvention.INSTANCE)
-                .replace(IgniteDistributions.broadcast());
+                .replace(single());
 
         return new IgniteTableFunctionScan(rel.getCluster(), traitSet, 
rel.getCall(), rel.getRowType());
     }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyConverterRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyConverterRule.java
index 443a63454e..05b32c8013 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyConverterRule.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyConverterRule.java
@@ -17,18 +17,37 @@
 
 package org.apache.ignite.internal.sql.engine.rule;
 
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.PhysicalNode;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.logical.LogicalTableModify;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
+import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify;
+import 
org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedHashAggregate;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.type.IgniteTypeSystem;
+import org.apache.ignite.internal.sql.engine.util.Commons;
 
 /**
  * TableModifyConverterRule.
@@ -48,12 +67,63 @@ public class TableModifyConverterRule extends 
AbstractIgniteConverterRule<Logica
     @Override
     protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, 
LogicalTableModify rel) {
         RelOptCluster cluster = rel.getCluster();
+        RelOptTable relTable = rel.getTable();
+        IgniteTable igniteTable = relTable.unwrap(IgniteTable.class);
+        assert igniteTable != null;
+
         RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE)
-                .replace(IgniteDistributions.single())
+                .replace(igniteTable.distribution())
                 .replace(RelCollations.EMPTY);
+
         RelNode input = convert(rel.getInput(), traits);
 
-        return new IgniteTableModify(cluster, traits, rel.getTable(), input,
+        IgniteTableModify tableModify = new IgniteTableModify(cluster, traits, 
relTable, input,
                 rel.getOperation(), rel.getUpdateColumnList(), 
rel.getSourceExpressionList(), rel.isFlattened());
+
+        // Return IgniteTableModify w/o aggregation for tables with single 
distribution.
+        if (igniteTable.distribution().equals(IgniteDistributions.single())) {
+            return tableModify;
+        } else {
+            return createAggregate(tableModify, cluster);
+        }
+    }
+
+    private static PhysicalNode createAggregate(IgniteTableModify tableModify, 
RelOptCluster cluster) {
+        // We must aggregate the total number of modified rows returned by of 
instances of DML operations.
+
+        RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+        RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+
+        // 1) add a SUM0 aggregate on top of TableModify to compute the total 
number of modified rows.
+        RelDataType rowType = tableModify.getRowType();
+        RelDataTypeField modifiedRowsField = rowType.getFieldList().get(0);
+        RelDataTypeFactory typeFactory = cluster.getTypeFactory();
+        RelDataType rowCountType = modifiedRowsField.getType();
+        RelDataType sumType = 
IgniteTypeSystem.INSTANCE.deriveSumType(typeFactory, rowCountType);
+
+        AggregateCall sum = AggregateCall.create(SqlStdOperatorTable.SUM0, 
false, false,
+                false, ImmutableList.of(0), -1, null, RelCollations.EMPTY, 0, 
tableModify,
+                sumType, null);
+
+        IgniteColocatedHashAggregate sumAgg = new IgniteColocatedHashAggregate(
+                cluster,
+                outTrait.replace(IgniteDistributions.single()),
+                convert(tableModify, 
inTrait.replace(IgniteDistributions.single())),
+                ImmutableBitSet.of(),
+                List.of(ImmutableBitSet.of()),
+                List.of(sum)
+        );
+
+        // 2) add a projection on top of the SUM0 aggregate that converts the 
result of SUM aggregate back to BIGINT
+        // (because the modified number of rows returned by DML operations is 
BIGINT)
+        RelDataType typeOfSum = modifiedRowsField.getType();
+        RelDataType convertedRowType = 
typeFactory.createStructType(List.of(Map.entry(modifiedRowsField.getName(), 
typeOfSum)));
+
+        RexBuilder rexBuilder = Commons.rexBuilder();
+        RexInputRef sumRef = rexBuilder.makeInputRef(sumAgg, 0);
+        RexNode rexNode = rexBuilder.makeCast(typeOfSum, sumRef);
+        List<RexNode> projections = Collections.singletonList(rexNode);
+
+        return new IgniteProject(cluster, 
outTrait.replace(IgniteDistributions.single()), sumAgg, projections, 
convertedRowType);
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/ValuesConverterRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/ValuesConverterRule.java
index 6959ec13b9..bcade330b9 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/ValuesConverterRule.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/ValuesConverterRule.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.sql.engine.rule;
 
-import static 
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.broadcast;
+import static 
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.single;
 
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptPlanner;
@@ -48,7 +48,7 @@ public class ValuesConverterRule extends 
AbstractIgniteConverterRule<LogicalValu
     protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, 
LogicalValues rel) {
         RelOptCluster cluster = rel.getCluster();
         RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE)
-                .replace(broadcast());
+                .replace(single());
 
         return new IgniteValues(cluster, rel.getRowType(), rel.getTuples(), 
traits);
     }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
index f77e9d4e70..6ce54aface 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
@@ -492,6 +492,7 @@ public class IgniteTableImpl extends AbstractTable 
implements IgniteTable, Updat
 
             Object value = hnd.get(colDesc.logicalIndex(), row);
 
+            // TODO Remove this check when 
https://issues.apache.org/jira/browse/IGNITE-19096 is complete
             assert value != RexImpTable.DEFAULT_VALUE_PLACEHOLDER;
 
             if (value == null) {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/TableDescriptorImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/TableDescriptorImpl.java
index e2148d1b96..37da6ea6c8 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/TableDescriptorImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/TableDescriptorImpl.java
@@ -22,17 +22,16 @@ import static 
org.apache.ignite.internal.sql.engine.util.TypeUtils.native2relati
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.stream.Collectors;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.ColumnStrategy;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql2rel.InitializerContext;
 import org.apache.calcite.sql2rel.NullInitializerExpressionFactory;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.sql.engine.sql.fun.IgniteSqlOperatorTable;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.Commons;
@@ -111,7 +110,9 @@ public class TableDescriptorImpl extends 
NullInitializerExpressionFactory implem
                 desc.physicalIndex(),
                 desc.physicalType(),
                 DefaultValueStrategy.DEFAULT_COMPUTED,
-                () -> UUID.randomUUID().toString()
+                () -> {
+                    throw new AssertionError("Implicit primary key is 
generated by a function");
+                }
         );
     }
 
@@ -162,14 +163,17 @@ public class TableDescriptorImpl extends 
NullInitializerExpressionFactory implem
                 return rexBuilder.makeNullLiteral(fieldType);
             }
             case DEFAULT_CONSTANT: {
-                var typeFactory = (IgniteTypeFactory) 
rexBuilder.getTypeFactory();
+                Class<?> storageType = 
Commons.nativeTypeToClass(descriptor.physicalType());
+                Object defaultVal = descriptor.defaultValue();
+                Object internalValue = TypeUtils.toInternal(defaultVal, 
storageType);
+                RelDataType relDataType = 
deriveLogicalType(rexBuilder.getTypeFactory(), descriptor);
 
-                Object defaultValue = 
TypeUtils.toInternal(descriptor.defaultValue());
-
-                return rexBuilder.makeLiteral(defaultValue, 
deriveLogicalType(typeFactory, descriptor), false);
+                return rexBuilder.makeLiteral(internalValue, relDataType, 
false);
             }
             case DEFAULT_COMPUTED: {
-                return rexBuilder.makeCall(SqlStdOperatorTable.DEFAULT);
+                assert descriptor.key() : "DEFAULT_COMPUTED is only supported 
for primary key columns. Column: " + descriptor.name();
+
+                return 
rexBuilder.makeCall(IgniteSqlOperatorTable.GEN_RANDOM_UUID);
             }
             default:
                 throw new IllegalStateException("Unknown default strategy: " + 
descriptor.defaultStrategy());
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/fun/IgniteSqlOperatorTable.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/fun/IgniteSqlOperatorTable.java
index 4b895b15d9..be5979a396 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/fun/IgniteSqlOperatorTable.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/fun/IgniteSqlOperatorTable.java
@@ -131,6 +131,30 @@ public class IgniteSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                 }
             };
 
+    /**
+     * This function is used to generate a value for implicit primary key.
+     */
+    // TODO This function should removed when 
https://issues.apache.org/jira/browse/IGNITE-19103 is complete.
+    public static final SqlFunction GEN_RANDOM_UUID =
+            new SqlFunction(
+                    "GEN_RANDOM_UUID",
+                    SqlKind.OTHER_FUNCTION,
+                    ReturnTypes.explicit(SqlTypeName.VARCHAR),
+                    null,
+                    OperandTypes.NILADIC,
+                    SqlFunctionCategory.SYSTEM
+            ) {
+                @Override
+                public boolean isDynamicFunction() {
+                    return true;
+                }
+
+                @Override
+                public boolean isDeterministic() {
+                    return false;
+                }
+            };
+
     /** Singleton instance. */
     public static final IgniteSqlOperatorTable INSTANCE = new 
IgniteSqlOperatorTable();
 
@@ -399,5 +423,6 @@ public class IgniteSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
         register(GREATEST2);
         register(NULL_BOUND);
         register(RAND_UUID);
+        register(GEN_RANDOM_UUID);
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java
index 9cac261113..8a23c7ae9a 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java
@@ -17,12 +17,15 @@
 
 package org.apache.ignite.internal.sql.engine.util;
 
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
 import java.util.Objects;
 import java.util.UUID;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.schema.NativeType;
 import org.apache.ignite.internal.schema.NativeTypeSpec;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.exec.exp.RexImpTable;
 import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
@@ -110,8 +113,18 @@ public class HashFunctionFactoryImpl<T> implements 
HashFunctionFactory<T> {
 
             for (int i = 0; i < fields.length; i++) {
                 Object value = rowHandler.get(fields[i], row);
-
-                value = TypeUtils.fromInternal(value, 
NativeTypeSpec.toClass(fieldTypes[i].spec(), true));
+                NativeTypeSpec nativeTypeSpec = fieldTypes[i].spec();
+                Class<?> storageType = NativeTypeSpec.toClass(nativeTypeSpec, 
true);
+
+                // TODO Remove this check when 
https://issues.apache.org/jira/browse/IGNITE-19096 is complete
+                if (value == RexImpTable.DEFAULT_VALUE_PLACEHOLDER) {
+                    var error = format("Placeholder should have been replaced. 
field: {} nativeTypeSpec: {} row: {} ",
+                            fields[i], nativeTypeSpec, 
rowHandler.toString(row));
+
+                    throw new IllegalArgumentException(error);
+                } else {
+                    value = TypeUtils.fromInternal(value, storageType);
+                }
 
                 ColocationUtils.append(hashCalc, value, fieldTypes[i]);
             }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
index 34d4c8a4f7..a98f8b0b39 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
@@ -98,7 +98,11 @@ public enum IgniteMethod {
     IS_NOT_DISTINCT_FROM(Objects.class, "equals", Object.class, Object.class),
 
     /** See {@link UUID#randomUUID()}. */
-    RAND_UUID(UUID.class, "randomUUID");
+    RAND_UUID(UUID.class, "randomUUID"),
+
+    /** See {@link IgniteSqlFunctions#genRandomUuid()}. */
+    // TODO This function should removed when 
https://issues.apache.org/jira/browse/IGNITE-19103 is complete.
+    GEN_RANDOM_UUID(IgniteSqlFunctions.class, "genRandomUuid");
 
     private final Method method;
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index e0a6bd511f..43716886b5 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -372,7 +372,7 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
                                 + "with 
partitions=1,replicas=1,primary_zone='%s'", tableName, zoneName)))
         );
 
-        assertInstanceOf(DistributionZoneNotFoundException.class, 
exception.getCause().getCause());
+        assertInstanceOf(DistributionZoneNotFoundException.class, 
exception.getCause());
     }
 
     /**
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java
new file mode 100644
index 0000000000..8613120468
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.planner;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
+import org.apache.ignite.internal.sql.engine.rel.IgniteValues;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests to verify DML plans.
+ */
+public class DmlPlannerTest extends AbstractPlannerTest {
+
+    /**
+     * Test for INSERT .. VALUES when table has a single distribution.
+     */
+    @Test
+    public void testInsertIntoSingleDistributedTable() throws Exception {
+        IgniteTable test1 = newTestTable("TEST1", 
IgniteDistributions.single());
+        IgniteSchema schema = createSchema(test1);
+
+        // There should be no exchanges and other operations.
+        assertPlan("INSERT INTO TEST1 (C1, C2) VALUES(1, 2)", schema,
+                
isInstanceOf(IgniteTableModify.class).and(input(isInstanceOf(IgniteValues.class))));
+    }
+
+    /**
+     * Test for INSERT .. VALUES when table has non single distribution.
+     */
+    @ParameterizedTest
+    @MethodSource("nonSingleDistributions")
+    public void testInsert(IgniteDistribution distribution) throws Exception {
+        IgniteTable test1 = newTestTable("TEST1", distribution);
+
+        IgniteSchema schema = createSchema(test1);
+
+        assertPlan("INSERT INTO TEST1 (C1, C2) VALUES(1, 2)", schema,
+                nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+                        .and(e -> 
e.distribution().equals(IgniteDistributions.single())))
+                        
.and(nodeOrAnyChild(isInstanceOf(IgniteTableModify.class))
+                                
.and(hasChildThat(isInstanceOf(IgniteExchange.class).and(e -> 
distribution.equals(e.distribution())))))
+        );
+    }
+
+    private static Stream<IgniteDistribution> nonSingleDistributions() {
+        return distributions().filter(d -> 
!IgniteDistributions.single().equals(d));
+    }
+
+    /**
+     * Test for INSERT .. FROM SELECT when tables has different distributions.
+     */
+    @ParameterizedTest
+    @MethodSource("distributions")
+    public void testInsertSelectFrom(IgniteDistribution distribution) throws 
Exception {
+        IgniteDistribution anotherDistribution = 
IgniteDistributions.affinity(1, new UUID(1, 0), "0");
+
+        IgniteTable test1 = newTestTable("TEST1", distribution);
+        IgniteTable test2 = newTestTable("TEST2", anotherDistribution);
+
+        IgniteSchema schema = createSchema(test1, test2);
+
+        assertPlan("INSERT INTO TEST1 (C1, C2) SELECT C1, C2 FROM TEST2", 
schema,
+                nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+                        .and(e -> 
e.distribution().equals(IgniteDistributions.single())))
+                        
.and(nodeOrAnyChild(isInstanceOf(IgniteTableModify.class))
+                                
.and(hasChildThat(isInstanceOf(IgniteExchange.class).and(e -> 
distribution.equals(e.distribution())))))
+        );
+    }
+
+    /**
+     * Test for INSERT .. FROM SELECT when tables has the same distribution.
+     */
+    @ParameterizedTest
+    @MethodSource("distributions")
+    public void testInsertSelectFromSameDistribution(IgniteDistribution 
distribution) throws Exception {
+        IgniteTable test1 = newTestTable("TEST1", distribution);
+        IgniteTable test2 = newTestTable("TEST2", distribution);
+
+        IgniteSchema schema = createSchema(test1, test2);
+
+        // there should be no exchanges.
+        assertPlan("INSERT INTO TEST1 (C1, C2) SELECT C1, C2 FROM TEST2", 
schema,
+                nodeOrAnyChild(isInstanceOf(IgniteTableModify.class))
+                        .and(hasChildThat(isInstanceOf(IgniteTableScan.class)))
+        );
+    }
+
+    /**
+     * Test for UPDATE when table has a single distribution.
+     */
+    @Test
+    public void testUpdateOfSingleDistributedTable() throws Exception {
+        IgniteTable test1 = newTestTable("TEST1", 
IgniteDistributions.single());
+        IgniteSchema schema = createSchema(test1);
+
+        // There should be no exchanges and other operations.
+        assertPlan("UPDATE TEST1 SET C2 = C2 + 1", schema,
+                
isInstanceOf(IgniteTableModify.class).and(input(isInstanceOf(IgniteTableScan.class))));
+    }
+
+    /**
+     * Test for UPDATE when table has non single distribution.
+     */
+    @ParameterizedTest
+    @MethodSource("nonSingleDistributions")
+    public void testUpdate(IgniteDistribution distribution) throws Exception {
+        IgniteTable test1 = newTestTable("TEST1", distribution);
+
+        IgniteSchema schema = createSchema(test1);
+
+        assertPlan("UPDATE TEST1 SET C2 = C2 + 1", schema,
+                nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+                        .and(e -> 
e.distribution().equals(IgniteDistributions.single())))
+                        
.and(nodeOrAnyChild(isInstanceOf(IgniteTableModify.class))
+                                
.and(hasChildThat(isInstanceOf(IgniteTableScan.class))))
+        );
+    }
+
+    private static Stream<IgniteDistribution> distributions() {
+        return Stream.of(
+                IgniteDistributions.single(),
+                IgniteDistributions.hash(List.of(0, 1)),
+                IgniteDistributions.affinity(0, new UUID(1, 1), "0")
+        );
+    }
+
+    // Class name is fully-qualified because AbstractPlannerTest defines a 
class with the same name.
+    private static org.apache.ignite.internal.sql.engine.framework.TestTable 
newTestTable(
+            String tableName, IgniteDistribution distribution) {
+
+        return TestBuilders.table()
+                .name(tableName)
+                .addColumn("C1", NativeTypes.INT32)
+                .addColumn("C2", NativeTypes.INT32)
+                .distribution(distribution)
+                .build();
+    }
+}

Reply via email to