This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-22303
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 303156175a4a8720263966365a1042c71f3841e7
Author: amashenkov <[email protected]>
AuthorDate: Mon Jul 1 16:28:45 2024 +0300

    wip
---
 .../internal/sql/engine/SqlQueryProcessor.java     |  26 ++++-
 .../sql/engine/exec/ExecutionServiceImpl.java      |   5 +
 .../sql/engine/schema/SqlSchemaManager.java        |   8 ++
 .../sql/engine/schema/SqlSchemaManagerImpl.java    |   5 +
 .../internal/sql/engine/exec/QueryRetryTest.java   | 128 +++++++++++++++++++++
 .../exec/rel/TableScanNodeExecutionTest.java       |   4 +-
 .../sql/engine/framework/ImplicitTxContext.java    |   4 +-
 .../sql/engine/framework/NoOpTransaction.java      |  13 ++-
 .../engine/framework/PredefinedSchemaManager.java  |   5 +
 .../ignite/internal/table/ItColocationTest.java    |   4 +-
 .../internal/table/distributed/TableManager.java   |   3 +-
 .../distributed/storage/InternalTableImpl.java     |  18 ++-
 .../distributed/storage/InternalTableImplTest.java |   7 +-
 .../apache/ignite/distributed/ItTxTestCluster.java |   3 +-
 .../table/impl/DummyInternalTableImpl.java         |  10 +-
 15 files changed, 229 insertions(+), 14 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index a7b3cae271..362e54e55d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -111,6 +111,7 @@ import 
org.apache.ignite.internal.sql.metrics.SqlClientMetricSource;
 import org.apache.ignite.internal.storage.DataStorageManager;
 import org.apache.ignite.internal.systemview.api.SystemViewManager;
 import org.apache.ignite.internal.table.distributed.TableManager;
+import 
org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException;
 import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.InternalTransaction;
@@ -609,7 +610,30 @@ public class SqlQueryProcessor implements QueryProcessor {
                                 
txContext.updateObservableTime(deriveMinimalRequiredTime(plan));
                             }
 
-                            return executePlan(operationContext, plan, 
nextStatement);
+                            try {
+                                return executePlan(operationContext, plan, 
nextStatement);
+                            } catch (InternalSchemaVersionMismatchException 
ex) {
+                                if (txContext.explicitTx() != null) {
+                                    throw ex;
+                                }
+
+                                // Retry implicit transaction on concurrent 
schema change.
+                                SqlOperationContext newOpCtx = 
SqlOperationContext.builder()
+                                        .cancel(operationContext.cancel())
+                                        
.defaultSchemaName(operationContext.defaultSchemaName())
+                                        .operationTime(clockService.now())
+                                        
.parameters(operationContext.parameters())
+                                        
.prefetchCallback(operationContext.prefetchCallback())
+                                        .queryId(operationContext.queryId())
+                                        
.timeZoneId(operationContext.timeZoneId())
+                                        .txContext(txContext)
+                                        .build();
+
+                                
CompletableFuture<AsyncSqlCursor<InternalSqlRow>> start = new 
CompletableFuture<>()
+                                        .thenCompose(ignore -> 
executeParsedStatement(newOpCtx, parsedResult, nextStatement));
+
+                                return start.completeAsync(null, taskExecutor);
+                            }
                         }));
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index c314582b68..211bcdf95c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -114,6 +114,7 @@ import 
org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.IteratorToDataCursorAdapter;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import 
org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.util.AsyncCursor;
 import org.apache.ignite.internal.util.ExceptionUtils;
@@ -320,6 +321,10 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
 
         QueryTransactionWrapper txWrapper = 
txContext.getOrStartImplicit(plan.type() != SqlQueryType.DML);
 
+        if (!sqlSchemaManager.isActualSchemaVersion(plan.catalogVersion(), 
txWrapper.unwrap().startTimestamp().longValue())) {
+            throw new InternalSchemaVersionMismatchException();
+        }
+
         AsyncCursor<InternalSqlRow> dataCursor = 
queryManager.execute(txWrapper.unwrap(), plan);
 
         PrefetchCallback prefetchCallback = 
operationContext.prefetchCallback();
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java
index a6d82dff82..f87e1eee57 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java
@@ -49,4 +49,12 @@ public interface SqlSchemaManager {
      * @param catalogVersion version of the catalog to wait.
      */
     CompletableFuture<Void> schemaReadyFuture(int catalogVersion);
+
+    /**
+     * Check if catalog version is an actual version at the given timestamp.
+     *
+     * @param catalogVersion Catalog version.
+     * @param timestamp Timestamp.
+     */
+    boolean isActualSchemaVersion(int catalogVersion, long timestamp);
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
index f2357c911f..01b788bead 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
@@ -405,4 +405,9 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
                 parititions
         );
     }
+
+    @Override
+    public boolean isActualSchemaVersion(int catalogVersion, long timestamp) {
+        return catalogManager.activeCatalogVersion(timestamp) > catalogVersion;
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java
new file mode 100644
index 0000000000..47412b188c
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+
+import java.util.BitSet;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Publisher;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
+import org.apache.ignite.internal.sql.engine.framework.ImplicitTxContext;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import org.apache.ignite.internal.sql.engine.framework.TestCluster;
+import org.apache.ignite.internal.sql.engine.framework.TestNode;
+import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
+import 
org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class QueryRetryTest extends BaseIgniteAbstractTest {
+    private TestCluster cluster;
+
+    @BeforeEach
+    public void init() {
+        cluster = TestBuilders.cluster()
+                .nodes("N1")
+                .dataProvider("N1", "TEST_TBL", new ScannableTable() {
+                    @Override
+                    public <RowT> Publisher<RowT> scan(ExecutionContext<RowT> 
ctx, PartitionWithConsistencyToken partWithConsistencyToken,
+                            RowFactory<RowT> rowFactory, @Nullable BitSet 
requiredColumns) {
+                        return subscriber -> subscriber.onError(new 
InternalSchemaVersionMismatchException());
+                    }
+
+                    @Override
+                    public <RowT> Publisher<RowT> 
indexRangeScan(ExecutionContext<RowT> ctx,
+                            PartitionWithConsistencyToken 
partWithConsistencyToken, RowFactory<RowT> rowFactory, int indexId,
+                            List<String> columns, @Nullable 
RangeCondition<RowT> cond, @Nullable BitSet requiredColumns) {
+                        return null;
+                    }
+
+                    @Override
+                    public <RowT> Publisher<RowT> 
indexLookup(ExecutionContext<RowT> ctx,
+                            PartitionWithConsistencyToken 
partWithConsistencyToken, RowFactory<RowT> rowFactory, int indexId,
+                            List<String> columns, RowT key, @Nullable BitSet 
requiredColumns) {
+                        return null;
+                    }
+
+                    @Override
+                    public <RowT> CompletableFuture<@Nullable RowT> 
primaryKeyLookup(ExecutionContext<RowT> ctx,
+                            @Nullable InternalTransaction explicitTx, 
RowFactory<RowT> rowFactory, RowT key,
+                            @Nullable BitSet requiredColumns) {
+                        return CompletableFuture.failedFuture(new 
InternalSchemaVersionMismatchException());
+                    }
+                })
+                .build();
+
+        cluster.start();
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        cluster.stop();
+    }
+
+    @Test
+    void testQuery() {
+        TestNode node = cluster.node("N1");
+
+        node.initSchema("CREATE TABLE test_tbl (ID INT PRIMARY KEY, VAL INT, 
VAL2 INT)");
+
+        QueryPlan plan = node.prepare("SELECT * FROM test_tbl");
+
+        node.initSchema("ALTER TABLE test_tbl DROP COLUMN VAL2");
+        ImplicitTxContext.INSTANCE.updateObservableTime(new 
HybridClockImpl().now());
+
+        assertThrows(InternalSchemaVersionMismatchException.class, () -> 
node.executePlan(plan), null);
+    }
+
+    @Test
+    void tesPkLookup() {
+        TestNode node = cluster.node("N1");
+
+        node.initSchema("CREATE TABLE test_tbl (ID INT PRIMARY KEY, VAL INT, 
VAL2 INT)");
+
+        QueryPlan plan = node.prepare("SELECT * FROM test_tbl WHERE id = 1");
+
+        node.initSchema("ALTER TABLE test_tbl DROP COLUMN VAL2");
+        ImplicitTxContext.INSTANCE.updateObservableTime(new 
HybridClockImpl().now());
+
+        assertThrows(InternalSchemaVersionMismatchException.class, () -> 
node.executePlan(plan).requestNextAsync(10).get(), null);
+    }
+
+    @Test
+    void testDmlQuery() {
+        TestNode node = cluster.node("N1");
+
+        node.initSchema("CREATE TABLE test_tbl (ID INT PRIMARY KEY, VAL INT, 
VAL2 INT)");
+
+        QueryPlan plan = node.prepare("INSERT INTO test_tbl VALUES (1, 2, 3)");
+
+        node.initSchema("ALTER TABLE test_tbl DROP COLUMN VAL2");
+        ImplicitTxContext.INSTANCE.updateObservableTime(new 
HybridClockImpl().now());
+
+        assertThrows(InternalSchemaVersionMismatchException.class, () -> 
node.executePlan(plan).requestNextAsync(10).get(), null);
+    }
+}
\ No newline at end of file
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index e3ebead924..956d276fcf 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -40,6 +40,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.catalog.CatalogService;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.ClockService;
@@ -261,7 +262,8 @@ public class TableScanNodeExecutionTest extends 
AbstractExecutionTest<Object[]>
                     mock(TransactionInflights.class),
                     3_000,
                     0,
-                    null
+                    null,
+                    mock(CatalogService.class)
             );
             this.dataAmount = dataAmount;
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java
index b22b24fa03..27bd07d10b 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java
@@ -48,7 +48,9 @@ public class ImplicitTxContext implements 
QueryTransactionContext {
 
     @Override
     public QueryTransactionWrapper getOrStartImplicit(boolean readOnly) {
-        return new QueryTransactionWrapperImpl(new NoOpTransaction("dummy"), 
true, TX_INFLIGHTS);
+        HybridTimestamp ts = observableTimeTracker.get();
+
+        return new QueryTransactionWrapperImpl(new NoOpTransaction("dummy", 
true, ts), true, TX_INFLIGHTS);
     }
 
     @Override
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
index 4ad153c175..aaf3493e0f 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
@@ -37,7 +37,7 @@ public final class NoOpTransaction implements 
InternalTransaction {
 
     private final UUID id = UUID.randomUUID();
 
-    private final HybridTimestamp hybridTimestamp = new HybridTimestamp(1, 1);
+    private final HybridTimestamp hybridTimestamp;
 
     private final IgniteBiTuple<ClusterNode, Long> tuple;
 
@@ -75,8 +75,19 @@ public final class NoOpTransaction implements 
InternalTransaction {
      * @param readOnly Read-only or not.
      */
     private NoOpTransaction(String name, boolean readOnly) {
+        this(name, readOnly,  new HybridTimestamp(1, 1));
+    }
+
+    /**
+     * Constructs a transaction.
+     *
+     * @param name Name of the node.
+     * @param readOnly Read-only or not.
+     */
+    NoOpTransaction(String name, boolean readOnly, HybridTimestamp timestamp) {
         var networkAddress = NetworkAddress.from(new 
InetSocketAddress("localhost", 1234));
         this.tuple = new IgniteBiTuple<>(new ClusterNodeImpl(name, name, 
networkAddress), 1L);
+        this.hybridTimestamp = timestamp;
         this.readOnly = readOnly;
     }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
index bccb19189c..8f6ade082d 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
@@ -95,4 +95,9 @@ public class PredefinedSchemaManager implements 
SqlSchemaManager {
 
         return table;
     }
+
+    @Override
+    public void isActualSchemaVersion(int catalogVersion, long timestamp) 
throws ConcurrentSchemaModificationException {
+        // No-op.
+    }
 }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index deb3df545d..b6d810e518 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -59,6 +59,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
+import org.apache.ignite.internal.catalog.CatalogService;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.ClockService;
@@ -301,7 +302,8 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
                 transactionInflights,
                 3_000,
                 0,
-                null
+                null,
+                mock(CatalogService.class)
         );
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index c2e6b230e4..c24ea23548 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1302,7 +1302,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 transactionInflights,
                 implicitTransactionTimeout,
                 attemptsObtainLock,
-                this::streamerFlushExecutor
+                this::streamerFlushExecutor,
+                catalogService
         );
 
         var table = new TableImpl(
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index d905d38b67..b2922cf5ab 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -63,6 +63,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collection;
+import java.util.ConcurrentModificationException;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -80,6 +81,7 @@ import java.util.function.BiPredicate;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
+import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
@@ -152,6 +154,8 @@ public class InternalTableImpl implements InternalTable {
 
     private final Supplier<ScheduledExecutorService> streamerFlushExecutor;
 
+    private final CatalogService catalogService;
+
     /** Table name. */
     private volatile String tableName;
 
@@ -219,6 +223,7 @@ public class InternalTableImpl implements InternalTable {
      * @param transactionInflights Transaction inflights.
      * @param implicitTransactionTimeout Implicit transaction timeout.
      * @param attemptsObtainLock Attempts to take lock.
+     * @param catalogService Catalog service.
      */
     public InternalTableImpl(
             String tableName,
@@ -236,7 +241,8 @@ public class InternalTableImpl implements InternalTable {
             TransactionInflights transactionInflights,
             long implicitTransactionTimeout,
             int attemptsObtainLock,
-            Supplier<ScheduledExecutorService> streamerFlushExecutor
+            Supplier<ScheduledExecutorService> streamerFlushExecutor,
+            CatalogService catalogService
     ) {
         this.tableName = tableName;
         this.tableId = tableId;
@@ -254,6 +260,7 @@ public class InternalTableImpl implements InternalTable {
         this.implicitTransactionTimeout = implicitTransactionTimeout;
         this.attemptsObtainLock = attemptsObtainLock;
         this.streamerFlushExecutor = streamerFlushExecutor;
+        this.catalogService = catalogService;
     }
 
     /** {@inheritDoc} */
@@ -342,6 +349,15 @@ public class InternalTableImpl implements InternalTable {
         boolean implicit = tx == null;
         InternalTransaction actualTx = startImplicitRwTxIfNeeded(tx);
 
+        if (implicit) {
+            long txTimestamp = tx.startTimestamp().longValue();
+            int actualCatalogVersion = 
catalogService.activeCatalogVersion(txTimestamp);
+
+            if (catalogService.table(tableId, 
actualCatalogVersion).schemaVersions().latestVersion() < row.schemaVersion()) {
+                throw new ConcurrentModificationException();
+            }
+        }
+
         int partId = partitionId(row);
 
         TablePartitionId partGroupId = new TablePartitionId(tableId, partId);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index 937daae3a3..b7396c3561 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -36,6 +36,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.network.SingleClusterNodeResolver;
@@ -75,7 +76,8 @@ public class InternalTableImplTest extends 
BaseIgniteAbstractTest {
                 mock(TransactionInflights.class),
                 3_000,
                 0,
-                null
+                null,
+                mock(CatalogService.class)
         );
 
         // Let's check the empty table.
@@ -125,7 +127,8 @@ public class InternalTableImplTest extends 
BaseIgniteAbstractTest {
                 mock(TransactionInflights.class),
                 3_000,
                 0,
-                null
+                null,
+                mock(CatalogService.class)
         );
 
         List<BinaryRowEx> originalRows = List.of(
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 7ec6ac9c5b..a0cbc3dee9 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -776,7 +776,8 @@ public class ItTxTestCluster {
                         clientTransactionInflights,
                         500,
                         0,
-                        null
+                        null,
+                        catalogService
                 ),
                 new DummySchemaManagerImpl(schemaDescriptor),
                 clientTxManager.lockManager(),
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 3ce2007d0b..c9c9c21856 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -206,7 +206,8 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 storageUpdateConfiguration,
                 txConfiguration,
                 new RemotelyTriggeredResourceRegistry(),
-                new TransactionInflights(new TestPlacementDriver(LOCAL_NODE), 
CLOCK_SERVICE)
+                new TransactionInflights(new TestPlacementDriver(LOCAL_NODE), 
CLOCK_SERVICE),
+                mock(CatalogService.class)
         );
     }
 
@@ -234,7 +235,8 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
             StorageUpdateConfiguration storageUpdateConfiguration,
             TransactionConfiguration txConfiguration,
             RemotelyTriggeredResourceRegistry resourcesRegistry,
-            TransactionInflights transactionInflights
+            TransactionInflights transactionInflights,
+            CatalogService catalogService
     ) {
         super(
                 "test",
@@ -257,7 +259,8 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 transactionInflights,
                 3_000,
                 0,
-                null
+                null,
+                catalogService
         );
 
         RaftGroupService svc = 
tableRaftService().partitionRaftGroupService(PART_ID);
@@ -372,7 +375,6 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
 
         DummySchemaManagerImpl schemaManager = new 
DummySchemaManagerImpl(schema);
 
-        CatalogService catalogService = mock(CatalogService.class);
         CatalogTableDescriptor tableDescriptor = 
mock(CatalogTableDescriptor.class);
 
         lenient().when(catalogService.table(anyInt(), 
anyLong())).thenReturn(tableDescriptor);

Reply via email to