This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-22303
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 1c378340cd88269768ed1bda33d269b1cd9deb6b
Author: amashenkov <[email protected]>
AuthorDate: Mon Jul 1 16:58:54 2024 +0300

    Retry implicit tx on concurrent schema modification.
---
 .../internal/sql/engine/SqlQueryProcessor.java     | 25 +++++++++++++++++++++-
 .../internal/sql/engine/exec/QueryRetryTest.java   |  7 ++----
 2 files changed, 26 insertions(+), 6 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index a7b3cae271..a0693de0c0 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -68,6 +68,7 @@ import org.apache.ignite.internal.sql.SqlCommon;
 import 
org.apache.ignite.internal.sql.configuration.distributed.SqlDistributedConfiguration;
 import 
org.apache.ignite.internal.sql.configuration.local.SqlLocalConfiguration;
 import org.apache.ignite.internal.sql.engine.exec.AsyncDataCursor;
+import org.apache.ignite.internal.sql.engine.exec.ConcurrentSchemaModification;
 import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl;
 import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistryImpl;
 import 
org.apache.ignite.internal.sql.engine.exec.ExecutionDependencyResolverImpl;
@@ -609,7 +610,29 @@ public class SqlQueryProcessor implements QueryProcessor {
                                 
txContext.updateObservableTime(deriveMinimalRequiredTime(plan));
                             }
 
-                            return executePlan(operationContext, plan, 
nextStatement);
+                            try {
+                                return executePlan(operationContext, plan, 
nextStatement);
+                            } catch (ConcurrentSchemaModification ex) {
+                                if (txContext.explicitTx() != null) {
+                                    throw ex;
+                                }
+
+                                // Retry implicit transaction on concurrent 
schema change.
+                                SqlOperationContext newOpCtx = 
SqlOperationContext.builder()
+                                        .queryId(operationContext.queryId())
+                                        
.prefetchCallback(operationContext.prefetchCallback())
+                                        .cancel(operationContext.cancel())
+                                        
.defaultSchemaName(operationContext.defaultSchemaName())
+                                        
.timeZoneId(operationContext.timeZoneId())
+                                        .operationTime(clockService.now())
+                                        .txContext(txContext)
+                                        .build();
+
+                                
CompletableFuture<AsyncSqlCursor<InternalSqlRow>> start = new 
CompletableFuture<>()
+                                        .thenCompose(ignore -> 
executeParsedStatement(newOpCtx, parsedResult, nextStatement));
+
+                                return start.completeAsync(null, taskExecutor);
+                            }
                         }));
     }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java
index 98c7aacd04..f0ea29d0da 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.sql.engine.exec;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.mock;
@@ -27,7 +28,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import org.apache.calcite.schema.SchemaPlus;
@@ -44,7 +44,6 @@ import org.apache.ignite.internal.metrics.MetricManagerImpl;
 import org.apache.ignite.internal.network.ClusterNodeImpl;
 import org.apache.ignite.internal.network.TopologyService;
 import org.apache.ignite.internal.sql.SqlCommon;
-import org.apache.ignite.internal.sql.engine.InternalSqlRow;
 import org.apache.ignite.internal.sql.engine.QueryCancel;
 import org.apache.ignite.internal.sql.engine.SqlOperationContext;
 import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
@@ -76,7 +75,6 @@ import 
org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory;
 import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.type.NativeTypes;
-import org.apache.ignite.internal.util.AsyncCursor;
 import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -171,8 +169,7 @@ public class QueryRetryTest extends BaseIgniteAbstractTest {
 
         SqlOperationContext execContext = createContext(clock.now());
 
-        AsyncCursor<InternalSqlRow> cursor = 
executionService.executePlan(plan, execContext);
-        cursor.requestNextAsync(100).get();
+        assertThrows(ConcurrentSchemaModification.class, () -> 
executionService.executePlan(plan, execContext), null);
     }
 
     private QueryPlan prepare(String query, SqlOperationContext ctx) {

Reply via email to