This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-22303 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 1c378340cd88269768ed1bda33d269b1cd9deb6b Author: amashenkov <[email protected]> AuthorDate: Mon Jul 1 16:58:54 2024 +0300 Retry implicit tx on concurrent schema modification. --- .../internal/sql/engine/SqlQueryProcessor.java | 25 +++++++++++++++++++++- .../internal/sql/engine/exec/QueryRetryTest.java | 7 ++---- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index a7b3cae271..a0693de0c0 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -68,6 +68,7 @@ import org.apache.ignite.internal.sql.SqlCommon; import org.apache.ignite.internal.sql.configuration.distributed.SqlDistributedConfiguration; import org.apache.ignite.internal.sql.configuration.local.SqlLocalConfiguration; import org.apache.ignite.internal.sql.engine.exec.AsyncDataCursor; +import org.apache.ignite.internal.sql.engine.exec.ConcurrentSchemaModification; import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl; import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistryImpl; import org.apache.ignite.internal.sql.engine.exec.ExecutionDependencyResolverImpl; @@ -609,7 +610,29 @@ public class SqlQueryProcessor implements QueryProcessor { txContext.updateObservableTime(deriveMinimalRequiredTime(plan)); } - return executePlan(operationContext, plan, nextStatement); + try { + return executePlan(operationContext, plan, nextStatement); + } catch (ConcurrentSchemaModification ex) { + if (txContext.explicitTx() != null) { + throw ex; + } + + // Retry implicit transaction on concurrent schema change. + SqlOperationContext newOpCtx = SqlOperationContext.builder() + .queryId(operationContext.queryId()) + .prefetchCallback(operationContext.prefetchCallback()) + .cancel(operationContext.cancel()) + .defaultSchemaName(operationContext.defaultSchemaName()) + .timeZoneId(operationContext.timeZoneId()) + .operationTime(clockService.now()) + .txContext(txContext) + .build(); + + CompletableFuture<AsyncSqlCursor<InternalSqlRow>> start = new CompletableFuture<>() + .thenCompose(ignore -> executeParsedStatement(newOpCtx, parsedResult, nextStatement)); + + return start.completeAsync(null, taskExecutor); + } })); } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java index 98c7aacd04..f0ea29d0da 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.sql.engine.exec; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows; import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.mock; @@ -27,7 +28,6 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import org.apache.calcite.schema.SchemaPlus; @@ -44,7 +44,6 @@ import org.apache.ignite.internal.metrics.MetricManagerImpl; import org.apache.ignite.internal.network.ClusterNodeImpl; import org.apache.ignite.internal.network.TopologyService; import org.apache.ignite.internal.sql.SqlCommon; -import org.apache.ignite.internal.sql.engine.InternalSqlRow; import org.apache.ignite.internal.sql.engine.QueryCancel; import org.apache.ignite.internal.sql.engine.SqlOperationContext; import org.apache.ignite.internal.sql.engine.SqlQueryProcessor; @@ -76,7 +75,6 @@ import org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory; import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.type.NativeTypes; -import org.apache.ignite.internal.util.AsyncCursor; import org.apache.ignite.network.NetworkAddress; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -171,8 +169,7 @@ public class QueryRetryTest extends BaseIgniteAbstractTest { SqlOperationContext execContext = createContext(clock.now()); - AsyncCursor<InternalSqlRow> cursor = executionService.executePlan(plan, execContext); - cursor.requestNextAsync(100).get(); + assertThrows(ConcurrentSchemaModification.class, () -> executionService.executePlan(plan, execContext), null); } private QueryPlan prepare(String query, SqlOperationContext ctx) {
