This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5e47e6b31f IGNITE-23663 Sql. Make TestNode to use QueryExecutor (#4711)
5e47e6b31f is described below
commit 5e47e6b31f9fc3ad32b6eb20c637822700b01fd4
Author: korlov42 <[email protected]>
AuthorDate: Wed Nov 13 13:09:06 2024 +0200
IGNITE-23663 Sql. Make TestNode to use QueryExecutor (#4711)
---
.../ignite/internal/sql/engine/exec/fsm/Query.java | 12 +-
.../sql/engine/tx/ScriptTransactionContext.java | 6 +-
.../sql/engine/benchmarks/SqlBenchmark.java | 31 +-
.../sql/engine/exec/TransactionEnlistTest.java | 39 +--
.../exec/coercion/BaseTypeCheckExecutionTest.java | 20 +-
.../engine/framework/ClusterServiceFactory.java | 31 +-
.../sql/engine/framework/NoOpTransaction.java | 3 +-
.../engine/framework/NoOpTransactionTracker.java | 4 +-
.../sql/engine/framework/TestBuilders.java | 317 +++++++++++++--------
.../internal/sql/engine/framework/TestCluster.java | 17 ++
.../sql/engine/framework/TestClusterTest.java | 103 +++----
.../internal/sql/engine/framework/TestNode.java | 146 ++++++----
.../internal/sql/engine/util/QueryCheckerTest.java | 42 +--
13 files changed, 468 insertions(+), 303 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
index b7a6414a92..3fc324bb64 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
@@ -173,9 +173,19 @@ class Query implements Runnable {
/** Moves the query to a given state. */
void moveTo(ExecutionPhase newPhase) {
synchronized (mux) {
+ ExecutionPhase currentPhase = this.currentPhase;
+
+ // Transition from TERMINATED to TERMINATED is possible, when
query is completed successfully
+ // just moment before the node starts to shut down. In this case,
all running queries are terminated
+ // with NodeStoppingException, and completed query may still be
presented in the list of running queries.
+ if (currentPhase == newPhase && currentPhase ==
ExecutionPhase.TERMINATED) {
+ // ignore such transition
+ return;
+ }
+
assert currentPhase.transitionAllowed(newPhase) : "currentPhase="
+ currentPhase + ", newPhase=" + newPhase;
- currentPhase = newPhase;
+ this.currentPhase = newPhase;
}
onPhaseStartedCallback.computeIfAbsent(newPhase, k -> new
CompletableFuture<>()).complete(null);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java
index 6e158e4ea5..e38f5f7a31 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java
@@ -39,7 +39,7 @@ import org.jetbrains.annotations.Nullable;
* Starts an implicit or script-driven transaction if there is no external
transaction.
*/
public class ScriptTransactionContext implements QueryTransactionContext {
- private final QueryTransactionContextImpl txContext;
+ private final QueryTransactionContext txContext;
private final TransactionTracker txTracker;
@@ -47,9 +47,7 @@ public class ScriptTransactionContext implements
QueryTransactionContext {
/** Constructor. */
public ScriptTransactionContext(QueryTransactionContext txContext,
TransactionTracker txTracker) {
- assert txContext instanceof QueryTransactionContextImpl : txContext;
-
- this.txContext = (QueryTransactionContextImpl) txContext;
+ this.txContext = txContext;
this.txTracker = txTracker;
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
index 3e841fae1b..e54eddbe68 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
@@ -19,14 +19,15 @@ package org.apache.ignite.internal.sql.engine.benchmarks;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.ignite.internal.sql.engine.framework.DataProvider;
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
import org.apache.ignite.internal.sql.engine.framework.TestCluster;
import org.apache.ignite.internal.sql.engine.framework.TestNode;
-import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
-import org.apache.ignite.internal.type.NativeTypes;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -62,27 +63,29 @@ public class SqlBenchmark {
// @formatter:off
private final TestCluster cluster = TestBuilders.cluster()
.nodes("N1", "N2", "N3")
- .addTable()
- .name("T1")
- .addKeyColumn("ID", NativeTypes.INT32)
- .addColumn("VAL", NativeTypes.stringOf(64))
- .end()
- .dataProvider("N1", "T1", TestBuilders.tableScan(dataProvider))
- .dataProvider("N2", "T1", TestBuilders.tableScan(dataProvider))
- .dataProvider("N3", "T1", TestBuilders.tableScan(dataProvider))
.build();
// @formatter:on
private final TestNode gatewayNode = cluster.node("N1");
- private QueryPlan plan;
-
/** Starts the cluster and prepares the plan of the query. */
@Setup
public void setUp() {
cluster.start();
- plan = gatewayNode.prepare("SELECT * FROM t1");
+ //noinspection ConcatenationWithEmptyString
+ cluster.node("N1").initSchema(""
+ + "CREATE ZONE test_zone WITH partitions=3,
storage_profiles='Default';"
+ + "CREATE TABLE t1 (id INT PRIMARY KEY, val VARCHAR(64)) ZONE
test_zone");
+
+ cluster.setAssignmentsProvider("T1", (partitionCount, b) -> {
+ assert partitionCount == 3;
+
+ return Stream.of("N1", "N2", "N3")
+ .map(List::of)
+ .collect(Collectors.toList());
+ });
+ cluster.setDataProvider("T1", TestBuilders.tableScan(dataProvider));
}
/** Stops the cluster. */
@@ -94,7 +97,7 @@ public class SqlBenchmark {
/** Very simple test to measure performance of minimal possible
distributed query. */
@Benchmark
public void selectAllSimple(Blackhole bh) {
- for (var row :
await(gatewayNode.executePlan(plan).requestNextAsync(10_000)).items()) {
+ for (var row : await(gatewayNode.executeQuery("SELECT * FROM
t1").requestNextAsync(10_000)).items()) {
bh.consume(row);
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
index ca71111124..316ed18a11 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
@@ -23,12 +23,12 @@ import static org.mockito.Mockito.times;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
-import org.apache.ignite.internal.sql.engine.AsyncSqlCursorImpl;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
-import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.sql.engine.framework.DataProvider;
import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
@@ -44,7 +44,6 @@ import
org.apache.ignite.internal.sql.engine.util.QueryCheckerFactory;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.lang.CancellationToken;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterAll;
@@ -62,18 +61,22 @@ public class TransactionEnlistTest extends
BaseIgniteAbstractTest {
private static QueryCheckerFactory queryCheckerFactory;
private static final TestCluster CLUSTER = TestBuilders.cluster()
- .nodes(NODE_NAME1, true)
- .addTable()
- .name("T1")
- .addKeyColumn("ID", NativeTypes.INT32)
- .addColumn("VAL", NativeTypes.INT32)
- .end()
- .dataProvider(NODE_NAME1, "T1",
TestBuilders.tableScan(DataProvider.fromCollection(List.of())))
+ .nodes(NODE_NAME1)
.build(); // add method use table partitions
@BeforeAll
static void startCluster() {
CLUSTER.start();
+
+ //noinspection ConcatenationWithEmptyString
+ CLUSTER.node("N1").initSchema(""
+ + "CREATE ZONE test_zone WITH partitions=3,
storage_profiles='Default';"
+ + "CREATE TABLE t1 (id INT PRIMARY KEY, val INT) ZONE
test_zone");
+
+ CLUSTER.setAssignmentsProvider("T1", (partitionCount, b) ->
IntStream.range(0, partitionCount)
+ .mapToObj(i -> List.of("N1"))
+ .collect(Collectors.toList()));
+ CLUSTER.setDataProvider("T1",
TestBuilders.tableScan(DataProvider.fromCollection(List.of())));
}
@AfterAll
@@ -92,7 +95,7 @@ public class TransactionEnlistTest extends
BaseIgniteAbstractTest {
try {
assertQuery("INSERT INTO t1 VALUES(1, 2), (2, 3)",
spiedTx).check();
- } catch (Exception ex) {
+ } catch (Exception ignored) {
// No op.
}
@@ -147,19 +150,7 @@ public class TransactionEnlistTest extends
BaseIgniteAbstractTest {
assert params == null || params.length == 0 : "params are not
supported";
assert !prepareOnly : "Expected that the query will only be
prepared, but not executed";
- QueryPlan plan = node.prepare(qry);
- AsyncDataCursor<InternalSqlRow> dataCursor =
node.executePlan(plan, transaction);
-
- SqlQueryType type = plan.type();
-
- assert type != null;
-
- AsyncSqlCursor<InternalSqlRow> sqlCursor = new
AsyncSqlCursorImpl<>(
- type,
- plan.metadata(),
- dataCursor,
- null
- );
+ AsyncSqlCursor<InternalSqlRow> sqlCursor =
node.executeQuery(transaction, qry);
return CompletableFuture.completedFuture(sqlCursor);
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/coercion/BaseTypeCheckExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/coercion/BaseTypeCheckExecutionTest.java
index 468758d423..0992ed5f1f 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/coercion/BaseTypeCheckExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/coercion/BaseTypeCheckExecutionTest.java
@@ -22,13 +22,16 @@ import static
org.junit.jupiter.api.Assertions.assertNotNull;
import java.math.BigDecimal;
import java.math.RoundingMode;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.framework.DataProvider;
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
import org.apache.ignite.internal.sql.engine.framework.TestCluster;
import org.apache.ignite.internal.sql.engine.framework.TestNode;
import org.apache.ignite.internal.sql.engine.planner.datatypes.utils.TypePair;
-import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
import org.apache.ignite.internal.sql.engine.util.CursorUtils;
import org.apache.ignite.internal.sql.engine.util.SqlTestUtils;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -38,7 +41,6 @@ import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.Pair;
import org.apache.ignite.sql.ColumnMetadata;
import org.apache.ignite.sql.ColumnType;
-import org.apache.ignite.sql.ResultSetMetadata;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
@@ -225,7 +227,11 @@ class BaseTypeCheckExecutionTest extends
BaseIgniteAbstractTest {
.addColumn("C1", typePair.first())
.addColumn("C2", typePair.second())
.end()
- .dataProvider("N1", "T", TestBuilders.tableScan(dataProvider))
+ .defaultAssignmentsProvider(tableName -> (partNum,
includeBackups) -> IntStream.range(0, partNum)
+ .mapToObj(part -> List.of("N1"))
+ .collect(Collectors.toList())
+ )
+ .defaultDataProvider(tableName ->
TestBuilders.tableScan(dataProvider))
.build();
return new ClusterWrapper(cluster);
@@ -242,11 +248,11 @@ class BaseTypeCheckExecutionTest extends
BaseIgniteAbstractTest {
void process(String sql, Matcher<Object> resultMatcher) {
TestNode gatewayNode = cluster.node("N1");
- QueryPlan plan = gatewayNode.prepare(sql);
- ResultSetMetadata resultMeta = plan.metadata();
- ColumnMetadata colMeta = resultMeta.columns().get(0);
- for (InternalSqlRow row :
CursorUtils.getAllFromCursor(gatewayNode.executePlan(plan))) {
+ AsyncSqlCursor<InternalSqlRow> cursor =
gatewayNode.executeQuery(sql);
+ ColumnMetadata colMeta = cursor.metadata().columns().get(0);
+
+ for (InternalSqlRow row : CursorUtils.getAllFromCursor(cursor)) {
assertNotNull(row);
assertNotNull(row.get(0), "Await not null object");
assertThat(new Pair<>(row.get(0), colMeta), resultMatcher);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
index 1bab8c4985..91ddbe6837 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
@@ -39,7 +39,9 @@ import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessageHandler;
+import org.apache.ignite.internal.network.TopologyEventHandler;
import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.internal.network.UnresolvableConsistentIdException;
import
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
@@ -120,6 +122,22 @@ public class ClusterServiceFactory {
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
return nullCompletedFuture();
}
+
+ @Override
+ public CompletableFuture<Void> stopAsync(ComponentContext
componentContext) {
+ ClusterNode node = nodeByName.remove(nodeName);
+
+ if (node != null) {
+ messagingServicesByNode.remove(nodeName);
+ topologyServicesByNode.remove(nodeName);
+
+ topologyServicesByNode.values().stream()
+ .flatMap(topSrvc ->
topSrvc.getEventHandlers().stream())
+ .forEach(eventHandler ->
eventHandler.onDisappeared(node));
+ }
+
+ return nullCompletedFuture();
+ }
};
}
@@ -150,6 +168,11 @@ public class ClusterServiceFactory {
return new ClusterNodeImpl(randomUUID(), name,
NetworkAddress.from("127.0.0.1:" + NODE_COUNTER.incrementAndGet()));
}
+ @Override
+ public Collection<TopologyEventHandler> getEventHandlers() {
+ return super.getEventHandlers();
+ }
+
/** {@inheritDoc} */
@Override
public ClusterNode localMember() {
@@ -205,7 +228,13 @@ public class ClusterServiceFactory {
@Override
public CompletableFuture<Void> send(String recipientConsistentId,
ChannelType channelType, NetworkMessage msg) {
- for (var handler :
messagingServicesByNode.get(recipientConsistentId).messageHandlers(msg.groupType()))
{
+ LocalMessagingService recipient =
messagingServicesByNode.get(recipientConsistentId);
+
+ if (recipient == null) {
+ return CompletableFuture.failedFuture(new
UnresolvableConsistentIdException(recipientConsistentId));
+ }
+
+ for (NetworkMessageHandler handler :
recipient.messageHandlers(msg.groupType())) {
handler.onReceived(msg, localNode, null);
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
index 886a7a4b2f..e221084a4c 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
@@ -41,7 +41,8 @@ public final class NoOpTransaction implements
InternalTransaction {
private final UUID id = randomUUID();
- private final HybridTimestamp hybridTimestamp = new HybridTimestamp(1, 1);
+ private final HybridTimestamp hybridTimestamp = new HybridTimestamp(1, 1)
+ .addPhysicalTime(System.currentTimeMillis());
private final IgniteBiTuple<ClusterNode, Long> tuple;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransactionTracker.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransactionTracker.java
index de3b7ece40..d87d66d48b 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransactionTracker.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransactionTracker.java
@@ -23,8 +23,8 @@ import
org.apache.ignite.internal.sql.engine.exec.TransactionTracker;
/**
* Dummy no-op tracker.
*/
-final class NoOpTransactionTracker implements TransactionTracker {
- static final TransactionTracker INSTANCE = new NoOpTransactionTracker();
+public final class NoOpTransactionTracker implements TransactionTracker {
+ public static final TransactionTracker INSTANCE = new
NoOpTransactionTracker();
@Override
public boolean register(UUID txId, boolean readOnly) {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 471f6a59bd..8af2224586 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -44,8 +44,10 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -176,11 +178,11 @@ public class TestBuilders {
}
/**
- * Factory method to create {@link ScannableTable table} instance from
given data provider with
- * only implemented {@link ScannableTable#scan table scan}.
+ * Factory method to create {@link ScannableTable table} instance from
given data provider with only implemented
+ * {@link ScannableTable#scan table scan}.
*/
public static ScannableTable tableScan(DataProvider<Object[]>
dataProvider) {
- return new ScannableTable() {
+ return new AbstractScannableTable() {
@Override
public <RowT> Publisher<RowT> scan(
ExecutionContext<RowT> ctx,
@@ -199,36 +201,39 @@ public class TestBuilders {
rowFactory::create
);
}
+ };
+ }
+ /**
+ * Factory method to create {@link ScannableTable table} instance from
given data provider with only implemented
+ * {@link ScannableTable#scan table scan}.
+ */
+ public static ScannableTable tableScan(BiFunction<String, Integer,
Iterable<Object[]>> generatorFunction) {
+ return new AbstractScannableTable() {
@Override
- public <RowT> Publisher<RowT>
indexRangeScan(ExecutionContext<RowT> ctx, PartitionWithConsistencyToken
partWithConsistencyToken,
- RowFactory<RowT> rowFactory, int indexId, List<String>
columns, @Nullable RangeCondition<RowT> cond,
- @Nullable BitSet requiredColumns) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT>
ctx, PartitionWithConsistencyToken partWithConsistencyToken,
- RowFactory<RowT> rowFactory, int indexId, List<String>
columns, RowT key, @Nullable BitSet requiredColumns) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <RowT> CompletableFuture<@Nullable RowT>
primaryKeyLookup(ExecutionContext<RowT> ctx, InternalTransaction explicitTx,
- RowFactory<RowT> rowFactory, RowT key, @Nullable BitSet
requiredColumns) {
- throw new UnsupportedOperationException();
- }
+ public <RowT> Publisher<RowT> scan(
+ ExecutionContext<RowT> ctx,
+ PartitionWithConsistencyToken partWithConsistencyToken,
+ RowFactory<RowT> rowFactory,
+ @Nullable BitSet requiredColumns
+ ) {
- @Override
- public CompletableFuture<Long> estimatedSize() {
- return
CompletableFuture.completedFuture(dataProvider.estimatedSize());
+ return new TransformingPublisher<>(
+ SubscriptionUtils.fromIterable(
+ () -> new TransformingIterator<>(
+
generatorFunction.apply(ctx.localNode().name(),
partWithConsistencyToken.partId()).iterator(),
+ row -> project(row, requiredColumns)
+ )
+ ),
+ rowFactory::create
+ );
}
};
}
/**
- * Factory method to create {@link ScannableTable table} instance from
given data provider with
- * only implemented {@link ScannableTable#indexRangeScan index range scan}.
+ * Factory method to create {@link ScannableTable table} instance from
given data provider with only implemented
+ * {@link ScannableTable#indexRangeScan index range scan}.
*/
public static ScannableTable indexRangeScan(DataProvider<Object[]>
dataProvider) {
return new ScannableTable() {
@@ -277,8 +282,8 @@ public class TestBuilders {
}
/**
- * Factory method to create {@link ScannableTable table} instance from
given data provider with
- * only implemented {@link ScannableTable#indexLookup index lookup}.
+ * Factory method to create {@link ScannableTable table} instance from
given data provider with only implemented
+ * {@link ScannableTable#indexLookup index lookup}.
*/
public static ScannableTable indexLookup(DataProvider<Object[]>
dataProvider) {
return new ScannableTable() {
@@ -342,17 +347,6 @@ public class TestBuilders {
*/
ClusterBuilder nodes(String firstNodeName, String... otherNodeNames);
- /**
- * Sets desired names for the cluster nodes.
- *
- * @param firstNodeName A name of the first node. There is no
difference in what node should be first. This parameter was
- * introduced to force user to provide at least one node name.
- * @param useTablePartitions If {@code true} map table partitions to
whole defined nodes.
- * @param otherNodeNames An array of rest of the names to create
cluster from.
- * @return {@code this} for chaining.
- */
- public ClusterBuilder nodes(String firstNodeName, boolean
useTablePartitions, String... otherNodeNames);
-
/**
* Creates a table builder to add to the cluster.
*
@@ -364,8 +358,8 @@ public class TestBuilders {
* Adds the given system view to the cluster.
*
* @param systemView System view.
- * @return {@code this} for chaining.
* @param <T> System view data type.
+ * @return {@code this} for chaining.
*/
<T> ClusterBuilder addSystemView(SystemView<T> systemView);
@@ -377,14 +371,20 @@ public class TestBuilders {
TestCluster build();
/**
- * Provides implementation of table with given name local per given
node.
+ * Provides implementation of table with given name.
+ *
+ * @param defaultDataProvider Name of the table given instance
represents.
+ * @return {@code this} for chaining.
+ */
+ ClusterBuilder defaultDataProvider(DefaultDataProvider
defaultDataProvider);
+
+ /**
+ * Provides implementation of table with given name.
*
- * @param nodeName Name of the node given instance of table will be
assigned to.
- * @param tableName Name of the table given instance represents.
- * @param table Actual table that will be used for read operations
during execution.
+ * @param defaultAssignmentsProvider Name of the table given instance
represents.
* @return {@code this} for chaining.
*/
- ClusterBuilder dataProvider(String nodeName, String tableName,
ScannableTable table);
+ ClusterBuilder defaultAssignmentsProvider(DefaultAssignmentsProvider
defaultAssignmentsProvider);
/**
* Registers a previously added system view (see {@link
#addSystemView(SystemView)}) on the specified node.
@@ -599,27 +599,16 @@ public class TestBuilders {
private static class ClusterBuilderImpl implements ClusterBuilder {
private final List<ClusterTableBuilderImpl> tableBuilders = new
ArrayList<>();
private List<String> nodeNames;
- private boolean useTablePartitions;
- private final Map<String, Map<String, ScannableTable>>
nodeName2tableName2table = new HashMap<>();
private final List<SystemView<?>> systemViews = new ArrayList<>();
private final Map<String, Set<String>> nodeName2SystemView = new
HashMap<>();
- /** {@inheritDoc} */
- @Override
- public ClusterBuilder nodes(String firstNodeName, String...
otherNodeNames) {
- this.nodeNames = new ArrayList<>();
-
- nodeNames.add(firstNodeName);
- nodeNames.addAll(Arrays.asList(otherNodeNames));
-
- return this;
- }
+ private @Nullable DefaultDataProvider defaultDataProvider = null;
+ private @Nullable DefaultAssignmentsProvider
defaultAssignmentsProvider = null;
/** {@inheritDoc} */
@Override
- public ClusterBuilder nodes(String firstNodeName, boolean
useTablePartitions, String... otherNodeNames) {
+ public ClusterBuilder nodes(String firstNodeName, String...
otherNodeNames) {
this.nodeNames = new ArrayList<>();
- this.useTablePartitions = useTablePartitions;
nodeNames.add(firstNodeName);
nodeNames.addAll(Arrays.asList(otherNodeNames));
@@ -640,15 +629,22 @@ public class TestBuilders {
}
@Override
- public ClusterBuilder dataProvider(String nodeName, String tableName,
ScannableTable table) {
- nodeName2tableName2table.computeIfAbsent(nodeName, key -> new
HashMap<>()).put(tableName, table);
+ public ClusterBuilder registerSystemView(String nodeName, String
systemViewName) {
+ nodeName2SystemView.computeIfAbsent(nodeName, key -> new
HashSet<>()).add(systemViewName);
return this;
}
@Override
- public ClusterBuilder registerSystemView(String nodeName, String
systemViewName) {
- nodeName2SystemView.computeIfAbsent(nodeName, key -> new
HashSet<>()).add(systemViewName);
+ public ClusterBuilder defaultDataProvider(DefaultDataProvider
defaultDataProvider) {
+ this.defaultDataProvider = defaultDataProvider;
+
+ return this;
+ }
+
+ @Override
+ public ClusterBuilder
defaultAssignmentsProvider(DefaultAssignmentsProvider
defaultAssignmentsProvider) {
+ this.defaultAssignmentsProvider = defaultAssignmentsProvider;
return this;
}
@@ -656,8 +652,6 @@ public class TestBuilders {
/** {@inheritDoc} */
@Override
public TestCluster build() {
- validateConfiguredDataProviders();
-
var clusterService = new ClusterServiceFactory(nodeNames);
var clusterName = "test_cluster";
@@ -674,13 +668,6 @@ public class TestBuilders {
new DdlSqlToCommandConverter(), PLANNING_TIMEOUT,
PLANNING_THREAD_COUNT,
new NoOpMetricManager(), schemaManager);
- Map<String, List<List<String>>> owningNodesByTableName = new
HashMap<>();
- for (Entry<String, Map<String, ScannableTable>> entry :
nodeName2tableName2table.entrySet()) {
- for (String tableName : entry.getValue().keySet()) {
- owningNodesByTableName.computeIfAbsent(tableName, key ->
new ArrayList<>()).add(List.of(entry.getKey()));
- }
- }
-
Map<String, List<String>> systemViewsByNode = new HashMap<>();
for (Entry<String, Set<String>> entry :
nodeName2SystemView.entrySet()) {
@@ -717,13 +704,20 @@ public class TestBuilders {
})
.collect(Collectors.toList());
+ ConcurrentMap<String, ScannableTable> dataProvidersByTableName =
new ConcurrentHashMap<>();
+ ConcurrentMap<String, AssignmentsProvider>
assignmentsProviderByTableName = new ConcurrentHashMap<>();
+ DefaultDataProvider defaultDataProvider = this.defaultDataProvider;
Map<String, TestNode> nodes = nodeNames.stream()
.map(name -> {
var systemViewManager = new
SystemViewManagerImpl(name, catalogManager);
var executionProvider = new
TestExecutionDistributionProvider(
systemViewManager::owningNodes,
- owningNodesByTableName,
- useTablePartitions
+ tableName -> resolveProvider(
+ tableName,
+ assignmentsProviderByTableName,
+ defaultAssignmentsProvider != null ?
defaultAssignmentsProvider::get : null
+ ),
+ false
);
var partitionPruner = new PartitionPrunerImpl();
var mappingService = new MappingServiceImpl(
@@ -743,12 +737,20 @@ public class TestBuilders {
return new TestNode(
name,
+ catalogManager,
clusterService.forNode(name),
parserService,
prepareService,
schemaManager,
mappingService,
- new
TestExecutableTableRegistry(nodeName2tableName2table.get(name), schemaManager),
+ new TestExecutableTableRegistry(
+ name0 -> resolveProvider(
+ name0,
+ dataProvidersByTableName,
+ defaultDataProvider != null ?
defaultDataProvider::get : null
+ ),
+ schemaManager
+ ),
ddlHandler,
systemViewManager
);
@@ -756,6 +758,8 @@ public class TestBuilders {
.collect(Collectors.toMap(TestNode::name,
Function.identity()));
return new TestCluster(
+ dataProvidersByTableName,
+ assignmentsProviderByTableName,
nodes,
catalogManager,
prepareService,
@@ -764,29 +768,6 @@ public class TestBuilders {
);
}
- private void validateConfiguredDataProviders() {
- Set<String> dataProvidersOwners = new
HashSet<>(nodeName2tableName2table.keySet());
-
- dataProvidersOwners.removeAll(Set.copyOf(nodeNames));
-
- if (!dataProvidersOwners.isEmpty()) {
- Map<String, List<String>> problematicTables = new HashMap<>();
-
- for (String outsiderNode : dataProvidersOwners) {
- for (String problematicTable :
nodeName2tableName2table.get(outsiderNode).keySet()) {
- problematicTables.computeIfAbsent(problematicTable, k
-> new ArrayList<>()).add(outsiderNode);
- }
- }
-
- String problematicTablesString =
problematicTables.entrySet().stream()
- .map(e -> e.getKey() + ": " + e.getValue())
- .collect(Collectors.joining(", "));
-
- throw new AssertionError(format("The table has a dataProvider
that is outside the cluster "
- + "[{}]", problematicTablesString));
- }
- }
-
private void initAction(CatalogManager catalogManager) {
List<CatalogCommand> initialSchema = tableBuilders.stream()
.flatMap(builder -> builder.build().stream())
@@ -992,7 +973,8 @@ public class TestBuilders {
NativeTypes.INT32,
DefaultValueStrategy.DEFAULT_COMPUTED,
() -> {
- throw new AssertionError("Partition virtual
column is generated by a function"); }
+ throw new AssertionError("Partition virtual
column is generated by a function");
+ }
))), distribution);
Map<String, IgniteIndex> indexes = indexBuilders.stream()
@@ -1442,10 +1424,10 @@ public class TestBuilders {
}
private static class TestExecutableTableRegistry implements
ExecutableTableRegistry {
- private final Map<String, ScannableTable> tablesByName;
+ private final Function<String, ScannableTable> tablesByName;
private final SqlSchemaManager schemaManager;
- TestExecutableTableRegistry(Map<String, ScannableTable> tablesByName,
SqlSchemaManager schemaManager) {
+ TestExecutableTableRegistry(Function<String, ScannableTable>
tablesByName, SqlSchemaManager schemaManager) {
this.tablesByName = tablesByName;
this.schemaManager = schemaManager;
}
@@ -1459,7 +1441,7 @@ public class TestBuilders {
return CompletableFuture.completedFuture(new ExecutableTable() {
@Override
public ScannableTable scannableTable() {
- ScannableTable scannableTable =
tablesByName.get(table.name());
+ ScannableTable scannableTable =
tablesByName.apply(table.name());
assert scannableTable != null;
@@ -1571,8 +1553,8 @@ public class TestBuilders {
}
/**
- * Sets a function that returns system views. Function accepts a view
name and returns a list of nodes
- * a system view is available at.
+ * Sets a function that returns system views. Function accepts a view
name and returns a list of nodes a system view is available
+ * at.
*/
public ExecutionDistributionProviderBuilder
setSystemViews(Function<String, List<String>> systemViews) {
this.owningNodesBySystemViewName = systemViews;
@@ -1587,9 +1569,22 @@ public class TestBuilders {
/** Creates an instance of {@link ExecutionDistributionProvider}. */
public ExecutionDistributionProvider build() {
+ Map<String, List<List<String>>> owningNodesByTableName =
Map.copyOf(this.owningNodesByTableName);
+
+ Function<String, AssignmentsProvider> sourceProviderFunction =
tableName ->
+ (AssignmentsProvider) (partitionsCount, includeBackups) ->
{
+ List<List<String>> assignments =
owningNodesByTableName.get(tableName);
+
+ if (nullOrEmpty(assignments)) {
+ throw new AssertionError("Assignments are not
configured for table " + tableName);
+ }
+
+ return assignments;
+ };
+
return new TestExecutionDistributionProvider(
owningNodesBySystemViewName,
- Map.copyOf(owningNodesByTableName),
+ sourceProviderFunction,
useTablePartitions
);
}
@@ -1598,17 +1593,17 @@ public class TestBuilders {
private static class TestExecutionDistributionProvider implements
ExecutionDistributionProvider {
final Function<String, List<String>> owningNodesBySystemViewName;
- final Map<String, List<List<String>>> owningNodesByTableName;
+ final Function<String, AssignmentsProvider> owningNodesByTableName;
final boolean useTablePartitions;
private TestExecutionDistributionProvider(
Function<String, List<String>> owningNodesBySystemViewName,
- Map<String, List<List<String>>> owningNodesByTableName,
+ Function<String, AssignmentsProvider> owningNodesByTableName,
boolean useTablePartitions
) {
this.owningNodesBySystemViewName = owningNodesBySystemViewName;
- this.owningNodesByTableName = Map.copyOf(owningNodesByTableName);
+ this.owningNodesByTableName = owningNodesByTableName;
this.useTablePartitions = useTablePartitions;
}
@@ -1620,9 +1615,25 @@ public class TestBuilders {
}
@Override
- public CompletableFuture<List<TokenizedAssignments>>
forTable(HybridTimestamp operationTime, IgniteTable table,
- boolean includeBackups) {
- List<List<String>> owningNodes =
owningNodesByTableName.get(table.name());
+ public CompletableFuture<List<TokenizedAssignments>> forTable(
+ HybridTimestamp operationTime,
+ IgniteTable table,
+ boolean includeBackups
+ ) {
+ AssignmentsProvider provider =
owningNodesByTableName.apply(table.name());
+
+ if (provider == null) {
+ return CompletableFuture.failedFuture(
+ new AssertionError("AssignmentsProvider is not
configured for table " + table.name())
+ );
+ }
+ List<List<String>> owningNodes = provider.get(table.partitions(),
includeBackups);
+
+ if (nullOrEmpty(owningNodes) || owningNodes.size() !=
table.partitions()) {
+ throw new AssertionError("Configured AssignmentsProvider
returns less assignment than expected "
+ + "[table=" + table.name() + ",
expectedNumberOfPartitions=" + table.partitions()
+ + ", returnedAssignmentSize=" + (owningNodes == null ?
"<null>" : owningNodes.size()) + "]");
+ }
List<TokenizedAssignments> assignments;
@@ -1648,10 +1659,94 @@ public class TestBuilders {
if (nullOrEmpty(nodes)) {
throw new SqlException(Sql.MAPPING_ERR, format("The view with
name '{}' could not be found on"
- + " any active nodes in the cluster", view));
+ + " any active nodes in the cluster", view));
}
return view.distribution() == IgniteDistributions.single() ?
List.of(nodes.get(0)) : nodes;
}
}
+
+ private abstract static class AbstractScannableTable implements
ScannableTable {
+ @Override
+ public <RowT> Publisher<RowT> scan(
+ ExecutionContext<RowT> ctx,
+ PartitionWithConsistencyToken partWithConsistencyToken,
+ RowFactory<RowT> rowFactory,
+ @Nullable BitSet requiredColumns
+ ) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <RowT> Publisher<RowT> indexRangeScan(ExecutionContext<RowT>
ctx, PartitionWithConsistencyToken partWithConsistencyToken,
+ RowFactory<RowT> rowFactory, int indexId, List<String>
columns, @Nullable RangeCondition<RowT> cond,
+ @Nullable BitSet requiredColumns) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT> ctx,
PartitionWithConsistencyToken partWithConsistencyToken,
+ RowFactory<RowT> rowFactory, int indexId, List<String>
columns, RowT key, @Nullable BitSet requiredColumns) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <RowT> CompletableFuture<@Nullable RowT>
primaryKeyLookup(ExecutionContext<RowT> ctx, InternalTransaction explicitTx,
+ RowFactory<RowT> rowFactory, RowT key, @Nullable BitSet
requiredColumns) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Long> estimatedSize() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * Data provider that will be used in case no other provider was specified
explicitly via
+ * {@link TestCluster#setDataProvider(String, ScannableTable)}.
+ */
+ @FunctionalInterface
+ public interface DefaultDataProvider {
+ ScannableTable get(String tableName);
+ }
+
+ /**
+ * Assignments provider that will be used in case no other provider was
specified explicitly via
+ * {@link TestCluster#setAssignmentsProvider(String, AssignmentsProvider)}.
+ */
+ @FunctionalInterface
+ public interface DefaultAssignmentsProvider {
+ AssignmentsProvider get(String tableName);
+ }
+
+ /** Provider of assignments for a table. */
+ @FunctionalInterface
+ public interface AssignmentsProvider {
+ /**
+ * Returns the list of assignments.
+ *
+ * <p>Returned list must have the same number of elements as provided
{@code partitionsCount}. If {@code includeBackups} is set to
+ * {@code true}, then every sublist is allowed to have more than one
element.
+ *
+ * @param partitionsCount Number of partitions in a table.
+ * @param includeBackups Whether to include backup assignments or node.
+ * @return List of assignments.
+ */
+ List<List<String>> get(int partitionsCount, boolean includeBackups);
+ }
+
+ private static <T> @Nullable T resolveProvider(
+ String tableName,
+ Map<String, T> providersByTableName,
+ @Nullable Function<String, T> defaultProvider
+ ) {
+ T provider = providersByTableName.get(tableName);
+
+ if (provider == null && defaultProvider != null) {
+ return defaultProvider.apply(tableName);
+ }
+
+ return provider;
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java
index 86497b6170..028c0d0ab8 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestCluster.java
@@ -23,6 +23,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.internal.catalog.CatalogManager;
@@ -30,6 +31,8 @@ import org.apache.ignite.internal.hlc.ClockWaiter;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
+import org.apache.ignite.internal.sql.engine.exec.ScannableTable;
+import
org.apache.ignite.internal.sql.engine.framework.TestBuilders.AssignmentsProvider;
import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -46,14 +49,20 @@ public class TestCluster implements LifecycleAware {
private final List<LifecycleAware> components;
private final Runnable initClosure;
private final CatalogManager catalogManager;
+ private final ConcurrentMap<String, ScannableTable>
dataProvidersByTableName;
+ private final ConcurrentMap<String, AssignmentsProvider>
assignmentsProvidersByTableName;
TestCluster(
+ ConcurrentMap<String, ScannableTable> dataProvidersByTableName,
+ ConcurrentMap<String, AssignmentsProvider>
assignmentsProvidersByTableName,
Map<String, TestNode> nodeByName,
CatalogManager catalogManager,
PrepareService prepareService,
ClockWaiter clockWaiter,
Runnable initClosure
) {
+ this.dataProvidersByTableName = dataProvidersByTableName;
+ this.assignmentsProvidersByTableName = assignmentsProvidersByTableName;
this.nodeByName = nodeByName;
this.components = List.of(
new ComponentToLifecycleAwareAdaptor(catalogManager),
@@ -100,6 +109,14 @@ public class TestCluster implements LifecycleAware {
IgniteUtils.closeAll(closeables);
}
+ public void setAssignmentsProvider(String tableName, AssignmentsProvider
assignmentsProvider) {
+ assignmentsProvidersByTableName.put(tableName, assignmentsProvider);
+ }
+
+ public void setDataProvider(String tableName, ScannableTable dataProvider)
{
+ dataProvidersByTableName.put(tableName, dataProvider);
+ }
+
private static class ComponentToLifecycleAwareAdaptor implements
LifecycleAware {
private final IgniteComponent component;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
index 2e4859a49d..fd508f99c1 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.sql.engine.framework;
import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.convertSqlRows;
-import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
@@ -27,6 +26,7 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -37,6 +37,8 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.exec.AsyncDataCursor;
@@ -48,11 +50,11 @@ import
org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
import org.apache.ignite.internal.sql.engine.prepare.KeyValueGetPlan;
import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
-import org.apache.ignite.internal.sql.engine.prepare.SelectCountPlan;
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
+import org.apache.ignite.internal.sql.engine.util.SqlTestUtils;
import org.apache.ignite.internal.systemview.api.SystemViews;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.tx.InternalTransaction;
@@ -61,6 +63,7 @@ import org.apache.ignite.internal.util.AsyncCursor;
import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
import org.apache.ignite.internal.util.SubscriptionUtils;
import org.apache.ignite.internal.util.subscription.TransformingPublisher;
+import org.apache.ignite.lang.ErrorGroups.Sql;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@@ -139,11 +142,11 @@ public class TestClusterTest extends
BaseIgniteAbstractTest {
.addColumn("ID", Collation.ASC_NULLS_FIRST)
.end()
.end()
- .dataProvider("N1", "T1", table)
- .dataProvider("N2", "T1", table)
- // table T2 will be created later by DDL
- .dataProvider("N1", "T2", table)
- .dataProvider("N2", "T2", table)
+ .defaultAssignmentsProvider(tableName -> (partitionsCount,
includeBackups) -> IntStream.range(0, partitionsCount)
+ .mapToObj(part -> List.of(part % 2 == 0 ? "N1" : "N2"))
+ .collect(Collectors.toList())
+ )
+ .defaultDataProvider(tableName -> table)
// Register system views
.addSystemView(SystemViews.<Object[]>clusterViewBuilder()
.name("NODES")
@@ -175,37 +178,41 @@ public class TestClusterTest extends
BaseIgniteAbstractTest {
public void testSimpleQuery() {
cluster.start();
- var gatewayNode = cluster.node("N1");
- var plan = gatewayNode.prepare("SELECT * FROM t1");
+ TestNode gatewayNode = cluster.node("N1");
+ String query = "SELECT * FROM t1";
- for (var row :
await(gatewayNode.executePlan(plan).requestNextAsync(10_000)).items()) {
- assertNotNull(row);
- }
+ QueryPlan plan = gatewayNode.prepare(query);
// Ensure the plan contains full table scan.
- assertTrue(plan instanceof MultiStepPlan);
- assertTrue(lastNode(((MultiStepPlan) plan).root()) instanceof
IgniteTableScan);
+ assertInstanceOf(MultiStepPlan.class, plan);
+ assertInstanceOf(IgniteTableScan.class, lastNode(((MultiStepPlan)
plan).root()));
+
+ for (var row :
await(gatewayNode.executeQuery(query).requestNextAsync(10_000)).items()) {
+ assertNotNull(row);
+ }
}
@Test
public void testSimpleFromCreatedTableByDdl() {
cluster.start();
- var gatewayNode = cluster.node("N1");
+ TestNode gatewayNode = cluster.node("N1");
gatewayNode.initSchema(
"CREATE TABLE t2 (id INT PRIMARY KEY, val VARCHAR(64))"
);
- QueryPlan plan = gatewayNode.prepare("SELECT * FROM t2");
+ String query = "SELECT * FROM t2";
- for (var row :
await(gatewayNode.executePlan(plan).requestNextAsync(10_000)).items()) {
+ QueryPlan plan = gatewayNode.prepare(query);
+
+ for (var row :
await(gatewayNode.executeQuery(query).requestNextAsync(10_000)).items()) {
assertNotNull(row);
}
// Ensure the plan contains full table scan.
- assertTrue(plan instanceof MultiStepPlan);
- assertTrue(lastNode(((MultiStepPlan) plan).root()) instanceof
IgniteTableScan);
+ assertInstanceOf(MultiStepPlan.class, plan);
+ assertInstanceOf(IgniteTableScan.class, lastNode(((MultiStepPlan)
plan).root()));
}
@Test
@@ -213,14 +220,16 @@ public class TestClusterTest extends
BaseIgniteAbstractTest {
cluster.start();
TestNode gatewayNode = cluster.node("N1");
- QueryPlan plan = gatewayNode.prepare("SELECT val, 100 FROM t1 WHERE ID
= 1");
+ String query = "SELECT val, 100 FROM t1 WHERE ID = 1";
+
+ QueryPlan plan = gatewayNode.prepare(query);
- for (InternalSqlRow row :
await(gatewayNode.executePlan(plan).requestNextAsync(10_000)).items()) {
+ for (InternalSqlRow row :
await(gatewayNode.executeQuery(query).requestNextAsync(10_000)).items()) {
assertNotNull(row);
}
// Ensure the plan uses index.
- assertTrue(plan instanceof KeyValueGetPlan);
+ assertInstanceOf(KeyValueGetPlan.class, plan);
}
@Test
@@ -228,15 +237,17 @@ public class TestClusterTest extends
BaseIgniteAbstractTest {
cluster.start();
TestNode gatewayNode = cluster.node("N1");
- QueryPlan plan = gatewayNode.prepare("SELECT * FROM t1 WHERE ID > 1");
+ String query = "SELECT * FROM t1 WHERE ID > 1";
- for (InternalSqlRow row :
await(gatewayNode.executePlan(plan).requestNextAsync(10_000)).items()) {
+ QueryPlan plan = gatewayNode.prepare(query);
+
+ for (InternalSqlRow row :
await(gatewayNode.executeQuery(query).requestNextAsync(10_000)).items()) {
assertNotNull(row);
}
// Ensure the plan uses index.
- assertTrue(plan instanceof MultiStepPlan);
- assertTrue(lastNode(((MultiStepPlan) plan).root()) instanceof
IgniteIndexScan);
+ assertInstanceOf(MultiStepPlan.class, plan);
+ assertInstanceOf(IgniteIndexScan.class, lastNode(((MultiStepPlan)
plan).root()));
assertEquals("SORTED_IDX", ((IgniteIndexScan)
lastNode(((MultiStepPlan) plan).root())).indexName());
}
@@ -249,9 +260,7 @@ public class TestClusterTest extends BaseIgniteAbstractTest
{
TestNode stoppedNode = cluster.node("N2");
- QueryPlan plan = gatewayNode.prepare("SELECT * FROM t1 WHERE ID > 1");
-
- AsyncCursor<InternalSqlRow> cur = gatewayNode.executePlan(plan);
+ AsyncCursor<InternalSqlRow> cur = gatewayNode.executeQuery("SELECT *
FROM t1 WHERE ID > 1");
await(cur.requestNextAsync(1));
@@ -284,9 +293,7 @@ public class TestClusterTest extends BaseIgniteAbstractTest
{
assertTrue(initiator.clockService().after(initiatorClock.now(),
otherNodeClock.now()));
- QueryPlan plan = initiator.prepare("SELECT * FROM t1");
-
- AsyncCursor<InternalSqlRow> cur = initiator.executePlan(plan);
+ AsyncCursor<InternalSqlRow> cur = initiator.executeQuery("SELECT *
FROM t1");
await(cur.requestNextAsync(1));
@@ -310,9 +317,7 @@ public class TestClusterTest extends BaseIgniteAbstractTest
{
assertTrue(otherNode.clockService().after(otherNodeClock.now(),
initiatorClock.now()));
- QueryPlan plan = initiator.prepare("SELECT * FROM t1");
-
- AsyncCursor<InternalSqlRow> cur = initiator.executePlan(plan);
+ AsyncCursor<InternalSqlRow> cur = initiator.executeQuery("SELECT *
FROM t1");
await(cur.requestNextAsync(10_000));
@@ -334,9 +339,11 @@ public class TestClusterTest extends
BaseIgniteAbstractTest {
cluster.start();
TestNode gatewayNode = cluster.node("N1");
- QueryPlan plan = gatewayNode.prepare("SELECT * FROM SYSTEM.NODES,
SYSTEM.NODE_N2");
- BatchedResult<InternalSqlRow> results =
await(gatewayNode.executePlan(plan).requestNextAsync(10_000));
+ BatchedResult<InternalSqlRow> results = await(
+ gatewayNode.executeQuery("SELECT * FROM SYSTEM.NODES,
SYSTEM.NODE_N2")
+ .requestNextAsync(10_000)
+ );
List<List<Object>> rows = convertSqlRows(results.items());
assertEquals(List.of(List.of(42L, "mango", "N2", 42)), rows);
@@ -360,8 +367,10 @@ public class TestClusterTest extends
BaseIgniteAbstractTest {
TestNode gatewayNode = cluster.node("N1");
- SelectCountPlan plan = (SelectCountPlan) gatewayNode.prepare("SELECT
'hello', COUNT(*) FROM t1");
- BatchedResult<InternalSqlRow> results =
await(gatewayNode.executePlan(plan).requestNextAsync(10_000));
+ BatchedResult<InternalSqlRow> results = await(
+ gatewayNode.executeQuery("SELECT 'hello', COUNT(*) FROM t1")
+ .requestNextAsync(10_000)
+ );
List<List<Object>> rows = convertSqlRows(results.items());
assertEquals(List.of(List.of("hello", 42L)), rows);
@@ -374,8 +383,7 @@ public class TestClusterTest extends BaseIgniteAbstractTest
{
TestNode node = cluster.node("N1");
Object[] params = {1, null};
- QueryPlan plan = node.prepare("SELECT ?, ?", params);
- AsyncDataCursor<InternalSqlRow> cursor = node.executePlan(plan,
params);
+ AsyncDataCursor<InternalSqlRow> cursor = node.executeQuery("SELECT ?,
?", params);
BatchedResult<InternalSqlRow> res = await(cursor.requestNextAsync(1));
assertThat(res.hasMore(), is(false));
@@ -391,15 +399,10 @@ public class TestClusterTest extends
BaseIgniteAbstractTest {
TestNode node = cluster.node("N1");
- QueryPlan plan = node.prepare("SELECT ?/?", 1, 0);
-
- AsyncDataCursor<InternalSqlRow> cursor = node.executePlan(plan, 1);
-
- //noinspection ThrowableNotThrown
- assertThrowsWithCause(
- () -> await(cursor.requestNextAsync(1)),
- IllegalStateException.class,
- "Missing dynamic parameter: ?1"
+ SqlTestUtils.assertThrowsSqlException(
+ Sql.STMT_VALIDATION_ERR,
+ "Unexpected number of query parameters. Provided 1 but there
is only 2 dynamic parameter(s)",
+ () -> node.executeQuery("SELECT ?/?", 1)
);
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
index fd945aa3b6..db4465535a 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
@@ -25,7 +25,13 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
import java.util.stream.Collectors;
+import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.failure.handlers.AbstractFailureHandler;
@@ -39,13 +45,12 @@ import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.sql.SqlCommon;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.QueryCancel;
import org.apache.ignite.internal.sql.engine.SqlOperationContext;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import
org.apache.ignite.internal.sql.engine.SqlQueryProcessor.PrefetchCallback;
-import org.apache.ignite.internal.sql.engine.SqlQueryType;
-import org.apache.ignite.internal.sql.engine.exec.AsyncDataCursor;
import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl;
import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
@@ -61,21 +66,24 @@ import
org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import
org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistryImpl;
+import org.apache.ignite.internal.sql.engine.exec.fsm.QueryExecutor;
import org.apache.ignite.internal.sql.engine.exec.mapping.MappingService;
import org.apache.ignite.internal.sql.engine.message.MessageService;
import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
+import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
import org.apache.ignite.internal.sql.engine.sql.ParserService;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
+import org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory;
import org.apache.ignite.internal.systemview.api.SystemViewManager;
+import
org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.util.AsyncCursor;
+import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.util.StringUtils;
import org.jetbrains.annotations.Nullable;
/**
@@ -85,10 +93,11 @@ import org.jetbrains.annotations.Nullable;
*/
public class TestNode implements LifecycleAware {
private final String nodeName;
+ private final QueryExecutor queryExecutor;
private final PrepareService prepareService;
- private final ExecutionService executionService;
private final ParserService parserService;
private final MessageService messageService;
+ private final ClusterService clusterService;
private final List<LifecycleAware> services = new ArrayList<>();
volatile boolean exceptionRaised;
@@ -105,6 +114,7 @@ public class TestNode implements LifecycleAware {
*/
TestNode(
String nodeName,
+ CatalogService catalogService,
ClusterService clusterService,
ParserService parserService,
PrepareService prepareService,
@@ -117,6 +127,7 @@ public class TestNode implements LifecycleAware {
this.nodeName = nodeName;
this.parserService = parserService;
this.prepareService = prepareService;
+ this.clusterService = clusterService;
TopologyService topologyService = clusterService.topologyService();
MessagingService messagingService = clusterService.messagingService();
@@ -150,7 +161,7 @@ public class TestNode implements LifecycleAware {
TableFunctionRegistryImpl tableFunctionRegistry = new
TableFunctionRegistryImpl();
- executionService = registerService(ExecutionServiceImpl.create(
+ ExecutionService executionService =
registerService(ExecutionServiceImpl.create(
topologyService,
messageService,
schemaManager,
@@ -168,6 +179,33 @@ public class TestNode implements LifecycleAware {
));
registerService(new
IgniteComponentLifecycleAwareAdapter(systemViewManager));
+
+ ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
+
+ registerService(new LifecycleAware() {
+ @Override
+ public void start() { }
+
+ @Override
+ public void stop() {
+ scheduler.shutdownNow();
+ }
+ });
+
+ queryExecutor = registerService(new QueryExecutor(
+ EmptyCacheFactory.INSTANCE,
+ 0,
+ parserService,
+ taskExecutor,
+ scheduler,
+ clockService,
+ new AlwaysSyncedSchemaSyncService(),
+ prepareService,
+ catalogService,
+ executionService,
+ SqlQueryProcessor.DEFAULT_PROPERTIES,
+ NoOpTransactionTracker.INSTANCE
+ ));
}
/** {@inheritDoc} */
@@ -181,6 +219,8 @@ public class TestNode implements LifecycleAware {
public void stop() throws Exception {
holdLock.block();
+ clusterService.stopAsync(new ComponentContext()).join();
+
List<AutoCloseable> closeables = services.stream()
.map(service -> ((AutoCloseable) service::stop))
.collect(Collectors.toList());
@@ -210,33 +250,6 @@ public class TestNode implements LifecycleAware {
return clockService;
}
- /**
- * Executes given plan on a cluster this node belongs to
- * and returns an async cursor representing the result.
- *
- * @param plan A plan to execute.
- * @param transaction External transaction.
- * @param params Query parameters.
- * @return A cursor representing the result.
- */
- public AsyncDataCursor<InternalSqlRow> executePlan(QueryPlan plan,
@Nullable InternalTransaction transaction, Object... params) {
- SqlOperationContext ctx = createContext(transaction, params).build();
-
- return executionService.executePlan(plan, ctx);
- }
-
- /**
- * Executes given plan on a cluster this node belongs to
- * and returns an async cursor representing the result.
- *
- * @param plan A plan to execute.
- * @param params Query parameters.
- * @return A cursor representing the result.
- */
- public AsyncDataCursor<InternalSqlRow> executePlan(QueryPlan plan,
Object... params) {
- return executePlan(plan, null, params);
- }
-
/**
* Prepares (aka parses, validates, and optimizes) the given query string
* and returns the plan to execute.
@@ -280,45 +293,56 @@ public class TestNode implements LifecycleAware {
return await(prepareService.prepareAsync(parsedResult, ctx));
}
- /**
- * Executes the given script.
- *
- * <p>This method splits given string by semicolon and execute every
statement
- * one by one. Technically it may execute SELECT statements as well, but
since
- * it returns nothing, it doesn't make any sense.
- *
- * @param script Script to execute.
- */
+ /** Executes the given script. */
public void initSchema(String script) {
- for (String statement : script.split(";")) {
- if (StringUtils.nullOrBlank(statement) ||
statement.trim().startsWith("--")) {
- continue;
- }
+ CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture =
queryExecutor.executeQuery(
+ SqlPropertiesHelper.emptyProperties(),
+ ImplicitTxContext.INSTANCE,
+ script,
+ null,
+ ArrayUtils.OBJECT_EMPTY_ARRAY
+ );
- ParsedResult parsedResult = parserService.parse(statement);
- SqlOperationContext ctx = createContext().build();
+ var consumer = new Function<AsyncSqlCursor<?>,
CompletionStage<AsyncSqlCursor<?>>>() {
+ @Override
+ public CompletionStage<AsyncSqlCursor<?>> apply(AsyncSqlCursor<?>
cursor) {
+ CompletableFuture<Void> closeFuture = cursor.closeAsync();
- QueryPlan plan = await(prepareService.prepareAsync(parsedResult,
ctx));
+ if (cursor.hasNextResult()) {
+ return cursor.nextResult().thenCompose(this);
+ }
- if (plan.type() != SqlQueryType.DDL && plan.type() !=
SqlQueryType.DML) {
- continue;
+ return closeFuture.thenApply(none -> cursor);
}
+ };
- AsyncCursor<?> cursor = executionService.executePlan(plan, ctx);
+ await(cursorFuture.thenCompose(consumer));
+ }
- await(cursor.requestNextAsync(1));
- }
+ /** Executes the given query. */
+ public AsyncSqlCursor<InternalSqlRow> executeQuery(@Nullable
InternalTransaction tx, String query, Object... params) {
+ QueryTransactionContext txContext = tx == null ?
ImplicitTxContext.INSTANCE : ExplicitTxContext.fromTx(tx);
+
+ return executeQuery(txContext, query, params);
}
- private SqlOperationContext.Builder createContext() {
- return createContext(ImplicitTxContext.INSTANCE);
+ /** Executes the given query. */
+ public AsyncSqlCursor<InternalSqlRow> executeQuery(QueryTransactionContext
txContext, String query, Object... params) {
+ return await(queryExecutor.executeQuery(
+ SqlPropertiesHelper.emptyProperties(),
+ txContext,
+ query,
+ null,
+ params
+ ));
}
- private SqlOperationContext.Builder createContext(@Nullable
InternalTransaction tx, Object... params) {
- return createContext(tx == null ? ImplicitTxContext.INSTANCE :
ExplicitTxContext.fromTx(tx), params);
+ /** Executes the given query. */
+ public AsyncSqlCursor<InternalSqlRow> executeQuery(String query, Object...
params) {
+ return executeQuery((InternalTransaction) null, query, params);
}
- private SqlOperationContext.Builder createContext(QueryTransactionContext
txContext, Object... params) {
+ private SqlOperationContext.Builder createContext() {
UUID queryId = UUID.randomUUID();
return SqlOperationContext.builder()
@@ -327,8 +351,8 @@ public class TestNode implements LifecycleAware {
.operationTime(clock.now())
.defaultSchemaName(SqlCommon.DEFAULT_SCHEMA_NAME)
.timeZoneId(SqlQueryProcessor.DEFAULT_TIME_ZONE_ID)
- .txContext(txContext)
- .parameters(params)
+ .txContext(ImplicitTxContext.INSTANCE)
+ .parameters()
.prefetchCallback(new PrefetchCallback());
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
index 75d4a8ddbd..625dbaad41 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
@@ -24,13 +24,12 @@ import static org.hamcrest.Matchers.containsString;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
-import org.apache.ignite.internal.sql.engine.AsyncSqlCursorImpl;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
-import org.apache.ignite.internal.sql.engine.SqlQueryType;
-import org.apache.ignite.internal.sql.engine.exec.AsyncDataCursor;
import org.apache.ignite.internal.sql.engine.framework.DataProvider;
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
import org.apache.ignite.internal.sql.engine.framework.TestCluster;
@@ -42,7 +41,6 @@ import
org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.sql.ColumnMetadata;
import org.apache.ignite.sql.ColumnType;
@@ -64,23 +62,25 @@ public class QueryCheckerTest extends
BaseIgniteAbstractTest {
@InjectQueryCheckerFactory
private static QueryCheckerFactory queryCheckerFactory;
- // @formatter:off
private static final TestCluster CLUSTER = TestBuilders.cluster()
.nodes(NODE_NAME)
- .addTable()
- .name("T1")
- .addKeyColumn("ID", NativeTypes.INT32)
- .addColumn("VAL", NativeTypes.INT32)
- .end()
- .dataProvider(NODE_NAME, "T1",
TestBuilders.tableScan(DataProvider.fromCollection(List.of(
- new Object[] {1, 1, 1}, new Object[] {2, 2, 1}
- ))))
.build();
- // @formatter:on
@BeforeAll
static void startCluster() {
CLUSTER.start();
+
+ //noinspection ConcatenationWithEmptyString
+ CLUSTER.node("N1").initSchema(""
+ + "CREATE ZONE test_zone WITH partitions=1,
storage_profiles='Default';"
+ + "CREATE TABLE t1 (id INT PRIMARY KEY, val INT) ZONE
test_zone");
+
+ CLUSTER.setAssignmentsProvider("T1", (partitionCount, b) ->
IntStream.range(0, partitionCount)
+ .mapToObj(i -> List.of("N1"))
+ .collect(Collectors.toList()));
+ CLUSTER.setDataProvider("T1",
TestBuilders.tableScan(DataProvider.fromCollection(
+ List.of(new Object[]{1, 1, 1}, new Object[]{2, 2, 1})
+ )));
}
@AfterAll
@@ -331,19 +331,7 @@ public class QueryCheckerTest extends
BaseIgniteAbstractTest {
assert params == null || params.length == 0 : "params are not
supported";
assert !prepareOnly : "Expected that the query will only be
prepared, but not executed";
- QueryPlan plan = node.prepare(qry);
- AsyncDataCursor<InternalSqlRow> dataCursor =
node.executePlan(plan);
-
- SqlQueryType type = plan.type();
-
- assert type != null;
-
- AsyncSqlCursor<InternalSqlRow> sqlCursor = new
AsyncSqlCursorImpl<>(
- type,
- plan.metadata(),
- dataCursor,
- null
- );
+ AsyncSqlCursor<InternalSqlRow> sqlCursor = node.executeQuery(qry);
return CompletableFuture.completedFuture(sqlCursor);
}