This is an automated email from the ASF dual-hosted git repository.

jooger pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new eb7b4e58b1 IGNITE-22713: Sql. Move timeout scheduling code to 
ExecutionContext (#4073)
eb7b4e58b1 is described below

commit eb7b4e58b11aa80d728fc6c10ba27a46b8fb1502
Author: Max Zhuravkov <[email protected]>
AuthorDate: Fri Jul 12 10:03:24 2024 +0300

    IGNITE-22713: Sql. Move timeout scheduling code to ExecutionContext (#4073)
---
 .../internal/sql/engine/exec/ExecutionContext.java |  19 ++-
 .../sql/engine/prepare/KeyValueGetPlan.java        |  11 +-
 .../sql/engine/prepare/KeyValueModifyPlan.java     |  11 +-
 .../sql/engine/exec/ExecutionServiceImplTest.java  | 177 ++++++---------------
 4 files changed, 68 insertions(+), 150 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
index 02cf1e7f42..af1044683c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
@@ -30,6 +30,7 @@ import java.util.Objects;
 import java.util.TimeZone;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import org.apache.calcite.DataContext;
@@ -39,6 +40,7 @@ import 
org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.RunnableX;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.sql.engine.QueryCancelledException;
 import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactory;
 import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactoryImpl;
 import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
@@ -387,8 +389,21 @@ public class ExecutionContext<RowT> implements DataContext 
{
         return cancelFlag.get();
     }
 
-    public @Nullable CompletableFuture<Void> timeoutFuture() {
-        return timeoutFut;
+    /**
+     * Schedules a timeout task that is going to complete the given future 
exceptionally with a {@link QueryCancelledException},
+     * if timeout is set of this context.
+     */
+    public void scheduleTimeout(CompletableFuture<?> fut) {
+        if (timeoutFut == null) {
+            return;
+        }
+
+        Executor executor = task -> execute(task::run, (err) -> {});
+
+        timeoutFut.thenAcceptAsync(
+                (r) -> fut.completeExceptionally(new 
QueryCancelledException(QueryCancelledException.TIMEOUT_MSG)),
+                executor
+        );
     }
 
     /** Creates {@link PartitionProvider} for the given source table. */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
index 8beffb1e22..42eff2814f 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
@@ -35,7 +35,6 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.sql.engine.InternalSqlRow;
 import org.apache.ignite.internal.sql.engine.InternalSqlRowImpl;
-import org.apache.ignite.internal.sql.engine.QueryCancelledException;
 import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
 import org.apache.ignite.internal.sql.engine.SqlQueryType;
 import org.apache.ignite.internal.sql.engine.exec.ExecutablePlan;
@@ -206,15 +205,7 @@ public class KeyValueGetPlan implements ExplainablePlan, 
ExecutablePlan {
             result.whenCompleteAsync((res, err) -> 
firstPageReadyCallback.onPrefetchComplete(err), executor);
         }
 
-        CompletableFuture<Void> timeoutFut = ctx.timeoutFuture();
-        if (timeoutFut != null) {
-            Executor executor = task -> ctx.execute(task::run, (err) -> {});
-
-            timeoutFut.thenAcceptAsync(
-                    (r) -> result.completeExceptionally(new 
QueryCancelledException(QueryCancelledException.TIMEOUT_MSG)),
-                    executor
-            );
-        }
+        ctx.scheduleTimeout(result);
 
         return new AsyncWrapper<>(result, Runnable::run);
     }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java
index 45c2cbddc0..0c647a0549 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java
@@ -27,7 +27,6 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.ignite.internal.sql.engine.InternalSqlRow;
 import org.apache.ignite.internal.sql.engine.InternalSqlRowSingleLong;
-import org.apache.ignite.internal.sql.engine.QueryCancelledException;
 import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
 import org.apache.ignite.internal.sql.engine.SqlQueryType;
 import org.apache.ignite.internal.sql.engine.exec.ExecutablePlan;
@@ -142,15 +141,7 @@ public class KeyValueModifyPlan implements 
ExplainablePlan, ExecutablePlan {
             result.whenCompleteAsync((res, err) -> 
firstPageReadyCallback.onPrefetchComplete(err), executor);
         }
 
-        CompletableFuture<Void> timeoutFut = ctx.timeoutFuture();
-        if (timeoutFut != null) {
-            Executor executor = task -> ctx.execute(task::run, (err) -> {});
-
-            timeoutFut.thenAcceptAsync(
-                    (r) -> result.completeExceptionally(new 
QueryCancelledException(QueryCancelledException.TIMEOUT_MSG)),
-                    executor
-            );
-        }
+        ctx.scheduleTimeout(result);
 
         return new AsyncWrapper<>(result, Runnable::run);
     }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 2a7dc43654..322b947725 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -206,13 +206,15 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
 
     @BeforeEach
     public void init() {
+        DdlSqlToCommandConverter converter = new DdlSqlToCommandConverter();
+
         testCluster = new TestCluster();
         executionServices = 
nodeNames.stream().map(this::create).collect(Collectors.toList());
         prepareService = new PrepareServiceImpl(
                 "test",
                 0,
                 CaffeineCacheFactory.INSTANCE,
-                new DdlSqlToCommandConverter(),
+                converter,
                 PLANNING_TIMEOUT,
                 PLANNING_THREAD_COUNT,
                 new MetricManagerImpl(),
@@ -898,39 +900,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
         SqlOperationContext planCtx = operationContext(null).build();
         QueryPlan plan = prepare("SELECT * FROM test_tbl", planCtx);
 
-        int attempts = 10;
-
-        for (int k = 0; k < attempts; k++) {
-            QueryCancel cancel = new QueryCancel();
-            CompletableFuture<Void> timeoutFut = cancel.setTimeout(scheduler, 
deadlineMillis);
-
-            SqlOperationContext execCtx = operationContext(null)
-                    .cancel(cancel)
-                    .build();
-
-            AsyncCursor<InternalSqlRow> cursor;
-            try {
-                cursor = execService.executePlan(plan, execCtx);
-            } catch (QueryCancelledException e) {
-                // This might happen when initialization took longer than a 
time out,
-                // Retry to get a proper error.
-                continue;
-            }
-
-            CompletableFuture<?> batchFut = cursor.requestNextAsync(1);
-
-            timeoutFut.join();
-
-            IgniteTestUtils.assertThrowsWithCause(
-                    batchFut::join,
-                    SqlException.class,
-                    "Query timeout"
-            );
-
-            return;
-        }
-
-        fail("Failed to get query timeout error");
+        awaitExecutionTimeout(execService, plan, deadlineMillis, 
SqlException.class);
     }
 
     @Test
@@ -974,37 +944,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
         Duration delay = Duration.of(deadlineMillis * 2, ChronoUnit.MILLIS);
         tableRegistry.setGetTableDelay(delay);
 
-        int attempts = 10;
-
-        for (int k = 0; k < attempts; k++) {
-            QueryCancel queryCancel = new QueryCancel();
-            CompletableFuture<Void> timeoutFut = 
queryCancel.setTimeout(scheduler, deadlineMillis);
-
-            SqlOperationContext execCtx = operationContext(null)
-                    .cancel(queryCancel)
-                    .build();
-
-            AsyncCursor<InternalSqlRow> cursor;
-            try {
-                cursor = execService.executePlan(plan, execCtx);
-            } catch (QueryCancelledException e) {
-                continue;
-            }
-
-            CompletableFuture<?> batchFut = cursor.requestNextAsync(1);
-
-            timeoutFut.join();
-
-            IgniteTestUtils.assertThrowsWithCause(
-                    batchFut::join,
-                    SqlException.class,
-                    "Query timeout"
-            );
-
-            return;
-        }
-
-        fail("Failed to get query timeout error");
+        awaitExecutionTimeout(execService, plan, deadlineMillis, 
SqlException.class);
     }
 
     @Test
@@ -1023,37 +963,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
         Duration delay = Duration.of(deadlineMillis * 2, ChronoUnit.MILLIS);
         tableRegistry.setGetTableDelay(delay);
 
-        int attempts = 10;
-
-        for (int k = 0; k < attempts; k++) {
-            QueryCancel queryCancel = new QueryCancel();
-            CompletableFuture<Void> timeoutFut = 
queryCancel.setTimeout(scheduler, deadlineMillis);
-
-            SqlOperationContext execCtx = operationContext(null)
-                    .cancel(queryCancel)
-                    .build();
-
-            AsyncCursor<InternalSqlRow> cursor;
-            try {
-                cursor = execService.executePlan(plan, execCtx);
-            } catch (QueryCancelledException e) {
-                continue;
-            }
-
-            CompletableFuture<?> batchFut = cursor.requestNextAsync(1);
-
-            timeoutFut.join();
-
-            IgniteTestUtils.assertThrowsWithCause(
-                    batchFut::join,
-                    SqlException.class,
-                    "Query timeout"
-            );
-
-            return;
-        }
-
-        fail("Failed to get query timeout error");
+        awaitExecutionTimeout(execService, plan, 500, SqlException.class);
     }
 
     @Test
@@ -1076,38 +986,8 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
         DdlCommandHandler ddlCommandHandler = execService.ddlCommandHandler();
         
when(ddlCommandHandler.handle(any(CatalogCommand.class))).thenReturn(new 
CompletableFuture<>());
 
-        int attempts = 10;
-
-        for (int k = 0; k < attempts; k++) {
-            QueryCancel queryCancel = new QueryCancel();
-            CompletableFuture<Void> timeoutFut = 
queryCancel.setTimeout(scheduler, deadlineMillis);
-
-            SqlOperationContext execCtx = operationContext(null)
-                    .cancel(queryCancel)
-                    .build();
-
-            AsyncCursor<InternalSqlRow> cursor;
-            try {
-                cursor = execService.executePlan(plan, execCtx);
-            } catch (QueryCancelledException e) {
-                continue;
-            }
-
-            CompletableFuture<?> batchFut = cursor.requestNextAsync(1);
-
-            timeoutFut.join();
-
-            // DDL handler does convert exceptions to SqlException, so we get 
QueryCancelledException here.
-            IgniteTestUtils.assertThrowsWithCause(
-                    batchFut::join,
-                    QueryCancelledException.class,
-                    "Query timeout"
-            );
-
-            return;
-        }
-
-        fail("Failed to get query timeout error");
+        // DDL handler does convert exceptions to SqlException, so we get 
QueryCancelledException here.
+        awaitExecutionTimeout(execService, plan, 500, 
QueryCancelledException.class);
     }
 
     /** Creates an execution service instance for the node with given 
consistent id. */
@@ -1262,6 +1142,47 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
         }
     }
 
+    private void awaitExecutionTimeout(
+            ExecutionService execService,
+            QueryPlan plan,
+            long deadlineMillis,
+            Class<? extends RuntimeException> errorClass
+    ) {
+        int attempts = 10;
+
+        for (int k = 0; k < attempts; k++) {
+            QueryCancel queryCancel = new QueryCancel();
+            CompletableFuture<Void> timeoutFut = 
queryCancel.setTimeout(scheduler, deadlineMillis);
+
+            SqlOperationContext execCtx = operationContext(null)
+                    .cancel(queryCancel)
+                    .build();
+
+            AsyncCursor<InternalSqlRow> cursor;
+            try {
+                cursor = execService.executePlan(plan, execCtx);
+            } catch (QueryCancelledException e) {
+                // This might happen when initialization took longer than a 
time out,
+                // Retry to get a proper error.
+                continue;
+            }
+
+            CompletableFuture<?> batchFut = cursor.requestNextAsync(1);
+
+            timeoutFut.join();
+
+            IgniteTestUtils.assertThrowsWithCause(
+                    batchFut::join,
+                    errorClass,
+                    "Query timeout"
+            );
+
+            return;
+        }
+
+        fail("Failed to get query timeout error");
+    }
+
     static class TestCluster {
         private final Map<String, TestNode> nodes = new ConcurrentHashMap<>();
 

Reply via email to