This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/sql-calcite by this push:
new 9551879 IGNITE-16315 Execution workflow optimization - Fixes #9751.
9551879 is described below
commit 9551879df991a05a7b864d2b3ff304302bc7d14e
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Thu Feb 3 10:14:29 2022 +0300
IGNITE-16315 Execution workflow optimization - Fixes #9751.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
modules/benchmarks/pom.xml | 12 +
.../benchmarks/jmh/sql/JmhSqlBenchmark.java | 285 +++++++++++++++++++++
modules/calcite/pom.xml | 2 +-
.../query/calcite/CalciteQueryProcessor.java | 2 +-
.../internal/processors/query/calcite/Query.java | 66 ++++-
.../processors/query/calcite/RootQuery.java | 59 ++++-
.../query/calcite/exec/ExchangeService.java | 10 +
.../query/calcite/exec/ExchangeServiceImpl.java | 39 ++-
.../query/calcite/exec/ExecutionContext.java | 3 +-
.../query/calcite/exec/ExecutionServiceImpl.java | 48 +++-
.../calcite/exec/exp/ExpressionFactoryImpl.java | 9 +-
.../processors/query/calcite/exec/rel/Inbox.java | 13 +-
.../processors/query/calcite/exec/rel/Outbox.java | 9 +
.../query/calcite/exec/rel/ScanNode.java | 20 +-
.../query/calcite/message/QueryStartRequest.java | 47 +++-
.../query/calcite/metadata/ColocationGroup.java | 260 ++++++++++++++++---
.../calcite/prepare/AbstractMultiStepPlan.java | 2 +-
.../query/calcite/prepare/BaseQueryContext.java | 14 +-
.../calcite/schema/CacheTableDescriptorImpl.java | 18 +-
.../schema/SystemViewTableDescriptorImpl.java | 10 +-
.../calcite/exec/rel/AbstractExecutionTest.java | 2 +
.../integration/AbstractBasicIntegrationTest.java | 11 +-
.../integration/RunningQueriesIntegrationTest.java | 138 ++++++----
.../query/calcite/planner/PlannerTest.java | 5 +
.../processors/query/GridQueryProcessor.java | 7 +-
25 files changed, 934 insertions(+), 157 deletions(-)
diff --git a/modules/benchmarks/pom.xml b/modules/benchmarks/pom.xml
index 4d55310..93b305d 100644
--- a/modules/benchmarks/pom.xml
+++ b/modules/benchmarks/pom.xml
@@ -52,6 +52,18 @@
</dependency>
<dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>ignite-calcite</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.javassist</groupId>
+ <artifactId>javassist</artifactId>
+ <version>${javassist.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.9</version>
diff --git
a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlBenchmark.java
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlBenchmark.java
new file mode 100644
index 0000000..0a97a8c
--- /dev/null
+++
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlBenchmark.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.sql;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Benchmark simple SQL queries.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@Warmup(iterations = 3, time = 5)
+@Measurement(iterations = 3, time = 5)
+public class JmhSqlBenchmark {
+ /** Count of server nodes. */
+ private static final int SRV_NODES_CNT = 3;
+
+ /** Keys count. */
+ private static final int KEYS_CNT = 100000;
+
+ /** Size of batch. */
+ private static final int BATCH_SIZE = 1000;
+
+ /** Partitions count. */
+ private static final int PARTS_CNT = 1024;
+
+ /** IP finder shared across nodes. */
+ private static final TcpDiscoveryVmIpFinder IP_FINDER = new
TcpDiscoveryVmIpFinder(true);
+
+ /** Query engine. */
+ @Param({"H2", "CALCITE"})
+ private String engine;
+
+ /** Ignite client. */
+ private Ignite client;
+
+ /** Servers. */
+ private final Ignite[] servers = new Ignite[SRV_NODES_CNT];
+
+ /** Cache. */
+ private IgniteCache<Integer, Item> cache;
+
+ /**
+ * Create Ignite configuration.
+ *
+ * @param igniteInstanceName Ignite instance name.
+ * @return Configuration.
+ */
+ private IgniteConfiguration configuration(String igniteInstanceName) {
+ IgniteConfiguration cfg = new IgniteConfiguration();
+
+ cfg.setIgniteInstanceName(igniteInstanceName);
+ cfg.setLocalHost("127.0.0.1");
+ cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+ return cfg;
+ }
+
+ /**
+ * Initiate Ignite and caches.
+ */
+ @Setup(Level.Trial)
+ public void setup() {
+ if ("CALCITE".equals(engine))
+
System.setProperty(IgniteSystemProperties.IGNITE_EXPERIMENTAL_SQL_ENGINE,
"true");
+ else
+
System.clearProperty(IgniteSystemProperties.IGNITE_EXPERIMENTAL_SQL_ENGINE);
+
+ for (int i = 0; i < SRV_NODES_CNT; i++)
+ servers[i] = Ignition.start(configuration("server" + i));
+
+ client = Ignition.start(configuration("client").setClientMode(true));
+
+ cache = client.getOrCreateCache(new CacheConfiguration<Integer,
Item>("CACHE")
+ .setIndexedTypes(Integer.class, Item.class)
+ .setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT))
+ );
+
+ try (IgniteDataStreamer<Integer, Item> ds =
client.dataStreamer("CACHE")) {
+ for (int i = 0; i < KEYS_CNT; i++)
+ ds.addData(i, new Item(i));
+ }
+ }
+
+ /**
+ * Stop Ignite instance.
+ */
+ @TearDown
+ public void tearDown() {
+ client.close();
+
+ for (Ignite ignite : servers)
+ ignite.close();
+ }
+
+ /**
+ * Query unique value (full scan).
+ */
+ @Benchmark
+ public void querySimpleUnique() {
+ int key = ThreadLocalRandom.current().nextInt(KEYS_CNT);
+
+ List<?> res = executeSql("SELECT name FROM Item WHERE fld=?", key);
+
+ assert res.size() == 1;
+ }
+
+ /**
+ * Query unique value (indexed).
+ */
+ @Benchmark
+ public void querySimpleUniqueIndexed() {
+ int key = ThreadLocalRandom.current().nextInt(KEYS_CNT);
+
+ List<?> res = executeSql("SELECT name FROM Item WHERE fldIdx=?", key);
+
+ assert res.size() == 1;
+ }
+
+ /**
+ * Query batch (full scan).
+ */
+ @Benchmark
+ public void querySimpleBatch() {
+ int key = ThreadLocalRandom.current().nextInt(KEYS_CNT);
+
+ List<?> res = executeSql("SELECT name FROM Item WHERE fldBatch=?", key
/ BATCH_SIZE);
+
+ assert res.size() == BATCH_SIZE;
+ }
+
+ /**
+ * Query batch (indexed).
+ */
+ @Benchmark
+ public void querySimpleBatchIndexed() {
+ int key = ThreadLocalRandom.current().nextInt(KEYS_CNT);
+
+ List<?> res = executeSql("SELECT name FROM Item WHERE fldIdxBatch=?",
key / BATCH_SIZE);
+
+ assert res.size() == BATCH_SIZE;
+ }
+
+ /**
+ * Query with group by and aggregate.
+ */
+ @Benchmark
+ public void queryGroupBy() {
+ List<?> res = executeSql("SELECT fldBatch, AVG(fld) FROM Item GROUP BY
fldBatch");
+
+ assert res.size() == KEYS_CNT / BATCH_SIZE;
+ }
+
+ /**
+ * Query with indexed field group by and aggregate.
+ */
+ @Benchmark
+ public void queryGroupByIndexed() {
+ List<?> res = executeSql("SELECT fldIdxBatch, AVG(fld) FROM Item GROUP
BY fldIdxBatch");
+
+ assert res.size() == KEYS_CNT / BATCH_SIZE;
+ }
+
+ /**
+ * Query with sorting (full set).
+ */
+ @Benchmark
+ public void queryOrderByFull() {
+ List<?> res = executeSql("SELECT name, fld FROM Item ORDER BY fld
DESC");
+
+ assert res.size() == KEYS_CNT;
+ }
+
+ /**
+ * Query with sorting (batch).
+ */
+ @Benchmark
+ public void queryOrderByBatch() {
+ int key = ThreadLocalRandom.current().nextInt(KEYS_CNT);
+
+ List<?> res = executeSql("SELECT name, fld FROM Item WHERE
fldIdxBatch=? ORDER BY fld DESC", key / BATCH_SIZE);
+
+ assert res.size() == BATCH_SIZE;
+ }
+
+ /** */
+ private List<?> executeSql(String sql, Object... args) {
+ List<List<?>> res = cache.query(new
SqlFieldsQuery(sql).setArgs(args)).getAll();
+
+ return res.get(0);
+ }
+
+ /**
+ * Run benchmarks.
+ *
+ * @param args Args.
+ * @throws Exception Exception.
+ */
+ public static void main(String[] args) throws Exception {
+ final Options options = new OptionsBuilder()
+ .include(JmhSqlBenchmark.class.getSimpleName())
+ .build();
+
+ new Runner(options).run();
+ }
+
+ /** */
+ private static class Item {
+ /** */
+ @QuerySqlField
+ private final String name;
+
+ /** */
+ @QuerySqlField
+ private final int fld;
+
+ /** */
+ @QuerySqlField
+ private final int fldBatch;
+
+ /** */
+ @QuerySqlField(index = true)
+ private final int fldIdx;
+
+ /** */
+ @QuerySqlField(index = true)
+ private final int fldIdxBatch;
+
+ /** */
+ public Item(int val) {
+ name = "name" + val;
+ fld = val;
+ fldBatch = val / BATCH_SIZE;
+ fldIdx = val;
+ fldIdxBatch = val / BATCH_SIZE;
+ }
+ }
+}
diff --git a/modules/calcite/pom.xml b/modules/calcite/pom.xml
index dba2262..19c6bd0 100644
--- a/modules/calcite/pom.xml
+++ b/modules/calcite/pom.xml
@@ -30,7 +30,7 @@
<janino.version>3.1.6</janino.version>
<avatica.version>1.19.0</avatica.version>
<jsonpath.version>2.4.0</jsonpath.version>
- <reflections.version>0.9.10</reflections.version>
+ <reflections.version>0.10.2</reflections.version>
<javassist.version>3.28.0-GA</javassist.version>
<esri.geometry.version>2.2.0</esri.geometry.version>
<checker.version>3.10.0</checker.version>
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 24ba82a..26b35fe 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -361,7 +361,7 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
try {
if (qryList.size() == 1) {
plan = queryPlanCache().queryPlan(
- new CacheKey(schemaName, qry.sql()),
+ new CacheKey(schemaName, sql), // Use source SQL to
avoid redundant parsing next time.
() -> prepareSvc.prepareSingle(sqlNode,
qry.planningContext()));
}
else
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
index 1c48bd1..872b0c3 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
@@ -19,14 +19,13 @@ package org.apache.ignite.internal.processors.query.calcite;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
@@ -40,9 +39,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
/** */
public class Query<RowT> implements RunningQuery {
- /** Completable futures empty array. */
- private static final CompletableFuture<?>[]
COMPLETABLE_FUTURES_EMPTY_ARRAY = new CompletableFuture<?>[0];
-
/** */
private final UUID initNodeId;
@@ -67,6 +63,15 @@ public class Query<RowT> implements RunningQuery {
/** */
protected final ExchangeService exch;
+ /** */
+ protected final int totalFragmentsCnt;
+
+ /** */
+ protected final AtomicInteger finishedFragmentsCnt = new AtomicInteger();
+
+ /** */
+ protected final Set<Long> initNodeStartedExchanges = new HashSet<>();
+
/** Logger. */
protected final IgniteLogger log;
@@ -77,7 +82,8 @@ public class Query<RowT> implements RunningQuery {
GridQueryCancel cancel,
ExchangeService exch,
Consumer<Query<RowT>> unregister,
- IgniteLogger log
+ IgniteLogger log,
+ int totalFragmentsCnt
) {
this.id = id;
this.unregister = unregister;
@@ -88,6 +94,7 @@ public class Query<RowT> implements RunningQuery {
this.cancel = cancel != null ? cancel : new GridQueryCancel();
fragments = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ this.totalFragmentsCnt = totalFragmentsCnt;
}
/** */
@@ -181,6 +188,53 @@ public class Query<RowT> implements RunningQuery {
cancel();
}
+ /**
+ * Callback after the first batch of the query fragment from the node is
received.
+ */
+ public void onInboundExchangeStarted(UUID nodeId, long exchangeId) {
+ // No-op.
+ }
+
+ /**
+ * Callback after the last batch of the query fragment from the node is
processed.
+ */
+ public void onInboundExchangeFinished(UUID nodeId, long exchangeId) {
+ // No-op.
+ }
+
+ /**
+ * Callback after the first batch of the query fragment from the node is
sent.
+ */
+ public void onOutboundExchangeStarted(UUID nodeId, long exchangeId) {
+ if (initNodeId.equals(nodeId))
+ initNodeStartedExchanges.add(exchangeId);
+ }
+
+ /**
+ * Callback after the last batch of the query fragment is sent to all
nodes.
+ */
+ public void onOutboundExchangeFinished(long exchangeId) {
+ if (finishedFragmentsCnt.incrementAndGet() == totalFragmentsCnt) {
+ QueryState state0;
+
+ synchronized (mux) {
+ state0 = state;
+
+ if (state0 == QueryState.EXECUTING)
+ state = QueryState.CLOSED;
+ }
+
+ if (state0 == QueryState.EXECUTING)
+ tryClose();
+ }
+ }
+
+ /** */
+ public boolean isExchangeWithInitNodeStarted(long fragmentId) {
+ // On remote node exchange ID is the same as fragment ID.
+ return initNodeStartedExchanges.contains(fragmentId);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(Query.class, this, "state", state, "fragments",
fragments);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
index 449b050..7cca5e1 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
@@ -17,14 +17,16 @@
package org.apache.ignite.internal.processors.query.calcite;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
-
import org.apache.calcite.plan.Context;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.Frameworks;
@@ -65,10 +67,10 @@ public class RootQuery<RowT> extends Query<RowT> {
/** Parameters. */
private final Object[] params;
- /** remote nodes */
- private final Set<UUID> remotes;
+ /** Remote nodes unfinished fragments count. AtomicInteger used just as
int holder, there is no concurrency here. */
+ private final Map<UUID, AtomicInteger> remoteFragments;
- /** node to fragment */
+ /** Node to fragment. */
private final Set<RemoteFragmentKey> waiting;
/** */
@@ -96,13 +98,14 @@ public class RootQuery<RowT> extends Query<RowT> {
qryCtx != null ? qryCtx.unwrap(GridQueryCancel.class) : null,
exch,
unregister,
- log
+ log,
+ 0 // Total fragments count not used for RootQuery.
);
this.sql = sql;
this.params = params;
- remotes = new HashSet<>();
+ remoteFragments = new HashMap<>();
waiting = new HashSet<>();
Context parent = Commons.convert(qryCtx);
@@ -184,7 +187,15 @@ public class RootQuery<RowT> extends Query<RowT> {
Fragment fragment = plan.fragments().get(i);
List<UUID> nodes = plan.mapping(fragment).nodeIds();
- remotes.addAll(nodes);
+ nodes.forEach(n -> remoteFragments.compute(n, (id, cnt) -> {
+ if (cnt == null)
+ return new AtomicInteger(1);
+ else {
+ cnt.incrementAndGet();
+
+ return cnt;
+ }
+ }));
for (UUID node : nodes)
waiting.add(new RemoteFragmentKey(node,
fragment.fragmentId()));
@@ -225,14 +236,16 @@ public class RootQuery<RowT> extends Query<RowT> {
try {
IgniteException wrpEx = null;
- for (UUID nodeId : remotes) {
+ for (Map.Entry<UUID, AtomicInteger> entry :
remoteFragments.entrySet()) {
try {
- if (!nodeId.equals(root.context().localNodeId()))
- exch.closeQuery(nodeId, id());
+ // Don't send close message if all remote fragments
are finished (query is self-closed on the
+ // remote node in this case).
+ if
(!entry.getKey().equals(root.context().localNodeId()) && entry.getValue().get()
> 0)
+ exch.closeQuery(entry.getKey(), id());
}
catch (IgniteCheckedException e) {
if (wrpEx == null)
- wrpEx = new IgniteException("Failed to send cancel
message. [nodeId=" + nodeId + ']', e);
+ wrpEx = new IgniteException("Failed to send cancel
message. [nodeId=" + entry.getKey() + ']', e);
else
wrpEx.addSuppressed(e);
}
@@ -341,6 +354,30 @@ public class RootQuery<RowT> extends Query<RowT> {
tryClose();
}
+ /** {@inheritDoc} */
+ @Override public void onInboundExchangeStarted(UUID nodeId, long
exchangeId) {
+ onResponse(nodeId, exchangeId, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onInboundExchangeFinished(UUID nodeId, long
exchangeId) {
+ AtomicInteger cnt = remoteFragments.get(nodeId);
+
+ assert cnt != null : nodeId;
+
+ cnt.decrementAndGet();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onOutboundExchangeStarted(UUID nodeId, long
exchangeId) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onOutboundExchangeFinished(long exchangeId) {
+ // No-op.
+ }
+
/** */
@Override public String toString() {
return S.toString(RootQuery.class, this);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
index ffbf5bc..5c618d2 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
@@ -80,4 +80,14 @@ public interface ExchangeService extends Service {
* @return {@code true} if node is alive, {@code false} otherwise.
*/
boolean alive(UUID nodeId);
+
+ /**
+ * Callback after the last batch of the query fragment is sent.
+ */
+ void onOutboundExchangeFinished(UUID qryId, long exchangeId);
+
+ /**
+ * Callback after the last batch of the query fragment from the node is
processed.
+ */
+ void onInboundExchangeFinished(UUID nodeId, UUID qryId, long exchangeId);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index abb45d2..2a2a246 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -28,6 +28,7 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.RunningQuery;
import
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
+import org.apache.ignite.internal.processors.query.calcite.Query;
import org.apache.ignite.internal.processors.query.calcite.QueryRegistry;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
@@ -123,6 +124,13 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
@Override public <Row> void sendBatch(UUID nodeId, UUID qryId, long
fragmentId, long exchangeId, int batchId,
boolean last, List<Row> rows) throws IgniteCheckedException {
messageService().send(nodeId, new QueryBatchMessage(qryId, fragmentId,
exchangeId, batchId, last, Commons.cast(rows)));
+
+ if (batchId == 0) {
+ Query<?> qry = (Query<?>)qryRegistry.query(qryId);
+
+ if (qry != null)
+ qry.onOutboundExchangeStarted(nodeId, exchangeId);
+ }
}
/** {@inheritDoc} */
@@ -172,6 +180,22 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
return messageService().alive(nodeId);
}
+ /** {@inheritDoc} */
+ @Override public void onOutboundExchangeFinished(UUID qryId, long
exchangeId) {
+ Query<?> qry = (Query<?>)qryRegistry.query(qryId);
+
+ if (qry != null)
+ qry.onOutboundExchangeFinished(exchangeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onInboundExchangeFinished(UUID nodeId, UUID qryId,
long exchangeId) {
+ Query<?> qry = (Query<?>)qryRegistry.query(qryId);
+
+ if (qry != null)
+ qry.onInboundExchangeFinished(nodeId, exchangeId);
+ }
+
/** */
protected void onMessage(UUID nodeId, InboxCloseMessage msg) {
Collection<Inbox<?>> inboxes =
mailboxRegistry().inboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
@@ -196,9 +220,11 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
if (qry != null)
qry.cancel();
else {
- log.warning("Stale query close message received: [" +
- "nodeId=" + nodeId +
- ", queryId=" + msg.queryId() + "]");
+ if (log.isDebugEnabled()) {
+ log.debug("Stale query close message received: [" +
+ "nodeId=" + nodeId +
+ ", queryId=" + msg.queryId() + "]");
+ }
}
}
@@ -241,6 +267,13 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
if (inbox != null) {
try {
+ if (msg.batchId() == 0) {
+ Query<?> qry = (Query<?>)qryRegistry.query(msg.queryId());
+
+ if (qry != null)
+ qry.onInboundExchangeStarted(nodeId, msg.exchangeId());
+ }
+
inbox.onBatchReceived(nodeId, msg.batchId(), msg.last(),
Commons.cast(msg.rows()));
}
catch (Throwable e) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
index 73a0472..dad750c 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
@@ -119,7 +119,8 @@ public class ExecutionContext<Row> extends
AbstractQueryContext implements DataC
expressionFactory = new ExpressionFactoryImpl<>(
this,
qctx.typeFactory(),
- qctx.config().getParserConfig().conformance()
+ qctx.config().getParserConfig().conformance(),
+ qctx.rexBuilder()
);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 2f429a5..1909f47 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -18,8 +18,11 @@
package org.apache.ignite.internal.processors.query.calcite.exec;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.Contexts;
@@ -446,7 +449,7 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
);
case EXPLAIN:
- return executeExplain((ExplainPlan)plan);
+ return executeExplain(qry, (ExplainPlan)plan);
case DDL:
return executeDdl(qry, (DdlPlan)plan);
@@ -534,7 +537,12 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
qry.run(ectx, plan, node);
- // start remote execution
+ Map<UUID, Long> fragmentsPerNode = fragments.stream()
+ .skip(1)
+ .flatMap(f -> f.mapping().nodeIds().stream())
+ .collect(Collectors.groupingBy(Function.identity(),
Collectors.counting()));
+
+ // Start remote execution.
for (int i = 1; i < fragments.size(); i++) {
fragment = fragments.get(i);
fragmentDesc = new FragmentDescription(
@@ -544,6 +552,8 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
plan.remotes(fragment));
Throwable ex = null;
+ byte[] parametersMarshalled = null;
+
for (UUID nodeId : fragmentDesc.nodeIds()) {
if (ex != null)
qry.onResponse(nodeId, fragment.fragmentId(), ex);
@@ -555,9 +565,16 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
fragment.serialized(),
ectx.topologyVersion(),
fragmentDesc,
- qry.parameters());
+ fragmentsPerNode.get(nodeId).intValue(),
+ qry.parameters(),
+ parametersMarshalled
+ );
messageService().send(nodeId, req);
+
+ // Avoid marshaling of the same parameters for other
nodes.
+ if (parametersMarshalled == null)
+ parametersMarshalled = req.parametersMarshalled();
}
catch (Throwable e) {
qry.onResponse(nodeId, fragment.fragmentId(), ex = e);
@@ -570,10 +587,12 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
}
/** */
- private FieldsQueryCursor<List<?>> executeExplain(ExplainPlan plan) {
+ private FieldsQueryCursor<List<?>> executeExplain(RootQuery<Row> qry,
ExplainPlan plan) {
QueryCursorImpl<List<?>> cur = new
QueryCursorImpl<>(singletonList(singletonList(plan.plan())));
cur.fieldsMeta(plan.fieldsMeta().queryFieldsMetadata(Commons.typeFactory()));
+ qryReg.unregister(qry.id());
+
return cur;
}
@@ -592,16 +611,18 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
qry.addFragment(new RunningFragment<>(plan.root(), node, ectx));
- try {
- messageService().send(origNodeId, new QueryStartResponse(qry.id(),
ectx.fragmentId()));
- }
- catch (IgniteCheckedException e) {
- IgniteException wrpEx = new IgniteException("Failed to send reply.
[nodeId=" + origNodeId + ']', e);
+ node.init();
- throw wrpEx;
- }
+ if (!qry.isExchangeWithInitNodeStarted(ectx.fragmentId())) {
+ try {
+ messageService().send(origNodeId, new
QueryStartResponse(qry.id(), ectx.fragmentId()));
+ }
+ catch (IgniteCheckedException e) {
+ IgniteException wrpEx = new IgniteException("Failed to send
reply. [nodeId=" + origNodeId + ']', e);
- node.init();
+ throw wrpEx;
+ }
+ }
}
/** */
@@ -624,7 +645,8 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
null,
exchangeSvc,
(q) -> qryReg.unregister(q.id()),
- log
+ log,
+ msg.totalFragmentsCount()
)
);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
index 7d8e8df..31c9fd5 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
@@ -95,12 +95,17 @@ public class ExpressionFactoryImpl<Row> implements
ExpressionFactory<Row> {
private final ExecutionContext<Row> ctx;
/** */
- public ExpressionFactoryImpl(ExecutionContext<Row> ctx, IgniteTypeFactory
typeFactory, SqlConformance conformance) {
+ public ExpressionFactoryImpl(
+ ExecutionContext<Row> ctx,
+ IgniteTypeFactory typeFactory,
+ SqlConformance conformance,
+ RexBuilder rexBuilder
+ ) {
this.ctx = ctx;
this.typeFactory = typeFactory;
this.conformance = conformance;
+ this.rexBuilder = rexBuilder;
- rexBuilder = new RexBuilder(this.typeFactory);
emptyType = new RelDataTypeFactory.Builder(this.typeFactory).build();
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
index 7e12665..5d90422 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
@@ -213,6 +213,9 @@ public class Inbox<Row> extends AbstractNode<Row>
implements Mailbox<Row>, Singl
break;
case END:
it.remove();
+
+ exchange.onInboundExchangeFinished(buf.nodeId, queryId(),
exchangeId);
+
break;
case WAITING:
return false;
@@ -252,6 +255,8 @@ public class Inbox<Row> extends AbstractNode<Row>
implements Mailbox<Row>, Singl
case END:
buffers.remove(buf);
+ exchange.onInboundExchangeFinished(buf.nodeId,
queryId(), exchangeId);
+
break;
case READY:
heap.offer(Pair.of(buf.peek(), buf));
@@ -290,6 +295,8 @@ public class Inbox<Row> extends AbstractNode<Row>
implements Mailbox<Row>, Singl
case END:
buffers.remove(idx--);
+ exchange.onInboundExchangeFinished(buf.nodeId,
queryId(), exchangeId);
+
break;
case READY:
noProgress = 0;
@@ -491,11 +498,11 @@ public class Inbox<Row> extends AbstractNode<Row>
implements Mailbox<Row>, Singl
Row row = curr.rows.set(curr.idx++, null);
- if (curr.idx == curr.rows.size()) {
+ if (curr.idx == curr.rows.size() && !curr.last) {
+ // Don't send acknowledge for the last batch, since outbox
already should be closed after the last batch.
acknowledge(nodeId, curr.batchId);
- if (!isEnd())
- curr = pollBatch();
+ curr = pollBatch();
}
return row;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
index 6b7ea84..0d4082f 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
@@ -64,6 +64,9 @@ public class Outbox<Row> extends AbstractNode<Row> implements
Mailbox<Row>, Sing
/** */
private int waiting;
+ /** */
+ private boolean exchangeFinished;
+
/**
* @param ctx Execution context.
* @param exchange Exchange service.
@@ -254,6 +257,12 @@ public class Outbox<Row> extends AbstractNode<Row>
implements Mailbox<Row>, Sing
else if (waiting == -1) {
for (UUID node : dest.targets())
getOrCreateBuffer(node).end();
+
+ if (!exchangeFinished) {
+ exchange.onOutboundExchangeFinished(queryId(), exchangeId);
+
+ exchangeFinished = true;
+ }
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
index c6ed6d5..ef761a7 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
@@ -19,7 +19,6 @@ package
org.apache.ignite.internal.processors.query.calcite.exec.rel;
import java.util.Iterator;
import java.util.List;
-
import org.apache.calcite.rel.type.RelDataType;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
@@ -40,6 +39,9 @@ public class ScanNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>
/** */
private boolean inLoop;
+ /** */
+ private boolean firstReq = true;
+
/**
* @param ctx Execution context.
* @param src Source.
@@ -58,8 +60,20 @@ public class ScanNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>
requested = rowsCnt;
- if (!inLoop)
- context().execute(this::push, this::onError);
+ if (!inLoop) {
+ if (firstReq) {
+ try {
+ push(); // Make first request sync to reduce latency in
simple cases.
+ }
+ catch (Throwable e) {
+ onError(e);
+ }
+
+ firstReq = false;
+ }
+ else
+ context().execute(this::push, this::onError);
+ }
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
index afc5126..e71b1d0 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
@@ -25,6 +25,7 @@ import
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
/**
*
@@ -45,6 +46,9 @@ public class QueryStartRequest implements MarshalableMessage,
ExecutionContextAw
/** */
private String root;
+ /** Total count of fragments in query for this node. */
+ private int totalFragmentsCnt;
+
/** */
@GridDirectTransient
private Object[] params;
@@ -54,14 +58,24 @@ public class QueryStartRequest implements
MarshalableMessage, ExecutionContextAw
/** */
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
- public QueryStartRequest(UUID qryId, String schema, String root,
AffinityTopologyVersion ver,
- FragmentDescription fragmentDesc, Object[] params) {
+ public QueryStartRequest(
+ UUID qryId,
+ String schema,
+ String root,
+ AffinityTopologyVersion ver,
+ FragmentDescription fragmentDesc,
+ int totalFragmentsCnt,
+ Object[] params,
+ @Nullable byte[] paramsBytes
+ ) {
this.qryId = qryId;
this.schema = schema;
this.root = root;
this.ver = ver;
this.fragmentDesc = fragmentDesc;
+ this.totalFragmentsCnt = totalFragmentsCnt;
this.params = params;
+ this.paramsBytes = paramsBytes; // If we already have marshalled
params, use it.
}
/** */
@@ -106,6 +120,13 @@ public class QueryStartRequest implements
MarshalableMessage, ExecutionContextAw
}
/**
+ * @return Total count of fragments in query for this node.
+ */
+ public int totalFragmentsCount() {
+ return totalFragmentsCnt;
+ }
+
+ /**
* @return Query parameters.
*/
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
@@ -113,6 +134,14 @@ public class QueryStartRequest implements
MarshalableMessage, ExecutionContextAw
return params;
}
+ /**
+ * @return Query parameters marshalled.
+ */
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+ public byte[] parametersMarshalled() {
+ return paramsBytes;
+ }
+
/** {@inheritDoc} */
@Override public void prepareMarshal(MarshallingContext ctx) throws
IgniteCheckedException {
if (paramsBytes == null && params != null)
@@ -177,6 +206,11 @@ public class QueryStartRequest implements
MarshalableMessage, ExecutionContextAw
writer.incrementState();
+ case 6:
+ if (!writer.writeInt("totalFragmentsCnt", totalFragmentsCnt))
+ return false;
+
+ writer.incrementState();
}
return true;
@@ -238,6 +272,13 @@ public class QueryStartRequest implements
MarshalableMessage, ExecutionContextAw
reader.incrementState();
+ case 6:
+ totalFragmentsCnt = reader.readInt("totalFragmentsCnt");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
}
return reader.afterMessageRead(QueryStartRequest.class);
@@ -250,6 +291,6 @@ public class QueryStartRequest implements
MarshalableMessage, ExecutionContextAw
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 6;
+ return 7;
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
index ab9aed1..64a6d10 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
@@ -19,13 +19,16 @@ package
org.apache.ignite.internal.processors.query.calcite.metadata;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
-import org.apache.ignite.IgniteSystemProperties;
+import java.util.stream.LongStream;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectTransient;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
@@ -34,11 +37,10 @@ import
org.apache.ignite.internal.processors.query.calcite.message.MarshallingCo
import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.GridIntIterator;
import org.apache.ignite.internal.util.GridIntList;
-import org.apache.ignite.internal.util.UUIDCollectionMessage;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.Message;
import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -47,12 +49,7 @@ import org.jetbrains.annotations.NotNull;
/** */
public class ColocationGroup implements MarshalableMessage {
/** */
- private static final int SYNTHETIC_PARTITIONS_COUNT =
-
IgniteSystemProperties.getInteger("IGNITE_CALCITE_SYNTHETIC_PARTITIONS_COUNT",
512);
-
- /** */
- @GridDirectCollection(Long.class)
- private List<Long> sourceIds;
+ private long[] sourceIds;
/** */
@GridDirectCollection(UUID.class)
@@ -62,9 +59,8 @@ public class ColocationGroup implements MarshalableMessage {
@GridDirectTransient
private List<List<UUID>> assignments;
- /** */
- @GridDirectCollection(Message.class)
- private List<UUIDCollectionMessage> assignments0;
+ /** Marshalled assignments. */
+ private int[] marshalledAssignments;
/** */
public static ColocationGroup forNodes(List<UUID> nodeIds) {
@@ -78,7 +74,7 @@ public class ColocationGroup implements MarshalableMessage {
/** */
public static ColocationGroup forSourceId(long sourceId) {
- return new ColocationGroup(Collections.singletonList(sourceId), null,
null);
+ return new ColocationGroup(new long[] {sourceId}, null, null);
}
/** */
@@ -86,20 +82,13 @@ public class ColocationGroup implements MarshalableMessage {
}
/** */
- private ColocationGroup(List<Long> sourceIds, List<UUID> nodeIds,
List<List<UUID>> assignments) {
+ private ColocationGroup(long[] sourceIds, List<UUID> nodeIds,
List<List<UUID>> assignments) {
this.sourceIds = sourceIds;
this.nodeIds = nodeIds;
this.assignments = assignments;
}
/**
- * @return Lists of colocation group sources.
- */
- public List<Long> sourceIds() {
- return sourceIds == null ? Collections.emptyList() : sourceIds;
- }
-
- /**
* @return Lists of nodes capable to execute a query fragment for what the
mapping is calculated.
*/
public List<UUID> nodeIds() {
@@ -126,7 +115,15 @@ public class ColocationGroup implements MarshalableMessage
{
/** */
public boolean belongs(long sourceId) {
- return sourceIds != null && sourceIds.contains(sourceId);
+ if (sourceIds == null)
+ return false;
+
+ for (long i : sourceIds) {
+ if (i == sourceId)
+ return true;
+ }
+
+ return false;
}
/**
@@ -137,11 +134,11 @@ public class ColocationGroup implements
MarshalableMessage {
* being calculated fragment.
*/
public ColocationGroup colocate(ColocationGroup other) throws
ColocationMappingException {
- List<Long> sourceIds;
+ long[] sourceIds;
if (this.sourceIds == null || other.sourceIds == null)
sourceIds = U.firstNotNull(this.sourceIds, other.sourceIds);
else
- sourceIds = Commons.combine(this.sourceIds, other.sourceIds);
+ sourceIds = LongStream.concat(Arrays.stream(this.sourceIds),
Arrays.stream(other.sourceIds)).distinct().toArray();
List<UUID> nodeIds;
if (this.nodeIds == null || other.nodeIds == null)
@@ -224,9 +221,11 @@ public class ColocationGroup implements MarshalableMessage
{
/** */
@NotNull private ColocationGroup forNodes0(List<UUID> nodeIds) {
- List<List<UUID>> assignments = new
ArrayList<>(SYNTHETIC_PARTITIONS_COUNT);
- for (int i = 0; i < SYNTHETIC_PARTITIONS_COUNT; i++)
- assignments.add(F.asList(nodeIds.get(i % nodeIds.size())));
+ List<List<UUID>> assignments = new ArrayList<>(nodeIds.size());
+
+ for (UUID nodeId : nodeIds)
+ assignments.add(Collections.singletonList(nodeId));
+
return new ColocationGroup(sourceIds, nodeIds, assignments);
}
@@ -266,7 +265,7 @@ public class ColocationGroup implements MarshalableMessage {
switch (writer.state()) {
case 0:
- if (!writer.writeCollection("assignments0", assignments0,
MessageCollectionItemType.MSG))
+ if (!writer.writeIntArray("marshalledAssignments",
marshalledAssignments))
return false;
writer.incrementState();
@@ -278,7 +277,7 @@ public class ColocationGroup implements MarshalableMessage {
writer.incrementState();
case 2:
- if (!writer.writeCollection("sourceIds", sourceIds,
MessageCollectionItemType.LONG))
+ if (!writer.writeLongArray("sourceIds", sourceIds))
return false;
writer.incrementState();
@@ -297,7 +296,7 @@ public class ColocationGroup implements MarshalableMessage {
switch (reader.state()) {
case 0:
- assignments0 = reader.readCollection("assignments0",
MessageCollectionItemType.MSG);
+ marshalledAssignments =
reader.readIntArray("marshalledAssignments");
if (!reader.isLastRead())
return false;
@@ -313,7 +312,7 @@ public class ColocationGroup implements MarshalableMessage {
reader.incrementState();
case 2:
- sourceIds = reader.readCollection("sourceIds",
MessageCollectionItemType.LONG);
+ sourceIds = reader.readLongArray("sourceIds");
if (!reader.isLastRead())
return false;
@@ -332,23 +331,202 @@ public class ColocationGroup implements
MarshalableMessage {
/** {@inheritDoc} */
@Override public void prepareMarshal(MarshallingContext ctx) {
- if (assignments != null && assignments0 == null)
- assignments0 = Commons.transform(assignments, this::transform);
+ if (assignments != null && marshalledAssignments == null) {
+ Map<UUID, Integer> nodeIdxs = new HashMap<>();
+
+ for (int i = 0; i < nodeIds.size(); i++)
+ nodeIdxs.put(nodeIds.get(i), i);
+
+ int bitsPerPart = Integer.SIZE -
Integer.numberOfLeadingZeros(nodeIds.size());
+
+ CompactedIntArray.Builder builder =
CompactedIntArray.builder(bitsPerPart, assignments.size());
+
+ for (List<UUID> assignment : assignments) {
+ assert F.isEmpty(assignment) || assignment.size() == 1;
+
+ if (F.isEmpty(assignment))
+ builder.add(nodeIds.size());
+ else {
+ Integer nodeIdx = nodeIdxs.get(assignment.get(0));
+
+ builder.add(nodeIdx);
+ }
+ }
+
+ marshalledAssignments = builder.build().buffer();
+ }
}
/** {@inheritDoc} */
@Override public void prepareUnmarshal(MarshallingContext ctx) {
- if (assignments0 != null && assignments == null)
- assignments = Commons.transform(assignments0, this::transform);
- }
+ if (marshalledAssignments != null && assignments == null) {
+ int bitsPerPart = Integer.SIZE -
Integer.numberOfLeadingZeros(nodeIds.size());
- /** */
- private List<UUID> transform(UUIDCollectionMessage message) {
- return message.uuids() instanceof List ? (List<UUID>)message.uuids() :
new ArrayList<>(message.uuids());
+ CompactedIntArray compactedArr = CompactedIntArray.of(bitsPerPart,
marshalledAssignments);
+
+ assignments = new ArrayList<>(compactedArr.size());
+
+ for (GridIntIterator iter = compactedArr.iterator();
iter.hasNext(); ) {
+ int nodeIdx = iter.next();
+
+ assignments.add(nodeIdx >= nodeIds.size() ?
Collections.emptyList() :
+ Collections.singletonList(nodeIds.get(nodeIdx)));
+ }
+ }
}
/** */
- private UUIDCollectionMessage transform(List<UUID> uuids) {
- return new UUIDCollectionMessage(uuids);
+ private static class CompactedIntArray {
+ /** */
+ protected static final int BUF_POS_MASK = Integer.SIZE - 1;
+
+ /** */
+ protected static final int BUF_POS_LOG2 = Integer.SIZE -
Integer.numberOfLeadingZeros(Integer.SIZE - 1);
+
+ /** */
+ protected static final int[] BIT_MASKS = new int[Integer.SIZE];
+
+ static {
+ for (int i = 0; i < Integer.SIZE; i++)
+ BIT_MASKS[i] = ~(-1 << i);
+ }
+
+ /** Buffer. */
+ private final int[] buf;
+
+ /** Bits count per each item. */
+ private final int bitsPerItem;
+
+ /** Ctor. */
+ private CompactedIntArray(int bitsPerItem, int[] buf) {
+ this.bitsPerItem = bitsPerItem;
+ this.buf = buf;
+ }
+
+ /** */
+ public int[] buffer() {
+ return buf;
+ }
+
+ /** */
+ public int size() {
+ return buf[0];
+ }
+
+ /** */
+ public GridIntIterator iterator() {
+ return new Iterator();
+ }
+
+ /** */
+ public static CompactedIntArray of(int bitsPerItem, int[] buf) {
+ return new CompactedIntArray(bitsPerItem, buf);
+ }
+
+ /** */
+ public static Builder builder(int bitsPerItem, int capacity) {
+ return new Builder(bitsPerItem, capacity);
+ }
+
+ /** */
+ private static class Builder {
+ /** Current bit position. */
+ private int bitPos = Integer.SIZE; // Skip first element.
+
+ /** Current size. */
+ private int size;
+
+ /** Buffer. */
+ protected final int[] buf;
+
+ /** Bits count per each item. */
+ protected final int bitsPerItem;
+
+ /** Ctor. */
+ public Builder(int bitsPerItem, int capacity) {
+ this.bitsPerItem = bitsPerItem;
+ buf = new int[(capacity * bitsPerItem + Integer.SIZE - 1) /
Integer.SIZE + 1];
+ buf[0] = capacity;
+ }
+
+ /** Add the next item. */
+ public void add(int val) {
+ assert size < buf[0];
+
+ int bitsToWrite = bitsPerItem;
+ int bitPos = this.bitPos;
+
+ do {
+ int bitsToWriteCurBuf = Math.min(bitsToWrite, Integer.SIZE
- (bitPos & BUF_POS_MASK));
+
+ int writeVal = (val & BIT_MASKS[bitsToWriteCurBuf]) <<
(bitPos & BUF_POS_MASK);
+
+ val >>= bitsToWriteCurBuf;
+
+ buf[bitPos >> BUF_POS_LOG2] |= writeVal;
+
+ bitPos += bitsToWriteCurBuf;
+
+ bitsToWrite -= bitsToWriteCurBuf;
+ }
+ while (bitsToWrite > 0);
+
+ this.bitPos = bitPos;
+
+ size++;
+ }
+
+ /** */
+ public CompactedIntArray build() {
+ buf[0] = size;
+
+ return new CompactedIntArray(bitsPerItem, buf);
+ }
+ }
+
+ /** */
+ private class Iterator implements GridIntIterator {
+ /** Current bit position. */
+ private int bitPos = Integer.SIZE; // Skip first element.
+
+ /** Current item position. */
+ private int pos;
+
+ /** Array size. */
+ private final int size = buf[0];
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return pos < size;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int next() {
+ assert pos < size;
+
+ int bitPos = this.bitPos;
+
+ int bitsFirstBuf = Math.min(bitsPerItem, Integer.SIZE -
(bitPos & BUF_POS_MASK));
+
+ int val = ((buf[bitPos >> BUF_POS_LOG2] >> (bitPos &
BUF_POS_MASK)) & BIT_MASKS[bitsFirstBuf]);
+
+ bitPos += bitsFirstBuf;
+
+ if (bitsFirstBuf < bitsPerItem) {
+ int bitsSecondBuf = bitsPerItem - bitsFirstBuf;
+
+ val |= ((buf[bitPos >> BUF_POS_LOG2] >> (bitPos &
BUF_POS_MASK)) & BIT_MASKS[bitsSecondBuf])
+ << bitsFirstBuf;
+
+ bitPos += bitsSecondBuf;
+ }
+
+ this.bitPos = bitPos;
+
+ pos++;
+
+ return val;
+ }
+ }
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
index 76318e9..b86f226 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
@@ -61,7 +61,7 @@ public abstract class AbstractMultiStepPlan implements
MultiStepPlan {
/** {@inheritDoc} */
@Override public FragmentMapping mapping(Fragment fragment) {
- return mapping(fragment.fragmentId());
+ return fragment.mapping();
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
index a920c95..076701f 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
@@ -71,7 +71,7 @@ public final class BaseQueryContext extends
AbstractQueryContext {
private static final VolcanoPlanner EMPTY_PLANNER;
/** */
- private static final RexBuilder DFLT_REX_BUILDER;
+ private static final RexBuilder REX_BUILDER;
/** */
public static final RelOptCluster CLUSTER;
@@ -104,9 +104,9 @@ public final class BaseQueryContext extends
AbstractQueryContext {
RelDataTypeSystem typeSys =
CALCITE_CONNECTION_CONFIG.typeSystem(RelDataTypeSystem.class,
FRAMEWORK_CONFIG.getTypeSystem());
TYPE_FACTORY = new IgniteTypeFactory(typeSys);
- DFLT_REX_BUILDER = new RexBuilder(TYPE_FACTORY);
+ REX_BUILDER = new RexBuilder(TYPE_FACTORY);
- CLUSTER = RelOptCluster.create(EMPTY_PLANNER, DFLT_REX_BUILDER);
+ CLUSTER = RelOptCluster.create(EMPTY_PLANNER, REX_BUILDER);
// Forbid using the empty cluster in any planning or mapping
procedures to prevent memory leaks.
String cantBeUsedMsg = "Empty cluster can't be used for planning or
mapping";
@@ -169,13 +169,11 @@ public final class BaseQueryContext extends
AbstractQueryContext {
this.log = log;
- RelDataTypeSystem typeSys =
CALCITE_CONNECTION_CONFIG.typeSystem(RelDataTypeSystem.class,
cfg.getTypeSystem());
-
- typeFactory = new IgniteTypeFactory(typeSys);
-
qryCancel = unwrap(GridQueryCancel.class);
- rexBuilder = new RexBuilder(typeFactory);
+ typeFactory = TYPE_FACTORY;
+
+ rexBuilder = REX_BUILDER;
}
/**
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
index ef7f8af..fc4868f 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
@@ -550,7 +550,7 @@ public class CacheTableDescriptorImpl extends
NullInitializerExpressionFactory
b.add(descriptors[i].name(),
descriptors[i].logicalType(factory));
}
- return TypeUtils.sqlType(factory, b.build());
+ return b.build();
}
/** {@inheritDoc} */
@@ -644,6 +644,9 @@ public class CacheTableDescriptorImpl extends
NullInitializerExpressionFactory
private final Class<?> storageType;
/** */
+ private volatile RelDataType logicalType;
+
+ /** */
private KeyValDescriptor(String name, Class<?> type, boolean isKey,
int fieldIdx) {
this.name = name;
this.isKey = isKey;
@@ -684,7 +687,10 @@ public class CacheTableDescriptorImpl extends
NullInitializerExpressionFactory
/** {@inheritDoc} */
@Override public RelDataType logicalType(IgniteTypeFactory f) {
- return f.toSql(f.createJavaType(storageType));
+ if (logicalType == null)
+ logicalType = TypeUtils.sqlType(f,
f.createJavaType(storageType));
+
+ return logicalType;
}
/** {@inheritDoc} */
@@ -718,6 +724,9 @@ public class CacheTableDescriptorImpl extends
NullInitializerExpressionFactory
private final Class<?> storageType;
/** */
+ private volatile RelDataType logicalType;
+
+ /** */
private FieldDescriptor(GridQueryProperty desc, int fieldIdx) {
this.desc = desc;
this.fieldIdx = fieldIdx;
@@ -758,7 +767,10 @@ public class CacheTableDescriptorImpl extends
NullInitializerExpressionFactory
/** {@inheritDoc} */
@Override public RelDataType logicalType(IgniteTypeFactory f) {
- return f.toSql(f.createJavaType(storageType));
+ if (logicalType == null)
+ logicalType = TypeUtils.sqlType(f,
f.createJavaType(storageType));
+
+ return logicalType;
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableDescriptorImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableDescriptorImpl.java
index f9d32fc..753dfce 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableDescriptorImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableDescriptorImpl.java
@@ -179,7 +179,7 @@ public class SystemViewTableDescriptorImpl<ViewRow> extends
NullInitializerExpre
b.add(descriptors[i].name(),
descriptors[i].logicalType(factory));
}
- return TypeUtils.sqlType(factory, b.build());
+ return b.build();
}
/** {@inheritDoc} */
@@ -220,6 +220,9 @@ public class SystemViewTableDescriptorImpl<ViewRow> extends
NullInitializerExpre
private final boolean isFiltrable;
/** */
+ private volatile RelDataType logicalType;
+
+ /** */
private SystemViewColumnDescriptorImpl(String name, Class<?> type, int
fieldIdx, boolean isFiltrable) {
originalName = name;
sqlName = toSqlName(name);
@@ -255,7 +258,10 @@ public class SystemViewTableDescriptorImpl<ViewRow>
extends NullInitializerExpre
/** {@inheritDoc} */
@Override public RelDataType logicalType(IgniteTypeFactory f) {
- return f.toSql(f.createJavaType(type));
+ if (logicalType == null)
+ logicalType = TypeUtils.sqlType(f, f.createJavaType(type));
+
+ return logicalType;
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
index aa0fc11..6647016 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
@@ -39,6 +39,7 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.calcite.QueryRegistryImpl;
import
org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeServiceImpl;
@@ -181,6 +182,7 @@ public class AbstractExecutionTest extends
GridCommonAbstractTest {
exchangeSvc.taskExecutor(taskExecutor);
exchangeSvc.messageService(msgSvc);
exchangeSvc.mailboxRegistry(mailboxRegistry);
+ exchangeSvc.queryRegistry(new QueryRegistryImpl(log));
exchangeSvc.init();
exchangeServices.put(uuid, exchangeSvc);
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
index b733b6f..0da2463 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.query.calcite.integration;
import java.util.List;
-
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.QueryEntity;
@@ -37,6 +36,7 @@ import
org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
*
@@ -55,9 +55,18 @@ public class AbstractBasicIntegrationTest extends
GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
+ // Wait for pending queries before destroying caches. If some error
occurs during query execution, client code
+ // can get control earlier than query leave the running queries
registry (need some time for async message
+ // exchange), but eventually, all queries should be closed.
+ assertTrue("Not finished queries found on client", waitForCondition(
+ () ->
queryProcessor(client).queryRegistry().runningQueries().isEmpty(), 1_000L));
+
for (Ignite ign : G.allGrids()) {
for (String cacheName : ign.cacheNames())
ign.destroyCache(cacheName);
+
+ assertEquals("Not finished queries found [ignite=" + ign.name() +
']',
+ 0,
queryProcessor((IgniteEx)ign).queryRegistry().runningQueries().size());
}
awaitPartitionMapExchange();
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
index 380a1c2..9dfe622 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
@@ -19,9 +19,16 @@
package org.apache.ignite.internal.processors.query.calcite.integration;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.IntStream;
-
+import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -29,8 +36,15 @@ import
org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryEngine;
import org.apache.ignite.internal.processors.query.QueryState;
import org.apache.ignite.internal.processors.query.RunningQuery;
+import
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
+import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import
org.apache.ignite.internal.processors.query.calcite.schema.CacheTableImpl;
+import
org.apache.ignite.internal.processors.query.calcite.schema.IgniteCacheTable;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
import org.junit.Assert;
import org.junit.Test;
@@ -94,62 +108,82 @@ public class RunningQueriesIntegrationTest extends
AbstractBasicIntegrationTest
}
/**
- * Execute query with a lot of JOINs to produce very long excution phase.
+ * Execute query with a latch on excution phase.
* Cancel query on execution phase and check query registry is empty on
the all nodes of the cluster.
*/
@Test
- public void testCancelAtExecutionPhase() throws IgniteCheckedException {
- QueryEngine cliEngine = queryProcessor(client);
- QueryEngine srvEngine = queryProcessor(srv);
- int cnt = 6;
+ public void testCancelAtExecutionPhase() throws Exception {
+ CalciteQueryProcessor cliEngine = queryProcessor(client);
+ CalciteQueryProcessor srvEngine = queryProcessor(srv);
sql("CREATE TABLE person (id int, val varchar)");
-
- String data = IntStream.range(0, 1000).mapToObj((i) -> "(" + i + "," +
i + ")").collect(joining(", "));
- String insertSql = "INSERT INTO person (id, val) VALUES " + data;
-
- sql(insertSql);
-
- String bigJoin = IntStream.range(0, cnt).mapToObj((i) -> "person p" +
i).collect(joining(", "));
- String sql = "SELECT * FROM " + bigJoin;
-
- IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() ->
sql(sql));
-
- // The query is executing on client.
- Assert.assertTrue(GridTestUtils.waitForCondition(
- () -> {
- Collection<? extends RunningQuery> queries =
cliEngine.runningQueries();
-
- return !queries.isEmpty() && F.first(queries).state() ==
QueryState.EXECUTING;
- },
- TIMEOUT_IN_MS));
-
- // The query is executing on sever.
- Assert.assertTrue(GridTestUtils.waitForCondition(
- () -> {
- Collection<? extends RunningQuery> queries =
srvEngine.runningQueries();
-
- return !queries.isEmpty() && F.first(queries).state() ==
QueryState.EXECUTING;
- },
- TIMEOUT_IN_MS));
-
- Collection<? extends RunningQuery> running =
cliEngine.runningQueries();
-
- assertEquals(1, running.size());
-
- RunningQuery qry = F.first(running);
-
- assertSame(qry, cliEngine.runningQuery(qry.id()));
-
- qry.cancel();
-
- Assert.assertTrue(GridTestUtils.waitForCondition(
- () -> cliEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
-
- Assert.assertTrue(GridTestUtils.waitForCondition(
- () -> srvEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
-
- GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(100),
IgniteSQLException.class, "The query was cancelled while executing.");
+ sql("INSERT INTO person (id, val) VALUES (?, ?)", 0, "val0");
+
+ IgniteCacheTable oldTbl =
(IgniteCacheTable)srvEngine.schemaHolder().schema("PUBLIC").getTable("PERSON");
+
+ CountDownLatch scanLatch = new CountDownLatch(1);
+ AtomicBoolean stop = new AtomicBoolean();
+
+ IgniteCacheTable newTbl = new CacheTableImpl(srv.context(),
oldTbl.descriptor()) {
+ @Override public <Row> Iterable<Row> scan(
+ ExecutionContext<Row> execCtx,
+ ColocationGroup grp,
+ Predicate<Row> filter,
+ Function<Row, Row> rowTransformer,
+ @Nullable ImmutableBitSet usedColumns
+ ) {
+ return new Iterable<Row>() {
+ @NotNull @Override public Iterator<Row> iterator() {
+ scanLatch.countDown();
+
+ return new Iterator<Row>() {
+ @Override public boolean hasNext() {
+ // Produce rows until stopped.
+ return !stop.get();
+ }
+
+ @Override public Row next() {
+ if (stop.get())
+ throw new NoSuchElementException();
+
+ return execCtx.rowHandler().factory().create();
+ }
+ };
+ }
+ };
+ }
+ };
+
+ srvEngine.schemaHolder().schema("PUBLIC").add("PERSON", newTbl);
+
+ IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() ->
sql("SELECT * FROM person"));
+
+ try {
+ scanLatch.await(TIMEOUT_IN_MS, TimeUnit.MILLISECONDS);
+
+ // Check state on server.
+ assertEquals(1, srvEngine.runningQueries().size());
+ assertEquals(QueryState.EXECUTING,
F.first(srvEngine.runningQueries()).state());
+
+ // Check state on client.
+ assertEquals(1, cliEngine.runningQueries().size());
+ RunningQuery qry = F.first(cliEngine.runningQueries());
+ assertEquals(QueryState.EXECUTING, qry.state());
+
+ qry.cancel();
+
+ Assert.assertTrue(GridTestUtils.waitForCondition(
+ () -> srvEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+
+ Assert.assertTrue(GridTestUtils.waitForCondition(
+ () -> cliEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+ }
+ finally {
+ stop.set(true);
+ }
+
+ GridTestUtils.assertThrowsAnyCause(log,
+ () -> fut.get(100), IgniteSQLException.class, "The query was
cancelled while executing.");
}
/**
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
index 7710ba1..3f79059 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -37,6 +37,7 @@ import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.calcite.QueryRegistryImpl;
import
org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeServiceImpl;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
@@ -328,6 +329,7 @@ public class PlannerTest extends AbstractPlannerTest {
exchangeSvc.taskExecutor(taskExecutor);
exchangeSvc.messageService(msgSvc);
exchangeSvc.mailboxRegistry(mailboxRegistry);
+ exchangeSvc.queryRegistry(new QueryRegistryImpl(log));
exchangeSvc.init();
ectx = new ExecutionContext<>(
@@ -385,6 +387,7 @@ public class PlannerTest extends AbstractPlannerTest {
exchangeSvc.taskExecutor(taskExecutor);
exchangeSvc.messageService(msgSvc);
exchangeSvc.mailboxRegistry(mailboxRegistry);
+ exchangeSvc.queryRegistry(new QueryRegistryImpl(log));
exchangeSvc.init();
ectx = new ExecutionContext<>(
@@ -553,6 +556,7 @@ public class PlannerTest extends AbstractPlannerTest {
exchangeSvc.taskExecutor(taskExecutor);
exchangeSvc.messageService(msgSvc);
exchangeSvc.mailboxRegistry(mailboxRegistry);
+ exchangeSvc.queryRegistry(new QueryRegistryImpl(log));
exchangeSvc.init();
ectx = new ExecutionContext<>(
@@ -609,6 +613,7 @@ public class PlannerTest extends AbstractPlannerTest {
exchangeSvc.taskExecutor(taskExecutor);
exchangeSvc.messageService(msgSvc);
exchangeSvc.mailboxRegistry(mailboxRegistry);
+ exchangeSvc.queryRegistry(new QueryRegistryImpl(log));
exchangeSvc.init();
ectx = new ExecutionContext<>(
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index de8d507..2e74824 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -2896,8 +2896,11 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
throw new CacheException("Execution of local SqlFieldsQuery on
client node disallowed.");
if (experimentalQueryEngine != null && useExperimentalSqlEngine) {
- return experimentalQueryEngine.query(QueryContext.of(qry, cliCtx),
qry.getSchema(), qry.getSql(),
- X.EMPTY_OBJECT_ARRAY);
+ return experimentalQueryEngine.query(
+ QueryContext.of(qry, cliCtx),
+ qry.getSchema() == null ? schemaName(cctx) : qry.getSchema(),
+ qry.getSql(),
+ qry.getArgs() == null ? X.EMPTY_OBJECT_ARRAY : qry.getArgs());
}
return executeQuerySafe(cctx, () -> {