This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 0723a0fd3e IGNITE-23083 Sql. Avoid planner cache pollution with simple 
insert queries (#4295)
0723a0fd3e is described below

commit 0723a0fd3e4ae34858fd05d450778fa601333a98
Author: Evgeniy Stanilovskiy <[email protected]>
AuthorDate: Wed Sep 4 08:21:53 2024 +0300

    IGNITE-23083 Sql. Avoid planner cache pollution with simple insert queries 
(#4295)
---
 .../exec/mapping/largecluster/AbstractTarget.java  |  2 +-
 .../exec/mapping/smallcluster/AbstractTarget.java  |  2 +-
 .../sql/engine/prepare/PrepareServiceImpl.java     | 93 +++++++++++++++++++---
 .../mapping/ExecutionTargetFactorySelfTest.java    |  4 +-
 .../sql/engine/prepare/PrepareServiceImplTest.java | 43 +++++++++-
 5 files changed, 128 insertions(+), 16 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java
index dbbb862e37..1a4ce330b4 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java
@@ -197,7 +197,7 @@ abstract class AbstractTarget implements ExecutionTarget {
 
     static ExecutionTarget colocate(PartitionedTarget partitioned, 
PartitionedTarget otherPartitioned) throws ColocationMappingException {
         if (partitioned.partitionsNodes.length != 
otherPartitioned.partitionsNodes.length) {
-            throw new ColocationMappingException("Partitioned targets with mot 
matching numbers of partitioned are not colocated");
+            throw new ColocationMappingException("Partitioned targets with not 
matching numbers of partitions are not colocated");
         }
 
         boolean changed = false;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
index 1783adaa92..b70785fc9b 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
@@ -188,7 +188,7 @@ abstract class AbstractTarget implements ExecutionTarget {
 
     static ExecutionTarget colocate(PartitionedTarget partitioned, 
PartitionedTarget otherPartitioned) throws ColocationMappingException {
         if (partitioned.partitionsNodes.length != 
otherPartitioned.partitionsNodes.length) {
-            throw new ColocationMappingException("Partitioned targets with mot 
matching numbers of partitioned are not colocated");
+            throw new ColocationMappingException("Partitioned targets with not 
matching numbers of partitions are not colocated");
         }
 
         boolean finalised = true;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
index b16c2634f0..690c23d767 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
@@ -36,12 +36,14 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Function;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlBasicCall;
 import org.apache.calcite.sql.SqlDdl;
 import org.apache.calcite.sql.SqlExplain;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.tools.Frameworks;
@@ -396,7 +398,7 @@ public class PrepareServiceImpl implements PrepareService {
 
                 SqlNode validatedNode = validated.sqlNode();
 
-                IgniteRel optimizedRel = doOptimize(ctx, validatedNode, 
planner, key);
+                IgniteRel optimizedRel = doOptimize(ctx, validatedNode, 
planner, () -> cache.invalidate(key));
                 QueryPlan fastPlan = tryOptimizeFast(stmt, ctx);
 
                 ResultSetMetadata resultSetMetadata = 
resultSetMetadata(validated.dataType(), validated.origins(), 
validated.aliases());
@@ -420,7 +422,7 @@ public class PrepareServiceImpl implements PrepareService {
                 return plan;
             }, planningPool));
 
-            return planFut.thenApply(Function.identity());
+            return planFut;
         });
     }
 
@@ -428,6 +430,67 @@ public class PrepareServiceImpl implements PrepareService {
         return new PlanId(prepareServiceId, planIdGen.getAndIncrement());
     }
 
+    private static boolean simpleInsert(SqlNode node) {
+        if (!(node instanceof SqlInsert)) {
+            return false;
+        }
+
+        SqlInsert insert = (SqlInsert) node;
+
+        SqlNode sourceNode = insert.getSource();
+
+        if (!(sourceNode instanceof SqlBasicCall) || insert.isUpsert() || 
sourceNode.getKind() != SqlKind.VALUES) {
+            return false;
+        } else {
+            for (SqlNode op : ((SqlBasicCall) sourceNode).getOperandList()) {
+                if (!(op instanceof SqlBasicCall)) {
+                    return false;
+                }
+
+                SqlBasicCall opCall = (SqlBasicCall) op;
+                for (SqlNode op0 : opCall.getOperandList()) {
+                    if (op0.getKind() != SqlKind.LITERAL) {
+                        return false;
+                    }
+                }
+            }
+        }
+
+        return true;
+    }
+
+    /** Prepare plan in current thread, applicable for simple insert queries, 
cache plan not involved. */
+    CompletableFuture<QueryPlan> prepareDmlOpt(SqlNode sqlNode, 
PlanningContext ctx, String originalQuery) {
+        assert single(sqlNode);
+
+        // Validate
+        IgnitePlanner planner = ctx.planner();
+        SqlNode validatedNode = planner.validate(sqlNode);
+
+        IgniteRel optimizedRel = doOptimize(ctx, validatedNode, planner, null);
+
+        // Get parameter metadata.
+        RelDataType parameterRowType = planner.getParameterRowType();
+        ParameterMetadata parameterMetadata = 
createParameterMetadata(parameterRowType);
+
+        ExplainablePlan plan;
+        if (optimizedRel instanceof IgniteKeyValueModify) {
+            plan = new KeyValueModifyPlan(
+                    nextPlanId(), ctx.catalogVersion(), (IgniteKeyValueModify) 
optimizedRel, DML_METADATA, parameterMetadata
+            );
+        } else {
+            plan = new MultiStepPlan(
+                    nextPlanId(), SqlQueryType.DML, optimizedRel, 
DML_METADATA, parameterMetadata, ctx.catalogVersion(), null
+            );
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Plan prepared: \n{}\n\n{}", originalQuery, 
plan.explain());
+        }
+
+        return CompletableFuture.completedFuture(plan);
+    }
+
     private CompletableFuture<QueryPlan> prepareDml(ParsedResult parsedResult, 
PlanningContext ctx) {
         // If a caller passes all the parameters, then get parameter types and 
check to see whether a plan future already exists.
         CompletableFuture<QueryPlan> f = 
getPlanIfParameterHaveValues(parsedResult, ctx);
@@ -435,12 +498,18 @@ public class PrepareServiceImpl implements PrepareService 
{
             return f;
         }
 
-        CompletableFuture<ValidStatement<SqlNode>> validFut = 
CompletableFuture.supplyAsync(() -> {
-            IgnitePlanner planner = ctx.planner();
+        SqlNode sqlNode = parsedResult.parsedTree();
 
-            SqlNode sqlNode = parsedResult.parsedTree();
+        assert single(sqlNode);
 
-            assert single(sqlNode);
+        boolean dmlSimplePlan = simpleInsert(sqlNode);
+
+        if (dmlSimplePlan) {
+            return prepareDmlOpt(sqlNode, ctx, parsedResult.originalQuery());
+        }
+
+        CompletableFuture<ValidStatement<SqlNode>> validFut = 
CompletableFuture.supplyAsync(() -> {
+            IgnitePlanner planner = ctx.planner();
 
             // Validate
             SqlNode validatedNode = planner.validate(sqlNode);
@@ -464,7 +533,7 @@ public class PrepareServiceImpl implements PrepareService {
                 SqlNode validatedNode = stmt.value;
                 ParameterMetadata parameterMetadata = stmt.parameterMetadata;
 
-                IgniteRel optimizedRel = doOptimize(ctx, validatedNode, 
planner, key);
+                IgniteRel optimizedRel = doOptimize(ctx, validatedNode, 
planner, () -> cache.invalidate(key));
 
                 int catalogVersion = ctx.catalogVersion();
 
@@ -486,7 +555,7 @@ public class PrepareServiceImpl implements PrepareService {
                 return plan;
             }, planningPool));
 
-            return planFut.thenApply(Function.identity());
+            return planFut;
         });
     }
 
@@ -668,7 +737,7 @@ public class PrepareServiceImpl implements PrepareService {
         );
     }
 
-    private IgniteRel doOptimize(PlanningContext ctx, SqlNode validatedNode, 
IgnitePlanner planner, CacheKey key) {
+    private IgniteRel doOptimize(PlanningContext ctx, SqlNode validatedNode, 
IgnitePlanner planner, @Nullable Runnable onTimeoutAction) {
         // Convert to Relational operators graph
         IgniteRel igniteRel;
         try {
@@ -678,7 +747,9 @@ public class PrepareServiceImpl implements PrepareService {
             // Otherwise the cache will keep a plan that can not be used 
anymore,
             // and we should allow another planning attempt with increased 
timeout.
             if (ctx.timeouted()) {
-                cache.invalidate(key);
+                if (onTimeoutAction != null) {
+                    onTimeoutAction.run();
+                }
 
                 //noinspection ThrowInsideCatchBlockWhichIgnoresCaughtException
                 throw new SqlException(
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
index 2846d0f420..78da8ec257 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
@@ -195,9 +195,9 @@ public class ExecutionTargetFactorySelfTest {
                 equalTo(NODE_SET));
         assertNotColocated(f.partitioned(assignmentFromPrimaries(NODE_SET)), 
f.partitioned(shuffle(assignmentFromPrimaries(NODE_SET))));
         
assertNotColocated(f.partitioned(assignmentFromPrimaries(NODE_SUBSET)), 
f.partitioned(assignmentFromPrimaries(NODE_SET)),
-                "Partitioned targets with mot matching numbers of partitioned 
are not colocated");
+                "Partitioned targets with not matching numbers of partitions 
are not colocated");
         assertNotColocated(f.partitioned(assignmentFromPrimaries(NODE_SET)), 
f.partitioned(assignmentFromPrimaries(NODE_SUBSET)),
-                "Partitioned targets with mot matching numbers of partitioned 
are not colocated");
+                "Partitioned targets with not matching numbers of partitions 
are not colocated");
 
         assertNotColocated(f.partitioned(singleWithToken("node1", 1)), 
f.partitioned(singleWithToken("node1", 2)),
                 "Partitioned targets have different terms");
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
index 5374e89f7e..0c671b1529 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
@@ -30,6 +30,9 @@ import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
 import java.time.ZoneId;
 import java.util.ArrayList;
@@ -95,6 +98,44 @@ public class PrepareServiceImplTest extends 
BaseIgniteAbstractTest {
         scheduler.shutdownNow();
     }
 
+    @ParameterizedTest
+    @MethodSource("insertInvariants")
+    public void testOptimizedExecutionPath(String insertStatement, boolean 
applicable) {
+        PrepareService service = createPlannerService();
+
+        PrepareServiceImpl prepare = (PrepareServiceImpl) spy(service);
+
+        await(prepare.prepareAsync(
+                parse(insertStatement),
+                createContext()
+        ));
+
+        if (applicable) {
+            verify(prepare).prepareDmlOpt(any(), any(), any());
+        } else {
+            verify(prepare, never()).prepareDmlOpt(any(), any(), any());
+        }
+    }
+
+    private static Stream<Arguments> insertInvariants() {
+        return Stream.of(
+                Arguments.of("INSERT INTO t VALUES (1, 2)", true),
+                Arguments.of("INSERT INTO t VALUES (1, 2), (3, 4)", true),
+                Arguments.of("INSERT INTO t(A, C) VALUES (1, 2)", true),
+                Arguments.of("INSERT INTO t(C, A) VALUES (2, 1)", true),
+                Arguments.of("INSERT INTO t(C, A) VALUES ('2'::smallint, 1)", 
false),
+                Arguments.of("INSERT INTO t(C, A) VALUES (2, 1), (3, ?)", 
false),
+                Arguments.of("INSERT INTO t(C, A) SELECT t.C, t.A from t", 
false),
+                Arguments.of("INSERT INTO t VALUES (1, OCTET_LENGTH('TEST'))", 
false),
+                Arguments.of("INSERT INTO t VALUES (1, ?)", false),
+                Arguments.of("INSERT INTO t VALUES (?, 2)", false),
+                Arguments.of("INSERT INTO t VALUES (?, ?)", false),
+                Arguments.of("INSERT INTO t VALUES ((SELECT 1), 2)", false),
+                Arguments.of("INSERT INTO t SELECT t1.c1, t1.c2 FROM (SELECT 
1, 2) as t1(c1, c2)", false),
+                Arguments.of("INSERT INTO t SELECT t1.c1, t1.c2 FROM (SELECT 
?::int, ?::int) as t1(c1, c2)", false)
+        );
+    }
+
     @Test
     public void prepareServiceReturnsExistingPlanForExplain() {
         PrepareService service = createPlannerService();
@@ -251,7 +292,7 @@ public class PrepareServiceImplTest extends 
BaseIgniteAbstractTest {
                 .build();
 
         // Create a proxy.
-        IgniteTable spyTable = Mockito.spy(igniteTable);
+        IgniteTable spyTable = spy(igniteTable);
 
         // Override and slowdown a method, which is called by Planner, to 
emulate long planning.
         Mockito.doAnswer(inv -> {

Reply via email to