This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a041b6c644 [IGNITE-18225] Sql. Pushdown MODIFY to data node. (#1798)
a041b6c644 is described below
commit a041b6c644380c130e8f9b19226cfff865dd7df1
Author: Max Zhuravkov <[email protected]>
AuthorDate: Tue Mar 28 15:35:01 2023 +0400
[IGNITE-18225] Sql. Pushdown MODIFY to data node. (#1798)
---
.../apache/ignite/jdbc/ItJdbcBatchSelfTest.java | 6 +-
.../raftsnapshot/ItTableRaftSnapshotsTest.java | 15 ++
.../runner/app/ItIgniteNodeRestartTest.java | 10 +-
.../sql/engine/ClusterPerClassIntegrationTest.java | 28 +++-
.../internal/sql/engine/ItCorrelatesTest.java | 20 ++-
.../internal/sql/engine/ItCreateTableDdlTest.java | 11 +-
.../ignite/internal/sql/engine/ItDmlTest.java | 39 +++++
.../internal/sql/engine/AsyncSqlCursorImpl.java | 53 ++++++-
.../sql/engine/exec/ExecutionServiceImpl.java | 7 +-
.../sql/engine/exec/exp/IgniteSqlFunctions.java | 8 +
.../internal/sql/engine/exec/exp/RexImpTable.java | 24 ++-
.../internal/sql/engine/exec/rel/ModifyNode.java | 26 ++--
.../sql/engine/message/MessageServiceImpl.java | 4 +-
.../engine/metadata/IgniteMdFragmentMapping.java | 14 +-
.../prepare/ddl/DdlSqlToCommandConverter.java | 4 +
.../rule/TableFunctionScanConverterRule.java | 4 +-
.../sql/engine/rule/TableModifyConverterRule.java | 74 ++++++++-
.../sql/engine/rule/ValuesConverterRule.java | 4 +-
.../sql/engine/schema/IgniteTableImpl.java | 1 +
.../sql/engine/schema/TableDescriptorImpl.java | 20 ++-
.../sql/engine/sql/fun/IgniteSqlOperatorTable.java | 25 ++++
.../sql/engine/util/HashFunctionFactoryImpl.java | 17 ++-
.../internal/sql/engine/util/IgniteMethod.java | 6 +-
.../sql/engine/exec/MockedStructuresTest.java | 2 +-
.../sql/engine/planner/DmlPlannerTest.java | 166 +++++++++++++++++++++
25 files changed, 522 insertions(+), 66 deletions(-)
diff --git
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
index 2dc6a7c588..cc581ac178 100644
---
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
+++
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
@@ -329,11 +329,7 @@ public class ItJdbcBatchSelfTest extends
AbstractJdbcSelfTest {
assertEquals(i + 1, updCnts[i], "Invalid update count: " + i);
}
- if (!e.getMessage().contains("PK unique constraint is violated")) {
- log.error("Invalid exception: ", e);
-
- fail();
- }
+ assertThat(e.toString(), e.getMessage(), containsString("PK unique
constraint is violated"));
assertEquals(SqlStateCode.INTERNAL_ERROR, e.getSQLState(),
"Invalid SQL state.");
assertEquals(IgniteQueryErrorCode.UNKNOWN, e.getErrorCode(),
"Invalid error code.");
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index 64f7b35fc1..69de6b5912 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.raftsnapshot;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static
org.apache.ignite.internal.sql.engine.ClusterPerClassIntegrationTest.waitForIndex;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
@@ -237,6 +238,8 @@ class ItTableRaftSnapshotsTest extends
IgniteIntegrationTest {
* to knock-out the follower to make it require a snapshot installation).
*/
@Test
+ // Hangs at
org.apache.ignite.internal.sql.engine.message.MessageServiceImpl.send(MessageServiceImpl.java:98)
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19121")
void leaderFeedsFollowerWithSnapshotWithKnockoutPartitionNetwork() throws
Exception {
testLeaderFeedsFollowerWithSnapshot(Cluster.NodeKnockout.PARTITION_NETWORK,
DEFAULT_STORAGE_ENGINE);
}
@@ -327,6 +330,8 @@ class ItTableRaftSnapshotsTest extends
IgniteIntegrationTest {
+ (DEFAULT_STORAGE_ENGINE.equals(storageEngine) ? "" : "
engine " + storageEngine)
+ " with partitions=1, replicas=3";
+ waitForIndex("test_PK");
+
cluster.doInSession(0, session -> {
executeUpdate(sql, session);
});
@@ -563,6 +568,8 @@ class ItTableRaftSnapshotsTest extends
IgniteIntegrationTest {
PersistentPageMemoryStorageEngine.ENGINE_NAME,
VolatilePageMemoryStorageEngine.ENGINE_NAME
})
+ // Hangs at
org.apache.ignite.internal.sql.engine.message.MessageServiceImpl.send(MessageServiceImpl.java:98)
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19121")
void leaderFeedsFollowerWithSnapshot(String storageEngine) throws
Exception {
testLeaderFeedsFollowerWithSnapshot(DEFAULT_KNOCKOUT, storageEngine);
}
@@ -663,6 +670,8 @@ class ItTableRaftSnapshotsTest extends
IgniteIntegrationTest {
* Tests that, if a snapshot installation fails for some reason, a
subsequent retry due to a timeout happens successfully.
*/
@Test
+ // Hangs at
org.apache.ignite.internal.sql.engine.message.MessageServiceImpl.send(MessageServiceImpl.java:98)
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19121")
void snapshotInstallationRepeatsOnTimeout() throws Exception {
prepareClusterForInstallingSnapshotToNode2(DEFAULT_KNOCKOUT,
DEFAULT_STORAGE_ENGINE, theCluster -> {
theCluster.node(0).dropMessages(dropFirstSnapshotMetaResponse());
@@ -716,6 +725,8 @@ class ItTableRaftSnapshotsTest extends
IgniteIntegrationTest {
* stuck because one 'download' task will remain unfinished forever.
*/
@Test
+ // Hangs at
org.apache.ignite.internal.sql.engine.message.MessageServiceImpl.send(MessageServiceImpl.java:98)
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19121")
void
snapshotInstallTimeoutDoesNotBreakSubsequentInstallsWhenSecondAttemptIsIdenticalToFirst()
throws Exception {
AtomicBoolean snapshotInstallFailedDueToIdenticalRetry = new
AtomicBoolean(false);
@@ -754,6 +765,8 @@ class ItTableRaftSnapshotsTest extends
IgniteIntegrationTest {
}
@Test
+ // Hangs at
org.apache.ignite.internal.sql.engine.message.MessageServiceImpl.send(MessageServiceImpl.java:98)
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19121")
void testChangeLeaderOnInstallSnapshotInMiddle() throws Exception {
CompletableFuture<Void> sentSnapshotMetaResponseFormNode1Future = new
CompletableFuture<>();
@@ -828,6 +841,8 @@ class ItTableRaftSnapshotsTest extends
IgniteIntegrationTest {
* </ol>
*/
@Test
+ // Hangs at
org.apache.ignite.internal.sql.engine.message.MessageServiceImpl.send(MessageServiceImpl.java:98)
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19121")
void testChangeLeaderDuringSnapshotInstallationToLeaderWithEnoughLog()
throws Exception {
CompletableFuture<Void> sentSnapshotMetaResponseFormNode0Future = new
CompletableFuture<>();
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 32345e60a7..8c20e3e15b 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -690,6 +690,7 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
* </ol>
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19091")
public void testQueryCorrectnessAfterNodeRestart() throws
InterruptedException {
IgniteImpl ignite1 = startNode(0);
@@ -878,6 +879,8 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
* checks that the table created before node stop, is not available when
majority if lost.
*/
@Test
+ // No sql engine
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19092")
public void testOneNodeRestartWithGap() throws InterruptedException {
IgniteImpl ignite = startNode(0);
@@ -909,6 +912,8 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
* Checks that the table created in cluster of 2 nodes, is recovered on a
node after restart of this node.
*/
@Test
+ // No SQL engine
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19092")
public void testRecoveryOnOneNode() throws InterruptedException {
IgniteImpl ignite = startNode(0);
@@ -931,7 +936,8 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
* Checks that a cluster is able to restart when some changes were made in
configuration.
*/
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-19079")
+ // No sql engine
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19092")
public void testRestartDiffConfig() throws InterruptedException {
List<IgniteImpl> ignites = startNodes(2);
@@ -959,6 +965,8 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
* The test for node restart when there is a gap between the node local
configuration and distributed configuration.
*/
@Test
+ // No SQL engine
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19092")
@WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY,
value = "0")
public void testCfgGapWithoutData() throws InterruptedException {
List<IgniteImpl> nodes = startNodes(3);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
index ba44e1d023..3f187ed126 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
@@ -228,6 +228,20 @@ public abstract class ClusterPerClassIntegrationTest
extends IgniteIntegrationTe
tearDownBase(testInfo);
}
+ /**
+ * Returns table index configuration of the given index at the given node,
or {@code null} if no such index exists.
+ *
+ * @param node A node.
+ * @param indexName An index.
+ * @return An index configuration.
+ */
+ public static @Nullable TableIndexConfiguration
getIndexConfiguration(Ignite node, String indexName) {
+ return ((IgniteImpl) node).clusterConfiguration()
+ .getConfiguration(TablesConfiguration.KEY)
+ .indexes()
+ .get(indexName.toUpperCase());
+ }
+
/**
* Executes the query and validates any asserts passed to the builder.
*
@@ -438,7 +452,12 @@ public abstract class ClusterPerClassIntegrationTest
extends IgniteIntegrationTe
);
}
- protected static void waitForIndex(String indexName) throws
InterruptedException {
+ /**
+ * Waits for all nodes in the cluster to have the given index in the
configuration.
+ *
+ * @param indexName An index.
+ */
+ public static void waitForIndex(String indexName) throws
InterruptedException {
// FIXME: Wait for the index to be created on all nodes,
// this is a workaround for
https://issues.apache.org/jira/browse/IGNITE-18733 to avoid missed updates to
the index.
assertTrue(waitForCondition(
@@ -446,11 +465,4 @@ public abstract class ClusterPerClassIntegrationTest
extends IgniteIntegrationTe
10_000)
);
}
-
- private static @Nullable TableIndexConfiguration
getIndexConfiguration(Ignite node, String indexName) {
- return ((IgniteImpl) node).clusterConfiguration()
- .getConfiguration(TablesConfiguration.KEY)
- .indexes()
- .get(indexName.toUpperCase());
- }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
index aabf891d91..c9b9ceca1f 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine;
import static
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsSubPlan;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@@ -58,10 +59,10 @@ public class ItCorrelatesTest extends
ClusterPerClassIntegrationTest {
}
/**
- * Tests resolving of collisions in correlates.
+ * Tests resolving of collisions in correlates with correlate variables in
the left hand.
*/
@Test
- public void testCorrelatesCollision() throws InterruptedException {
+ public void testCorrelatesCollisionLeft() throws InterruptedException {
sql("CREATE TABLE test1 (a INTEGER PRIMARY KEY, b INTEGER)");
sql("INSERT INTO test1 VALUES (11, 1), (12, 2), (13, 3)");
sql("CREATE TABLE test2 (a INTEGER PRIMARY KEY, c INTEGER)");
@@ -76,6 +77,21 @@ public class ItCorrelatesTest extends
ClusterPerClassIntegrationTest {
+ "AND NOT EXISTS(SELECT * FROM test2 WHERE test1.a=test2.a
AND test1.b<test2.c)")
.returns(12, 2)
.check();
+ }
+
+ /**
+ * Tests resolving of collisions in correlates with correlate variables in
both, left and right hands.
+ */
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19018")
+ public void testCorrelatesCollisionRight() throws InterruptedException {
+ sql("CREATE TABLE test1 (a INTEGER PRIMARY KEY, b INTEGER)");
+ sql("INSERT INTO test1 VALUES (11, 1), (12, 2), (13, 3)");
+ sql("CREATE TABLE test2 (a INTEGER PRIMARY KEY, c INTEGER)");
+ sql("INSERT INTO test2 VALUES (11, 1), (12, 1), (13, 4)");
+
+ waitForIndex("TEST1_PK");
+ waitForIndex("TEST2_PK");
// Collision by correlate variables in both, left and right hands.
assertQuery("SELECT * FROM test1 WHERE "
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
index 8529b37e01..e7a54f0552 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
@@ -26,12 +26,13 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.SqlException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
/**
- * Integration test for set op (EXCEPT, INTERSECT).
+ * Integration test for CREATE TABLE DDL command.
*/
public class ItCreateTableDdlTest extends ClusterPerClassIntegrationTest {
/**
@@ -146,4 +147,12 @@ public class ItCreateTableDdlTest extends
ClusterPerClassIntegrationTest {
assertEquals(1, colocationColumns.length);
assertEquals("Id0", colocationColumns[0].name());
}
+
+ @Test
+ public void doNotAllowFunctionsInNonPkColumns() {
+ SqlException t = assertThrows(SqlException.class,
+ () -> sql("create table t (id varchar primary key, val varchar
default gen_random_uuid)"));
+
+ assertThat(t.getMessage(), containsString("Functional defaults are not
supported for non-primary key columns"));
+ }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
index 53eb5cf1a6..0b776d5dc6 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
@@ -555,4 +555,43 @@ public class ItDmlTest extends
ClusterPerClassIntegrationTest {
this.expectedVal = expectedVal;
}
}
+
+ @Test
+ public void testInsertMultipleDefaults() {
+ sql("CREATE TABLE integers(i INTEGER PRIMARY KEY, j INTEGER DEFAULT
2)");
+
+ sql("INSERT INTO integers VALUES (1, DEFAULT)");
+
+ assertQuery("SELECT i, j FROM integers").returns(1, 2).check();
+
+ sql("INSERT INTO integers VALUES (2, 3), (3, DEFAULT), (4, 4), (5,
DEFAULT)");
+
+ assertQuery("SELECT i, j FROM integers ORDER BY i")
+ .returns(1, 2)
+ .returns(2, 3)
+ .returns(3, 2)
+ .returns(4, 4)
+ .returns(5, 2)
+ .check();
+ }
+
+ @Test
+ @WithSystemProperty(key = "IMPLICIT_PK_ENABLED", value = "true")
+ public void testInsertMultipleDefaultsWithImplicitPk() {
+ sql("CREATE TABLE integers(i INTEGER, j INTEGER DEFAULT 2)");
+
+ sql("INSERT INTO integers VALUES (1, DEFAULT)");
+
+ assertQuery("SELECT i, j FROM integers").returns(1, 2).check();
+
+ sql("INSERT INTO integers VALUES (2, 3), (3, DEFAULT), (4, 4), (5,
DEFAULT)");
+
+ assertQuery("SELECT i, j FROM integers ORDER BY i")
+ .returns(1, 2)
+ .returns(2, 3)
+ .returns(3, 2)
+ .returns(4, 4)
+ .returns(5, 2)
+ .check();
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
index 4ce6b57681..65c6995b1c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
@@ -17,10 +17,15 @@
package org.apache.ignite.internal.sql.engine;
+import java.lang.reflect.Constructor;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.sql.engine.metadata.RemoteException;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.sql.ResultSetMetadata;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
@@ -74,7 +79,7 @@ public class AsyncSqlCursorImpl<T> implements
AsyncSqlCursor<T> {
implicitTx.rollback();
}
- throw IgniteException.wrap(t);
+ throw wrapIfNecessary(t);
}
if (implicitTx != null && !batch.hasMore()) {
@@ -91,4 +96,50 @@ public class AsyncSqlCursorImpl<T> implements
AsyncSqlCursor<T> {
public CompletableFuture<Void> closeAsync() {
return dataCursor.closeAsync();
}
+
+ private static RuntimeException wrapIfNecessary(@NotNull Throwable t) {
+ Throwable cause = unwrapRemoteCause(t);
+
+ // If the cause is IgniteException then create
+ // an exception of the same type with the same properties
+ // and set its cause to the original exception.
+ if (cause instanceof IgniteException) {
+ return preserveExceptionType((IgniteException) cause, t);
+ } else {
+ // If the cause is not a subclass of IgniteException, wrap it in
IgniteException.
+ return IgniteException.wrap(t);
+ }
+ }
+
+ private static Throwable unwrapRemoteCause(@NotNull Throwable t) {
+ Throwable err = t;
+
+ while (err != null) {
+ err = ExceptionUtils.unwrapCause(err);
+ // Unwrap RemoteExceptions because they are just wrappers.
+ if (err instanceof RemoteException) {
+ err = err.getCause();
+ continue;
+ }
+
+ return err;
+ }
+
+ return t;
+ }
+
+ private static IgniteException preserveExceptionType(IgniteException e,
Throwable t) {
+ // Return IgniteException as is
+ if (e.getClass() == IgniteException.class) {
+ return e;
+ }
+
+ try {
+ Constructor<?> ctor =
e.getClass().getDeclaredConstructor(UUID.class, int.class, String.class,
Throwable.class);
+
+ return (IgniteException) ctor.newInstance(e.traceId(), e.code(),
e.getMessage(), t);
+ } catch (Exception ex) {
+ throw new RuntimeException("IgniteException-derived class does not
have required constructor: " + e.getClass().getName(), ex);
+ }
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 555da1183a..edf8afb225 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -375,11 +376,13 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
/** {@inheritDoc} */
@Override
public void stop() throws Exception {
- CompletableFuture.allOf(queryManagerMap.values().stream()
+ CompletableFuture<Void> f =
CompletableFuture.allOf(queryManagerMap.values().stream()
.filter(mgr -> mgr.rootFragmentId != null)
.map(mgr -> mgr.close(true))
.toArray(CompletableFuture[]::new)
- ).join();
+ );
+ // TODO Workaround for
https://issues.apache.org/jira/browse/IGNITE-19088
+ f.get(1, TimeUnit.MINUTES);
}
/** {@inheritDoc} */
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/IgniteSqlFunctions.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/IgniteSqlFunctions.java
index a6abfaf79d..965b67942c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/IgniteSqlFunctions.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/IgniteSqlFunctions.java
@@ -21,6 +21,7 @@ import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import java.time.LocalTime;
+import java.util.UUID;
import org.apache.calcite.DataContext;
import org.apache.calcite.avatica.util.ByteString;
import org.apache.calcite.config.CalciteConnectionConfig;
@@ -28,6 +29,7 @@ import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.linq4j.function.NonDeterministic;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.ScannableTable;
@@ -174,6 +176,12 @@ public class IgniteSqlFunctions {
return leastOrGreatest(false, arg0, arg1);
}
+ /** Generates a random UUID and converts it to string. **/
+ @NonDeterministic
+ public static String genRandomUuid() {
+ return UUID.randomUUID().toString();
+ }
+
private static @Nullable Object leastOrGreatest(boolean least, Object
arg0, Object arg1) {
if (arg0 == null || arg1 == null) {
return null;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java
index f845837246..1016ea64e5 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java
@@ -229,6 +229,7 @@ import
org.apache.calcite.sql.validate.SqlUserDefinedTableFunction;
import org.apache.calcite.sql.validate.SqlUserDefinedTableMacro;
import org.apache.calcite.util.BuiltInMethod;
import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.sql.engine.sql.fun.IgniteSqlOperatorTable;
import org.apache.ignite.internal.sql.engine.util.IgniteMethod;
/**
@@ -250,16 +251,11 @@ public class RexImpTable {
private final Map<SqlOperator, RexCallImplementor> map = new HashMap<>();
/** Placeholder for DEFAULT operator value. */
- public static final Object DEFAULT_VALUE_PLACEHOLDER = new
DefaultValuePlaceholder();
+ // TODO Remove this constant when
https://issues.apache.org/jira/browse/IGNITE-19096 is complete
+ public static final Object DEFAULT_VALUE_PLACEHOLDER =
Placeholder.DEFAULT_VALUE;
/** Placeholder for values, which expressions are not specified. */
- public static final Object UNSPECIFIED_VALUE_PLACEHOLDER = new Object() {
- /** {@inheritDoc} */
- @Override
- public String toString() {
- return "<unspecified_value>";
- }
- };
+ public static final Object UNSPECIFIED_VALUE_PLACEHOLDER =
Placeholder.UNSPECIFIED_VALUE;
/**
* Constructor.
@@ -513,6 +509,7 @@ public class RexImpTable {
defineMethod(IS_NOT_DISTINCT_FROM,
IgniteMethod.IS_NOT_DISTINCT_FROM.method(), NullPolicy.NONE);
defineMethod(RAND_UUID, IgniteMethod.RAND_UUID.method(),
NullPolicy.NONE);
+ defineMethod(IgniteSqlOperatorTable.GEN_RANDOM_UUID,
IgniteMethod.GEN_RANDOM_UUID.method(), NullPolicy.NONE);
}
private void defineMethod(SqlOperator operator, String functionName,
NullPolicy nullPolicy) {
@@ -2491,6 +2488,7 @@ public class RexImpTable {
}
/** Implementor for the {@code DEFAULT} function. */
+ // TODO Remove this class when
https://issues.apache.org/jira/browse/IGNITE-19096 is complete
private static class DefaultImplementor extends AbstractRexCallImplementor
{
DefaultImplementor() {
super(NullPolicy.NONE, false);
@@ -2579,10 +2577,10 @@ public class RexImpTable {
};
}
- private static class DefaultValuePlaceholder {
- @Override
- public String toString() {
- return "DEFAULT";
- }
+ // We use enums for placeholders because enum
serialization/deserialization guarantees to preserve object's identity.
+ private enum Placeholder {
+ // TODO Remove this enum element when
https://issues.apache.org/jira/browse/IGNITE-19096 is complete
+ DEFAULT_VALUE,
+ UNSPECIFIED_VALUE
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
index eb0ba711a7..0a7d9e54e2 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
@@ -412,6 +412,7 @@ public class ModifyNode<RowT> extends AbstractNode<RowT>
implements SingleNode<R
return mapping;
}
+ // TODO Remove this method when
https://issues.apache.org/jira/browse/IGNITE-19096 is complete
private static <RowT> void injectDefaults(
TableDescriptor tableDescriptor,
RowHandler<RowT> handler,
@@ -422,16 +423,7 @@ public class ModifyNode<RowT> extends AbstractNode<RowT>
implements SingleNode<R
ColumnDescriptor columnDescriptor =
tableDescriptor.columnDescriptor(i);
Object oldValue = handler.get(columnDescriptor.logicalIndex(),
row);
-
- Object newValue;
- if (columnDescriptor.key()
- && Commons.implicitPkEnabled()
- &&
Commons.IMPLICIT_PK_COL_NAME.equals(columnDescriptor.name())
- ) {
- newValue = columnDescriptor.defaultValue();
- } else {
- newValue = replaceDefaultValuePlaceholder(oldValue,
columnDescriptor);
- }
+ Object newValue = replaceDefaultValuePlaceholder(oldValue,
columnDescriptor);
if (oldValue != newValue) {
handler.set(columnDescriptor.logicalIndex(), row,
newValue);
@@ -440,8 +432,20 @@ public class ModifyNode<RowT> extends AbstractNode<RowT>
implements SingleNode<R
}
}
+ // TODO Remove this method when
https://issues.apache.org/jira/browse/IGNITE-19096 is complete
private static @Nullable Object replaceDefaultValuePlaceholder(@Nullable
Object val, ColumnDescriptor desc) {
- return val == RexImpTable.DEFAULT_VALUE_PLACEHOLDER ?
desc.defaultValue() : val;
+ Object newValue;
+
+ if (desc.key() && Commons.implicitPkEnabled() &&
Commons.IMPLICIT_PK_COL_NAME.equals(desc.name())) {
+ assert val != RexImpTable.DEFAULT_VALUE_PLACEHOLDER : "Implicit
primary key value should have already been generated";
+ newValue = val;
+ } else {
+ newValue = val == RexImpTable.DEFAULT_VALUE_PLACEHOLDER ?
desc.defaultValue() : val;
+ }
+
+ assert newValue != RexImpTable.DEFAULT_VALUE_PLACEHOLDER :
"Placeholder should have been replaced. Column: " + desc.name();
+
+ return newValue;
}
private enum State {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
index a0010e81b9..1fd3d591ff 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
@@ -24,6 +24,7 @@ import static
org.apache.ignite.lang.ErrorGroups.Sql.NODE_LEFT_ERR;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.IgniteInternalCheckedException;
@@ -95,7 +96,8 @@ public class MessageServiceImpl implements MessageService {
}
try {
- messagingSrvc.send(node, msg).join();
+ // TODO Workaround for
https://issues.apache.org/jira/browse/IGNITE-19088
+ messagingSrvc.send(node, msg).get(1, TimeUnit.SECONDS);
} catch (Exception ex) {
if (ex instanceof IgniteInternalCheckedException) {
throw (IgniteInternalCheckedException) ex;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java
index be167dc151..bb27607b4e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java
@@ -225,17 +225,25 @@ public class IgniteMdFragmentMapping implements
MetadataHandler<FragmentMappingM
* See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode,
RelMetadataQuery, MappingQueryContext)}.
*/
public FragmentMapping fragmentMapping(IgniteTableModify rel,
RelMetadataQuery mq, MappingQueryContext ctx) {
- FragmentMapping mapping =
fragmentMappingForMetadataQuery(rel.getInput(), mq, ctx);
+ RelNode input = rel.getInput();
+ FragmentMapping mapping = fragmentMappingForMetadataQuery(input, mq,
ctx);
// In case of the statement like UPDATE t SET a = a + 1
// this will be the second call to the collation group, hence the
result may differ.
// But such query should be rejected during execution, since we will
try to do RW read
// from replica that is not primary anymore.
- List<NodeWithTerm> assignments =
rel.getTable().unwrap(IgniteTable.class)
- .colocationGroup(ctx).assignments().stream()
+ ColocationGroup tableColocationGroup =
rel.getTable().unwrap(IgniteTable.class).colocationGroup(ctx);
+ List<NodeWithTerm> assignments =
tableColocationGroup.assignments().stream()
.map(CollectionUtils::first)
.collect(Collectors.toList());
+ FragmentMapping tableMapping = FragmentMapping.create(-1,
tableColocationGroup);
+ try {
+ mapping = tableMapping.colocate(mapping);
+ } catch (ColocationMappingException e) {
+ throw new NodeMappingException("Failed to calculate physical
distribution", input, e);
+ }
+
mapping = mapping.updatingTableAssignments(assignments);
return mapping;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ddl/DdlSqlToCommandConverter.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ddl/DdlSqlToCommandConverter.java
index 8f72c55136..b68df714b0 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ddl/DdlSqlToCommandConverter.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ddl/DdlSqlToCommandConverter.java
@@ -345,6 +345,10 @@ public class DdlSqlToCommandConverter {
dedupSetPk.remove(name);
DefaultValueDefinition dflt = convertDefault(col.expression,
relType);
+ if (dflt.type() == DefaultValueDefinition.Type.FUNCTION_CALL &&
!pkCols.contains(name)) {
+ throw new SqlException(QUERY_INVALID_ERR,
+ "Functional defaults are not supported for non-primary
key columns [col=" + name + "]");
+ }
cols.add(new ColumnDefinition(name, relType, dflt));
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableFunctionScanConverterRule.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableFunctionScanConverterRule.java
index afa7c85cd1..0b0493b61d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableFunctionScanConverterRule.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableFunctionScanConverterRule.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.sql.engine.rule;
+import static
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.single;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import org.apache.calcite.plan.RelOptPlanner;
@@ -27,7 +28,6 @@ import
org.apache.calcite.rel.logical.LogicalTableFunctionScan;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
import org.apache.ignite.internal.sql.engine.rel.IgniteTableFunctionScan;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
/**
* Rule to convert a {@link LogicalTableFunctionScan} to an {@link
IgniteTableFunctionScan}.
@@ -47,7 +47,7 @@ public class TableFunctionScanConverterRule extends
AbstractIgniteConverterRule<
RelTraitSet traitSet = rel.getTraitSet()
.replace(IgniteConvention.INSTANCE)
- .replace(IgniteDistributions.broadcast());
+ .replace(single());
return new IgniteTableFunctionScan(rel.getCluster(), traitSet,
rel.getCall(), rel.getRowType());
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyConverterRule.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyConverterRule.java
index 443a63454e..05b32c8013 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyConverterRule.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyConverterRule.java
@@ -17,18 +17,37 @@
package org.apache.ignite.internal.sql.engine.rule;
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.PhysicalNode;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.logical.LogicalTableModify;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
+import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify;
+import
org.apache.ignite.internal.sql.engine.rel.agg.IgniteColocatedHashAggregate;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.type.IgniteTypeSystem;
+import org.apache.ignite.internal.sql.engine.util.Commons;
/**
* TableModifyConverterRule.
@@ -48,12 +67,63 @@ public class TableModifyConverterRule extends
AbstractIgniteConverterRule<Logica
@Override
protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
LogicalTableModify rel) {
RelOptCluster cluster = rel.getCluster();
+ RelOptTable relTable = rel.getTable();
+ IgniteTable igniteTable = relTable.unwrap(IgniteTable.class);
+ assert igniteTable != null;
+
RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE)
- .replace(IgniteDistributions.single())
+ .replace(igniteTable.distribution())
.replace(RelCollations.EMPTY);
+
RelNode input = convert(rel.getInput(), traits);
- return new IgniteTableModify(cluster, traits, rel.getTable(), input,
+ IgniteTableModify tableModify = new IgniteTableModify(cluster, traits,
relTable, input,
rel.getOperation(), rel.getUpdateColumnList(),
rel.getSourceExpressionList(), rel.isFlattened());
+
+ // Return IgniteTableModify w/o aggregation for tables with single
distribution.
+ if (igniteTable.distribution().equals(IgniteDistributions.single())) {
+ return tableModify;
+ } else {
+ return createAggregate(tableModify, cluster);
+ }
+ }
+
+ private static PhysicalNode createAggregate(IgniteTableModify tableModify,
RelOptCluster cluster) {
+ // We must aggregate the total number of modified rows returned by of
instances of DML operations.
+
+ RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+ RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+
+ // 1) add a SUM0 aggregate on top of TableModify to compute the total
number of modified rows.
+ RelDataType rowType = tableModify.getRowType();
+ RelDataTypeField modifiedRowsField = rowType.getFieldList().get(0);
+ RelDataTypeFactory typeFactory = cluster.getTypeFactory();
+ RelDataType rowCountType = modifiedRowsField.getType();
+ RelDataType sumType =
IgniteTypeSystem.INSTANCE.deriveSumType(typeFactory, rowCountType);
+
+ AggregateCall sum = AggregateCall.create(SqlStdOperatorTable.SUM0,
false, false,
+ false, ImmutableList.of(0), -1, null, RelCollations.EMPTY, 0,
tableModify,
+ sumType, null);
+
+ IgniteColocatedHashAggregate sumAgg = new IgniteColocatedHashAggregate(
+ cluster,
+ outTrait.replace(IgniteDistributions.single()),
+ convert(tableModify,
inTrait.replace(IgniteDistributions.single())),
+ ImmutableBitSet.of(),
+ List.of(ImmutableBitSet.of()),
+ List.of(sum)
+ );
+
+ // 2) add a projection on top of the SUM0 aggregate that converts the
result of SUM aggregate back to BIGINT
+ // (because the modified number of rows returned by DML operations is
BIGINT)
+ RelDataType typeOfSum = modifiedRowsField.getType();
+ RelDataType convertedRowType =
typeFactory.createStructType(List.of(Map.entry(modifiedRowsField.getName(),
typeOfSum)));
+
+ RexBuilder rexBuilder = Commons.rexBuilder();
+ RexInputRef sumRef = rexBuilder.makeInputRef(sumAgg, 0);
+ RexNode rexNode = rexBuilder.makeCast(typeOfSum, sumRef);
+ List<RexNode> projections = Collections.singletonList(rexNode);
+
+ return new IgniteProject(cluster,
outTrait.replace(IgniteDistributions.single()), sumAgg, projections,
convertedRowType);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/ValuesConverterRule.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/ValuesConverterRule.java
index 6959ec13b9..bcade330b9 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/ValuesConverterRule.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/ValuesConverterRule.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.sql.engine.rule;
-import static
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.broadcast;
+import static
org.apache.ignite.internal.sql.engine.trait.IgniteDistributions.single;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
@@ -48,7 +48,7 @@ public class ValuesConverterRule extends
AbstractIgniteConverterRule<LogicalValu
protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
LogicalValues rel) {
RelOptCluster cluster = rel.getCluster();
RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE)
- .replace(broadcast());
+ .replace(single());
return new IgniteValues(cluster, rel.getRowType(), rel.getTuples(),
traits);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
index f77e9d4e70..6ce54aface 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
@@ -492,6 +492,7 @@ public class IgniteTableImpl extends AbstractTable
implements IgniteTable, Updat
Object value = hnd.get(colDesc.logicalIndex(), row);
+ // TODO Remove this check when
https://issues.apache.org/jira/browse/IGNITE-19096 is complete
assert value != RexImpTable.DEFAULT_VALUE_PLACEHOLDER;
if (value == null) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/TableDescriptorImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/TableDescriptorImpl.java
index e2148d1b96..37da6ea6c8 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/TableDescriptorImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/TableDescriptorImpl.java
@@ -22,17 +22,16 @@ import static
org.apache.ignite.internal.sql.engine.util.TypeUtils.native2relati
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.ColumnStrategy;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql2rel.InitializerContext;
import org.apache.calcite.sql2rel.NullInitializerExpressionFactory;
import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.sql.engine.sql.fun.IgniteSqlOperatorTable;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.util.Commons;
@@ -111,7 +110,9 @@ public class TableDescriptorImpl extends
NullInitializerExpressionFactory implem
desc.physicalIndex(),
desc.physicalType(),
DefaultValueStrategy.DEFAULT_COMPUTED,
- () -> UUID.randomUUID().toString()
+ () -> {
+ throw new AssertionError("Implicit primary key is
generated by a function");
+ }
);
}
@@ -162,14 +163,17 @@ public class TableDescriptorImpl extends
NullInitializerExpressionFactory implem
return rexBuilder.makeNullLiteral(fieldType);
}
case DEFAULT_CONSTANT: {
- var typeFactory = (IgniteTypeFactory)
rexBuilder.getTypeFactory();
+ Class<?> storageType =
Commons.nativeTypeToClass(descriptor.physicalType());
+ Object defaultVal = descriptor.defaultValue();
+ Object internalValue = TypeUtils.toInternal(defaultVal,
storageType);
+ RelDataType relDataType =
deriveLogicalType(rexBuilder.getTypeFactory(), descriptor);
- Object defaultValue =
TypeUtils.toInternal(descriptor.defaultValue());
-
- return rexBuilder.makeLiteral(defaultValue,
deriveLogicalType(typeFactory, descriptor), false);
+ return rexBuilder.makeLiteral(internalValue, relDataType,
false);
}
case DEFAULT_COMPUTED: {
- return rexBuilder.makeCall(SqlStdOperatorTable.DEFAULT);
+ assert descriptor.key() : "DEFAULT_COMPUTED is only supported
for primary key columns. Column: " + descriptor.name();
+
+ return
rexBuilder.makeCall(IgniteSqlOperatorTable.GEN_RANDOM_UUID);
}
default:
throw new IllegalStateException("Unknown default strategy: " +
descriptor.defaultStrategy());
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/fun/IgniteSqlOperatorTable.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/fun/IgniteSqlOperatorTable.java
index 4b895b15d9..be5979a396 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/fun/IgniteSqlOperatorTable.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/fun/IgniteSqlOperatorTable.java
@@ -131,6 +131,30 @@ public class IgniteSqlOperatorTable extends
ReflectiveSqlOperatorTable {
}
};
+ /**
+ * This function is used to generate a value for implicit primary key.
+ */
+ // TODO This function should removed when
https://issues.apache.org/jira/browse/IGNITE-19103 is complete.
+ public static final SqlFunction GEN_RANDOM_UUID =
+ new SqlFunction(
+ "GEN_RANDOM_UUID",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.explicit(SqlTypeName.VARCHAR),
+ null,
+ OperandTypes.NILADIC,
+ SqlFunctionCategory.SYSTEM
+ ) {
+ @Override
+ public boolean isDynamicFunction() {
+ return true;
+ }
+
+ @Override
+ public boolean isDeterministic() {
+ return false;
+ }
+ };
+
/** Singleton instance. */
public static final IgniteSqlOperatorTable INSTANCE = new
IgniteSqlOperatorTable();
@@ -399,5 +423,6 @@ public class IgniteSqlOperatorTable extends
ReflectiveSqlOperatorTable {
register(GREATEST2);
register(NULL_BOUND);
register(RAND_UUID);
+ register(GEN_RANDOM_UUID);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java
index 9cac261113..8a23c7ae9a 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java
@@ -17,12 +17,15 @@
package org.apache.ignite.internal.sql.engine.util;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
import java.util.Objects;
import java.util.UUID;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.schema.NativeType;
import org.apache.ignite.internal.schema.NativeTypeSpec;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.exec.exp.RexImpTable;
import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
@@ -110,8 +113,18 @@ public class HashFunctionFactoryImpl<T> implements
HashFunctionFactory<T> {
for (int i = 0; i < fields.length; i++) {
Object value = rowHandler.get(fields[i], row);
-
- value = TypeUtils.fromInternal(value,
NativeTypeSpec.toClass(fieldTypes[i].spec(), true));
+ NativeTypeSpec nativeTypeSpec = fieldTypes[i].spec();
+ Class<?> storageType = NativeTypeSpec.toClass(nativeTypeSpec,
true);
+
+ // TODO Remove this check when
https://issues.apache.org/jira/browse/IGNITE-19096 is complete
+ if (value == RexImpTable.DEFAULT_VALUE_PLACEHOLDER) {
+ var error = format("Placeholder should have been replaced.
field: {} nativeTypeSpec: {} row: {} ",
+ fields[i], nativeTypeSpec,
rowHandler.toString(row));
+
+ throw new IllegalArgumentException(error);
+ } else {
+ value = TypeUtils.fromInternal(value, storageType);
+ }
ColocationUtils.append(hashCalc, value, fieldTypes[i]);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
index 34d4c8a4f7..a98f8b0b39 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
@@ -98,7 +98,11 @@ public enum IgniteMethod {
IS_NOT_DISTINCT_FROM(Objects.class, "equals", Object.class, Object.class),
/** See {@link UUID#randomUUID()}. */
- RAND_UUID(UUID.class, "randomUUID");
+ RAND_UUID(UUID.class, "randomUUID"),
+
+ /** See {@link IgniteSqlFunctions#genRandomUuid()}. */
+ // TODO This function should removed when
https://issues.apache.org/jira/browse/IGNITE-19103 is complete.
+ GEN_RANDOM_UUID(IgniteSqlFunctions.class, "genRandomUuid");
private final Method method;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index e0a6bd511f..43716886b5 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -372,7 +372,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
+ "with
partitions=1,replicas=1,primary_zone='%s'", tableName, zoneName)))
);
- assertInstanceOf(DistributionZoneNotFoundException.class,
exception.getCause().getCause());
+ assertInstanceOf(DistributionZoneNotFoundException.class,
exception.getCause());
}
/**
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java
new file mode 100644
index 0000000000..8613120468
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.planner;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
+import org.apache.ignite.internal.sql.engine.rel.IgniteValues;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests to verify DML plans.
+ */
+public class DmlPlannerTest extends AbstractPlannerTest {
+
+ /**
+ * Test for INSERT .. VALUES when table has a single distribution.
+ */
+ @Test
+ public void testInsertIntoSingleDistributedTable() throws Exception {
+ IgniteTable test1 = newTestTable("TEST1",
IgniteDistributions.single());
+ IgniteSchema schema = createSchema(test1);
+
+ // There should be no exchanges and other operations.
+ assertPlan("INSERT INTO TEST1 (C1, C2) VALUES(1, 2)", schema,
+
isInstanceOf(IgniteTableModify.class).and(input(isInstanceOf(IgniteValues.class))));
+ }
+
+ /**
+ * Test for INSERT .. VALUES when table has non single distribution.
+ */
+ @ParameterizedTest
+ @MethodSource("nonSingleDistributions")
+ public void testInsert(IgniteDistribution distribution) throws Exception {
+ IgniteTable test1 = newTestTable("TEST1", distribution);
+
+ IgniteSchema schema = createSchema(test1);
+
+ assertPlan("INSERT INTO TEST1 (C1, C2) VALUES(1, 2)", schema,
+ nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+ .and(e ->
e.distribution().equals(IgniteDistributions.single())))
+
.and(nodeOrAnyChild(isInstanceOf(IgniteTableModify.class))
+
.and(hasChildThat(isInstanceOf(IgniteExchange.class).and(e ->
distribution.equals(e.distribution())))))
+ );
+ }
+
+ private static Stream<IgniteDistribution> nonSingleDistributions() {
+ return distributions().filter(d ->
!IgniteDistributions.single().equals(d));
+ }
+
+ /**
+ * Test for INSERT .. FROM SELECT when tables has different distributions.
+ */
+ @ParameterizedTest
+ @MethodSource("distributions")
+ public void testInsertSelectFrom(IgniteDistribution distribution) throws
Exception {
+ IgniteDistribution anotherDistribution =
IgniteDistributions.affinity(1, new UUID(1, 0), "0");
+
+ IgniteTable test1 = newTestTable("TEST1", distribution);
+ IgniteTable test2 = newTestTable("TEST2", anotherDistribution);
+
+ IgniteSchema schema = createSchema(test1, test2);
+
+ assertPlan("INSERT INTO TEST1 (C1, C2) SELECT C1, C2 FROM TEST2",
schema,
+ nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+ .and(e ->
e.distribution().equals(IgniteDistributions.single())))
+
.and(nodeOrAnyChild(isInstanceOf(IgniteTableModify.class))
+
.and(hasChildThat(isInstanceOf(IgniteExchange.class).and(e ->
distribution.equals(e.distribution())))))
+ );
+ }
+
+ /**
+ * Test for INSERT .. FROM SELECT when tables has the same distribution.
+ */
+ @ParameterizedTest
+ @MethodSource("distributions")
+ public void testInsertSelectFromSameDistribution(IgniteDistribution
distribution) throws Exception {
+ IgniteTable test1 = newTestTable("TEST1", distribution);
+ IgniteTable test2 = newTestTable("TEST2", distribution);
+
+ IgniteSchema schema = createSchema(test1, test2);
+
+ // there should be no exchanges.
+ assertPlan("INSERT INTO TEST1 (C1, C2) SELECT C1, C2 FROM TEST2",
schema,
+ nodeOrAnyChild(isInstanceOf(IgniteTableModify.class))
+ .and(hasChildThat(isInstanceOf(IgniteTableScan.class)))
+ );
+ }
+
+ /**
+ * Test for UPDATE when table has a single distribution.
+ */
+ @Test
+ public void testUpdateOfSingleDistributedTable() throws Exception {
+ IgniteTable test1 = newTestTable("TEST1",
IgniteDistributions.single());
+ IgniteSchema schema = createSchema(test1);
+
+ // There should be no exchanges and other operations.
+ assertPlan("UPDATE TEST1 SET C2 = C2 + 1", schema,
+
isInstanceOf(IgniteTableModify.class).and(input(isInstanceOf(IgniteTableScan.class))));
+ }
+
+ /**
+ * Test for UPDATE when table has non single distribution.
+ */
+ @ParameterizedTest
+ @MethodSource("nonSingleDistributions")
+ public void testUpdate(IgniteDistribution distribution) throws Exception {
+ IgniteTable test1 = newTestTable("TEST1", distribution);
+
+ IgniteSchema schema = createSchema(test1);
+
+ assertPlan("UPDATE TEST1 SET C2 = C2 + 1", schema,
+ nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+ .and(e ->
e.distribution().equals(IgniteDistributions.single())))
+
.and(nodeOrAnyChild(isInstanceOf(IgniteTableModify.class))
+
.and(hasChildThat(isInstanceOf(IgniteTableScan.class))))
+ );
+ }
+
+ private static Stream<IgniteDistribution> distributions() {
+ return Stream.of(
+ IgniteDistributions.single(),
+ IgniteDistributions.hash(List.of(0, 1)),
+ IgniteDistributions.affinity(0, new UUID(1, 1), "0")
+ );
+ }
+
+ // Class name is fully-qualified because AbstractPlannerTest defines a
class with the same name.
+ private static org.apache.ignite.internal.sql.engine.framework.TestTable
newTestTable(
+ String tableName, IgniteDistribution distribution) {
+
+ return TestBuilders.table()
+ .name(tableName)
+ .addColumn("C1", NativeTypes.INT32)
+ .addColumn("C2", NativeTypes.INT32)
+ .distribution(distribution)
+ .build();
+ }
+}