This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-22303
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 37e65813ba5f66bb6304fee70a2a5dd02c8af3e4
Author: amashenkov <[email protected]>
AuthorDate: Mon Jul 1 16:44:18 2024 +0300

    Make ExecutionServiceImpl throwing exception
---
 .../engine/exec/ConcurrentSchemaModification.java  | 28 ++++++++++++++++++++++
 .../sql/engine/exec/ExecutionServiceImpl.java      |  4 +++-
 .../sql/engine/schema/SqlSchemaManager.java        |  2 ++
 .../sql/engine/schema/SqlSchemaManagerImpl.java    |  5 ++++
 .../internal/sql/engine/exec/QueryRetryTest.java   |  5 ++++
 .../engine/framework/PredefinedSchemaManager.java  |  5 ++++
 6 files changed, 48 insertions(+), 1 deletion(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ConcurrentSchemaModification.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ConcurrentSchemaModification.java
new file mode 100644
index 0000000000..42d491c3bc
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ConcurrentSchemaModification.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.lang.ErrorGroups.Common;
+
+//TODO: javadoc
+public class ConcurrentSchemaModification extends IgniteInternalException {
+    ConcurrentSchemaModification() {
+        super(Common.INTERNAL_ERR);
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 82e1f1b34c..446965dd39 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -320,7 +320,9 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
 
         QueryTransactionWrapper txWrapper = 
txContext.getOrStartImplicit(plan.type() != SqlQueryType.DML);
 
-        assert sqlSchemaManager.schema(plan.catalogVersion()) == 
sqlSchemaManager.schema(txWrapper.unwrap().startTimestamp().longValue());
+        if (!sqlSchemaManager.isActualSchemaVersion(plan.catalogVersion(), 
txWrapper.unwrap().startTimestamp().longValue())) {
+            throw new ConcurrentSchemaModification();
+        }
 
         AsyncCursor<InternalSqlRow> dataCursor = 
queryManager.execute(txWrapper.unwrap(), plan);
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java
index a6d82dff82..b637e8198a 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java
@@ -49,4 +49,6 @@ public interface SqlSchemaManager {
      * @param catalogVersion version of the catalog to wait.
      */
     CompletableFuture<Void> schemaReadyFuture(int catalogVersion);
+
+    boolean isActualSchemaVersion(int catalogVersion, long timestamp);
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
index f2357c911f..0ef9b27b52 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
@@ -405,4 +405,9 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
                 parititions
         );
     }
+
+    @Override
+    public boolean isActualSchemaVersion(int catalogVersion, long timestamp) {
+        return catalogManager.activeCatalogVersion(timestamp) == 
catalogVersion;
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java
index 7d4e4d4cf9..98c7aacd04 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java
@@ -313,6 +313,11 @@ public class QueryRetryTest extends BaseIgniteAbstractTest 
{
             public CompletableFuture<Void> schemaReadyFuture(int 
catalogVersion) {
                 return completedFuture(null);
             }
+
+            @Override
+            public boolean isActualSchemaVersion(int catalogVersion, long 
timestamp) {
+                return schemaChangedTs > timestamp ? catalogVersion == 1 : 
catalogVersion == 2;
+            }
         };
     }
 }
\ No newline at end of file
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
index bccb19189c..dde60cfd5a 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
@@ -95,4 +95,9 @@ public class PredefinedSchemaManager implements 
SqlSchemaManager {
 
         return table;
     }
+
+    @Override
+    public boolean isActualSchemaVersion(int catalogVersion, long timestamp) {
+        return true;
+    }
 }

Reply via email to