ignite-gg9499 -
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/193d9b32 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/193d9b32 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/193d9b32 Branch: refs/heads/ignite-gg9499 Commit: 193d9b32183cf4b35eb0bdb026c37147942bf15e Parents: 4da1d1a Author: S.Vladykin <svlady...@gridgain.com> Authored: Thu Dec 18 19:29:25 2014 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Thu Dec 18 19:29:25 2014 +0300 ---------------------------------------------------------------------- .../org/gridgain/grid/kernal/GridTopic.java | 5 +- .../cache/query/GridCacheQueriesEx.java | 7 + .../cache/query/GridCacheQueriesImpl.java | 5 + .../cache/query/GridCacheQueriesProxy.java | 12 + .../cache/query/GridCacheQueryManager.java | 26 +- .../cache/query/GridCacheSqlQuery.java | 91 +++++ .../cache/query/GridCacheSqlResult.java | 19 ++ .../cache/query/GridCacheTwoStepQuery.java | 66 ++++ .../processors/query/GridQueryIndexing.java | 10 + .../processors/query/GridQueryProcessor.java | 17 + .../java/org/gridgain/grid/util/GridQueue.java | 340 ------------------- .../gridgain/grid/util/GridQueueSelfTest.java | 62 ---- .../processors/query/h2/GridH2Indexing.java | 130 ++++--- .../query/h2/GridH2ResultSetIterator.java | 3 +- .../query/h2/opt/GridH2IndexBase.java | 5 +- .../query/h2/twostep/GridMapQueryExecutor.java | 263 ++++++++++++++ .../query/h2/twostep/GridMergeIndex.java | 131 +++++-- .../h2/twostep/GridMergeIndexUnsorted.java | 74 ++++ .../query/h2/twostep/GridMergeTable.java | 33 +- .../query/h2/twostep/GridNextPageRequest.java | 54 --- .../query/h2/twostep/GridNextPageResponse.java | 47 --- .../query/h2/twostep/GridQueryAck.java | 42 --- .../query/h2/twostep/GridQueryRequest.java | 50 --- .../h2/twostep/GridReduceQueryExecutor.java | 199 +++++++++++ .../query/h2/twostep/GridResultPage.java | 76 +++++ .../twostep/messages/GridNextPageRequest.java | 59 ++++ .../twostep/messages/GridNextPageResponse.java | 149 ++++++++ .../query/h2/twostep/messages/GridQueryAck.java | 34 ++ .../twostep/messages/GridQueryFailResponse.java | 46 +++ .../h2/twostep/messages/GridQueryRequest.java | 61 ++++ 30 files changed, 1402 insertions(+), 714 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/GridTopic.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridTopic.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridTopic.java index 5fedbd9..7ab61d9 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridTopic.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridTopic.java @@ -77,7 +77,10 @@ public enum GridTopic { TOPIC_TIME_SYNC, /** */ - TOPIC_HADOOP; + TOPIC_HADOOP, + + /** */ + TOPIC_QUERY; /** Enum values. */ private static final GridTopic[] VALS = values(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java index d1732fb..e854367 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java @@ -10,6 +10,7 @@ package org.gridgain.grid.kernal.processors.cache.query; import org.apache.ignite.*; +import org.apache.ignite.lang.*; import org.gridgain.grid.cache.query.*; import java.util.*; @@ -41,4 +42,10 @@ public interface GridCacheQueriesEx<K, V> extends GridCacheQueries<K, V> { * @return Query. */ public <R> GridCacheQuery<R> createSpiQuery(); + + /** + * @param qry Query. + * @return Future. + */ + public IgniteFuture<GridCacheSqlResult> execute(GridCacheTwoStepQuery qry); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java index 3ba1ceb..f643cb2 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java @@ -158,6 +158,11 @@ public class GridCacheQueriesImpl<K, V> implements GridCacheQueriesEx<K, V>, Ext } /** {@inheritDoc} */ + @Override public IgniteFuture<GridCacheSqlResult> execute(GridCacheTwoStepQuery qry) { + return ctx.kernalContext().query().queryTwoStep(qry); + } + + /** {@inheritDoc} */ @Override public GridCacheContinuousQuery<K, V> createContinuousQuery() { return ctx.continuousQueries().createQuery(prj == null ? null : prj.predicate()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java index 9edcf6a..61f7ac7 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java @@ -166,6 +166,18 @@ public class GridCacheQueriesProxy<K, V> implements GridCacheQueriesEx<K, V>, Ex } /** {@inheritDoc} */ + @Override public IgniteFuture<GridCacheSqlResult> execute(GridCacheTwoStepQuery qry) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + return delegate.execute(qry); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public IgniteFuture<?> rebuildIndexes(Class<?> cls) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java index 25c0668..5fbc366 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java @@ -51,7 +51,7 @@ import static org.gridgain.grid.kernal.processors.cache.query.GridCacheQueryType @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapter<K, V> { /** */ - protected GridQueryProcessor idxProc; + protected GridQueryProcessor qryProc; /** */ private String space; @@ -78,7 +78,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** {@inheritDoc} */ @Override public void start0() throws IgniteCheckedException { - idxProc = cctx.kernalContext().query(); + qryProc = cctx.kernalContext().query(); space = cctx.name(); maxIterCnt = cctx.config().getMaximumQueryIteratorCount(); @@ -165,7 +165,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte throw new IllegalStateException("Failed to get size (grid is stopping)."); try { - return idxProc.size(space, valType); + return qryProc.size(space, valType); } finally { leaveBusy(); @@ -193,7 +193,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte throw new IllegalStateException("Failed to rebuild indexes (grid is stopping)."); try { - return idxProc.rebuildIndexes(space, typeName); + return qryProc.rebuildIndexes(space, typeName); } finally { leaveBusy(); @@ -210,7 +210,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte throw new IllegalStateException("Failed to rebuild indexes (grid is stopping)."); try { - return idxProc.rebuildAllIndexes(); + return qryProc.rebuildAllIndexes(); } finally { leaveBusy(); @@ -262,7 +262,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte return; // Ignore index update when node is stopping. try { - idxProc.onSwap(space, key); + qryProc.onSwap(space, key); } finally { leaveBusy(); @@ -282,7 +282,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte return; // Ignore index update when node is stopping. try { - idxProc.onUnswap(space, key, val, valBytes); + qryProc.onUnswap(space, key, val, valBytes); } finally { leaveBusy(); @@ -324,7 +324,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (val == null) val = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader()); - idxProc.store(space, key, keyBytes, val, valBytes, CU.versionToBytes(ver), expirationTime); + qryProc.store(space, key, keyBytes, val, valBytes, CU.versionToBytes(ver), expirationTime); } finally { invalidateResultCache(); @@ -349,7 +349,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte return; // Ignore index update when node is stopping. try { - idxProc.remove(space, key); + qryProc.remove(space, key); } finally { invalidateResultCache(); @@ -368,7 +368,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte return; // Ignore index update when node is stopping. try { - idxProc.onUndeploy(space, ldr); + qryProc.onUndeploy(space, ldr); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -488,7 +488,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte taskName)); } - iter = idxProc.query(space, qry.clause(), F.asList(args), + iter = qryProc.query(space, qry.clause(), F.asList(args), qry.queryClassName(), filter(qry)); break; @@ -531,7 +531,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte taskName)); } - iter = idxProc.queryText(space, qry.clause(), qry.queryClassName(), filter(qry)); + iter = qryProc.queryText(space, qry.clause(), qry.queryClassName(), filter(qry)); break; @@ -650,7 +650,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte else { assert qry.type() == SQL_FIELDS; - GridQueryFieldsResult qryRes = idxProc.queryFields(space, qry.clause(), F.asList(args), filter(qry)); + GridQueryFieldsResult qryRes = qryProc.queryFields(space, qry.clause(), F.asList(args), filter(qry)); res.metaData(qryRes.metaData()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java new file mode 100644 index 0000000..025ea29 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java @@ -0,0 +1,91 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache.query; + +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; + +import java.io.*; + +/** + * Query. + */ +public class GridCacheSqlQuery implements Externalizable { + /** */ + private static final Object[] EMPTY_PARAMS = {}; + + /** */ + String alias; + + /** */ + String qry; + + /** */ + Object[] params; + + /** + * For {@link Externalizable}. + */ + public GridCacheSqlQuery() { + // No-op. + } + + /** + * @param alias Alias. + * @param qry Query. + * @param params Query parameters. + */ + GridCacheSqlQuery(String alias, String qry, Object[] params) { + A.ensure(!F.isEmpty(qry), "qry must not be empty"); + + this.alias = alias; + this.qry = qry; + + this.params = F.isEmpty(params) ? EMPTY_PARAMS : params; + } + + /** + * @return Alias. + */ + public String alias() { + return alias; + } + + /** + * @return Query. + */ + public String query() { + return qry; + } + + /** + * @return Parameters. + */ + public Object[] parameters() { + return params; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, alias); + U.writeString(out, qry); + U.writeArray(out, params); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + alias = U.readString(in); + qry = U.readString(in); + params = U.readArray(in); + + if (F.isEmpty(params)) + params = EMPTY_PARAMS; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlResult.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlResult.java new file mode 100644 index 0000000..ecee21e --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlResult.java @@ -0,0 +1,19 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache.query; + +import java.util.*; + +/** + * SQL Query result. + */ +public interface GridCacheSqlResult extends AutoCloseable, Iterable<List<?>> { + // No-op. +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java new file mode 100644 index 0000000..a7c9a02 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java @@ -0,0 +1,66 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache.query; + +import org.apache.ignite.*; +import org.gridgain.grid.util.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * Two step map-reduce style query. + */ +public class GridCacheTwoStepQuery implements Serializable { + /** */ + private Map<String, GridCacheSqlQuery> mapQrys; + + /** */ + private GridCacheSqlQuery reduce; + + /** + * @param qry Reduce query. + * @param params Reduce query parameters. + */ + public GridCacheTwoStepQuery(String qry, Object ... params) { + reduce = new GridCacheSqlQuery(null, qry, params); + } + + /** + * @param alias Alias. + * @param qry SQL Query. + * @param params Query parameters. + */ + public void addMapQuery(String alias, String qry, Object ... params) { + A.ensure(!F.isEmpty(alias), "alias must not be empty"); + + if (mapQrys == null) + mapQrys = new GridLeanMap<>(); + + if (mapQrys.put(alias, new GridCacheSqlQuery(alias, qry, params)) != null) + throw new IgniteException("Failed to add query, alias already exists: " + alias + "."); + } + + /** + * @return Reduce query. + */ + public GridCacheSqlQuery reduceQuery() { + return reduce; + } + + /** + * @return Map queries. + */ + public Collection<GridCacheSqlQuery> mapQueries() { + return mapQrys.values(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java index 18b2832..1b9ec6a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java @@ -13,6 +13,7 @@ import org.apache.ignite.*; import org.apache.ignite.lang.*; import org.apache.ignite.spi.indexing.*; import org.gridgain.grid.kernal.*; +import org.gridgain.grid.kernal.processors.cache.query.*; import org.gridgain.grid.util.lang.*; import org.jetbrains.annotations.*; @@ -37,6 +38,15 @@ public interface GridQueryIndexing { */ public void stop() throws IgniteCheckedException; + + /** + * Runs two step query. + * + * @param qry Query. + * @return Future. + */ + public IgniteFuture<GridCacheSqlResult> queryTwoStep(GridCacheTwoStepQuery qry); + /** * Queries individual fields (generally used by JDBC drivers). * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java index 6bc0235..e05c425 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java @@ -18,6 +18,7 @@ import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.query.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.*; +import org.gridgain.grid.kernal.processors.cache.query.*; import org.gridgain.grid.util.*; import org.gridgain.grid.util.future.*; import org.gridgain.grid.util.lang.*; @@ -428,6 +429,22 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * @param qry Query. + * @return Future. + */ + public IgniteFuture<GridCacheSqlResult> queryTwoStep(GridCacheTwoStepQuery qry) { + if (!busyLock.enterBusy()) + throw new IllegalStateException("Failed to execute query (grid is stopping)."); + + try { + return idx.queryTwoStep(qry); + } + finally { + busyLock.leaveBusy(); + } + } + + /** * @param space Space. * @param key Key. * @throws IgniteCheckedException Thrown in case of any errors. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/util/GridQueue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/util/GridQueue.java b/modules/core/src/main/java/org/gridgain/grid/util/GridQueue.java deleted file mode 100644 index 1f6bbe4..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/util/GridQueue.java +++ /dev/null @@ -1,340 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.util; - -import org.gridgain.grid.util.typedef.internal.*; -import org.gridgain.grid.util.tostring.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Queue which supports addition at tail and removing at head. This - * queue also exposes its internal linked list nodes and allows for - * constant time removal from the middle of the queue. - * <p> - * This queue is not thread-safe. - */ -public class GridQueue<E> extends AbstractCollection<E> implements Queue<E> { - /** Queue size. */ - private int size; - - /** Modification count. */ - private int modCnt; - - /** Queue header. */ - private Node<E> hdr = new Node<>(null, null, null); - - /** - * Creates empty queue. - */ - public GridQueue() { - hdr.next = hdr.prev = hdr; - } - - /** - * Handles modification count check. - * - * @param match Modification count to match. - */ - private void checkModCount(int match) { - if (modCnt != match) - throw new ConcurrentModificationException("Mod count mismatch [expected=" + match + - ", actual=" + modCnt + ']'); - - modCnt++; - } - - /** - * Adds element before node. - * - * @param e Element to add. - * @param n Node. - * @return New node. - */ - private Node<E> addBefore(E e, Node<E> n) { - A.notNull(e, "e"); - - assert n != null; - - int match = modCnt; - - Node<E> newNode = new Node<>(e, n, n.prev); - - // Link. - newNode.prev.next = newNode; - newNode.next.prev = newNode; - - size++; - - checkModCount(match); - - return newNode; - } - - /** - * Removes node. - * - * @param n Node to remove. - * @return Removed value. - */ - private E remove(Node<E> n) { - assert n != null; - - if (n == hdr) - throw new NoSuchElementException(); - - assert !n.unlinked(); - - int match = modCnt; - - E res = n.item; - - // Relink. - n.prev.next = n.next; - n.next.prev = n.prev; - - // GC. - n.next = n.prev = null; - n.item = null; - - size--; - - checkModCount(match); - - n.unlink(); - - return res; - } - - /** {@inheritDoc} */ - @Override public boolean add(E e) { - offer(e); - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean remove(Object o) { - A.notNull(o, "o"); - - for (Node<E> n = hdr.next; n != hdr; n = n.next) { - if (o.equals(n.item)) { - remove(n); - - return true; - } - } - - return false; - } - - /** {@inheritDoc} */ - @Override public boolean offer(E e) { - addBefore(e, hdr); - - return true; - } - - /** - * Same as {@link #offer(Object)}, but returns created node. - * - * @param e Element to add. - * @return New node. - */ - public Node<E> offerx(E e) { - return addBefore(e, hdr); - } - - /** - * Polls element from head of the queue. - * - * @return Polled element. - */ - @Nullable @Override public E poll() { - if (size == 0) - return null; - - return remove(hdr.next); - } - - /** {@inheritDoc} */ - @Override public E element() { - Node<E> n = hdr.next; - - if (n == null) - throw new NoSuchElementException(); - - return n.item; - } - - /** {@inheritDoc} */ - @Override public E remove() { - E item = poll(); - - if (item == null) - throw new NoSuchElementException(); - - return item; - } - - /** {@inheritDoc} */ - @Nullable @Override public E peek() { - return hdr.next.item; - } - - /** - * @return Peeks at first node in the queue. - */ - public Node<E> peekx() { - return hdr.next == hdr ? null : hdr.next; - } - - /** - * Unlinks node from the queue. - * - * @param n Node to unlink. - */ - public void unlink(Node<E> n) { - A.notNull(n, "n"); - - remove(n); - } - - /** - * Gets queue size. - * - * @return Queue size. - */ - @Override public int size() { - return size; - } - - /** {@inheritDoc} */ - @Override public Iterator<E> iterator() { - return new QueueIterator(); - } - - /** - * Node for internal linked list. - * - * @param <E> Queue element. - */ - @SuppressWarnings( {"PublicInnerClass"}) - public static class Node<E> { - /** Item. */ - private E item; - - /** Next. */ - @GridToStringExclude - private Node<E> next; - - /** Previous. */ - @GridToStringExclude - private Node<E> prev; - - /** Unlinked flag. */ - private boolean unlinked; - - /** - * @param item Item. - * @param next Next link. - * @param prev Previous link. - */ - private Node(E item, Node<E> next, Node<E> prev) { - this.item = item; - this.next = next; - this.prev = prev; - } - - /** - * Gets this node's item. - * - * @return This node's item. - */ - public E item() { - return item; - } - - /** - * Sets unlinked flag. - */ - void unlink() { - assert !unlinked; - - unlinked = true; - } - - /** - * Checks if node is unlinked. - * - * @return {@code True} if node is unlinked. - */ - public boolean unlinked() { - return unlinked; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(Node.class, this); - } - } - - /** - * Iterator. - */ - private class QueueIterator implements Iterator<E> { - /** Next element. */ - private Node<E> next; - - /** Expected modification count. */ - private int expModCnt = modCnt; - - /** - * - */ - QueueIterator() { - next = hdr.next; - } - - /** {@inheritDoc} */ - @Override public boolean hasNext() { - return next != hdr; - } - - /** {@inheritDoc} */ - @Override public E next() { - checkModCount(); - - if (next == null) - throw new NoSuchElementException(); - - E ret = next.item; - - next = next.next; - - return ret; - } - - /** {@inheritDoc} */ - @Override public void remove() { - throw new UnsupportedOperationException(); - } - - /** - * Checks modification count. - */ - private void checkModCount() { - if (modCnt != expModCnt) - throw new ConcurrentModificationException("Mod count mismatch [expected=" + expModCnt + - ", actual=" + modCnt + ']'); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/test/java/org/gridgain/grid/util/GridQueueSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/util/GridQueueSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/util/GridQueueSelfTest.java deleted file mode 100644 index 4b613f6..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/util/GridQueueSelfTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.util; - -import org.gridgain.grid.util.typedef.*; -import org.gridgain.testframework.junits.common.*; - -/** - * Grid utils tests. - */ -@GridCommonTest(group = "Utils") -public class GridQueueSelfTest extends GridCommonAbstractTest { - /** - * - */ - public void testQueue() { - GridQueue<String> q = new GridQueue<>(); - for (char c = 'a'; c <= 'z'; c++) - q.offer(Character.toString(c)); - - assertEquals('z' - 'a' + 1, q.size()); - - char ch = 'a'; - - for (String c = q.poll(); c != null; c = q.poll()) { - X.println(c); - - assertEquals(Character.toString(ch++), c); - } - - assert q.isEmpty(); - - for (char c = 'A'; c <= 'Z'; c++) - q.offer(Character.toString(c)); - - assertEquals('Z' - 'A' + 1, q.size()); - - ch = 'A'; - - for (String s : q) { - X.println(s); - - assertEquals(Character.toString(ch++), s); - } - - q.remove("O"); - - assertEquals('Z' - 'A', q.size()); - - for (String c = q.poll(); c != null; c = q.poll()) - assert !"O".equals(c); - - assert q.isEmpty(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java index 7307676..7ee84f8 100644 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java @@ -16,13 +16,14 @@ import org.apache.ignite.marshaller.*; import org.apache.ignite.resources.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.indexing.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.query.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.cache.query.*; import org.gridgain.grid.kernal.processors.query.*; import org.gridgain.grid.kernal.processors.query.h2.opt.*; +import org.gridgain.grid.kernal.processors.query.h2.twostep.*; import org.gridgain.grid.util.*; import org.gridgain.grid.util.lang.*; import org.gridgain.grid.util.offheap.unsafe.*; @@ -112,9 +113,6 @@ public class GridH2Indexing implements GridQueryIndexing { } /** */ - private static final ThreadLocal<GridH2Indexing> localSpi = new ThreadLocal<>(); - - /** */ private volatile String cachedSearchPathCmd; /** Cache for deserialized offheap rows. */ @@ -148,6 +146,12 @@ public class GridH2Indexing implements GridQueryIndexing { private final Collection<Connection> conns = Collections.synchronizedCollection(new ArrayList<Connection>()); /** */ + private GridMapQueryExecutor mapQryExec; + + /** */ + private GridReduceQueryExecutor rdcQryExec; + + /** */ private final ThreadLocal<ConnectionWrapper> connCache = new ThreadLocal<ConnectionWrapper>() { @Nullable @Override public ConnectionWrapper get() { ConnectionWrapper c = super.get(); @@ -218,6 +222,15 @@ public class GridH2Indexing implements GridQueryIndexing { private volatile GridKernalContext ctx; /** + * @param space Space. + * @return Connection. + * @throws IgniteCheckedException If failed. + */ + public Connection connectionForSpace(@Nullable String space) throws IgniteCheckedException { + return connectionForThread(schema(space)); + } + + /** * Gets DB connection. * * @param schema Whether to set schema for connection or not. @@ -370,22 +383,15 @@ public class GridH2Indexing implements GridQueryIndexing { if (tbl == null) return; // Type was rejected. - localSpi.set(this); + removeKey(spaceName, k, tbl); - try { - removeKey(spaceName, k, tbl); - - if (expirationTime == 0) - expirationTime = Long.MAX_VALUE; + if (expirationTime == 0) + expirationTime = Long.MAX_VALUE; - tbl.tbl.update(k, v, expirationTime); + tbl.tbl.update(k, v, expirationTime); - if (tbl.luceneIdx != null) - tbl.luceneIdx.store(k, v, ver, expirationTime); - } - finally { - localSpi.remove(); - } + if (tbl.luceneIdx != null) + tbl.luceneIdx.store(k, v, ver, expirationTime); } /** {@inheritDoc} */ @@ -393,23 +399,16 @@ public class GridH2Indexing implements GridQueryIndexing { if (log.isDebugEnabled()) log.debug("Removing key from cache query index [locId=" + nodeId + ", key=" + key + ']'); - localSpi.set(this); + for (TableDescriptor tbl : tables(schema(spaceName))) { + if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) { + if (tbl.tbl.update(key, null, 0)) { + if (tbl.luceneIdx != null) + tbl.luceneIdx.remove(key); - try { - for (TableDescriptor tbl : tables(schema(spaceName))) { - if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) { - if (tbl.tbl.update(key, null, 0)) { - if (tbl.luceneIdx != null) - tbl.luceneIdx.remove(key); - - return; - } + return; } } } - finally { - localSpi.remove(); - } } /** {@inheritDoc} */ @@ -419,47 +418,33 @@ public class GridH2Indexing implements GridQueryIndexing { if (schema == null) return; - localSpi.set(this); - - try { - for (TableDescriptor tbl : schema.values()) { - if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) { - try { - if (tbl.tbl.onSwap(key)) - return; - } - catch (IgniteCheckedException e) { - throw new IgniteCheckedException(e); - } + for (TableDescriptor tbl : schema.values()) { + if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) { + try { + if (tbl.tbl.onSwap(key)) + return; + } + catch (IgniteCheckedException e) { + throw new IgniteCheckedException(e); } } } - finally { - localSpi.remove(); - } } /** {@inheritDoc} */ @Override public void onUnswap(@Nullable String spaceName, Object key, Object val, byte[] valBytes) throws IgniteCheckedException { - localSpi.set(this); - - try { - for (TableDescriptor tbl : tables(schema(spaceName))) { - if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) { - try { - if (tbl.tbl.onUnswap(key, val)) - return; - } - catch (IgniteCheckedException e) { - throw new IgniteCheckedException(e); - } + for (TableDescriptor tbl : tables(schema(spaceName))) { + if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) { + try { + if (tbl.tbl.onUnswap(key, val)) + return; + } + catch (IgniteCheckedException e) { + throw new IgniteCheckedException(e); } } } - finally { - localSpi.remove(); - } } /** @@ -540,8 +525,6 @@ public class GridH2Indexing implements GridQueryIndexing { @Override public <K, V> GridQueryFieldsResult queryFields(@Nullable final String spaceName, final String qry, @Nullable final Collection<Object> params, final GridIndexingQueryFilter filters) throws IgniteCheckedException { - localSpi.set(this); - setFilters(filters); try { @@ -575,8 +558,6 @@ public class GridH2Indexing implements GridQueryIndexing { } finally { setFilters(null); - - localSpi.remove(); } } @@ -652,7 +633,7 @@ public class GridH2Indexing implements GridQueryIndexing { * @return Result. * @throws IgniteCheckedException If failed. */ - private ResultSet executeSqlQueryWithTimer(Connection conn, String sql, + public ResultSet executeSqlQueryWithTimer(Connection conn, String sql, @Nullable Collection<Object> params) throws IgniteCheckedException { long start = U.currentTimeMillis(); @@ -719,7 +700,7 @@ public class GridH2Indexing implements GridQueryIndexing { * @param params Parameters collection. * @throws IgniteCheckedException If failed. */ - private void bindParameters(PreparedStatement stmt, @Nullable Collection<Object> params) throws IgniteCheckedException { + public void bindParameters(PreparedStatement stmt, @Nullable Collection<Object> params) throws IgniteCheckedException { if (!F.isEmpty(params)) { int idx = 1; @@ -751,8 +732,6 @@ public class GridH2Indexing implements GridQueryIndexing { setFilters(filters); - localSpi.set(this); - try { ResultSet rs = executeQuery(qry, params, tbl); @@ -760,11 +739,14 @@ public class GridH2Indexing implements GridQueryIndexing { } finally { setFilters(null); - - localSpi.remove(); } } + /** {@inheritDoc} */ + @Override public IgniteFuture<GridCacheSqlResult> queryTwoStep(GridCacheTwoStepQuery qry) { + return rdcQryExec.query(qry); + } + /** * Sets filters for current thread. Must be set to not null value * before executeQuery and reset to null after in finally block since it signals @@ -772,7 +754,7 @@ public class GridH2Indexing implements GridQueryIndexing { * * @param filters Filters. */ - private void setFilters(@Nullable GridIndexingQueryFilter filters) { + public void setFilters(@Nullable GridIndexingQueryFilter filters) { GridH2IndexBase.setFiltersForThread(filters); } @@ -1115,6 +1097,12 @@ public class GridH2Indexing implements GridQueryIndexing { for (GridCacheConfiguration cacheCfg : ctx.config().getCacheConfiguration()) registerSpace(cacheCfg.getName()); + + mapQryExec = new GridMapQueryExecutor(); + rdcQryExec = new GridReduceQueryExecutor(); + + mapQryExec.start(ctx, this); + rdcQryExec.start(ctx, this); } System.setProperty("h2.serializeJavaObject", "false"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java index 61f1190..90aa454 100644 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java @@ -10,6 +10,7 @@ package org.gridgain.grid.kernal.processors.query.h2; import org.apache.ignite.*; +import org.gridgain.grid.kernal.processors.cache.query.*; import org.gridgain.grid.util.*; import org.gridgain.grid.util.typedef.internal.*; @@ -20,7 +21,7 @@ import java.util.*; /** * Iterator over result set. */ -abstract class GridH2ResultSetIterator<T> extends GridCloseableIteratorAdapter<T> { +public abstract class GridH2ResultSetIterator<T> extends GridCloseableIteratorAdapter<T> { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java index 00cb06d..4dcfd73 100644 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java @@ -49,7 +49,10 @@ public abstract class GridH2IndexBase extends BaseIndex { * @param fs Filters. */ public static void setFiltersForThread(GridIndexingQueryFilter fs) { - filters.set(fs); + if (fs == null) + filters.remove(); + else + filters.set(fs); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java new file mode 100644 index 0000000..b86caf1 --- /dev/null +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -0,0 +1,263 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.query.h2.twostep; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.indexing.*; +import org.gridgain.grid.kernal.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.cache.query.*; +import org.gridgain.grid.kernal.processors.query.h2.*; +import org.gridgain.grid.kernal.processors.query.h2.twostep.messages.*; +import org.gridgain.grid.util.typedef.*; +import org.h2.jdbc.*; +import org.h2.result.*; +import org.h2.value.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.lang.reflect.*; +import java.sql.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Map query executor. + */ +public class GridMapQueryExecutor { + /** */ + private static final Field RESULT_FIELD; + + /** + * Initialize. + */ + static { + try { + RESULT_FIELD = JdbcResultSet.class.getDeclaredField("result"); + + RESULT_FIELD.setAccessible(true); + } + catch (NoSuchFieldException e) { + throw new IllegalStateException("Check H2 version in classpath.", e); + } + } + + /** */ + private IgniteLogger log; + + /** */ + private GridKernalContext ctx; + + /** */ + private GridH2Indexing h2; + + /** */ + private ConcurrentMap<UUID, ConcurrentMap<Long, QueryResults>> qryRess = new ConcurrentHashMap8<>(); + + /** + * @param ctx Context. + * @param h2 H2 Indexing. + * @throws IgniteCheckedException If failed. + */ + public void start(final GridKernalContext ctx, GridH2Indexing h2) throws IgniteCheckedException { + this.ctx = ctx; + this.h2 = h2; + + log = ctx.log(GridMapQueryExecutor.class); + + // TODO handle node failures. + + ctx.io().addUserMessageListener(GridTopic.TOPIC_QUERY, new IgniteBiPredicate<UUID, Object>() { + @Override public boolean apply(UUID nodeId, Object msg) { + assert msg != null; + + ClusterNode node = ctx.discovery().node(nodeId); + + if (msg instanceof GridQueryRequest) + executeLocalQuery(node, (GridQueryRequest)msg); + else if (msg instanceof GridNextPageRequest) + sendNextPage(node, (GridNextPageRequest)msg); + + return true; + } + }); + } + + /** + * @param node Node. + * @param req Query request. + */ + private void executeLocalQuery(ClusterNode node, GridQueryRequest req) { + h2.setFilters(new GridIndexingQueryFilter() { + @Nullable @Override public <K, V> IgniteBiPredicate<K, V> forSpace(String spaceName) { + final GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(spaceName); + + if (cache.context().isReplicated() || cache.configuration().getBackups() == 0) + return null; + + return new IgniteBiPredicate<K, V>() { + @Override public boolean apply(K k, V v) { + return cache.context().affinity().primary(ctx.discovery().localNode(), k, -1); + } + }; + } + }); + + try { + QueryResults qr = new QueryResults(req.requestId(), req.queries().size()); + + ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(node.id()); + + if (nodeRess == null) { + nodeRess = new ConcurrentHashMap8<>(); + + ConcurrentMap<Long, QueryResults> old = qryRess.putIfAbsent(node.id(), nodeRess); + + if (old != null) + nodeRess = old; + } + + QueryResults old = nodeRess.putIfAbsent(req.requestId(), qr); + + assert old == null; + + // Prepare snapshots for all the needed tables before actual run. + for (GridCacheSqlQuery qry : req.queries()) { + // TODO + } + + // Run queries. + int i = 0; + + for (GridCacheSqlQuery qry : req.queries()) { + ResultSet rs = h2.executeSqlQueryWithTimer(h2.connectionForSpace(null), qry.query(), + F.asList(qry.parameters())); + + assert rs instanceof JdbcResultSet : rs.getClass(); + + ResultInterface res = (ResultInterface)RESULT_FIELD.get(rs); + + qr.results[i] = res; + qr.resultSets[i] = rs; + + // Send the first page. + sendNextPage(node, qr, i, req.pageSize(), res.getRowCount()); + + i++; + } + } + catch (Throwable e) { + sendError(node, req.requestId(), e); + } + finally { + h2.setFilters(null); + } + } + + /** + * @param node Node. + * @param qryReqId Query request ID. + * @param err Error. + */ + private void sendError(ClusterNode node, long qryReqId, Throwable err) { + try { + ctx.io().sendUserMessage(F.asList(node), new GridQueryFailResponse(qryReqId, err)); + } + catch (IgniteCheckedException e) { + e.addSuppressed(err); + + log.error("Failed to send error message.", e); + } + } + + /** + * @param node Node. + * @param req Request. + */ + private void sendNextPage(ClusterNode node, GridNextPageRequest req) { + ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(node.id()); + + QueryResults qr = nodeRess == null ? null : nodeRess.get(req.queryRequestId()); + + if (qr == null) + sendError(node, req.queryRequestId(), + new IllegalStateException("No query result found for request: " + req)); + else + sendNextPage(node, qr, req.query(), req.pageSize(), -1); + } + + /** + * @param node Node. + * @param qr Query results. + * @param qry Query. + * @param pageSize Page size. + * @param allRows All rows count. + */ + private void sendNextPage(ClusterNode node, QueryResults qr, int qry, int pageSize, int allRows) { + int page; + + List<Value[]> rows = new ArrayList<>(Math.min(64, pageSize)); + + ResultInterface res = qr.results[qry]; + + assert res != null; + + synchronized (res) { + page = qr.pages[qry]++; + + for (int i = 0 ; i < pageSize; i++) { + if (!res.next()) + break; + + rows.add(res.currentRow()); + } + } + + try { + ctx.io().sendUserMessage(F.asList(node), new GridNextPageResponse(qr.qryReqId, qry, page, allRows, rows)); + } + catch (IgniteCheckedException e) { + log.error("Failed to send message.", e); + + throw new IgniteException(e); + } + } + + /** + * + */ + private static class QueryResults { + /** */ + private long qryReqId; + + /** */ + private ResultInterface[] results; + + /** */ + private ResultSet[] resultSets; + + /** */ + private int[] pages; + + /** + * @param qryReqId Query request ID. + * @param qrys Queries. + */ + private QueryResults(long qryReqId, int qrys) { + this.qryReqId = qryReqId; + + results = new ResultInterface[qrys]; + resultSets = new ResultSet[qrys]; + pages = new int[qrys]; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java index 163256b..16ba15d 100644 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java @@ -9,7 +9,7 @@ package org.gridgain.grid.kernal.processors.query.h2.twostep; -import org.gridgain.grid.*; +import org.apache.ignite.*; import org.h2.engine.*; import org.h2.index.*; import org.h2.message.*; @@ -18,31 +18,23 @@ import org.h2.table.*; import org.jetbrains.annotations.*; import java.util.*; -import java.util.concurrent.*; import java.util.concurrent.atomic.*; /** * Merge index. */ -public class GridMergeIndex extends BaseIndex { +public abstract class GridMergeIndex extends BaseIndex { /** */ - private static final int MAX_CAPACITY = 100_000; - - /** */ - private static final Collection<Row> END = new ArrayList<>(0); - - /** */ - private Collection<Collection<Row>> fetchedRows = new LinkedBlockingQueue<>(); - - /** */ - private BlockingQueue<Collection<Row>> cursorRows = new LinkedBlockingQueue<>(); - - /** */ - private int fetchedCnt; + private static final int MAX_FETCH_SIZE = 100000; /** */ private final AtomicInteger cnt = new AtomicInteger(0); + /** + * Will be r/w from query execution thread only, does not need to be threadsafe. + */ + private ArrayList<Row> fetched = new ArrayList<>(); + /** {@inheritDoc} */ @Override public long getRowCount(Session session) { return cnt.get(); @@ -61,12 +53,36 @@ public class GridMergeIndex extends BaseIndex { } /** - * @param rows0 Rows. + * @param page Page. */ - public void addRows(Collection<Row> rows0) { - assert !rows0.isEmpty(); + public abstract void addPage(GridResultPage<?> page); + + /** {@inheritDoc} */ + @Override public Cursor find(Session session, SearchRow first, SearchRow last) { + if (fetched == null) + throw new IgniteException("Fetched result set was too large."); + if (fetched.size() == cnt.get()) // We've fetched all the rows. + return findAllFetched(fetched, first, last); + return findInStream(first, last); + } + + /** + * @param first First row. + * @param last Last row. + * @return Cursor. Usually it must be {@link FetchingCursor} instance. + */ + protected abstract Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last); + + /** + * @param fetched Fetched rows. + * @param first First row. + * @param last Last row. + * @return Cursor. + */ + protected Cursor findAllFetched(List<Row> fetched, @Nullable SearchRow first, @Nullable SearchRow last) { + return new IteratorCursor(fetched.iterator()); } /** {@inheritDoc} */ @@ -90,14 +106,6 @@ public class GridMergeIndex extends BaseIndex { } /** {@inheritDoc} */ - @Override public Cursor find(Session session, SearchRow first, SearchRow last) { - if (fetchedRows == null) - throw new GridRuntimeException("Rows were dropped out of result set."); - - return new Cursor0(); - } - - /** {@inheritDoc} */ @Override public double getCost(Session session, int[] masks, TableFilter filter, SortOrder sortOrder) { return getRowCountApproximation() + Constants.COST_ROW_OFFSET; } @@ -132,12 +140,24 @@ public class GridMergeIndex extends BaseIndex { return 0; } - private class Cursor0 implements Cursor { + /** + * Cursor over iterator. + */ + protected class IteratorCursor implements Cursor { /** */ - private Row cur; + private Iterator<Row> iter; /** */ - private Iterator<Row> curIter; + protected Row cur; + + /** + * @param iter Iterator. + */ + public IteratorCursor(Iterator<Row> iter) { + assert iter != null; + + this.iter = iter; + } /** {@inheritDoc} */ @Override public Row get() { @@ -151,7 +171,9 @@ public class GridMergeIndex extends BaseIndex { /** {@inheritDoc} */ @Override public boolean next() { - return false; + cur = iter.hasNext() ? iter.next() : null; + + return cur != null; } /** {@inheritDoc} */ @@ -159,4 +181,51 @@ public class GridMergeIndex extends BaseIndex { throw DbException.getUnsupportedException("previous"); } } + + /** + * Fetching cursor. + */ + protected abstract class FetchingCursor extends IteratorCursor { + /** */ + private boolean canFetch = true; + + /** + */ + public FetchingCursor() { + super(fetched == null ? Collections.<Row>emptyIterator() : fetched.iterator()); + } + + /** + * @return Next row or {@code null} if none available. + */ + @Nullable protected abstract Row fetchNext(); + + /** {@inheritDoc} */ + @Override public boolean next() { + if (super.next()) + return true; + + if (!canFetch) + return false; + + cur = fetchNext(); + + if (cur == null) { // No more results to fetch. + assert fetched == null || fetched.size() == cnt.get() : fetched.size() + " <> " + cnt.get(); + + canFetch = false; + + return false; + } + + if (fetched != null) { // Try to reuse fetched result. + fetched.add(cur); + + if (fetched.size() == MAX_FETCH_SIZE) + fetched = null; // Throw away fetched result if it is too large. + } + + return true; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java new file mode 100644 index 0000000..c6aaea9 --- /dev/null +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java @@ -0,0 +1,74 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.query.h2.twostep; + +import org.apache.ignite.*; +import org.h2.index.*; +import org.h2.result.*; +import org.h2.value.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * Unsorted merge index. + */ +public class GridMergeIndexUnsorted extends GridMergeIndex { + /** */ + private final BlockingQueue<GridResultPage<?>> queue = new LinkedBlockingQueue<>(); + + /** {@inheritDoc} */ + @Override public void addPage(GridResultPage<?> page) { + queue.add(page); + } + + /** {@inheritDoc} */ + @Override protected Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last) { + final GridResultPage<?> p = queue.poll(); + + assert p != null; // First page must be already fetched. + + if (p.isEmpty()) + return new IteratorCursor(Collections.<Row>emptyIterator()); + + p.fetchNextPage(); // We always request next page before reading this one. + + return new FetchingCursor() { + /** */ + Iterator<Value[]> iter = p.rows().iterator(); + + @Nullable @Override protected Row fetchNext() { + if (!iter.hasNext()) { + GridResultPage<?> page; + + try { + page = queue.take(); + } + catch (InterruptedException e) { + throw new IgniteException("Query execution was interrupted.", e); + } + + if (page.isEmpty()) { + assert queue.isEmpty() : "It must be the last page."; + + return null; // Empty page - we are done. + } + + page.fetchNextPage(); + + iter = page.rows().iterator(); + } + + return new Row(iter.next(), 0); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java index 3f059fa..a1f2213 100644 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java @@ -9,6 +9,7 @@ package org.gridgain.grid.kernal.processors.query.h2.twostep; +import org.h2.api.*; import org.h2.command.ddl.*; import org.h2.engine.*; import org.h2.index.*; @@ -26,7 +27,7 @@ public class GridMergeTable extends TableBase { private final ArrayList<Index> idxs = new ArrayList<>(1); /** */ - private final GridMergeIndex idx = new GridMergeIndex(); + private final GridMergeIndex idx = new GridMergeIndexUnsorted(); /** * @param data Data. @@ -142,4 +143,34 @@ public class GridMergeTable extends TableBase { @Override public void checkRename() { throw DbException.getUnsupportedException("rename"); } + + /** + * Engine. + */ + public static class Engine implements TableEngine { + /** */ + private static ThreadLocal<GridMergeTable> createdTbl = new ThreadLocal<>(); + + /** + * @return Created table. + */ + public static GridMergeTable getCreated() { + GridMergeTable tbl = createdTbl.get(); + + assert tbl != null; + + createdTbl.remove(); + + return tbl; + } + + /** {@inheritDoc} */ + @Override public Table createTable(CreateTableData data) { + GridMergeTable tbl = new GridMergeTable(data); + + createdTbl.set(tbl); + + return tbl; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java deleted file mode 100644 index d550b3b..0000000 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java +++ /dev/null @@ -1,54 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.query.h2.twostep; - -import org.gridgain.grid.util.direct.*; - -import java.nio.*; - -/** - * Request to fetch next page. - */ -public class GridNextPageRequest extends GridTcpCommunicationMessageAdapter { - /** */ - private long reqId; - - /** */ - private long qryId; - - /** */ - private int qry; - - /** */ - private int offset; - - /** */ - private int pageSize; - - @Override public boolean writeTo(ByteBuffer buf) { - return false; - } - - @Override public boolean readFrom(ByteBuffer buf) { - return false; - } - - @Override public byte directType() { - return 0; - } - - @Override public GridTcpCommunicationMessageAdapter clone() { - return null; - } - - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java deleted file mode 100644 index d77215c..0000000 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java +++ /dev/null @@ -1,47 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.query.h2.twostep; - -import org.gridgain.grid.util.direct.*; -import org.h2.value.*; - -import java.nio.*; -import java.util.*; - -/** - * TODO write doc - */ -public class GridNextPageResponse extends GridTcpCommunicationMessageAdapter { - /** */ - private long reqId; - - /** */ - private Collection<Value[]> rows; - - @Override public boolean writeTo(ByteBuffer buf) { - return false; - } - - @Override public boolean readFrom(ByteBuffer buf) { - return false; - } - - @Override public byte directType() { - return 0; - } - - @Override public GridTcpCommunicationMessageAdapter clone() { - return null; - } - - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java deleted file mode 100644 index 10e30ee..0000000 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java +++ /dev/null @@ -1,42 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.query.h2.twostep; - -import org.gridgain.grid.util.direct.*; - -import java.nio.*; - -/** - * TODO write doc - */ -public class GridQueryAck extends GridTcpCommunicationMessageAdapter { - /** */ - private long reqId; - - @Override public boolean writeTo(ByteBuffer buf) { - return false; - } - - @Override public boolean readFrom(ByteBuffer buf) { - return false; - } - - @Override public byte directType() { - return 0; - } - - @Override public GridTcpCommunicationMessageAdapter clone() { - return null; - } - - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java deleted file mode 100644 index 7a664fd..0000000 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java +++ /dev/null @@ -1,50 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.query.h2.twostep; - -import org.gridgain.grid.util.direct.*; - -import java.nio.*; -import java.util.*; - -/** - * TODO write doc - */ -public class GridQueryRequest extends GridTcpCommunicationMessageAdapter { - /** */ - private long reqId; - - /** */ - private List<String> sqlQrys; - - /** */ - private List<Collection<Object>> params; - - - @Override public boolean writeTo(ByteBuffer buf) { - return false; - } - - @Override public boolean readFrom(ByteBuffer buf) { - return false; - } - - @Override public byte directType() { - return 0; - } - - @Override public GridTcpCommunicationMessageAdapter clone() { - return null; - } - - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java new file mode 100644 index 0000000..3e7e12c --- /dev/null +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -0,0 +1,199 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.query.h2.twostep; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.kernal.*; +import org.gridgain.grid.kernal.managers.communication.*; +import org.gridgain.grid.kernal.processors.cache.query.*; +import org.gridgain.grid.kernal.processors.query.h2.*; +import org.gridgain.grid.kernal.processors.query.h2.twostep.messages.*; +import org.gridgain.grid.util.future.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jdk8.backport.*; + +import java.sql.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Reduce query executor. + */ +public class GridReduceQueryExecutor { + /** */ + private GridKernalContext ctx; + + /** */ + private GridH2Indexing h2; + + /** */ + private IgniteLogger log; + + /** */ + private final AtomicLong reqIdGen = new AtomicLong(); + + /** */ + private final ConcurrentMap<Long, QueryRun> runs = new ConcurrentHashMap8<>(); + + /** + * @param ctx Context. + * @param h2 H2 Indexing. + * @throws IgniteCheckedException If failed. + */ + public void start(final GridKernalContext ctx, GridH2Indexing h2) throws IgniteCheckedException { + this.ctx = ctx; + this.h2 = h2; + + log = ctx.log(GridReduceQueryExecutor.class); + + // TODO handle node failure. + + ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + assert msg != null; + + ClusterNode node = ctx.discovery().node(nodeId); + + if (msg instanceof GridNextPageResponse) + onNextPage(node, (GridNextPageResponse)msg); + else if (msg instanceof GridQueryFailResponse) + onFail(node, (GridQueryFailResponse)msg); + } + }); + } + + private void onFail(ClusterNode node, GridQueryFailResponse msg) { + U.error(log, "Failed to execute query.", msg.error()); + } + + private void onNextPage(final ClusterNode node, GridNextPageResponse msg) { + final long qryReqId = msg.queryRequestId(); + final int qry = msg.query(); + final int pageSize = msg.rows().size(); + + QueryRun r = runs.get(qryReqId); + + GridMergeIndex idx = r.tbls.get(msg.query()).getScanIndex(null); + + idx.addPage(new GridResultPage<Object>(node.id(), msg.query(), msg.rows()) { + @Override public void fetchNextPage() { + try { + ctx.io().sendUserMessage(F.asList(node), new GridNextPageRequest(qryReqId, qry, pageSize)); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }); + + if (msg.allRows() != -1) { // Only the first page contains row count. + idx.addCount(msg.allRows()); + + r.latch.countDown(); + } + } + + public IgniteFuture<GridCacheSqlResult> query(GridCacheTwoStepQuery qry) { + long qryReqId = reqIdGen.incrementAndGet(); + + QueryRun r = new QueryRun(); + + r.tbls = new ArrayList<>(); + + try { + r.conn = h2.connectionForSpace(null); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + + for (GridCacheSqlQuery mapQry : qry.mapQueries()) + r.tbls.add(createTable(r.conn, mapQry)); + + Collection<ClusterNode> nodes = ctx.grid().cluster().nodes(); // TODO filter nodes somehow? + + r.latch = new CountDownLatch(r.tbls.size() * nodes.size()); + + this.runs.put(qryReqId, r); + + try { + ctx.io().sendUserMessage(nodes, new GridQueryRequest(qryReqId, 1000, qry.mapQueries())); // TODO conf page size + + r.latch.await(); + + GridCacheSqlQuery rdc = qry.reduceQuery(); + + final ResultSet res = h2.executeSqlQueryWithTimer(r.conn, rdc.query(), F.asList(rdc.parameters())); + + return new GridFinishedFuture(ctx, new Iter(res)); + } + catch (IgniteCheckedException | InterruptedException e) { + return new GridFinishedFuture<>(ctx, e); + } + } + + private GridMergeTable createTable(Connection conn, GridCacheSqlQuery qry) { + try { + try (PreparedStatement s = conn.prepareStatement( + "CREATE LOCAL TEMPORARY TABLE " + qry.alias() + + " ENGINE \"" + GridMergeTable.Engine.class.getName() + "\" " + + " AS SELECT * FROM (" + qry.query() + ") WHERE FALSE")) { + h2.bindParameters(s, F.asList(qry.parameters())); + + s.execute(); + } + + return GridMergeTable.Engine.getCreated(); + } + catch (SQLException|IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** + * + */ + private static class QueryRun { + /** */ + private List<GridMergeTable> tbls; + + /** */ + private CountDownLatch latch; + + /** */ + private Connection conn; + } + + /** + * + */ + private static class Iter extends GridH2ResultSetIterator<List<?>> implements GridCacheSqlResult { + /** + * @param data Data array. + * @throws IgniteCheckedException If failed. + */ + protected Iter(ResultSet data) throws IgniteCheckedException { + super(data); + } + + /** {@inheritDoc} */ + @Override protected List<?> createRow() { + ArrayList<Object> res = new ArrayList<>(row.length); + + Collections.addAll(res, row); + + return res; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java new file mode 100644 index 0000000..6c6ab6a --- /dev/null +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java @@ -0,0 +1,76 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.query.h2.twostep; + +import org.h2.result.*; +import org.h2.value.*; + +import java.util.*; + +/** + * Page result. + */ +public abstract class GridResultPage<S> { + /** */ + private final S src; + + /** */ + private final Collection<Value[]> rows; + + /** */ + private final int page; + + /** + * @param src Source. + * @param page Page. + * @param rows Page rows. + */ + protected GridResultPage(S src, int page, Collection<Value[]> rows) { + assert src != null; + assert rows != null; + + this.src = src; + this.page = page; + this.rows = rows; + } + + /** + * @return Result source. + */ + public S source() { + return src; + } + + /** + * @return Page. + */ + public int page() { + return page; + } + + /** + * @return {@code true} If result is empty. + */ + public boolean isEmpty() { + return rows.isEmpty(); + } + + /** + * @return Page rows. + */ + public Collection<Value[]> rows() { + return rows; + } + + /** + * Request next page. + */ + public abstract void fetchNextPage(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageRequest.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageRequest.java new file mode 100644 index 0000000..e1eb905 --- /dev/null +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageRequest.java @@ -0,0 +1,59 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.query.h2.twostep.messages; + + +import java.io.*; + +/** + * Request to fetch next page. + */ +public class GridNextPageRequest implements Serializable { + /** */ + private long qryReqId; + + /** */ + private int qry; + + /** */ + private int pageSize; + + /** + * @param qryReqId Query request ID. + * @param qry Query. + * @param pageSize Page size. + */ + public GridNextPageRequest(long qryReqId, int qry, int pageSize) { + this.qryReqId = qryReqId; + this.qry = qry; + this.pageSize = pageSize; + } + + /** + * @return Query request ID. + */ + public long queryRequestId() { + return qryReqId; + } + + /** + * @return Query. + */ + public int query() { + return qry; + } + + /** + * @return Page size. + */ + public int pageSize() { + return pageSize; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java new file mode 100644 index 0000000..de38172 --- /dev/null +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java @@ -0,0 +1,149 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.query.h2.twostep.messages; + +import org.h2.store.*; +import org.h2.value.*; + +import java.io.*; +import java.util.*; + +/** + * Next page response. + */ +public class GridNextPageResponse implements Externalizable { + /** */ + private long qryReqId; + + /** */ + private int qry; + + /** */ + private int page; + + /** */ + private int allRows; + + /** */ + private Collection<Value[]> rows; + + /** + * @param qryReqId Query request ID. + * @param qry Query. + * @param page Page. + * @param allRows All rows count. + * @param rows Rows. + */ + public GridNextPageResponse(long qryReqId, int qry, int page, int allRows, Collection<Value[]> rows) { + assert rows != null; + + this.qryReqId = qryReqId; + this.qry = qry; + this.page = page; + this.allRows = allRows; + this.rows = rows; + } + + /** + * @return Query request ID. + */ + public long queryRequestId() { + return qryReqId; + } + + /** + * @return Query. + */ + public int query() { + return qry; + } + + /** + * @return Page. + */ + public int page() { + return page; + } + + /** + * @return All rows. + */ + public int allRows() { + return allRows; + } + + /** + * @return Rows. + */ + public Collection<Value[]> rows() { + return rows; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(qryReqId); + out.writeInt(qry); + out.writeInt(page); + out.writeInt(allRows); + + out.writeInt(rows.size()); + + if (rows.isEmpty()) + return; + + Data data = Data.create(null, 512); + + boolean first = true; + + for (Value[] row : rows) { + if (first) { + out.writeInt(row.length); + + first = false; + } + + for (Value val : row) + data.writeValue(val); + } + + out.writeInt(data.length()); + out.write(data.getBytes(), 0, data.length()); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + qryReqId = in.readLong(); + qry = in.readInt(); + page = in.readInt(); + allRows = in.readInt(); + + int rowCnt = in.readInt(); + + if (rowCnt == 0) + rows = Collections.emptyList(); + else { + rows = new ArrayList<>(rowCnt); + + int cols = in.readInt(); + int dataSize = in.readInt(); + + Data data = Data.create(null, dataSize); + + for (int r = 0; r < rowCnt; r++) { + Value[] row = new Value[cols]; + + for (int c = 0; c < cols; c++) + row[c] = data.readValue(); + + rows.add(row); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryAck.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryAck.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryAck.java new file mode 100644 index 0000000..fe55114 --- /dev/null +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryAck.java @@ -0,0 +1,34 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.query.h2.twostep.messages; + +import java.io.*; + +/** + * TODO write doc + */ +public class GridQueryAck implements Serializable { + /** */ + private long reqId; + + /** + * @param reqId Request ID. + */ + public GridQueryAck(long reqId) { + this.reqId = reqId; + } + + /** + * @return Request ID. + */ + public long requestId() { + return reqId; + } +}