This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-22303 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit c8deccdcce6530bac48de626c3b1acf3705887a9 Author: amashenkov <[email protected]> AuthorDate: Mon Jul 1 16:28:45 2024 +0300 Add test --- .../sql/engine/exec/ExecutionServiceImpl.java | 2 + .../internal/sql/engine/schema/IgniteSchema.java | 2 +- .../internal/sql/engine/exec/QueryRetryTest.java | 318 +++++++++++++++++++++ .../sql/engine/framework/ImplicitTxContext.java | 4 +- .../sql/engine/framework/NoOpTransaction.java | 13 +- 5 files changed, 336 insertions(+), 3 deletions(-) diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java index c314582b68..82e1f1b34c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java @@ -320,6 +320,8 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve QueryTransactionWrapper txWrapper = txContext.getOrStartImplicit(plan.type() != SqlQueryType.DML); + assert sqlSchemaManager.schema(plan.catalogVersion()) == sqlSchemaManager.schema(txWrapper.unwrap().startTimestamp().longValue()); + AsyncCursor<InternalSqlRow> dataCursor = queryManager.execute(txWrapper.unwrap(), plan); PrefetchCallback prefetchCallback = operationContext.prefetchCallback(); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java index 109b1ff431..b1d1092449 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java @@ -66,7 +66,7 @@ public class IgniteSchema extends AbstractSchema { } /** Returns table by given id. */ - @Nullable IgniteTable tableByIdOpt(int tableId) { + public @Nullable IgniteTable tableByIdOpt(int tableId) { IgniteDataSource dataSource = tableById.get(tableId); if (!(dataSource instanceof IgniteTable)) { diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java new file mode 100644 index 0000000000..7d4e4d4cf9 --- /dev/null +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.tools.Frameworks; +import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; +import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot; +import org.apache.ignite.internal.failure.FailureProcessor; +import org.apache.ignite.internal.failure.handlers.NoOpFailureHandler; +import org.apache.ignite.internal.hlc.ClockService; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.hlc.TestClockService; +import org.apache.ignite.internal.metrics.MetricManagerImpl; +import org.apache.ignite.internal.network.ClusterNodeImpl; +import org.apache.ignite.internal.network.TopologyService; +import org.apache.ignite.internal.sql.SqlCommon; +import org.apache.ignite.internal.sql.engine.InternalSqlRow; +import org.apache.ignite.internal.sql.engine.QueryCancel; +import org.apache.ignite.internal.sql.engine.SqlOperationContext; +import org.apache.ignite.internal.sql.engine.SqlQueryProcessor; +import org.apache.ignite.internal.sql.engine.SqlQueryProcessor.PrefetchCallback; +import org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.TestCluster; +import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler; +import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistryImpl; +import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget; +import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory; +import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetProvider; +import org.apache.ignite.internal.sql.engine.exec.mapping.MappingServiceImpl; +import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler; +import org.apache.ignite.internal.sql.engine.framework.ImplicitTxContext; +import org.apache.ignite.internal.sql.engine.framework.TestBuilders; +import org.apache.ignite.internal.sql.engine.prepare.PrepareService; +import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl; +import org.apache.ignite.internal.sql.engine.prepare.QueryPlan; +import org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter; +import org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruner; +import org.apache.ignite.internal.sql.engine.schema.IgniteSchema; +import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView; +import org.apache.ignite.internal.sql.engine.schema.IgniteTable; +import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager; +import org.apache.ignite.internal.sql.engine.sql.ParsedResult; +import org.apache.ignite.internal.sql.engine.sql.ParserService; +import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl; +import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions; +import org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory; +import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.internal.type.NativeTypes; +import org.apache.ignite.internal.util.AsyncCursor; +import org.apache.ignite.network.NetworkAddress; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class QueryRetryTest extends BaseIgniteAbstractTest { + + /** Timeout in ms for async operations. */ + private static final long TIMEOUT_IN_MS = 2_000; + + /** Timeout in ms for SQL planning phase. */ + public static final long PLANNING_TIMEOUT = 5_000; + + public static final int PLANNING_THREAD_COUNT = 2; + + /** Timeout in ms for stopping execution service. */ + private static final long SHUTDOWN_TIMEOUT = 5_000; + + private static final String NODE_NAME = "node"; + + private final IgniteTable table = TestBuilders.table() + .name("TEST_TBL") + .addKeyColumn("ID", NativeTypes.INT32) + .addColumn("VAL", NativeTypes.INT32) + .distribution(IgniteDistributions.affinity(0, 1, 2)) + .hashIndex().name("TEST_TBL_PK").addColumn("ID").primaryKey(true).end() + .size(1_000_000) + .build(); + + private final IgniteSchema schema1 = new IgniteSchema(SqlCommon.DEFAULT_SCHEMA_NAME, 1, List.of(table)); + private final IgniteSchema schema2 = new IgniteSchema(SqlCommon.DEFAULT_SCHEMA_NAME, 2, List.of()); + + private final List<MailboxRegistry> mailboxes = new ArrayList<>(); + + private TestCluster testCluster; + private ExecutionServiceImpl<?> executionService; + private PrepareService prepareService; + private ParserService parserService; + + private final List<QueryTaskExecutor> executers = new ArrayList<>(); + + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private SqlSchemaManager schemaManager; + + @BeforeEach + public void init() { + testCluster = new TestCluster(); + } + + @AfterEach + public void tearDown() throws Exception { + prepareService.stop(); + + mailboxes.clear(); + + executers.forEach(executer -> { + try { + executer.stop(); + } catch (Exception e) { + log.error("Unable to stop executor", e); + } + }); + + executers.clear(); + scheduler.shutdownNow(); + } + + @Test + void test() throws Exception { + HybridClockImpl clock = new HybridClockImpl(); + HybridTimestamp queryStartTs = clock.now(); + long schemaChangeTs = clock.now().longValue(); + + schemaManager = createSchemaManager(schemaChangeTs); + + parserService = new ParserServiceImpl(); + prepareService = new PrepareServiceImpl( + "test", + 0, + CaffeineCacheFactory.INSTANCE, + new DdlSqlToCommandConverter(), + PLANNING_TIMEOUT, + PLANNING_THREAD_COUNT, + new MetricManagerImpl(), + schemaManager + ); + prepareService.start(); + executionService = createExecutionService(clock); + + SqlOperationContext prepareContext = createContext(queryStartTs); + QueryPlan plan = prepare("SELECT * FROM test_tbl", prepareContext); + + SqlOperationContext execContext = createContext(clock.now()); + + AsyncCursor<InternalSqlRow> cursor = executionService.executePlan(plan, execContext); + cursor.requestNextAsync(100).get(); + } + + private QueryPlan prepare(String query, SqlOperationContext ctx) { + ParsedResult parsedResult = parserService.parse(query); + + assertEquals(ctx.parameters().length, parsedResult.dynamicParamsCount(), "Invalid number of dynamic parameters"); + + return await(prepareService.prepareAsync(parsedResult, ctx)); + } + + private static SqlOperationContext createContext(HybridTimestamp operationTime) { + ImplicitTxContext.INSTANCE.updateObservableTime(operationTime); + + return SqlOperationContext.builder() + .queryId(UUID.randomUUID()) + .cancel(new QueryCancel()) + .operationTime(operationTime) + .prefetchCallback(new PrefetchCallback()) + .defaultSchemaName(SqlCommon.DEFAULT_SCHEMA_NAME) + .timeZoneId(SqlQueryProcessor.DEFAULT_TIME_ZONE_ID) + .txContext(ImplicitTxContext.INSTANCE) + .build(); + } + + /** Creates an execution service instance for the node with given consistent id. */ + private ExecutionServiceImpl<Object[]> createExecutionService(HybridClockImpl clock) { + var failureProcessor = new FailureProcessor(new NoOpFailureHandler()); + var taskExecutor = new QueryTaskExecutorImpl(NODE_NAME, 4, failureProcessor); + executers.add(taskExecutor); + + var node = testCluster.addNode(NODE_NAME, taskExecutor); + + node.dataset( List.of(new Object[]{1, 1}, new Object[]{2, 2}, new Object[]{3, 3})); + + var messageService = node.messageService(); + var mailboxRegistry = new MailboxRegistryImpl(); + mailboxes.add(mailboxRegistry); + + ClockService clockService = new TestClockService(clock); + + var exchangeService = new ExchangeServiceImpl(mailboxRegistry, messageService, clockService); + + var clusterNode = new ClusterNodeImpl(UUID.randomUUID().toString(), NODE_NAME, NetworkAddress.from("127.0.0.1:1111")); + + var topologyService = mock(TopologyService.class); + + when(topologyService.localMember()).thenReturn(clusterNode); + + NoOpExecutableTableRegistry executableTableRegistry = new NoOpExecutableTableRegistry(); + + ExecutionDependencyResolver dependencyResolver = new ExecutionDependencyResolverImpl(executableTableRegistry, null); + + var mappingService = createMappingService(NODE_NAME, clockService, taskExecutor); + var tableFunctionRegistry = new TableFunctionRegistryImpl(); + + List<LogicalNode> logicalNodes = List.of(new LogicalNode(NODE_NAME, NODE_NAME, NetworkAddress.from("127.0.0.1:10000"))); + + mappingService.onTopologyLeap(new LogicalTopologySnapshot(1, logicalNodes)); + + var executionService = new ExecutionServiceImpl<>( + messageService, + topologyService, + mappingService, + schemaManager, + mock(DdlCommandHandler.class), + taskExecutor, + ArrayRowHandler.INSTANCE, + executableTableRegistry, + dependencyResolver, + (ctx, deps) -> node.implementor(ctx, mailboxRegistry, exchangeService, deps, tableFunctionRegistry), + clockService, + SHUTDOWN_TIMEOUT + ); + + taskExecutor.start(); + exchangeService.start(); + executionService.start(); + + return executionService; + } + + private static MappingServiceImpl createMappingService( + String nodeName, + ClockService clock, + QueryTaskExecutorImpl taskExecutor + ) { + var targetProvider = new ExecutionTargetProvider() { + @Override + public CompletableFuture<ExecutionTarget> forTable( + HybridTimestamp operationTime, + ExecutionTargetFactory factory, + IgniteTable table, + boolean includeBackups + ) { + return completedFuture(factory.allOf(List.of(NODE_NAME))); + } + + @Override + public CompletableFuture<ExecutionTarget> forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) { + return CompletableFuture.failedFuture(new AssertionError("Not supported")); + } + }; + + PartitionPruner partitionPruner = (mappedFragments, dynamicParameters) -> mappedFragments; + return new MappingServiceImpl(nodeName, clock, targetProvider, EmptyCacheFactory.INSTANCE, 0, partitionPruner, taskExecutor); + } + + private SqlSchemaManager createSchemaManager(long schemaChangedTs) { + return new SqlSchemaManager() { + private final SchemaPlus root1; + private final SchemaPlus root2; + + { + this.root1 = Frameworks.createRootSchema(false); + this.root2 = Frameworks.createRootSchema(false); + + root1.add(schema1.getName(), schema1); + root2.add(schema2.getName(), schema2); + } + + @Override + public SchemaPlus schema(int catalogVersion) { + return catalogVersion == 1 ? root1 : root2; + } + + @Override + public SchemaPlus schema(long timestamp) { + boolean beforeChange = schemaChangedTs > timestamp; + return beforeChange ? root1 : root2; + } + + @Override + public IgniteTable table(int catalogVersion, int tableId) { + return catalogVersion == 1 ? schema1.tableByIdOpt(tableId) : schema2.tableByIdOpt(tableId); + } + + @Override + public CompletableFuture<Void> schemaReadyFuture(int catalogVersion) { + return completedFuture(null); + } + }; + } +} \ No newline at end of file diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java index b22b24fa03..27bd07d10b 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java @@ -48,7 +48,9 @@ public class ImplicitTxContext implements QueryTransactionContext { @Override public QueryTransactionWrapper getOrStartImplicit(boolean readOnly) { - return new QueryTransactionWrapperImpl(new NoOpTransaction("dummy"), true, TX_INFLIGHTS); + HybridTimestamp ts = observableTimeTracker.get(); + + return new QueryTransactionWrapperImpl(new NoOpTransaction("dummy", true, ts), true, TX_INFLIGHTS); } @Override diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java index 4ad153c175..aaf3493e0f 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java @@ -37,7 +37,7 @@ public final class NoOpTransaction implements InternalTransaction { private final UUID id = UUID.randomUUID(); - private final HybridTimestamp hybridTimestamp = new HybridTimestamp(1, 1); + private final HybridTimestamp hybridTimestamp; private final IgniteBiTuple<ClusterNode, Long> tuple; @@ -75,8 +75,19 @@ public final class NoOpTransaction implements InternalTransaction { * @param readOnly Read-only or not. */ private NoOpTransaction(String name, boolean readOnly) { + this(name, readOnly, new HybridTimestamp(1, 1)); + } + + /** + * Constructs a transaction. + * + * @param name Name of the node. + * @param readOnly Read-only or not. + */ + NoOpTransaction(String name, boolean readOnly, HybridTimestamp timestamp) { var networkAddress = NetworkAddress.from(new InetSocketAddress("localhost", 1234)); this.tuple = new IgniteBiTuple<>(new ClusterNodeImpl(name, name, networkAddress), 1L); + this.hybridTimestamp = timestamp; this.readOnly = readOnly; }
