This is an automated email from the ASF dual-hosted git repository.
jooger pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new eb7b4e58b1 IGNITE-22713: Sql. Move timeout scheduling code to
ExecutionContext (#4073)
eb7b4e58b1 is described below
commit eb7b4e58b11aa80d728fc6c10ba27a46b8fb1502
Author: Max Zhuravkov <[email protected]>
AuthorDate: Fri Jul 12 10:03:24 2024 +0300
IGNITE-22713: Sql. Move timeout scheduling code to ExecutionContext (#4073)
---
.../internal/sql/engine/exec/ExecutionContext.java | 19 ++-
.../sql/engine/prepare/KeyValueGetPlan.java | 11 +-
.../sql/engine/prepare/KeyValueModifyPlan.java | 11 +-
.../sql/engine/exec/ExecutionServiceImplTest.java | 177 ++++++---------------
4 files changed, 68 insertions(+), 150 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
index 02cf1e7f42..af1044683c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
@@ -30,6 +30,7 @@ import java.util.Objects;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.calcite.DataContext;
@@ -39,6 +40,7 @@ import
org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.RunnableX;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.sql.engine.QueryCancelledException;
import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactory;
import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactoryImpl;
import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
@@ -387,8 +389,21 @@ public class ExecutionContext<RowT> implements DataContext
{
return cancelFlag.get();
}
- public @Nullable CompletableFuture<Void> timeoutFuture() {
- return timeoutFut;
+ /**
+ * Schedules a timeout task that is going to complete the given future
exceptionally with a {@link QueryCancelledException},
+ * if timeout is set of this context.
+ */
+ public void scheduleTimeout(CompletableFuture<?> fut) {
+ if (timeoutFut == null) {
+ return;
+ }
+
+ Executor executor = task -> execute(task::run, (err) -> {});
+
+ timeoutFut.thenAcceptAsync(
+ (r) -> fut.completeExceptionally(new
QueryCancelledException(QueryCancelledException.TIMEOUT_MSG)),
+ executor
+ );
}
/** Creates {@link PartitionProvider} for the given source table. */
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
index 8beffb1e22..42eff2814f 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
@@ -35,7 +35,6 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.InternalSqlRowImpl;
-import org.apache.ignite.internal.sql.engine.QueryCancelledException;
import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.sql.engine.exec.ExecutablePlan;
@@ -206,15 +205,7 @@ public class KeyValueGetPlan implements ExplainablePlan,
ExecutablePlan {
result.whenCompleteAsync((res, err) ->
firstPageReadyCallback.onPrefetchComplete(err), executor);
}
- CompletableFuture<Void> timeoutFut = ctx.timeoutFuture();
- if (timeoutFut != null) {
- Executor executor = task -> ctx.execute(task::run, (err) -> {});
-
- timeoutFut.thenAcceptAsync(
- (r) -> result.completeExceptionally(new
QueryCancelledException(QueryCancelledException.TIMEOUT_MSG)),
- executor
- );
- }
+ ctx.scheduleTimeout(result);
return new AsyncWrapper<>(result, Runnable::run);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java
index 45c2cbddc0..0c647a0549 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java
@@ -27,7 +27,6 @@ import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.InternalSqlRowSingleLong;
-import org.apache.ignite.internal.sql.engine.QueryCancelledException;
import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.sql.engine.exec.ExecutablePlan;
@@ -142,15 +141,7 @@ public class KeyValueModifyPlan implements
ExplainablePlan, ExecutablePlan {
result.whenCompleteAsync((res, err) ->
firstPageReadyCallback.onPrefetchComplete(err), executor);
}
- CompletableFuture<Void> timeoutFut = ctx.timeoutFuture();
- if (timeoutFut != null) {
- Executor executor = task -> ctx.execute(task::run, (err) -> {});
-
- timeoutFut.thenAcceptAsync(
- (r) -> result.completeExceptionally(new
QueryCancelledException(QueryCancelledException.TIMEOUT_MSG)),
- executor
- );
- }
+ ctx.scheduleTimeout(result);
return new AsyncWrapper<>(result, Runnable::run);
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 2a7dc43654..322b947725 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -206,13 +206,15 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
@BeforeEach
public void init() {
+ DdlSqlToCommandConverter converter = new DdlSqlToCommandConverter();
+
testCluster = new TestCluster();
executionServices =
nodeNames.stream().map(this::create).collect(Collectors.toList());
prepareService = new PrepareServiceImpl(
"test",
0,
CaffeineCacheFactory.INSTANCE,
- new DdlSqlToCommandConverter(),
+ converter,
PLANNING_TIMEOUT,
PLANNING_THREAD_COUNT,
new MetricManagerImpl(),
@@ -898,39 +900,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
SqlOperationContext planCtx = operationContext(null).build();
QueryPlan plan = prepare("SELECT * FROM test_tbl", planCtx);
- int attempts = 10;
-
- for (int k = 0; k < attempts; k++) {
- QueryCancel cancel = new QueryCancel();
- CompletableFuture<Void> timeoutFut = cancel.setTimeout(scheduler,
deadlineMillis);
-
- SqlOperationContext execCtx = operationContext(null)
- .cancel(cancel)
- .build();
-
- AsyncCursor<InternalSqlRow> cursor;
- try {
- cursor = execService.executePlan(plan, execCtx);
- } catch (QueryCancelledException e) {
- // This might happen when initialization took longer than a
time out,
- // Retry to get a proper error.
- continue;
- }
-
- CompletableFuture<?> batchFut = cursor.requestNextAsync(1);
-
- timeoutFut.join();
-
- IgniteTestUtils.assertThrowsWithCause(
- batchFut::join,
- SqlException.class,
- "Query timeout"
- );
-
- return;
- }
-
- fail("Failed to get query timeout error");
+ awaitExecutionTimeout(execService, plan, deadlineMillis,
SqlException.class);
}
@Test
@@ -974,37 +944,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
Duration delay = Duration.of(deadlineMillis * 2, ChronoUnit.MILLIS);
tableRegistry.setGetTableDelay(delay);
- int attempts = 10;
-
- for (int k = 0; k < attempts; k++) {
- QueryCancel queryCancel = new QueryCancel();
- CompletableFuture<Void> timeoutFut =
queryCancel.setTimeout(scheduler, deadlineMillis);
-
- SqlOperationContext execCtx = operationContext(null)
- .cancel(queryCancel)
- .build();
-
- AsyncCursor<InternalSqlRow> cursor;
- try {
- cursor = execService.executePlan(plan, execCtx);
- } catch (QueryCancelledException e) {
- continue;
- }
-
- CompletableFuture<?> batchFut = cursor.requestNextAsync(1);
-
- timeoutFut.join();
-
- IgniteTestUtils.assertThrowsWithCause(
- batchFut::join,
- SqlException.class,
- "Query timeout"
- );
-
- return;
- }
-
- fail("Failed to get query timeout error");
+ awaitExecutionTimeout(execService, plan, deadlineMillis,
SqlException.class);
}
@Test
@@ -1023,37 +963,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
Duration delay = Duration.of(deadlineMillis * 2, ChronoUnit.MILLIS);
tableRegistry.setGetTableDelay(delay);
- int attempts = 10;
-
- for (int k = 0; k < attempts; k++) {
- QueryCancel queryCancel = new QueryCancel();
- CompletableFuture<Void> timeoutFut =
queryCancel.setTimeout(scheduler, deadlineMillis);
-
- SqlOperationContext execCtx = operationContext(null)
- .cancel(queryCancel)
- .build();
-
- AsyncCursor<InternalSqlRow> cursor;
- try {
- cursor = execService.executePlan(plan, execCtx);
- } catch (QueryCancelledException e) {
- continue;
- }
-
- CompletableFuture<?> batchFut = cursor.requestNextAsync(1);
-
- timeoutFut.join();
-
- IgniteTestUtils.assertThrowsWithCause(
- batchFut::join,
- SqlException.class,
- "Query timeout"
- );
-
- return;
- }
-
- fail("Failed to get query timeout error");
+ awaitExecutionTimeout(execService, plan, 500, SqlException.class);
}
@Test
@@ -1076,38 +986,8 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
DdlCommandHandler ddlCommandHandler = execService.ddlCommandHandler();
when(ddlCommandHandler.handle(any(CatalogCommand.class))).thenReturn(new
CompletableFuture<>());
- int attempts = 10;
-
- for (int k = 0; k < attempts; k++) {
- QueryCancel queryCancel = new QueryCancel();
- CompletableFuture<Void> timeoutFut =
queryCancel.setTimeout(scheduler, deadlineMillis);
-
- SqlOperationContext execCtx = operationContext(null)
- .cancel(queryCancel)
- .build();
-
- AsyncCursor<InternalSqlRow> cursor;
- try {
- cursor = execService.executePlan(plan, execCtx);
- } catch (QueryCancelledException e) {
- continue;
- }
-
- CompletableFuture<?> batchFut = cursor.requestNextAsync(1);
-
- timeoutFut.join();
-
- // DDL handler does convert exceptions to SqlException, so we get
QueryCancelledException here.
- IgniteTestUtils.assertThrowsWithCause(
- batchFut::join,
- QueryCancelledException.class,
- "Query timeout"
- );
-
- return;
- }
-
- fail("Failed to get query timeout error");
+ // DDL handler does convert exceptions to SqlException, so we get
QueryCancelledException here.
+ awaitExecutionTimeout(execService, plan, 500,
QueryCancelledException.class);
}
/** Creates an execution service instance for the node with given
consistent id. */
@@ -1262,6 +1142,47 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
}
}
+ private void awaitExecutionTimeout(
+ ExecutionService execService,
+ QueryPlan plan,
+ long deadlineMillis,
+ Class<? extends RuntimeException> errorClass
+ ) {
+ int attempts = 10;
+
+ for (int k = 0; k < attempts; k++) {
+ QueryCancel queryCancel = new QueryCancel();
+ CompletableFuture<Void> timeoutFut =
queryCancel.setTimeout(scheduler, deadlineMillis);
+
+ SqlOperationContext execCtx = operationContext(null)
+ .cancel(queryCancel)
+ .build();
+
+ AsyncCursor<InternalSqlRow> cursor;
+ try {
+ cursor = execService.executePlan(plan, execCtx);
+ } catch (QueryCancelledException e) {
+ // This might happen when initialization took longer than a
time out,
+ // Retry to get a proper error.
+ continue;
+ }
+
+ CompletableFuture<?> batchFut = cursor.requestNextAsync(1);
+
+ timeoutFut.join();
+
+ IgniteTestUtils.assertThrowsWithCause(
+ batchFut::join,
+ errorClass,
+ "Query timeout"
+ );
+
+ return;
+ }
+
+ fail("Failed to get query timeout error");
+ }
+
static class TestCluster {
private final Map<String, TestNode> nodes = new ConcurrentHashMap<>();