This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-22303 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 303156175a4a8720263966365a1042c71f3841e7 Author: amashenkov <[email protected]> AuthorDate: Mon Jul 1 16:28:45 2024 +0300 wip --- .../internal/sql/engine/SqlQueryProcessor.java | 26 ++++- .../sql/engine/exec/ExecutionServiceImpl.java | 5 + .../sql/engine/schema/SqlSchemaManager.java | 8 ++ .../sql/engine/schema/SqlSchemaManagerImpl.java | 5 + .../internal/sql/engine/exec/QueryRetryTest.java | 128 +++++++++++++++++++++ .../exec/rel/TableScanNodeExecutionTest.java | 4 +- .../sql/engine/framework/ImplicitTxContext.java | 4 +- .../sql/engine/framework/NoOpTransaction.java | 13 ++- .../engine/framework/PredefinedSchemaManager.java | 5 + .../ignite/internal/table/ItColocationTest.java | 4 +- .../internal/table/distributed/TableManager.java | 3 +- .../distributed/storage/InternalTableImpl.java | 18 ++- .../distributed/storage/InternalTableImplTest.java | 7 +- .../apache/ignite/distributed/ItTxTestCluster.java | 3 +- .../table/impl/DummyInternalTableImpl.java | 10 +- 15 files changed, 229 insertions(+), 14 deletions(-) diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index a7b3cae271..362e54e55d 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -111,6 +111,7 @@ import org.apache.ignite.internal.sql.metrics.SqlClientMetricSource; import org.apache.ignite.internal.storage.DataStorageManager; import org.apache.ignite.internal.systemview.api.SystemViewManager; import org.apache.ignite.internal.table.distributed.TableManager; +import org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException; import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.InternalTransaction; @@ -609,7 +610,30 @@ public class SqlQueryProcessor implements QueryProcessor { txContext.updateObservableTime(deriveMinimalRequiredTime(plan)); } - return executePlan(operationContext, plan, nextStatement); + try { + return executePlan(operationContext, plan, nextStatement); + } catch (InternalSchemaVersionMismatchException ex) { + if (txContext.explicitTx() != null) { + throw ex; + } + + // Retry implicit transaction on concurrent schema change. + SqlOperationContext newOpCtx = SqlOperationContext.builder() + .cancel(operationContext.cancel()) + .defaultSchemaName(operationContext.defaultSchemaName()) + .operationTime(clockService.now()) + .parameters(operationContext.parameters()) + .prefetchCallback(operationContext.prefetchCallback()) + .queryId(operationContext.queryId()) + .timeZoneId(operationContext.timeZoneId()) + .txContext(txContext) + .build(); + + CompletableFuture<AsyncSqlCursor<InternalSqlRow>> start = new CompletableFuture<>() + .thenCompose(ignore -> executeParsedStatement(newOpCtx, parsedResult, nextStatement)); + + return start.completeAsync(null, taskExecutor); + } })); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java index c314582b68..211bcdf95c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java @@ -114,6 +114,7 @@ import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper; import org.apache.ignite.internal.sql.engine.util.Commons; import org.apache.ignite.internal.sql.engine.util.IteratorToDataCursorAdapter; import org.apache.ignite.internal.sql.engine.util.TypeUtils; +import org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.util.AsyncCursor; import org.apache.ignite.internal.util.ExceptionUtils; @@ -320,6 +321,10 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve QueryTransactionWrapper txWrapper = txContext.getOrStartImplicit(plan.type() != SqlQueryType.DML); + if (!sqlSchemaManager.isActualSchemaVersion(plan.catalogVersion(), txWrapper.unwrap().startTimestamp().longValue())) { + throw new InternalSchemaVersionMismatchException(); + } + AsyncCursor<InternalSqlRow> dataCursor = queryManager.execute(txWrapper.unwrap(), plan); PrefetchCallback prefetchCallback = operationContext.prefetchCallback(); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java index a6d82dff82..f87e1eee57 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java @@ -49,4 +49,12 @@ public interface SqlSchemaManager { * @param catalogVersion version of the catalog to wait. */ CompletableFuture<Void> schemaReadyFuture(int catalogVersion); + + /** + * Check if catalog version is an actual version at the given timestamp. + * + * @param catalogVersion Catalog version. + * @param timestamp Timestamp. + */ + boolean isActualSchemaVersion(int catalogVersion, long timestamp); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java index f2357c911f..01b788bead 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java @@ -405,4 +405,9 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager { parititions ); } + + @Override + public boolean isActualSchemaVersion(int catalogVersion, long timestamp) { + return catalogManager.activeCatalogVersion(timestamp) > catalogVersion; + } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java new file mode 100644 index 0000000000..47412b188c --- /dev/null +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec; + +import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows; + +import java.util.BitSet; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Flow.Publisher; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory; +import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition; +import org.apache.ignite.internal.sql.engine.framework.ImplicitTxContext; +import org.apache.ignite.internal.sql.engine.framework.TestBuilders; +import org.apache.ignite.internal.sql.engine.framework.TestCluster; +import org.apache.ignite.internal.sql.engine.framework.TestNode; +import org.apache.ignite.internal.sql.engine.prepare.QueryPlan; +import org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.internal.tx.InternalTransaction; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class QueryRetryTest extends BaseIgniteAbstractTest { + private TestCluster cluster; + + @BeforeEach + public void init() { + cluster = TestBuilders.cluster() + .nodes("N1") + .dataProvider("N1", "TEST_TBL", new ScannableTable() { + @Override + public <RowT> Publisher<RowT> scan(ExecutionContext<RowT> ctx, PartitionWithConsistencyToken partWithConsistencyToken, + RowFactory<RowT> rowFactory, @Nullable BitSet requiredColumns) { + return subscriber -> subscriber.onError(new InternalSchemaVersionMismatchException()); + } + + @Override + public <RowT> Publisher<RowT> indexRangeScan(ExecutionContext<RowT> ctx, + PartitionWithConsistencyToken partWithConsistencyToken, RowFactory<RowT> rowFactory, int indexId, + List<String> columns, @Nullable RangeCondition<RowT> cond, @Nullable BitSet requiredColumns) { + return null; + } + + @Override + public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT> ctx, + PartitionWithConsistencyToken partWithConsistencyToken, RowFactory<RowT> rowFactory, int indexId, + List<String> columns, RowT key, @Nullable BitSet requiredColumns) { + return null; + } + + @Override + public <RowT> CompletableFuture<@Nullable RowT> primaryKeyLookup(ExecutionContext<RowT> ctx, + @Nullable InternalTransaction explicitTx, RowFactory<RowT> rowFactory, RowT key, + @Nullable BitSet requiredColumns) { + return CompletableFuture.failedFuture(new InternalSchemaVersionMismatchException()); + } + }) + .build(); + + cluster.start(); + } + + @AfterEach + public void tearDown() throws Exception { + cluster.stop(); + } + + @Test + void testQuery() { + TestNode node = cluster.node("N1"); + + node.initSchema("CREATE TABLE test_tbl (ID INT PRIMARY KEY, VAL INT, VAL2 INT)"); + + QueryPlan plan = node.prepare("SELECT * FROM test_tbl"); + + node.initSchema("ALTER TABLE test_tbl DROP COLUMN VAL2"); + ImplicitTxContext.INSTANCE.updateObservableTime(new HybridClockImpl().now()); + + assertThrows(InternalSchemaVersionMismatchException.class, () -> node.executePlan(plan), null); + } + + @Test + void tesPkLookup() { + TestNode node = cluster.node("N1"); + + node.initSchema("CREATE TABLE test_tbl (ID INT PRIMARY KEY, VAL INT, VAL2 INT)"); + + QueryPlan plan = node.prepare("SELECT * FROM test_tbl WHERE id = 1"); + + node.initSchema("ALTER TABLE test_tbl DROP COLUMN VAL2"); + ImplicitTxContext.INSTANCE.updateObservableTime(new HybridClockImpl().now()); + + assertThrows(InternalSchemaVersionMismatchException.class, () -> node.executePlan(plan).requestNextAsync(10).get(), null); + } + + @Test + void testDmlQuery() { + TestNode node = cluster.node("N1"); + + node.initSchema("CREATE TABLE test_tbl (ID INT PRIMARY KEY, VAL INT, VAL2 INT)"); + + QueryPlan plan = node.prepare("INSERT INTO test_tbl VALUES (1, 2, 3)"); + + node.initSchema("ALTER TABLE test_tbl DROP COLUMN VAL2"); + ImplicitTxContext.INSTANCE.updateObservableTime(new HybridClockImpl().now()); + + assertThrows(InternalSchemaVersionMismatchException.class, () -> node.executePlan(plan).requestNextAsync(10).get(), null); + } +} \ No newline at end of file diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java index e3ebead924..956d276fcf 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java @@ -40,6 +40,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.ClockService; @@ -261,7 +262,8 @@ public class TableScanNodeExecutionTest extends AbstractExecutionTest<Object[]> mock(TransactionInflights.class), 3_000, 0, - null + null, + mock(CatalogService.class) ); this.dataAmount = dataAmount; diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java index b22b24fa03..27bd07d10b 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java @@ -48,7 +48,9 @@ public class ImplicitTxContext implements QueryTransactionContext { @Override public QueryTransactionWrapper getOrStartImplicit(boolean readOnly) { - return new QueryTransactionWrapperImpl(new NoOpTransaction("dummy"), true, TX_INFLIGHTS); + HybridTimestamp ts = observableTimeTracker.get(); + + return new QueryTransactionWrapperImpl(new NoOpTransaction("dummy", true, ts), true, TX_INFLIGHTS); } @Override diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java index 4ad153c175..aaf3493e0f 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java @@ -37,7 +37,7 @@ public final class NoOpTransaction implements InternalTransaction { private final UUID id = UUID.randomUUID(); - private final HybridTimestamp hybridTimestamp = new HybridTimestamp(1, 1); + private final HybridTimestamp hybridTimestamp; private final IgniteBiTuple<ClusterNode, Long> tuple; @@ -75,8 +75,19 @@ public final class NoOpTransaction implements InternalTransaction { * @param readOnly Read-only or not. */ private NoOpTransaction(String name, boolean readOnly) { + this(name, readOnly, new HybridTimestamp(1, 1)); + } + + /** + * Constructs a transaction. + * + * @param name Name of the node. + * @param readOnly Read-only or not. + */ + NoOpTransaction(String name, boolean readOnly, HybridTimestamp timestamp) { var networkAddress = NetworkAddress.from(new InetSocketAddress("localhost", 1234)); this.tuple = new IgniteBiTuple<>(new ClusterNodeImpl(name, name, networkAddress), 1L); + this.hybridTimestamp = timestamp; this.readOnly = readOnly; } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java index bccb19189c..8f6ade082d 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java @@ -95,4 +95,9 @@ public class PredefinedSchemaManager implements SqlSchemaManager { return table; } + + @Override + public void isActualSchemaVersion(int catalogVersion, long timestamp) throws ConcurrentSchemaModificationException { + // No-op. + } } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java index deb3df545d..b6d810e518 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java @@ -59,6 +59,7 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.ClockService; @@ -301,7 +302,8 @@ public class ItColocationTest extends BaseIgniteAbstractTest { transactionInflights, 3_000, 0, - null + null, + mock(CatalogService.class) ); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index c2e6b230e4..c24ea23548 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -1302,7 +1302,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { transactionInflights, implicitTransactionTimeout, attemptsObtainLock, - this::streamerFlushExecutor + this::streamerFlushExecutor, + catalogService ); var table = new TableImpl( diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index d905d38b67..b2922cf5ab 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -63,6 +63,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.BitSet; import java.util.Collection; +import java.util.ConcurrentModificationException; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -80,6 +81,7 @@ import java.util.function.BiPredicate; import java.util.function.Function; import java.util.function.Supplier; import org.apache.ignite.internal.binarytuple.BinaryTupleReader; +import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteBiTuple; @@ -152,6 +154,8 @@ public class InternalTableImpl implements InternalTable { private final Supplier<ScheduledExecutorService> streamerFlushExecutor; + private final CatalogService catalogService; + /** Table name. */ private volatile String tableName; @@ -219,6 +223,7 @@ public class InternalTableImpl implements InternalTable { * @param transactionInflights Transaction inflights. * @param implicitTransactionTimeout Implicit transaction timeout. * @param attemptsObtainLock Attempts to take lock. + * @param catalogService Catalog service. */ public InternalTableImpl( String tableName, @@ -236,7 +241,8 @@ public class InternalTableImpl implements InternalTable { TransactionInflights transactionInflights, long implicitTransactionTimeout, int attemptsObtainLock, - Supplier<ScheduledExecutorService> streamerFlushExecutor + Supplier<ScheduledExecutorService> streamerFlushExecutor, + CatalogService catalogService ) { this.tableName = tableName; this.tableId = tableId; @@ -254,6 +260,7 @@ public class InternalTableImpl implements InternalTable { this.implicitTransactionTimeout = implicitTransactionTimeout; this.attemptsObtainLock = attemptsObtainLock; this.streamerFlushExecutor = streamerFlushExecutor; + this.catalogService = catalogService; } /** {@inheritDoc} */ @@ -342,6 +349,15 @@ public class InternalTableImpl implements InternalTable { boolean implicit = tx == null; InternalTransaction actualTx = startImplicitRwTxIfNeeded(tx); + if (implicit) { + long txTimestamp = tx.startTimestamp().longValue(); + int actualCatalogVersion = catalogService.activeCatalogVersion(txTimestamp); + + if (catalogService.table(tableId, actualCatalogVersion).schemaVersions().latestVersion() < row.schemaVersion()) { + throw new ConcurrentModificationException(); + } + } + int partId = partitionId(row); TablePartitionId partGroupId = new TablePartitionId(tableId, partId); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java index 937daae3a3..b7396c3561 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java @@ -36,6 +36,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMaps; import java.util.ArrayList; import java.util.List; +import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.network.SingleClusterNodeResolver; @@ -75,7 +76,8 @@ public class InternalTableImplTest extends BaseIgniteAbstractTest { mock(TransactionInflights.class), 3_000, 0, - null + null, + mock(CatalogService.class) ); // Let's check the empty table. @@ -125,7 +127,8 @@ public class InternalTableImplTest extends BaseIgniteAbstractTest { mock(TransactionInflights.class), 3_000, 0, - null + null, + mock(CatalogService.class) ); List<BinaryRowEx> originalRows = List.of( diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java index 7ec6ac9c5b..a0cbc3dee9 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java @@ -776,7 +776,8 @@ public class ItTxTestCluster { clientTransactionInflights, 500, 0, - null + null, + catalogService ), new DummySchemaManagerImpl(schemaDescriptor), clientTxManager.lockManager(), diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java index 3ce2007d0b..c9c9c21856 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java @@ -206,7 +206,8 @@ public class DummyInternalTableImpl extends InternalTableImpl { storageUpdateConfiguration, txConfiguration, new RemotelyTriggeredResourceRegistry(), - new TransactionInflights(new TestPlacementDriver(LOCAL_NODE), CLOCK_SERVICE) + new TransactionInflights(new TestPlacementDriver(LOCAL_NODE), CLOCK_SERVICE), + mock(CatalogService.class) ); } @@ -234,7 +235,8 @@ public class DummyInternalTableImpl extends InternalTableImpl { StorageUpdateConfiguration storageUpdateConfiguration, TransactionConfiguration txConfiguration, RemotelyTriggeredResourceRegistry resourcesRegistry, - TransactionInflights transactionInflights + TransactionInflights transactionInflights, + CatalogService catalogService ) { super( "test", @@ -257,7 +259,8 @@ public class DummyInternalTableImpl extends InternalTableImpl { transactionInflights, 3_000, 0, - null + null, + catalogService ); RaftGroupService svc = tableRaftService().partitionRaftGroupService(PART_ID); @@ -372,7 +375,6 @@ public class DummyInternalTableImpl extends InternalTableImpl { DummySchemaManagerImpl schemaManager = new DummySchemaManagerImpl(schema); - CatalogService catalogService = mock(CatalogService.class); CatalogTableDescriptor tableDescriptor = mock(CatalogTableDescriptor.class); lenient().when(catalogService.table(anyInt(), anyLong())).thenReturn(tableDescriptor);
