This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-22303
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit b44b5813f69b44ce1113e7f8db982769fcefe9a1
Author: amashenkov <[email protected]>
AuthorDate: Mon Jul 1 16:58:54 2024 +0300

    Retry implicit tx on concurrent schema modification.
---
 .../internal/sql/engine/SqlQueryProcessor.java     | 25 +++++++++++++++++++++-
 1 file changed, 24 insertions(+), 1 deletion(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index b74dd87fde..c3b7b5cc10 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -610,7 +610,30 @@ public class SqlQueryProcessor implements QueryProcessor {
                                 
txContext.updateObservableTime(deriveMinimalRequiredTime(plan));
                             }
 
-                            return executePlan(operationContext, plan, 
nextStatement);
+                            try {
+                                return executePlan(operationContext, plan, 
nextStatement);
+                            } catch (ConcurrentSchemaModificationException ex) 
{
+                                if (txContext.explicitTx() != null) {
+                                    throw ex;
+                                }
+
+                                // Retry implicit transaction on concurrent 
schema change.
+                                SqlOperationContext newOpCtx = 
SqlOperationContext.builder()
+                                        .cancel(operationContext.cancel())
+                                        
.defaultSchemaName(operationContext.defaultSchemaName())
+                                        .operationTime(clockService.now())
+                                        
.parameters(operationContext.parameters())
+                                        
.prefetchCallback(operationContext.prefetchCallback())
+                                        .queryId(operationContext.queryId())
+                                        
.timeZoneId(operationContext.timeZoneId())
+                                        .txContext(txContext)
+                                        .build();
+
+                                
CompletableFuture<AsyncSqlCursor<InternalSqlRow>> start = new 
CompletableFuture<>()
+                                        .thenCompose(ignore -> 
executeParsedStatement(newOpCtx, parsedResult, nextStatement));
+
+                                return start.completeAsync(null, taskExecutor);
+                            }
                         }));
     }
 

Reply via email to