Implemented.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/64ba13b0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/64ba13b0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/64ba13b0 Branch: refs/heads/master Commit: 64ba13b0a3be6acbf7d629029b460a39c2e2b388 Parents: 4eac51c Author: AMRepo <[email protected]> Authored: Mon Feb 20 21:24:29 2017 +0300 Committer: Andrey V. Mashenkov <[email protected]> Committed: Tue Feb 21 11:52:40 2017 +0300 ---------------------------------------------------------------------- .../configuration/CacheConfiguration.java | 48 ++++ .../processors/cache/GridCacheProcessor.java | 3 + .../processors/cache/IgniteCacheProxy.java | 6 +- .../closure/GridClosureProcessor.java | 2 +- .../processors/query/GridQueryIndexing.java | 27 +- .../processors/query/GridQueryProcessor.java | 141 +++------- .../messages/GridQueryNextPageRequest.java | 29 +- .../messages/GridQueryNextPageResponse.java | 29 +- .../cache/query/GridCacheTwoStepQuery.java | 17 ++ .../processors/query/h2/IgniteH2Indexing.java | 235 ++++++++++++++-- .../query/h2/opt/DistributedJoinMode.java | 51 ++++ .../query/h2/opt/GridH2IndexBase.java | 264 +++++++++++++----- .../query/h2/opt/GridH2QueryContext.java | 84 ++++-- .../query/h2/opt/GridH2TreeIndex.java | 232 ++++++++++++---- .../query/h2/twostep/GridMapQueryExecutor.java | 227 +++++++++++---- .../query/h2/twostep/GridMergeIndex.java | 39 ++- .../h2/twostep/GridReduceQueryExecutor.java | 69 +++-- .../h2/twostep/msg/GridH2IndexRangeRequest.java | 60 +++- .../twostep/msg/GridH2IndexRangeResponse.java | 62 ++++- .../h2/twostep/msg/GridH2QueryRequest.java | 5 + .../query/IgniteSqlSegmentedIndexSelfTest.java | 263 ++++++++++++++++++ .../query/IgniteSqlSplitterSelfTest.java | 139 +++++++++- .../h2/GridIndexingSpiAbstractSelfTest.java | 26 +- .../FetchingQueryCursorStressTest.java | 277 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite.java | 2 + 25 files changed, 1917 insertions(+), 420 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index 0656dda..149f25a 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -223,6 +223,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** Default threshold for concurrent loading of keys from {@link CacheStore}. */ public static final int DFLT_CONCURRENT_LOAD_ALL_THRESHOLD = 5; + /** Default SQL query parallelism level */ + public static final int DFLT_SQL_QUERY_PARALLELISM_LVL = 1; + /** Cache name. */ private String name; @@ -410,6 +413,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** Query entities. */ private Collection<QueryEntity> qryEntities; + /** */ + private int qryParallelism = DFLT_SQL_QUERY_PARALLELISM_LVL; + /** Empty constructor (all values are initialized to their defaults). */ public CacheConfiguration() { /* No-op. */ @@ -462,6 +468,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { interceptor = cc.getInterceptor(); invalidate = cc.isInvalidate(); isReadThrough = cc.isReadThrough(); + qryParallelism = cc.getQueryParallelism(); isWriteThrough = cc.isWriteThrough(); storeKeepBinary = cc.isStoreKeepBinary() != null ? cc.isStoreKeepBinary() : DFLT_STORE_KEEP_BINARY; listenerConfigurations = cc.listenerConfigurations; @@ -2108,6 +2115,47 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { } /** + * Defines a hint to query execution engine on desired degree of parallelism within a single node. + * Query executor may or may not use this hint depending on estimated query costs. Query executor may define + * certain restrictions on parallelism depending on query type and/or cache type. + * <p> + * As of {@code Apache Ignite 1.9} this hint is only supported for SQL queries with the following restrictions: + * <ul> + * <li>Hint cannot be used for {@code REPLICATED} cache, exception is thrown otherwise</li> + * <li>All caches participating in query must have the same degree of parallelism, exception is thrown + * otherwise</li> + * </ul> + * These restrictions will be removed in future versions of Apache Ignite. + * <p> + * Defaults to {@code 1}. + */ + public int getQueryParallelism() { + return qryParallelism; + } + + /** + * Defines a hint to query execution engine on desired degree of parallelism within a single node. + * Query executor may or may not use this hint depending on estimated query costs. Query executor may define + * certain restrictions on parallelism depending on query type and/or cache type. + * <p> + * As of {@code Apache Ignite 1.9} this hint is only supported for SQL queries with the following restrictions: + * <ul> + * <li>Hint cannot be used for {@code REPLICATED} cache, exception is thrown otherwise</li> + * <li>All caches participating in query must have the same degree of parallelism, exception is thrown + * otherwise</li> + * </ul> + * These restrictions will be removed in future versions of Apache Ignite. + * + * @param qryParallelism Query parallelizm level. + * @return {@code this} for chaining. + */ + public CacheConfiguration<K,V> setQueryParallelism(int qryParallelism) { + this.qryParallelism = qryParallelism; + + return this; + } + + /** * Gets topology validator. * <p> * See {@link TopologyValidator} for details. http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 7093403..c3e3f3b 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -269,6 +269,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (cfg.getCacheMode() == REPLICATED) cfg.setBackups(Integer.MAX_VALUE); + if( cfg.getQueryParallelism() > 1 && cfg.getCacheMode() != PARTITIONED) + throw new IgniteCheckedException("Cache index segmentation is supported for PARTITIONED mode only."); + if (cfg.getAffinityMapper() == null) cfg.setAffinityMapper(cacheObjCtx.defaultAffMapper()); http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 1381670..f806d05 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -729,12 +729,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V final SqlQuery p = (SqlQuery)qry; if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal()) - return (QueryCursor<R>)new QueryCursorImpl<>(new Iterable<Cache.Entry<K, V>>() { - @Override public Iterator<Cache.Entry<K, V>> iterator() { - return ctx.kernalContext().query().queryLocal(ctx, p, + return (QueryCursor<R>)ctx.kernalContext().query().queryLocal(ctx, p, opCtxCall != null && opCtxCall.isKeepBinary()); - } - }); return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p); } http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java index 20fb6a0..61ed8a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -902,7 +902,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @return Future. * @throws IgniteCheckedException Thrown in case of any errors. */ - private <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, byte plc) + public <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, byte plc) throws IgniteCheckedException { if (c == null) return new GridFinishedFuture<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index ca04724..37f0ade 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -84,35 +84,26 @@ public interface GridQueryIndexing { /** * Queries individual fields (generally used by JDBC drivers). * - * @param spaceName Space name. + * @param cctx Cache context. * @param qry Query. - * @param params Query parameters. * @param filter Space name and key filter. - * @param enforceJoinOrder Enforce join order of tables in the query. - * @param timeout Query timeout in milliseconds. * @param cancel Query cancel. - * @return Query result. - * @throws IgniteCheckedException If failed. + * @return Cursor. */ - public GridQueryFieldsResult queryLocalSqlFields(@Nullable String spaceName, String qry, - Collection<Object> params, IndexingQueryFilter filter, boolean enforceJoinOrder, int timeout, - GridQueryCancel cancel) throws IgniteCheckedException; + public <K, V> QueryCursor<List<?>> queryLocalSqlFields(GridCacheContext<?, ?> cctx, SqlFieldsQuery qry, + IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException; /** * Executes regular query. * - * @param spaceName Space name. + * @param cctx Cache context. * @param qry Query. - * @param alias Table alias used in Query. - * @param params Query parameters. - * @param type Query return type. * @param filter Space name and key filter. - * @return Queried rows. - * @throws IgniteCheckedException If failed. + * @param keepBinary Keep binary flag. + * @return Cursor. */ - public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(@Nullable String spaceName, String qry, - String alias, Collection<Object> params, GridQueryTypeDescriptor type, IndexingQueryFilter filter) - throws IgniteCheckedException; + public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(GridCacheContext<?, ?> cctx, SqlQuery qry, + IndexingQueryFilter filter, boolean keepBinary) throws IgniteCheckedException; /** * Executes text query. http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index ee9224b..85744d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -754,42 +754,6 @@ public class GridQueryProcessor extends GridProcessorAdapter { INDEXING.module() + " to classpath or moving it from 'optional' to 'libs' folder)."); } - /** - * @param space Space. - * @param clause Clause. - * @param params Parameters collection. - * @param resType Result type. - * @param filters Filters. - * @return Key/value rows. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("unchecked") - public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(final String space, final String clause, - final Collection<Object> params, final String resType, final IndexingQueryFilter filters) - throws IgniteCheckedException { - checkEnabled(); - - if (!busyLock.enterBusy()) - throw new IllegalStateException("Failed to execute query (grid is stopping)."); - - try { - final GridCacheContext<?, ?> cctx = ctx.cache().internalCache(space).context(); - - return executeQuery(GridCacheQueryType.SQL_FIELDS, clause, cctx, new IgniteOutClosureX<GridCloseableIterator<IgniteBiTuple<K, V>>>() { - @Override public GridCloseableIterator<IgniteBiTuple<K, V>> applyx() throws IgniteCheckedException { - TypeDescriptor type = typesByName.get(new TypeName(space, resType)); - - if (type == null || !type.registered()) - throw new CacheException("Failed to find SQL table for type: " + resType); - - return idx.queryLocalSql(space, clause, null, params, type, filters); - } - }, false); - } - finally { - busyLock.leaveBusy(); - } - } /** * @param cctx Cache context. @@ -829,11 +793,12 @@ public class GridQueryProcessor extends GridProcessorAdapter { throw new IllegalStateException("Failed to execute query (grid is stopping)."); try { - return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx, new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() { - @Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException { - return idx.queryTwoStep(cctx, qry); - } - }, true); + return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx, + new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() { + @Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException { + return idx.queryTwoStep(cctx, qry); + } + }, true); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -849,7 +814,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param keepBinary Keep binary flag. * @return Cursor. */ - public <K, V> Iterator<Cache.Entry<K, V>> queryLocal( + public <K, V> QueryCursor<Cache.Entry<K, V>> queryLocal( final GridCacheContext<?, ?> cctx, final SqlQuery qry, final boolean keepBinary @@ -859,54 +824,25 @@ public class GridQueryProcessor extends GridProcessorAdapter { try { return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx, - new IgniteOutClosureX<Iterator<Cache.Entry<K, V>>>() { - @Override public Iterator<Cache.Entry<K, V>> applyx() throws IgniteCheckedException { - String space = cctx.name(); + new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() { + @Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException { String type = qry.getType(); - String sqlQry = qry.getSql(); - Object[] params = qry.getArgs(); - TypeDescriptor typeDesc = typesByName.get( - new TypeName( - space, + GridQueryProcessor.TypeDescriptor typeDesc = typesByName.get( + new GridQueryProcessor.TypeName( + cctx.name(), type)); if (typeDesc == null || !typeDesc.registered()) throw new CacheException("Failed to find SQL table for type: " + type); - final GridCloseableIterator<IgniteBiTuple<K, V>> i = idx.queryLocalSql( - space, - qry.getSql(), - qry.getAlias(), - F.asList(params), - typeDesc, - idx.backupFilter(requestTopVer.get(), null)); + qry.setType(typeDesc.name()); sendQueryExecutedEvent( - sqlQry, - params); - - return new ClIter<Cache.Entry<K, V>>() { - @Override public void close() throws Exception { - i.close(); - } - - @Override public boolean hasNext() { - return i.hasNext(); - } - - @Override public Cache.Entry<K, V> next() { - IgniteBiTuple<K, V> t = i.next(); - - return new CacheEntryImpl<>( - (K)cctx.unwrapBinaryIfNeeded(t.getKey(), keepBinary, false), - (V)cctx.unwrapBinaryIfNeeded(t.getValue(), keepBinary, false)); - } - - @Override public void remove() { - throw new UnsupportedOperationException(); - } - }; + qry.getSql(), + qry.getArgs()); + + return idx.queryLocalSql(cctx, qry, idx.backupFilter(requestTopVer.get(), null), keepBinary); } }, true); } @@ -994,13 +930,6 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** - * Closeable iterator. - */ - private interface ClIter<X> extends AutoCloseable, Iterator<X> { - // No-op. - } - - /** * @param cctx Cache context. * @param qry Query. * @return Iterator. @@ -1010,34 +939,26 @@ public class GridQueryProcessor extends GridProcessorAdapter { throw new IllegalStateException("Failed to execute query (grid is stopping)."); try { - final boolean keepBinary = cctx.keepBinary(); - return executeQuery(GridCacheQueryType.SQL_FIELDS, qry.getSql(), cctx, new IgniteOutClosureX<QueryCursor<List<?>>>() { @Override public QueryCursor<List<?>> applyx() throws IgniteCheckedException { - final String space = cctx.name(); - final String sql = qry.getSql(); - final Object[] args = qry.getArgs(); - final GridQueryCancel cancel = new GridQueryCancel(); + GridQueryCancel cancel = new GridQueryCancel(); - final GridQueryFieldsResult res = idx.queryLocalSqlFields(space, sql, F.asList(args), - idx.backupFilter(requestTopVer.get(), null), qry.isEnforceJoinOrder(), qry.getTimeout(), cancel); + final QueryCursor<List<?>> cursor = idx.queryLocalSqlFields(cctx, qry, + idx.backupFilter(requestTopVer.get(), null), cancel); - QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() { + return new QueryCursorImpl<List<?>>(new Iterable<List<?>>() { @Override public Iterator<List<?>> iterator() { - try { - sendQueryExecutedEvent(sql, args); - - return new GridQueryCacheObjectsIterator(res.iterator(), cctx, keepBinary); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - }, cancel); - - cursor.fieldsMeta(res.metaData()); + sendQueryExecutedEvent(qry.getSql(), qry.getArgs()); - return cursor; + return cursor.iterator(); + } + }, cancel) { + @Override public List<GridQueryFieldMetadata> fieldsMeta() { + if (cursor instanceof QueryCursorImpl) + return ((QueryCursorImpl)cursor).fieldsMeta(); + return super.fieldsMeta(); + } + }; } }, true); } http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java index 1feff5a..acea084 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.query.h2.twostep.messages; - import java.nio.ByteBuffer; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; @@ -35,6 +34,9 @@ public class GridQueryNextPageRequest implements Message { private long qryReqId; /** */ + private int segmentId; + + /** */ private int qry; /** */ @@ -50,11 +52,13 @@ public class GridQueryNextPageRequest implements Message { /** * @param qryReqId Query request ID. * @param qry Query. + * @param segmentId Index segment ID. * @param pageSize Page size. */ - public GridQueryNextPageRequest(long qryReqId, int qry, int pageSize) { + public GridQueryNextPageRequest(long qryReqId, int qry, int segmentId, int pageSize) { this.qryReqId = qryReqId; this.qry = qry; + this.segmentId = segmentId; this.pageSize = pageSize; } @@ -72,6 +76,11 @@ public class GridQueryNextPageRequest implements Message { return qry; } + /** @return Index segment ID */ + public int segmentId() { + return segmentId; + } + /** * @return Page size. */ @@ -119,6 +128,12 @@ public class GridQueryNextPageRequest implements Message { writer.incrementState(); + case 3: + if (!writer.writeInt("segmentId", segmentId)) + return false; + + writer.incrementState(); + } return true; @@ -156,6 +171,14 @@ public class GridQueryNextPageRequest implements Message { reader.incrementState(); + case 3: + segmentId = reader.readInt("segmentId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(GridQueryNextPageRequest.class); @@ -168,6 +191,6 @@ public class GridQueryNextPageRequest implements Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 3; + return 4; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java index 4889069..e85c00b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java @@ -42,6 +42,9 @@ public class GridQueryNextPageResponse implements Message { private long qryReqId; /** */ + private int segmentId; + + /** */ private int qry; /** */ @@ -73,6 +76,7 @@ public class GridQueryNextPageResponse implements Message { /** * @param qryReqId Query request ID. + * @param segmentId Index segment ID. * @param qry Query. * @param page Page. * @param allRows All rows count. @@ -80,12 +84,13 @@ public class GridQueryNextPageResponse implements Message { * @param vals Values for rows in this page added sequentially. * @param plainRows Not marshalled rows for local node. */ - public GridQueryNextPageResponse(long qryReqId, int qry, int page, int allRows, int cols, + public GridQueryNextPageResponse(long qryReqId, int segmentId, int qry, int page, int allRows, int cols, Collection<Message> vals, Collection<?> plainRows) { assert vals != null ^ plainRows != null; assert cols > 0 : cols; this.qryReqId = qryReqId; + this.segmentId = segmentId; this.qry = qry; this.page = page; this.allRows = allRows; @@ -102,6 +107,13 @@ public class GridQueryNextPageResponse implements Message { } /** + * @return Index segment ID. + */ + public int segmentId() { + return segmentId; + } + + /** * @return Query. */ public int query() { @@ -202,6 +214,12 @@ public class GridQueryNextPageResponse implements Message { writer.incrementState(); + case 7: + if (!writer.writeInt("segmentId", segmentId)) + return false; + + writer.incrementState(); + } return true; @@ -271,6 +289,13 @@ public class GridQueryNextPageResponse implements Message { reader.incrementState(); + case 7: + segmentId = reader.readInt("segmentId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); } return reader.afterMessageRead(GridQueryNextPageResponse.class); @@ -283,7 +308,7 @@ public class GridQueryNextPageResponse implements Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 7; + return 8; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java index f53936f..c127eeb 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java @@ -69,6 +69,9 @@ public class GridCacheTwoStepQuery { /** */ private List<Integer> extraCaches; + /** */ + private boolean local; + /** * @param originalSql Original query SQL. * @param schemas Schema names in query. @@ -229,6 +232,20 @@ public class GridCacheTwoStepQuery { } /** + * @return {@code True} If query is local. + */ + public boolean isLocal() { + return local; + } + + /** + * @param local Local query flag. + */ + public void local(boolean local) { + this.local = local; + } + + /** * @param args New arguments to copy with. * @return Copy. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index e4b0c1f..2f40d87 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -77,11 +77,13 @@ import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator; import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; @@ -92,6 +94,7 @@ import org.apache.ignite.internal.processors.query.GridQueryIndexing; import org.apache.ignite.internal.processors.query.GridQueryProperty; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode; import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine; import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOffheap; import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap; @@ -187,6 +190,8 @@ import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy import static org.apache.ignite.internal.processors.query.GridQueryIndexType.FULLTEXT; import static org.apache.ignite.internal.processors.query.GridQueryIndexType.GEO_SPATIAL; import static org.apache.ignite.internal.processors.query.GridQueryIndexType.SORTED; +import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF; +import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.LOCAL; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE; @@ -810,10 +815,22 @@ public class IgniteH2Indexing implements GridQueryIndexing { removeTable(tbl); } - /** {@inheritDoc} */ + /** + * Queries individual fields (generally used by JDBC drivers). + * + * @param spaceName Space name. + * @param qry Query. + * @param params Query parameters. + * @param filter Space name and key filter. + * @param enforceJoinOrder Enforce join order of tables in the query. + * @param timeout Query timeout in milliseconds. + * @param cancel Query cancel. + * @return Query result. + * @throws IgniteCheckedException If failed. + */ @SuppressWarnings("unchecked") - @Override public GridQueryFieldsResult queryLocalSqlFields(@Nullable final String spaceName, final String qry, - @Nullable final Collection<Object> params, final IndexingQueryFilter filters, boolean enforceJoinOrder, + public GridQueryFieldsResult queryLocalSqlFields(@Nullable final String spaceName, final String qry, + @Nullable final Collection<Object> params, final IndexingQueryFilter filter, boolean enforceJoinOrder, final int timeout, final GridQueryCancel cancel) throws IgniteCheckedException { final Connection conn = connectionForSpace(spaceName); @@ -833,7 +850,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { fldsQry.setEnforceJoinOrder(enforceJoinOrder); fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS); - return dmlProc.updateLocalSqlFields(spaceName, stmt, fldsQry, filters, cancel); + return dmlProc.updateLocalSqlFields(spaceName, stmt, fldsQry, filter, cancel); } List<GridQueryFieldMetadata> meta; @@ -846,7 +863,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { } final GridH2QueryContext ctx = new GridH2QueryContext(nodeId, nodeId, 0, LOCAL) - .filter(filters).distributedJoins(false); + .filter(filter).distributedJoinMode(OFF); return new GridQueryFieldsResultAdapter(meta, null) { @Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException { @@ -1099,14 +1116,113 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(@Nullable String spaceName, - final String qry, String alias, @Nullable final Collection<Object> params, GridQueryTypeDescriptor type, - final IndexingQueryFilter filter) throws IgniteCheckedException { - final TableDescriptor tbl = tableDescriptor(spaceName, type); + @Override public <K, V> QueryCursor<List<?>> queryLocalSqlFields(final GridCacheContext<?, ?> cctx, + final SqlFieldsQuery qry, final IndexingQueryFilter filter, final GridQueryCancel cancel) + throws IgniteCheckedException { + + if (cctx.config().getQueryParallelism() > 1) { + qry.setDistributedJoins(true); + + assert qry.isLocal(); + + return queryTwoStep(cctx, qry, cancel); + } + else { + final boolean keepBinary = cctx.keepBinary(); + + final String space = cctx.name(); + final String sql = qry.getSql(); + final Object[] args = qry.getArgs(); + + final GridQueryFieldsResult res = queryLocalSqlFields(space, sql, F.asList(args), filter, + qry.isEnforceJoinOrder(), qry.getTimeout(), cancel); + + QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() { + @Override public Iterator<List<?>> iterator() { + try { + return new GridQueryCacheObjectsIterator(res.iterator(), cctx, keepBinary); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }, cancel); + + cursor.fieldsMeta(res.metaData()); + + return cursor; + } + } + + /** {@inheritDoc} */ + @Override public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(final GridCacheContext<?, ?> cctx, + final SqlQuery qry, final IndexingQueryFilter filter, final boolean keepBinary) throws IgniteCheckedException { + if (cctx.config().getQueryParallelism() > 1) { + qry.setDistributedJoins(true); + + assert qry.isLocal(); + + return queryTwoStep(cctx, qry); + } + else { + String space = cctx.name(); + String type = qry.getType(); + String sqlQry = qry.getSql(); + String alias = qry.getAlias(); + Object[] params = qry.getArgs(); + + GridQueryCancel cancel = new GridQueryCancel(); + + final GridCloseableIterator<IgniteBiTuple<K, V>> i = queryLocalSql(space, sqlQry, alias, + F.asList(params), type, filter, cancel); + + return new QueryCursorImpl<Cache.Entry<K, V>>(new Iterable<Cache.Entry<K, V>>() { + @Override public Iterator<Cache.Entry<K, V>> iterator() { + return new ClIter<Cache.Entry<K, V>>() { + @Override public void close() throws Exception { + i.close(); + } + + @Override public boolean hasNext() { + return i.hasNext(); + } + + @Override public Cache.Entry<K, V> next() { + IgniteBiTuple<K, V> t = i.next(); + + return new CacheEntryImpl<>( + (K)cctx.unwrapBinaryIfNeeded(t.get1(), keepBinary, false), + (V)cctx.unwrapBinaryIfNeeded(t.get2(), keepBinary, false)); + } + + @Override public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }, cancel); + } + } + + /** + * Executes regular query. + * + * @param spaceName Space name. + * @param qry Query. + * @param alias Table alias. + * @param params Query parameters. + * @param type Query return type. + * @param filter Space name and key filter. + * @return Queried rows. + * @throws IgniteCheckedException If failed. + */ + public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(@Nullable String spaceName, + final String qry, String alias, @Nullable final Collection<Object> params, String type, + final IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException { + final TableDescriptor tbl = tableDescriptor(type, spaceName); if (tbl == null) - throw new IgniteSQLException("Failed to find SQL table for type: " + type.name(), + throw new IgniteSQLException("Failed to find SQL table for type: " + type, IgniteQueryErrorCode.TABLE_NOT_FOUND); String sql = generateQuery(qry, alias, tbl); @@ -1115,7 +1231,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { setupConnection(conn, false, false); - GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter).distributedJoins(false)); + GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter) + .distributedJoinMode(OFF)); GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL, spaceName, U.currentTimeMillis(), null, true); @@ -1123,7 +1240,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { runs.put(run.id(), run); try { - ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, params, true, 0, null); + ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, params, true, 0, cancel); return new KeyValIterator(rs); } @@ -1178,8 +1295,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { fqry.setArgs(qry.getArgs()); fqry.setPageSize(qry.getPageSize()); fqry.setDistributedJoins(qry.isDistributedJoins()); + fqry.setLocal(qry.isLocal()); - if(qry.getTimeout() > 0) + if (qry.getTimeout() > 0) fqry.setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS); final QueryCursor<List<?>> res = queryTwoStep(cctx, fqry, null); @@ -1234,11 +1352,13 @@ public class IgniteH2Indexing implements GridQueryIndexing { final boolean distributedJoins = qry.isDistributedJoins() && cctx.isPartitioned(); final boolean grpByCollocated = qry.isCollocated(); + final DistributedJoinMode distributedJoinMode = distributedJoinMode(qry.isLocal(), distributedJoins); + GridCacheTwoStepQuery twoStepQry; List<GridQueryFieldMetadata> meta; final TwoStepCachedQueryKey cachedQryKey = new TwoStepCachedQueryKey(space, sqlQry, grpByCollocated, - distributedJoins, enforceJoinOrder); + distributedJoins, enforceJoinOrder, qry.isLocal()); TwoStepCachedQuery cachedQry = twoStepCache.get(cachedQryKey); if (cachedQry != null) { @@ -1251,7 +1371,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { setupConnection(c, distributedJoins, enforceJoinOrder); GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, 0, PREPARE) - .distributedJoins(distributedJoins)); + .distributedJoinMode(distributedJoinMode)); PreparedStatement stmt; @@ -1286,9 +1406,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { GridH2QueryContext.clearThreadLocal(); } - Prepared prepared = GridSqlQueryParser.prepared((JdbcPreparedStatement) stmt); + Prepared prepared = GridSqlQueryParser.prepared((JdbcPreparedStatement)stmt); - if (qry instanceof JdbcSqlFieldsQuery && ((JdbcSqlFieldsQuery) qry).isQuery() != prepared.isQuery()) + if (qry instanceof JdbcSqlFieldsQuery && ((JdbcSqlFieldsQuery)qry).isQuery() != prepared.isQuery()) throw new IgniteSQLException("Given statement type does not match that declared by JDBC driver", IgniteQueryErrorCode.STMT_TYPE_MISMATCH); @@ -1341,8 +1461,12 @@ public class IgniteH2Indexing implements GridQueryIndexing { extraCaches = null; } + //Prohibit usage indices with different numbers of segments in same query. + checkCacheIndexSegmentation(caches); + twoStepQry.caches(caches); twoStepQry.extraCaches(extraCaches); + twoStepQry.local(qry.isLocal()); meta = meta(stmt.getMetaData()); } @@ -1380,6 +1504,32 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** + * @throws IllegalStateException if segmented indices used with non-segmented indices. + */ + private void checkCacheIndexSegmentation(List<Integer> caches) { + if (caches.isEmpty()) + return; //Nnothing to check + + GridCacheSharedContext sharedContext = ctx.cache().context(); + + int expectedParallelism = 0; + + for (int i = 0; i < caches.size(); i++) { + GridCacheContext cctx = sharedContext.cacheContext(caches.get(i)); + + assert cctx != null; + + if(!cctx.isPartitioned()) + continue; + + if(expectedParallelism == 0) + expectedParallelism = cctx.config().getQueryParallelism(); + else if (expectedParallelism != 0 && cctx.config().getQueryParallelism() != expectedParallelism) + throw new IllegalStateException("Using indexes with different parallelism levels in same query is forbidden."); + } + } + + /** * Prepares statement for query. * * @param qry Query string. @@ -1669,7 +1819,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { private void cleanupStatementCache() { long cur = U.currentTimeMillis(); - for(Iterator<Map.Entry<Thread, StatementCache>> it = stmtCache.entrySet().iterator(); it.hasNext(); ) { + for (Iterator<Map.Entry<Thread, StatementCache>> it = stmtCache.entrySet().iterator(); it.hasNext(); ) { Map.Entry<Thread, StatementCache> entry = it.next(); Thread t = entry.getKey(); @@ -1877,6 +2027,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { for (ClusterNode node : nodes) { if (node.isLocal()) { + if (locNode != null) + throw new IllegalStateException(); + locNode = node; continue; @@ -2163,23 +2316,29 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** */ private final boolean enforceJoinOrder; + /** */ + private final boolean isLocal; + /** * @param space Space. * @param sql Sql. * @param grpByCollocated Collocated GROUP BY. * @param distributedJoins Distributed joins enabled. * @param enforceJoinOrder Enforce join order of tables. + * @param isLocal Query is local flag. */ private TwoStepCachedQueryKey(String space, String sql, boolean grpByCollocated, boolean distributedJoins, - boolean enforceJoinOrder) { + boolean enforceJoinOrder, + boolean isLocal) { this.space = space; this.sql = sql; this.grpByCollocated = grpByCollocated; this.distributedJoins = distributedJoins; this.enforceJoinOrder = enforceJoinOrder; + this.isLocal = isLocal; } /** {@inheritDoc} */ @@ -2204,7 +2363,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (space != null ? !space.equals(that.space) : that.space != null) return false; - return sql.equals(that.sql); + return isLocal == that.isLocal && sql.equals(that.sql); } /** {@inheritDoc} */ @@ -2212,8 +2371,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { int res = space != null ? space.hashCode() : 0; res = 31 * res + sql.hashCode(); res = 31 * res + (grpByCollocated ? 1 : 0); - res = 31 * res + (distributedJoins ? 1 : 0); - res = 31 * res + (enforceJoinOrder ? 1 : 0); + res = res + (distributedJoins ? 2 : 0); + res = res + (enforceJoinOrder ? 4 : 0); + res = res + (isLocal ? 8 : 0); return res; } @@ -2572,7 +2732,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { affCol = null; // Add primary key index. - idxs.add(new GridH2TreeIndex("_key_PK", tbl, true, + idxs.add(createTreeIndex("_key_PK", tbl, true, treeIndexColumns(new ArrayList<IndexColumn>(2), keyCol, affCol))); if (type().valueClass() == String.class) { @@ -2618,7 +2778,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { cols = treeIndexColumns(cols, keyCol, affCol); - idxs.add(new GridH2TreeIndex(name, tbl, false, cols)); + idxs.add(createTreeIndex(name, tbl, false, cols)); } else if (idx.type() == GEO_SPATIAL) idxs.add(createH2SpatialIndex(tbl, name, cols.toArray(new IndexColumn[cols.size()]))); @@ -2629,7 +2789,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { // Add explicit affinity key index if nothing alike was found. if (affCol != null && !affIdxFound) { - idxs.add(new GridH2TreeIndex("AFFINITY_KEY", tbl, false, + idxs.add(createTreeIndex("AFFINITY_KEY", tbl, false, treeIndexColumns(new ArrayList<IndexColumn>(2), affCol, keyCol))); } @@ -2676,6 +2836,22 @@ public class IgniteH2Indexing implements GridQueryIndexing { throw new IgniteException("Failed to instantiate: " + className, e); } } + + /** + * @param idxName Index name. + * @param tbl Table. + * @param pk Primary key flag. + * @param columns Index column list. + * @return + */ + private Index createTreeIndex(String idxName, GridH2Table tbl, boolean pk, List<IndexColumn> columns) { + GridCacheContext<?, ?> cctx = tbl.rowDescriptor().context(); + + if (cctx != null && cctx.config().getQueryParallelism() > 1) + return new GridH2TreeIndex(idxName, tbl, pk, columns, cctx.config().getQueryParallelism()); + + return new GridH2TreeIndex(idxName, tbl, pk, columns, 1); + } } /** @@ -2729,6 +2905,13 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** + * Closeable iterator. + */ + private interface ClIter<X> extends AutoCloseable, Iterator<X> { + // No-op. + } + + /** * Field descriptor. */ static class SqlFieldMetadata implements GridQueryFieldMetadata { http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java new file mode 100644 index 0000000..cc06244 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt; + +/** + * Defines set of distributed join modes. + */ +public enum DistributedJoinMode { + /** + * Distributed joins is disabled. Local joins will be performed instead. + */ + OFF, + + /** + * Distributed joins is enabled within local node only. + * + * NOTE: This mode is used with segmented indices for local sql queries. + * As in this case we need to make distributed join across local index segments + * and prevent range-queries to other nodes. + */ + LOCAL_ONLY, + + /** + * Distributed joins is enabled. + */ + ON; + + /** + * @param isLocal Query local flag. + * @param distributedJoins Query distributed joins flag. + * @return DistributedJoinMode for the query. + */ + public static DistributedJoinMode distributedJoinMode(boolean isLocal, boolean distributedJoins) { + return distributedJoins ? (isLocal ? LOCAL_ONLY : ON) : OFF; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java index bab219c..131e03b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java @@ -81,6 +81,8 @@ import org.jetbrains.annotations.Nullable; import static java.util.Collections.emptyIterator; import static java.util.Collections.singletonList; +import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.LOCAL_ONLY; +import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VAL_COL; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2CollocationModel.buildCollocationModel; @@ -178,6 +180,13 @@ public abstract class GridH2IndexBase extends BaseIndex { } /** + * @return Index segment ID for current query context. + */ + protected int threadLocalSegment() { + return 0; + } + + /** * If the index supports rebuilding it has to creates its own copy. * * @return Rebuilt copy. @@ -252,7 +261,7 @@ public abstract class GridH2IndexBase extends BaseIndex { // because on run stage reordering of joined tables by Optimizer is explicitly disabled // and thus multiplier will be always the same, so it will not affect choice of index. // Query expressions can not be distributed as well. - if (qctx == null || qctx.type() != PREPARE || !qctx.distributedJoins() || ses.isPreparingQueryExpression()) + if (qctx == null || qctx.type() != PREPARE || qctx.distributedJoinMode() == OFF || ses.isPreparingQueryExpression()) return GridH2CollocationModel.MULTIPLIER_COLLOCATED; // We have to clear this cache because normally sub-query plan cost does not depend on anything @@ -363,7 +372,7 @@ public abstract class GridH2IndexBase extends BaseIndex { @Override public IndexLookupBatch createLookupBatch(TableFilter filter) { GridH2QueryContext qctx = GridH2QueryContext.get(); - if (qctx == null || !qctx.distributedJoins() || !getTable().isPartitioned()) + if (qctx == null || qctx.distributedJoinMode() == OFF || !getTable().isPartitioned()) return null; IndexColumn affCol = getTable().getAffinityKeyColumn(); @@ -381,9 +390,11 @@ public abstract class GridH2IndexBase extends BaseIndex { ucast = false; } - GridCacheContext<?,?> cctx = getTable().rowDescriptor().context(); + GridCacheContext<?, ?> cctx = getTable().rowDescriptor().context(); - return new DistributedLookupBatch(cctx, ucast, affColId); + boolean isLocal = qctx.distributedJoinMode() == LOCAL_ONLY; + + return new DistributedLookupBatch(cctx, ucast, affColId, isLocal); } /** @@ -437,18 +448,18 @@ public abstract class GridH2IndexBase extends BaseIndex { * @param node Requesting node. * @param msg Request message. */ - private void onIndexRangeRequest(ClusterNode node, GridH2IndexRangeRequest msg) { - GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(), - msg.originNodeId(), - msg.queryId(), - MAP); - + private void onIndexRangeRequest(final ClusterNode node, final GridH2IndexRangeRequest msg) { GridH2IndexRangeResponse res = new GridH2IndexRangeResponse(); res.originNodeId(msg.originNodeId()); res.queryId(msg.queryId()); + res.originSegmentId(msg.originSegmentId()); + res.segment(msg.segment()); res.batchLookupId(msg.batchLookupId()); + GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(), msg.originNodeId(), + msg.queryId(), msg.originSegmentId(), MAP); + if (qctx == null) res.status(STATUS_NOT_FOUND); else { @@ -461,11 +472,11 @@ public abstract class GridH2IndexBase extends BaseIndex { assert !msg.bounds().isEmpty() : "empty bounds"; - src = new RangeSource(msg.bounds(), snapshot0, qctx.filter()); + src = new RangeSource(msg.bounds(), msg.segment(), snapshot0, qctx.filter()); } else { // This is request to fetch next portion of data. - src = qctx.getSource(node.id(), msg.batchLookupId()); + src = qctx.getSource(node.id(), msg.segment(), msg.batchLookupId()); assert src != null; } @@ -491,11 +502,11 @@ public abstract class GridH2IndexBase extends BaseIndex { if (src.hasMoreRows()) { // Save source for future fetches. if (msg.bounds() != null) - qctx.putSource(node.id(), msg.batchLookupId(), src); + qctx.putSource(node.id(), msg.segment(), msg.batchLookupId(), src); } else if (msg.bounds() == null) { // Drop saved source. - qctx.putSource(node.id(), msg.batchLookupId(), null); + qctx.putSource(node.id(), msg.segment(), msg.batchLookupId(), null); } assert !ranges.isEmpty(); @@ -520,17 +531,17 @@ public abstract class GridH2IndexBase extends BaseIndex { */ private void onIndexRangeResponse(ClusterNode node, GridH2IndexRangeResponse msg) { GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(), - msg.originNodeId(), msg.queryId(), MAP); + msg.originNodeId(), msg.queryId(), msg.originSegmentId(), MAP); if (qctx == null) return; - Map<ClusterNode, RangeStream> streams = qctx.getStreams(msg.batchLookupId()); + Map<SegmentKey, RangeStream> streams = qctx.getStreams(msg.batchLookupId()); if (streams == null) return; - RangeStream stream = streams.get(node); + RangeStream stream = streams.get(new SegmentKey(node, msg.segment())); assert stream != null; @@ -549,47 +560,69 @@ public abstract class GridH2IndexBase extends BaseIndex { /** * @param qctx Query context. * @param batchLookupId Batch lookup ID. + * @param segmentId Segment ID. * @return Index range request. */ - private static GridH2IndexRangeRequest createRequest(GridH2QueryContext qctx, int batchLookupId) { + private static GridH2IndexRangeRequest createRequest(GridH2QueryContext qctx, int batchLookupId, int segmentId) { GridH2IndexRangeRequest req = new GridH2IndexRangeRequest(); req.originNodeId(qctx.originNodeId()); req.queryId(qctx.queryId()); + req.originSegmentId(qctx.segment()); + req.segment(segmentId); req.batchLookupId(batchLookupId); return req; } + /** * @param qctx Query context. * @param cctx Cache context. + * @param isLocalQry Local query flag. * @return Collection of nodes for broadcasting. */ - private List<ClusterNode> broadcastNodes(GridH2QueryContext qctx, GridCacheContext<?,?> cctx) { + private List<SegmentKey> broadcastSegments(GridH2QueryContext qctx, GridCacheContext<?, ?> cctx, boolean isLocalQry) { Map<UUID, int[]> partMap = qctx.partitionsMap(); - List<ClusterNode> res; + List<ClusterNode> nodes; + + if (isLocalQry) { + if (partMap != null && !partMap.containsKey(cctx.localNodeId())) + return Collections.<SegmentKey>emptyList(); // Prevent remote index call for local queries. - if (partMap == null) - res = new ArrayList<>(CU.affinityNodes(cctx, qctx.topologyVersion())); + nodes = Collections.singletonList(cctx.localNode()); + } else { - res = new ArrayList<>(partMap.size()); + if (partMap == null) + nodes = new ArrayList<>(CU.affinityNodes(cctx, qctx.topologyVersion())); + else { + nodes = new ArrayList<>(partMap.size()); - GridKernalContext ctx = kernalContext(); + GridKernalContext ctx = kernalContext(); - for (UUID nodeId : partMap.keySet()) { - ClusterNode node = ctx.discovery().node(nodeId); + for (UUID nodeId : partMap.keySet()) { + ClusterNode node = ctx.discovery().node(nodeId); - if (node == null) - throw new GridH2RetryException("Failed to find node."); + if (node == null) + throw new GridH2RetryException("Failed to find node."); - res.add(node); + nodes.add(node); + } } + + if (F.isEmpty(nodes)) + throw new GridH2RetryException("Failed to collect affinity nodes."); } - if (F.isEmpty(res)) - throw new GridH2RetryException("Failed to collect affinity nodes."); + int segmentsCount = segmentsCount(); + + List<SegmentKey> res = new ArrayList<>(nodes.size() * segmentsCount); + + for (ClusterNode node : nodes) { + for (int seg = 0; seg < segmentsCount; seg++) + res.add(new SegmentKey(node, seg)); + } return res; } @@ -598,26 +631,81 @@ public abstract class GridH2IndexBase extends BaseIndex { * @param cctx Cache context. * @param qctx Query context. * @param affKeyObj Affinity key. - * @return Cluster nodes or {@code null} if affinity key is a null value. + * @param isLocalQry Local query flag. + * @return Segment key for Affinity key. */ - private ClusterNode rangeNode(GridCacheContext<?,?> cctx, GridH2QueryContext qctx, Object affKeyObj) { + private SegmentKey rangeSegment(GridCacheContext<?, ?> cctx, GridH2QueryContext qctx, Object affKeyObj, boolean isLocalQry) { assert affKeyObj != null && affKeyObj != EXPLICIT_NULL : affKeyObj; ClusterNode node; - if (qctx.partitionsMap() != null) { - // If we have explicit partitions map, we have to use it to calculate affinity node. - UUID nodeId = qctx.nodeForPartition(cctx.affinity().partition(affKeyObj), cctx); + int partition = cctx.affinity().partition(affKeyObj); + + if (isLocalQry) { + if (qctx.partitionsMap() != null) { + // If we have explicit partitions map, we have to use it to calculate affinity node. + UUID nodeId = qctx.nodeForPartition(partition, cctx); + + if(!cctx.localNodeId().equals(nodeId)) + return null; // Prevent remote index call for local queries. + } + + if (!cctx.affinity().primaryByKey(cctx.localNode(), partition, qctx.topologyVersion())) + return null; + + node = cctx.localNode(); + } + else{ + if (qctx.partitionsMap() != null) { + // If we have explicit partitions map, we have to use it to calculate affinity node. + UUID nodeId = qctx.nodeForPartition(partition, cctx); node = cctx.discovery().node(nodeId); } else // Get primary node for current topology version. node = cctx.affinity().primaryByKey(affKeyObj, qctx.topologyVersion()); - if (node == null) // Node was not found, probably topology changed and we need to retry the whole query. - throw new GridH2RetryException("Failed to find node."); + if (node == null) // Node was not found, probably topology changed and we need to retry the whole query. + throw new GridH2RetryException("Failed to find node."); + } + + return new SegmentKey(node, segment(partition)); + } + + /** */ + protected class SegmentKey { + /** */ + final ClusterNode node; + + /** */ + final int segmentId; + + SegmentKey(ClusterNode node, int segmentId) { + assert node != null; + + this.node = node; + this.segmentId = segmentId; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + SegmentKey key = (SegmentKey)o; + + return segmentId == key.segmentId && node.id().equals(key.node.id()); - return node; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int result = node.hashCode(); + result = 31 * result + segmentId; + return result; + } } /** @@ -740,6 +828,20 @@ public abstract class GridH2IndexBase extends BaseIndex { return database.createRow(vals0, MEMORY_CALCULATE); } + /** @return Index segments count. */ + protected int segmentsCount() { + return 1; + } + + /** + * @param partition Partition idx. + * @return Segment ID for given key + */ + protected int segment(int partition) { + return 0; + } + + /** * Simple cursor from a single node. */ @@ -752,14 +854,14 @@ public abstract class GridH2IndexBase extends BaseIndex { /** * @param rangeId Range ID. - * @param nodes Remote nodes. + * @param keys Remote index segment keys. * @param rangeStreams Range streams. */ - private UnicastCursor(int rangeId, Collection<ClusterNode> nodes, Map<ClusterNode,RangeStream> rangeStreams) { - assert nodes.size() == 1; + UnicastCursor(int rangeId, List<SegmentKey> keys, Map<SegmentKey, RangeStream> rangeStreams) { + assert keys.size() == 1; this.rangeId = rangeId; - this.stream = rangeStreams.get(F.first(nodes)); + this.stream = rangeStreams.get(F.first(keys)); assert stream != null; } @@ -803,20 +905,19 @@ public abstract class GridH2IndexBase extends BaseIndex { /** * @param rangeId Range ID. - * @param nodes Remote nodes. + * @param segmentKeys Remote nodes. * @param rangeStreams Range streams. */ - private BroadcastCursor(int rangeId, Collection<ClusterNode> nodes, Map<ClusterNode,RangeStream> rangeStreams) { - assert nodes.size() > 1; + BroadcastCursor(int rangeId, Collection<SegmentKey> segmentKeys, Map<SegmentKey, RangeStream> rangeStreams) { this.rangeId = rangeId; - streams = new RangeStream[nodes.size()]; + streams = new RangeStream[segmentKeys.size()]; int i = 0; - for (ClusterNode node : nodes) { - RangeStream stream = rangeStreams.get(node); + for (SegmentKey segmentKey : segmentKeys) { + RangeStream stream = rangeStreams.get(segmentKey); assert stream != null; @@ -928,16 +1029,19 @@ public abstract class GridH2IndexBase extends BaseIndex { final int affColId; /** */ + private final boolean localQuery; + + /** */ GridH2QueryContext qctx; /** */ int batchLookupId; /** */ - Map<ClusterNode, RangeStream> rangeStreams = Collections.emptyMap(); + Map<SegmentKey, RangeStream> rangeStreams = Collections.emptyMap(); /** */ - List<ClusterNode> broadcastNodes; + List<SegmentKey> broadcastSegments; /** */ List<Future<Cursor>> res = Collections.emptyList(); @@ -952,11 +1056,13 @@ public abstract class GridH2IndexBase extends BaseIndex { * @param cctx Cache Cache context. * @param ucast Unicast or broadcast query. * @param affColId Affinity column ID. + * @param localQuery Local query flag. */ - private DistributedLookupBatch(GridCacheContext<?,?> cctx, boolean ucast, int affColId) { + DistributedLookupBatch(GridCacheContext<?, ?> cctx, boolean ucast, int affColId, boolean localQuery) { this.cctx = cctx; this.ucast = ucast; this.affColId = affColId; + this.localQuery = localQuery; } /** @@ -1028,7 +1134,7 @@ public abstract class GridH2IndexBase extends BaseIndex { Object affKey = affColId == -1 ? null : getAffinityKey(firstRow, lastRow); - List<ClusterNode> nodes; + List<SegmentKey> segmentKeys; Future<Cursor> fut; if (affKey != null) { @@ -1036,17 +1142,20 @@ public abstract class GridH2IndexBase extends BaseIndex { if (affKey == EXPLICIT_NULL) // Affinity key is explicit null, we will not find anything. return false; - nodes = F.asList(rangeNode(cctx, qctx, affKey)); + segmentKeys = F.asList(rangeSegment(cctx, qctx, affKey, localQuery)); } else { // Affinity key is not provided or is not the same in upper and lower bounds, we have to broadcast. - if (broadcastNodes == null) - broadcastNodes = broadcastNodes(qctx, cctx); + if (broadcastSegments == null) + broadcastSegments = broadcastSegments(qctx, cctx, localQuery); - nodes = broadcastNodes; + segmentKeys = broadcastSegments; } - assert !F.isEmpty(nodes) : nodes; + if (localQuery && segmentKeys.isEmpty()) + return false; // Nothing to do + + assert !F.isEmpty(segmentKeys) : segmentKeys; final int rangeId = res.size(); @@ -1058,21 +1167,21 @@ public abstract class GridH2IndexBase extends BaseIndex { GridH2RowRangeBounds rangeBounds = rangeBounds(rangeId, first, last); // Add range to every message of every participating node. - for (int i = 0; i < nodes.size(); i++) { - ClusterNode node = nodes.get(i); - assert node != null; + for (int i = 0; i < segmentKeys.size(); i++) { + SegmentKey segmentKey = segmentKeys.get(i); + assert segmentKey != null; - RangeStream stream = rangeStreams.get(node); + RangeStream stream = rangeStreams.get(segmentKey); List<GridH2RowRangeBounds> bounds; if (stream == null) { - stream = new RangeStream(qctx, node); + stream = new RangeStream(qctx, segmentKey.node); - stream.req = createRequest(qctx, batchLookupId); + stream.req = createRequest(qctx, batchLookupId, segmentKey.segmentId); stream.req.bounds(bounds = new ArrayList<>()); - rangeStreams.put(node, stream); + rangeStreams.put(segmentKey, stream); } else bounds = stream.req.bounds(); @@ -1084,9 +1193,9 @@ public abstract class GridH2IndexBase extends BaseIndex { batchFull = true; } - fut = new DoneFuture<>(nodes.size() == 1 ? - new UnicastCursor(rangeId, nodes, rangeStreams) : - new BroadcastCursor(rangeId, nodes, rangeStreams)); + fut = new DoneFuture<>(segmentKeys.size() == 1 ? + new UnicastCursor(rangeId, segmentKeys, rangeStreams) : + new BroadcastCursor(rangeId, segmentKeys, rangeStreams)); res.add(fut); @@ -1138,7 +1247,7 @@ public abstract class GridH2IndexBase extends BaseIndex { batchLookupId = 0; rangeStreams = Collections.emptyMap(); - broadcastNodes = null; + broadcastSegments = null; batchFull = false; findCalled = false; res = Collections.emptyList(); @@ -1244,7 +1353,7 @@ public abstract class GridH2IndexBase extends BaseIndex { if (remainingRanges > 0) { if (req.bounds() != null) - req = createRequest(qctx, req.batchLookupId()); + req = createRequest(qctx, req.batchLookupId(), req.segment()); // Prefetch next page. send(singletonList(node), req); @@ -1366,6 +1475,9 @@ public abstract class GridH2IndexBase extends BaseIndex { final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree; /** */ + private final int segment; + + /** */ final IndexingQueryFilter filter; /** @@ -1375,9 +1487,11 @@ public abstract class GridH2IndexBase extends BaseIndex { */ RangeSource( Iterable<GridH2RowRangeBounds> bounds, + int segment, ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree, IndexingQueryFilter filter ) { + this.segment = segment; this.filter = filter; this.tree = tree; boundsIter = bounds.iterator(); @@ -1435,7 +1549,7 @@ public abstract class GridH2IndexBase extends BaseIndex { SearchRow first = toSearchRow(bounds.first()); SearchRow last = toSearchRow(bounds.last()); - ConcurrentNavigableMap<GridSearchRowPointer,GridH2Row> t = tree != null ? tree : treeForRead(); + ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t = tree != null ? tree : treeForRead(segment); curRange = doFind0(t, first, true, last, filter); @@ -1452,9 +1566,10 @@ public abstract class GridH2IndexBase extends BaseIndex { } /** - * @return Snapshot for current thread if there is one. + * @param segment Segment Id. + * @return Snapshot for requested segment if there is one. */ - protected ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead() { + protected ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead(int segment) { throw new UnsupportedOperationException(); } @@ -1505,7 +1620,8 @@ public abstract class GridH2IndexBase extends BaseIndex { this.fltr = qryFilter.forSpace(spaceName); this.isValRequired = qryFilter.isValueRequired(); - } else { + } + else { this.fltr = null; this.isValRequired = false; http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java index 19ea2b2..a7ee0dc 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java @@ -32,6 +32,7 @@ import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; +import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP; /** @@ -79,7 +80,7 @@ public class GridH2QueryContext { private UUID[] partsNodes; /** */ - private boolean distributedJoins; + private DistributedJoinMode distributedJoinMode; /** */ private int pageSize; @@ -94,7 +95,22 @@ public class GridH2QueryContext { * @param type Query type. */ public GridH2QueryContext(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) { - key = new Key(locNodeId, nodeId, qryId, type); + assert type != MAP; + + key = new Key(locNodeId, nodeId, qryId, 0, type); + } + + /** + * @param locNodeId Local node ID. + * @param nodeId The node who initiated the query. + * @param qryId The query ID. + * @param segmentId Index segment ID. + * @param type Query type. + */ + public GridH2QueryContext(UUID locNodeId, UUID nodeId, long qryId, int segmentId, GridH2QueryType type) { + assert segmentId == 0 || type == MAP; + + key = new Key(locNodeId, nodeId, qryId, segmentId, type); } /** @@ -133,20 +149,20 @@ public class GridH2QueryContext { } /** - * @param distributedJoins Distributed joins can be run in this query. + * @param distributedJoinMode Distributed join mode. * @return {@code this}. */ - public GridH2QueryContext distributedJoins(boolean distributedJoins) { - this.distributedJoins = distributedJoins; + public GridH2QueryContext distributedJoinMode(DistributedJoinMode distributedJoinMode) { + this.distributedJoinMode = distributedJoinMode; return this; } /** - * @return {@code true} If distributed joins can be run in this query. + * @return Distributed join mode. */ - public boolean distributedJoins() { - return distributedJoins; + public DistributedJoinMode distributedJoinMode() { + return distributedJoinMode; } /** @@ -226,6 +242,11 @@ public class GridH2QueryContext { return nodeIds[p]; } + /** @return index segment ID. */ + public int segment() { + return key.segmentId; + } + /** * @param idxId Index ID. * @param snapshot Index snapshot. @@ -303,11 +324,12 @@ public class GridH2QueryContext { /** * @param ownerId Owner node ID. + * @param segmentId Index segment ID. * @param batchLookupId Batch lookup ID. * @param src Range source. */ - public synchronized void putSource(UUID ownerId, int batchLookupId, Object src) { - SourceKey srcKey = new SourceKey(ownerId, batchLookupId); + public synchronized void putSource(UUID ownerId, int segmentId, int batchLookupId, Object src) { + SourceKey srcKey = new SourceKey(ownerId, segmentId, batchLookupId); if (src != null) { if (sources == null) @@ -321,15 +343,16 @@ public class GridH2QueryContext { /** * @param ownerId Owner node ID. + * @param segmentId Index segment ID. * @param batchLookupId Batch lookup ID. * @return Range source. */ @SuppressWarnings("unchecked") - public synchronized <T> T getSource(UUID ownerId, int batchLookupId) { + public synchronized <T> T getSource(UUID ownerId, int segmentId, int batchLookupId) { if (sources == null) return null; - return (T)sources.get(new SourceKey(ownerId, batchLookupId)); + return (T)sources.get(new SourceKey(ownerId, segmentId, batchLookupId)); } /** @@ -356,7 +379,7 @@ public class GridH2QueryContext { assert qctx.get() == null; // We need MAP query context to be available to other threads to run distributed joins. - if (x.key.type == MAP && x.distributedJoins() && qctxs.putIfAbsent(x.key, x) != null) + if (x.key.type == MAP && x.distributedJoinMode() != OFF && qctxs.putIfAbsent(x.key, x) != null) throw new IllegalStateException("Query context is already set."); qctx.set(x); @@ -381,7 +404,14 @@ public class GridH2QueryContext { * @return {@code True} if context was found. */ public static boolean clear(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) { - return doClear(new Key(locNodeId, nodeId, qryId, type), false); + boolean res = false; + + for (Key key : qctxs.keySet()) { + if (key.locNodeId.equals(locNodeId) && key.nodeId.equals(nodeId) && key.qryId == qryId && key.type == type) + res |= doClear(new Key(locNodeId, nodeId, qryId, key.segmentId, type), false); + } + + return res; } /** @@ -463,6 +493,7 @@ public class GridH2QueryContext { * @param locNodeId Local node ID. * @param nodeId The node who initiated the query. * @param qryId The query ID. + * @param segmentId Index segment ID. * @param type Query type. * @return Query context. */ @@ -470,9 +501,10 @@ public class GridH2QueryContext { UUID locNodeId, UUID nodeId, long qryId, + int segmentId, GridH2QueryType type ) { - return qctxs.get(new Key(locNodeId, nodeId, qryId, type)); + return qctxs.get(new Key(locNodeId, nodeId, qryId, segmentId, type)); } /** @@ -528,15 +560,19 @@ public class GridH2QueryContext { private final long qryId; /** */ + private final int segmentId; + + /** */ private final GridH2QueryType type; /** * @param locNodeId Local node ID. * @param nodeId The node who initiated the query. * @param qryId The query ID. + * @param segmentId Index segment ID. * @param type Query type. */ - private Key(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) { + private Key(UUID locNodeId, UUID nodeId, long qryId, int segmentId, GridH2QueryType type) { assert locNodeId != null; assert nodeId != null; assert type != null; @@ -544,6 +580,7 @@ public class GridH2QueryContext { this.locNodeId = locNodeId; this.nodeId = nodeId; this.qryId = qryId; + this.segmentId = segmentId; this.type = type; } @@ -568,6 +605,7 @@ public class GridH2QueryContext { res = 31 * res + nodeId.hashCode(); res = 31 * res + (int)(qryId ^ (qryId >>> 32)); res = 31 * res + type.hashCode(); + res = 31 * res + segmentId; return res; } @@ -586,14 +624,19 @@ public class GridH2QueryContext { UUID ownerId; /** */ + int segmentId; + + /** */ int batchLookupId; /** * @param ownerId Owner node ID. + * @param segmentId Index segment ID. * @param batchLookupId Batch lookup ID. */ - SourceKey(UUID ownerId, int batchLookupId) { + SourceKey(UUID ownerId, int segmentId, int batchLookupId) { this.ownerId = ownerId; + this.segmentId = segmentId; this.batchLookupId = batchLookupId; } @@ -601,12 +644,15 @@ public class GridH2QueryContext { @Override public boolean equals(Object o) { SourceKey srcKey = (SourceKey)o; - return batchLookupId == srcKey.batchLookupId && ownerId.equals(srcKey.ownerId); + return batchLookupId == srcKey.batchLookupId && segmentId == srcKey.segmentId && + ownerId.equals(srcKey.ownerId); } /** {@inheritDoc} */ @Override public int hashCode() { - return 31 * ownerId.hashCode() + batchLookupId; + int hash = ownerId.hashCode(); + hash = 31 * hash + segmentId; + return 31 * hash + batchLookupId; } } }
