This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-22303
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit c8deccdcce6530bac48de626c3b1acf3705887a9
Author: amashenkov <[email protected]>
AuthorDate: Mon Jul 1 16:28:45 2024 +0300

    Add test
---
 .../sql/engine/exec/ExecutionServiceImpl.java      |   2 +
 .../internal/sql/engine/schema/IgniteSchema.java   |   2 +-
 .../internal/sql/engine/exec/QueryRetryTest.java   | 318 +++++++++++++++++++++
 .../sql/engine/framework/ImplicitTxContext.java    |   4 +-
 .../sql/engine/framework/NoOpTransaction.java      |  13 +-
 5 files changed, 336 insertions(+), 3 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index c314582b68..82e1f1b34c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -320,6 +320,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
 
         QueryTransactionWrapper txWrapper = 
txContext.getOrStartImplicit(plan.type() != SqlQueryType.DML);
 
+        assert sqlSchemaManager.schema(plan.catalogVersion()) == 
sqlSchemaManager.schema(txWrapper.unwrap().startTimestamp().longValue());
+
         AsyncCursor<InternalSqlRow> dataCursor = 
queryManager.execute(txWrapper.unwrap(), plan);
 
         PrefetchCallback prefetchCallback = 
operationContext.prefetchCallback();
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
index 109b1ff431..b1d1092449 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
@@ -66,7 +66,7 @@ public class IgniteSchema extends AbstractSchema {
     }
 
     /** Returns table by given id. */
-    @Nullable IgniteTable tableByIdOpt(int tableId) {
+    public @Nullable IgniteTable tableByIdOpt(int tableId) {
         IgniteDataSource dataSource = tableById.get(tableId);
 
         if (!(dataSource instanceof IgniteTable)) {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java
new file mode 100644
index 0000000000..7d4e4d4cf9
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRetryTest.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.failure.handlers.NoOpFailureHandler;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.hlc.TestClockService;
+import org.apache.ignite.internal.metrics.MetricManagerImpl;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.internal.sql.SqlCommon;
+import org.apache.ignite.internal.sql.engine.InternalSqlRow;
+import org.apache.ignite.internal.sql.engine.QueryCancel;
+import org.apache.ignite.internal.sql.engine.SqlOperationContext;
+import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
+import 
org.apache.ignite.internal.sql.engine.SqlQueryProcessor.PrefetchCallback;
+import 
org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.TestCluster;
+import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
+import 
org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistryImpl;
+import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetProvider;
+import org.apache.ignite.internal.sql.engine.exec.mapping.MappingServiceImpl;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
+import org.apache.ignite.internal.sql.engine.framework.ImplicitTxContext;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
+import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
+import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
+import 
org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
+import org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruner;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
+import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
+import org.apache.ignite.internal.sql.engine.sql.ParserService;
+import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory;
+import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.internal.util.AsyncCursor;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class QueryRetryTest extends BaseIgniteAbstractTest {
+
+    /** Timeout in ms for async operations. */
+    private static final long TIMEOUT_IN_MS = 2_000;
+
+    /** Timeout in ms for SQL planning phase. */
+    public static final long PLANNING_TIMEOUT = 5_000;
+
+    public static final int PLANNING_THREAD_COUNT = 2;
+
+    /** Timeout in ms for stopping execution service. */
+    private static final long SHUTDOWN_TIMEOUT = 5_000;
+
+    private static final String NODE_NAME = "node";
+
+    private final IgniteTable table = TestBuilders.table()
+            .name("TEST_TBL")
+            .addKeyColumn("ID", NativeTypes.INT32)
+            .addColumn("VAL", NativeTypes.INT32)
+            .distribution(IgniteDistributions.affinity(0, 1, 2))
+            
.hashIndex().name("TEST_TBL_PK").addColumn("ID").primaryKey(true).end()
+            .size(1_000_000)
+            .build();
+
+    private final IgniteSchema schema1 = new 
IgniteSchema(SqlCommon.DEFAULT_SCHEMA_NAME, 1, List.of(table));
+    private final IgniteSchema schema2 = new 
IgniteSchema(SqlCommon.DEFAULT_SCHEMA_NAME, 2, List.of());
+
+    private final List<MailboxRegistry> mailboxes = new ArrayList<>();
+
+    private TestCluster testCluster;
+    private ExecutionServiceImpl<?> executionService;
+    private PrepareService prepareService;
+    private ParserService parserService;
+
+    private final List<QueryTaskExecutor> executers = new ArrayList<>();
+
+    private final ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
+    private SqlSchemaManager schemaManager;
+
+    @BeforeEach
+    public void init() {
+        testCluster = new TestCluster();
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        prepareService.stop();
+
+        mailboxes.clear();
+
+        executers.forEach(executer -> {
+            try {
+                executer.stop();
+            } catch (Exception e) {
+                log.error("Unable to stop executor", e);
+            }
+        });
+
+        executers.clear();
+        scheduler.shutdownNow();
+    }
+
+    @Test
+    void test() throws Exception {
+        HybridClockImpl clock = new HybridClockImpl();
+        HybridTimestamp queryStartTs = clock.now();
+        long schemaChangeTs = clock.now().longValue();
+
+        schemaManager = createSchemaManager(schemaChangeTs);
+
+        parserService = new ParserServiceImpl();
+        prepareService = new PrepareServiceImpl(
+                "test",
+                0,
+                CaffeineCacheFactory.INSTANCE,
+                new DdlSqlToCommandConverter(),
+                PLANNING_TIMEOUT,
+                PLANNING_THREAD_COUNT,
+                new MetricManagerImpl(),
+                schemaManager
+        );
+        prepareService.start();
+        executionService = createExecutionService(clock);
+
+        SqlOperationContext prepareContext = createContext(queryStartTs);
+        QueryPlan plan = prepare("SELECT * FROM test_tbl", prepareContext);
+
+        SqlOperationContext execContext = createContext(clock.now());
+
+        AsyncCursor<InternalSqlRow> cursor = 
executionService.executePlan(plan, execContext);
+        cursor.requestNextAsync(100).get();
+    }
+
+    private QueryPlan prepare(String query, SqlOperationContext ctx) {
+        ParsedResult parsedResult = parserService.parse(query);
+
+        assertEquals(ctx.parameters().length, 
parsedResult.dynamicParamsCount(), "Invalid number of dynamic parameters");
+
+        return await(prepareService.prepareAsync(parsedResult, ctx));
+    }
+
+    private static SqlOperationContext createContext(HybridTimestamp 
operationTime) {
+        ImplicitTxContext.INSTANCE.updateObservableTime(operationTime);
+
+        return SqlOperationContext.builder()
+                .queryId(UUID.randomUUID())
+                .cancel(new QueryCancel())
+                .operationTime(operationTime)
+                .prefetchCallback(new PrefetchCallback())
+                .defaultSchemaName(SqlCommon.DEFAULT_SCHEMA_NAME)
+                .timeZoneId(SqlQueryProcessor.DEFAULT_TIME_ZONE_ID)
+                .txContext(ImplicitTxContext.INSTANCE)
+                .build();
+    }
+
+    /** Creates an execution service instance for the node with given 
consistent id. */
+    private ExecutionServiceImpl<Object[]> 
createExecutionService(HybridClockImpl clock) {
+        var failureProcessor = new FailureProcessor(new NoOpFailureHandler());
+        var taskExecutor = new QueryTaskExecutorImpl(NODE_NAME, 4, 
failureProcessor);
+        executers.add(taskExecutor);
+
+        var node = testCluster.addNode(NODE_NAME, taskExecutor);
+
+        node.dataset( List.of(new Object[]{1, 1}, new Object[]{2, 2}, new 
Object[]{3, 3}));
+
+        var messageService = node.messageService();
+        var mailboxRegistry = new MailboxRegistryImpl();
+        mailboxes.add(mailboxRegistry);
+
+        ClockService clockService = new TestClockService(clock);
+
+        var exchangeService = new ExchangeServiceImpl(mailboxRegistry, 
messageService, clockService);
+
+        var clusterNode = new ClusterNodeImpl(UUID.randomUUID().toString(), 
NODE_NAME, NetworkAddress.from("127.0.0.1:1111"));
+
+        var topologyService = mock(TopologyService.class);
+
+        when(topologyService.localMember()).thenReturn(clusterNode);
+
+        NoOpExecutableTableRegistry executableTableRegistry = new 
NoOpExecutableTableRegistry();
+
+        ExecutionDependencyResolver dependencyResolver = new 
ExecutionDependencyResolverImpl(executableTableRegistry, null);
+
+        var mappingService = createMappingService(NODE_NAME, clockService, 
taskExecutor);
+        var tableFunctionRegistry = new TableFunctionRegistryImpl();
+
+        List<LogicalNode> logicalNodes = List.of(new LogicalNode(NODE_NAME, 
NODE_NAME, NetworkAddress.from("127.0.0.1:10000")));
+
+        mappingService.onTopologyLeap(new LogicalTopologySnapshot(1, 
logicalNodes));
+
+        var executionService = new ExecutionServiceImpl<>(
+                messageService,
+                topologyService,
+                mappingService,
+                schemaManager,
+                mock(DdlCommandHandler.class),
+                taskExecutor,
+                ArrayRowHandler.INSTANCE,
+                executableTableRegistry,
+                dependencyResolver,
+                (ctx, deps) -> node.implementor(ctx, mailboxRegistry, 
exchangeService, deps, tableFunctionRegistry),
+                clockService,
+                SHUTDOWN_TIMEOUT
+        );
+
+        taskExecutor.start();
+        exchangeService.start();
+        executionService.start();
+
+        return executionService;
+    }
+
+    private static MappingServiceImpl createMappingService(
+            String nodeName,
+            ClockService clock,
+            QueryTaskExecutorImpl taskExecutor
+    ) {
+        var targetProvider = new ExecutionTargetProvider() {
+            @Override
+            public CompletableFuture<ExecutionTarget> forTable(
+                    HybridTimestamp operationTime,
+                    ExecutionTargetFactory factory,
+                    IgniteTable table,
+                    boolean includeBackups
+            ) {
+                return completedFuture(factory.allOf(List.of(NODE_NAME)));
+            }
+
+            @Override
+            public CompletableFuture<ExecutionTarget> 
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
+                return CompletableFuture.failedFuture(new AssertionError("Not 
supported"));
+            }
+        };
+
+        PartitionPruner partitionPruner = (mappedFragments, dynamicParameters) 
-> mappedFragments;
+        return new MappingServiceImpl(nodeName, clock, targetProvider, 
EmptyCacheFactory.INSTANCE, 0, partitionPruner, taskExecutor);
+    }
+
+    private SqlSchemaManager createSchemaManager(long schemaChangedTs) {
+        return new SqlSchemaManager() {
+            private final SchemaPlus root1;
+            private final SchemaPlus root2;
+
+            {
+                this.root1 = Frameworks.createRootSchema(false);
+                this.root2 = Frameworks.createRootSchema(false);
+
+                root1.add(schema1.getName(), schema1);
+                root2.add(schema2.getName(), schema2);
+            }
+
+            @Override
+            public SchemaPlus schema(int catalogVersion) {
+                return catalogVersion == 1 ? root1 : root2;
+            }
+
+            @Override
+            public SchemaPlus schema(long timestamp) {
+                boolean beforeChange = schemaChangedTs > timestamp;
+                return beforeChange ? root1 : root2;
+            }
+
+            @Override
+            public IgniteTable table(int catalogVersion, int tableId) {
+                return catalogVersion == 1 ? schema1.tableByIdOpt(tableId) : 
schema2.tableByIdOpt(tableId);
+            }
+
+            @Override
+            public CompletableFuture<Void> schemaReadyFuture(int 
catalogVersion) {
+                return completedFuture(null);
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java
index b22b24fa03..27bd07d10b 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java
@@ -48,7 +48,9 @@ public class ImplicitTxContext implements 
QueryTransactionContext {
 
     @Override
     public QueryTransactionWrapper getOrStartImplicit(boolean readOnly) {
-        return new QueryTransactionWrapperImpl(new NoOpTransaction("dummy"), 
true, TX_INFLIGHTS);
+        HybridTimestamp ts = observableTimeTracker.get();
+
+        return new QueryTransactionWrapperImpl(new NoOpTransaction("dummy", 
true, ts), true, TX_INFLIGHTS);
     }
 
     @Override
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
index 4ad153c175..aaf3493e0f 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
@@ -37,7 +37,7 @@ public final class NoOpTransaction implements 
InternalTransaction {
 
     private final UUID id = UUID.randomUUID();
 
-    private final HybridTimestamp hybridTimestamp = new HybridTimestamp(1, 1);
+    private final HybridTimestamp hybridTimestamp;
 
     private final IgniteBiTuple<ClusterNode, Long> tuple;
 
@@ -75,8 +75,19 @@ public final class NoOpTransaction implements 
InternalTransaction {
      * @param readOnly Read-only or not.
      */
     private NoOpTransaction(String name, boolean readOnly) {
+        this(name, readOnly,  new HybridTimestamp(1, 1));
+    }
+
+    /**
+     * Constructs a transaction.
+     *
+     * @param name Name of the node.
+     * @param readOnly Read-only or not.
+     */
+    NoOpTransaction(String name, boolean readOnly, HybridTimestamp timestamp) {
         var networkAddress = NetworkAddress.from(new 
InetSocketAddress("localhost", 1234));
         this.tuple = new IgniteBiTuple<>(new ClusterNodeImpl(name, name, 
networkAddress), 1L);
+        this.hybridTimestamp = timestamp;
         this.readOnly = readOnly;
     }
 

Reply via email to