This is an automated email from the ASF dual-hosted git repository.
zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0723a0fd3e IGNITE-23083 Sql. Avoid planner cache pollution with simple
insert queries (#4295)
0723a0fd3e is described below
commit 0723a0fd3e4ae34858fd05d450778fa601333a98
Author: Evgeniy Stanilovskiy <[email protected]>
AuthorDate: Wed Sep 4 08:21:53 2024 +0300
IGNITE-23083 Sql. Avoid planner cache pollution with simple insert queries
(#4295)
---
.../exec/mapping/largecluster/AbstractTarget.java | 2 +-
.../exec/mapping/smallcluster/AbstractTarget.java | 2 +-
.../sql/engine/prepare/PrepareServiceImpl.java | 93 +++++++++++++++++++---
.../mapping/ExecutionTargetFactorySelfTest.java | 4 +-
.../sql/engine/prepare/PrepareServiceImplTest.java | 43 +++++++++-
5 files changed, 128 insertions(+), 16 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java
index dbbb862e37..1a4ce330b4 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java
@@ -197,7 +197,7 @@ abstract class AbstractTarget implements ExecutionTarget {
static ExecutionTarget colocate(PartitionedTarget partitioned,
PartitionedTarget otherPartitioned) throws ColocationMappingException {
if (partitioned.partitionsNodes.length !=
otherPartitioned.partitionsNodes.length) {
- throw new ColocationMappingException("Partitioned targets with mot
matching numbers of partitioned are not colocated");
+ throw new ColocationMappingException("Partitioned targets with not
matching numbers of partitions are not colocated");
}
boolean changed = false;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
index 1783adaa92..b70785fc9b 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
@@ -188,7 +188,7 @@ abstract class AbstractTarget implements ExecutionTarget {
static ExecutionTarget colocate(PartitionedTarget partitioned,
PartitionedTarget otherPartitioned) throws ColocationMappingException {
if (partitioned.partitionsNodes.length !=
otherPartitioned.partitionsNodes.length) {
- throw new ColocationMappingException("Partitioned targets with mot
matching numbers of partitioned are not colocated");
+ throw new ColocationMappingException("Partitioned targets with not
matching numbers of partitions are not colocated");
}
boolean finalised = true;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
index b16c2634f0..690c23d767 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
@@ -36,12 +36,14 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Function;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlDdl;
import org.apache.calcite.sql.SqlExplain;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.tools.Frameworks;
@@ -396,7 +398,7 @@ public class PrepareServiceImpl implements PrepareService {
SqlNode validatedNode = validated.sqlNode();
- IgniteRel optimizedRel = doOptimize(ctx, validatedNode,
planner, key);
+ IgniteRel optimizedRel = doOptimize(ctx, validatedNode,
planner, () -> cache.invalidate(key));
QueryPlan fastPlan = tryOptimizeFast(stmt, ctx);
ResultSetMetadata resultSetMetadata =
resultSetMetadata(validated.dataType(), validated.origins(),
validated.aliases());
@@ -420,7 +422,7 @@ public class PrepareServiceImpl implements PrepareService {
return plan;
}, planningPool));
- return planFut.thenApply(Function.identity());
+ return planFut;
});
}
@@ -428,6 +430,67 @@ public class PrepareServiceImpl implements PrepareService {
return new PlanId(prepareServiceId, planIdGen.getAndIncrement());
}
+ private static boolean simpleInsert(SqlNode node) {
+ if (!(node instanceof SqlInsert)) {
+ return false;
+ }
+
+ SqlInsert insert = (SqlInsert) node;
+
+ SqlNode sourceNode = insert.getSource();
+
+ if (!(sourceNode instanceof SqlBasicCall) || insert.isUpsert() ||
sourceNode.getKind() != SqlKind.VALUES) {
+ return false;
+ } else {
+ for (SqlNode op : ((SqlBasicCall) sourceNode).getOperandList()) {
+ if (!(op instanceof SqlBasicCall)) {
+ return false;
+ }
+
+ SqlBasicCall opCall = (SqlBasicCall) op;
+ for (SqlNode op0 : opCall.getOperandList()) {
+ if (op0.getKind() != SqlKind.LITERAL) {
+ return false;
+ }
+ }
+ }
+ }
+
+ return true;
+ }
+
+ /** Prepare plan in current thread, applicable for simple insert queries,
cache plan not involved. */
+ CompletableFuture<QueryPlan> prepareDmlOpt(SqlNode sqlNode,
PlanningContext ctx, String originalQuery) {
+ assert single(sqlNode);
+
+ // Validate
+ IgnitePlanner planner = ctx.planner();
+ SqlNode validatedNode = planner.validate(sqlNode);
+
+ IgniteRel optimizedRel = doOptimize(ctx, validatedNode, planner, null);
+
+ // Get parameter metadata.
+ RelDataType parameterRowType = planner.getParameterRowType();
+ ParameterMetadata parameterMetadata =
createParameterMetadata(parameterRowType);
+
+ ExplainablePlan plan;
+ if (optimizedRel instanceof IgniteKeyValueModify) {
+ plan = new KeyValueModifyPlan(
+ nextPlanId(), ctx.catalogVersion(), (IgniteKeyValueModify)
optimizedRel, DML_METADATA, parameterMetadata
+ );
+ } else {
+ plan = new MultiStepPlan(
+ nextPlanId(), SqlQueryType.DML, optimizedRel,
DML_METADATA, parameterMetadata, ctx.catalogVersion(), null
+ );
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Plan prepared: \n{}\n\n{}", originalQuery,
plan.explain());
+ }
+
+ return CompletableFuture.completedFuture(plan);
+ }
+
private CompletableFuture<QueryPlan> prepareDml(ParsedResult parsedResult,
PlanningContext ctx) {
// If a caller passes all the parameters, then get parameter types and
check to see whether a plan future already exists.
CompletableFuture<QueryPlan> f =
getPlanIfParameterHaveValues(parsedResult, ctx);
@@ -435,12 +498,18 @@ public class PrepareServiceImpl implements PrepareService
{
return f;
}
- CompletableFuture<ValidStatement<SqlNode>> validFut =
CompletableFuture.supplyAsync(() -> {
- IgnitePlanner planner = ctx.planner();
+ SqlNode sqlNode = parsedResult.parsedTree();
- SqlNode sqlNode = parsedResult.parsedTree();
+ assert single(sqlNode);
- assert single(sqlNode);
+ boolean dmlSimplePlan = simpleInsert(sqlNode);
+
+ if (dmlSimplePlan) {
+ return prepareDmlOpt(sqlNode, ctx, parsedResult.originalQuery());
+ }
+
+ CompletableFuture<ValidStatement<SqlNode>> validFut =
CompletableFuture.supplyAsync(() -> {
+ IgnitePlanner planner = ctx.planner();
// Validate
SqlNode validatedNode = planner.validate(sqlNode);
@@ -464,7 +533,7 @@ public class PrepareServiceImpl implements PrepareService {
SqlNode validatedNode = stmt.value;
ParameterMetadata parameterMetadata = stmt.parameterMetadata;
- IgniteRel optimizedRel = doOptimize(ctx, validatedNode,
planner, key);
+ IgniteRel optimizedRel = doOptimize(ctx, validatedNode,
planner, () -> cache.invalidate(key));
int catalogVersion = ctx.catalogVersion();
@@ -486,7 +555,7 @@ public class PrepareServiceImpl implements PrepareService {
return plan;
}, planningPool));
- return planFut.thenApply(Function.identity());
+ return planFut;
});
}
@@ -668,7 +737,7 @@ public class PrepareServiceImpl implements PrepareService {
);
}
- private IgniteRel doOptimize(PlanningContext ctx, SqlNode validatedNode,
IgnitePlanner planner, CacheKey key) {
+ private IgniteRel doOptimize(PlanningContext ctx, SqlNode validatedNode,
IgnitePlanner planner, @Nullable Runnable onTimeoutAction) {
// Convert to Relational operators graph
IgniteRel igniteRel;
try {
@@ -678,7 +747,9 @@ public class PrepareServiceImpl implements PrepareService {
// Otherwise the cache will keep a plan that can not be used
anymore,
// and we should allow another planning attempt with increased
timeout.
if (ctx.timeouted()) {
- cache.invalidate(key);
+ if (onTimeoutAction != null) {
+ onTimeoutAction.run();
+ }
//noinspection ThrowInsideCatchBlockWhichIgnoresCaughtException
throw new SqlException(
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
index 2846d0f420..78da8ec257 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
@@ -195,9 +195,9 @@ public class ExecutionTargetFactorySelfTest {
equalTo(NODE_SET));
assertNotColocated(f.partitioned(assignmentFromPrimaries(NODE_SET)),
f.partitioned(shuffle(assignmentFromPrimaries(NODE_SET))));
assertNotColocated(f.partitioned(assignmentFromPrimaries(NODE_SUBSET)),
f.partitioned(assignmentFromPrimaries(NODE_SET)),
- "Partitioned targets with mot matching numbers of partitioned
are not colocated");
+ "Partitioned targets with not matching numbers of partitions
are not colocated");
assertNotColocated(f.partitioned(assignmentFromPrimaries(NODE_SET)),
f.partitioned(assignmentFromPrimaries(NODE_SUBSET)),
- "Partitioned targets with mot matching numbers of partitioned
are not colocated");
+ "Partitioned targets with not matching numbers of partitions
are not colocated");
assertNotColocated(f.partitioned(singleWithToken("node1", 1)),
f.partitioned(singleWithToken("node1", 2)),
"Partitioned targets have different terms");
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
index 5374e89f7e..0c671b1529 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
@@ -30,6 +30,9 @@ import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import java.time.ZoneId;
import java.util.ArrayList;
@@ -95,6 +98,44 @@ public class PrepareServiceImplTest extends
BaseIgniteAbstractTest {
scheduler.shutdownNow();
}
+ @ParameterizedTest
+ @MethodSource("insertInvariants")
+ public void testOptimizedExecutionPath(String insertStatement, boolean
applicable) {
+ PrepareService service = createPlannerService();
+
+ PrepareServiceImpl prepare = (PrepareServiceImpl) spy(service);
+
+ await(prepare.prepareAsync(
+ parse(insertStatement),
+ createContext()
+ ));
+
+ if (applicable) {
+ verify(prepare).prepareDmlOpt(any(), any(), any());
+ } else {
+ verify(prepare, never()).prepareDmlOpt(any(), any(), any());
+ }
+ }
+
+ private static Stream<Arguments> insertInvariants() {
+ return Stream.of(
+ Arguments.of("INSERT INTO t VALUES (1, 2)", true),
+ Arguments.of("INSERT INTO t VALUES (1, 2), (3, 4)", true),
+ Arguments.of("INSERT INTO t(A, C) VALUES (1, 2)", true),
+ Arguments.of("INSERT INTO t(C, A) VALUES (2, 1)", true),
+ Arguments.of("INSERT INTO t(C, A) VALUES ('2'::smallint, 1)",
false),
+ Arguments.of("INSERT INTO t(C, A) VALUES (2, 1), (3, ?)",
false),
+ Arguments.of("INSERT INTO t(C, A) SELECT t.C, t.A from t",
false),
+ Arguments.of("INSERT INTO t VALUES (1, OCTET_LENGTH('TEST'))",
false),
+ Arguments.of("INSERT INTO t VALUES (1, ?)", false),
+ Arguments.of("INSERT INTO t VALUES (?, 2)", false),
+ Arguments.of("INSERT INTO t VALUES (?, ?)", false),
+ Arguments.of("INSERT INTO t VALUES ((SELECT 1), 2)", false),
+ Arguments.of("INSERT INTO t SELECT t1.c1, t1.c2 FROM (SELECT
1, 2) as t1(c1, c2)", false),
+ Arguments.of("INSERT INTO t SELECT t1.c1, t1.c2 FROM (SELECT
?::int, ?::int) as t1(c1, c2)", false)
+ );
+ }
+
@Test
public void prepareServiceReturnsExistingPlanForExplain() {
PrepareService service = createPlannerService();
@@ -251,7 +292,7 @@ public class PrepareServiceImplTest extends
BaseIgniteAbstractTest {
.build();
// Create a proxy.
- IgniteTable spyTable = Mockito.spy(igniteTable);
+ IgniteTable spyTable = spy(igniteTable);
// Override and slowdown a method, which is called by Planner, to
emulate long planning.
Mockito.doAnswer(inv -> {