Repository: ignite Updated Branches: refs/heads/ignite-5937 488662975 -> 0d69982fe
ignite-5937 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0d69982f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0d69982f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0d69982f Branch: refs/heads/ignite-5937 Commit: 0d69982fe8d3be9ec6cf705ae77156061850f166 Parents: 4886629 Author: sboikov <[email protected]> Authored: Tue Oct 17 16:54:43 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Oct 17 16:59:41 2017 +0300 ---------------------------------------------------------------------- .../processors/query/GridQueryProcessor.java | 7 +- .../cache/query/GridCacheTwoStepQuery.java | 18 +++ .../query/h2/DmlStatementsProcessor.java | 2 +- .../processors/query/h2/IgniteH2Indexing.java | 20 ++- .../query/h2/opt/GridH2PlainRowFactory.java | 144 +++++++++++++++++-- .../query/h2/opt/GridH2QueryContext.java | 27 +++- .../query/h2/twostep/GridMapQueryExecutor.java | 36 ++++- .../query/h2/twostep/GridMergeIndexSorted.java | 2 +- .../h2/twostep/GridMergeIndexUnsorted.java | 2 +- .../h2/twostep/GridReduceQueryExecutor.java | 34 ++++- .../h2/twostep/msg/GridH2QueryRequest.java | 83 ++++++++--- .../cache/mvcc/CacheMvccSqlQueriesTest.java | 8 +- 12 files changed, 324 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 1095e5f..e88a234 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -72,7 +72,6 @@ import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.property.QueryBinaryProperty; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl; @@ -1868,7 +1867,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { try { final String schemaName = qry.getSchema() != null ? qry.getSchema() : idx.schema(cctx.name()); - final int mainCacheId = CU.cacheId(cctx.name()); + final int mainCacheId = cctx.cacheId(); IgniteOutClosureX<FieldsQueryCursor<List<?>>> clo; @@ -2054,7 +2053,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { try { final String schemaName = idx.schema(cctx.name()); - final int mainCacheId = CU.cacheId(cctx.name()); + final int mainCacheId = cctx.cacheId(); return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx, new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() { @@ -2083,7 +2082,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { throw new IllegalStateException("Failed to execute query (grid is stopping)."); final String schemaName = idx.schema(cctx.name()); - final int mainCacheId = CU.cacheId(cctx.name()); + final int mainCacheId = cctx.cacheId(); try { return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx, http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java index 4a93aaf..f5c5e60 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java @@ -65,6 +65,9 @@ public class GridCacheTwoStepQuery { /** */ private CacheQueryPartitionInfo[] derivedPartitions; + /** */ + private boolean mvccEnabled; + /** * @param originalSql Original query SQL. * @param tbls Tables in query. @@ -241,6 +244,7 @@ public class GridCacheTwoStepQuery { cp.distributedJoins = distributedJoins; cp.derivedPartitions = derivedPartitions; cp.local = local; + cp.mvccEnabled = mvccEnabled; for (int i = 0; i < mapQrys.size(); i++) cp.mapQrys.add(mapQrys.get(i).copy()); @@ -262,6 +266,20 @@ public class GridCacheTwoStepQuery { return tbls; } + /** + * @return Mvcc flag. + */ + public boolean mvccEnabled() { + return mvccEnabled; + } + + /** + * @param mvccEnabled Mvcc flag. + */ + public void mvccEnabled(boolean mvccEnabled) { + this.mvccEnabled = mvccEnabled; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCacheTwoStepQuery.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java index 9e55442..c3d48dd 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java @@ -361,7 +361,7 @@ public class DmlStatementsProcessor { private UpdateResult executeUpdateStatement(String schemaName, final GridCacheContext cctx, Connection c, Prepared prepared, SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel, Object[] failedKeys) throws IgniteCheckedException { - int mainCacheId = CU.cacheId(cctx.name()); + int mainCacheId = cctx.cacheId(); Integer errKeysPos = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 9e8f593..57c9c57 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1587,9 +1587,11 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** + * @param cacheIds Cache IDs. + * @param twoStepQry Query. * @throws IllegalStateException if segmented indices used with non-segmented indices. */ - private void checkCacheIndexSegmentation(List<Integer> cacheIds) { + private void processCaches(List<Integer> cacheIds, GridCacheTwoStepQuery twoStepQry) { if (cacheIds.isEmpty()) return; // Nothing to check @@ -1597,11 +1599,21 @@ public class IgniteH2Indexing implements GridQueryIndexing { int expectedParallelism = 0; - for (Integer cacheId : cacheIds) { + boolean mvccEnabled = false; + + for (int i = 0; i < cacheIds.size(); i++) { + Integer cacheId = cacheIds.get(i); + GridCacheContext cctx = sharedCtx.cacheContext(cacheId); assert cctx != null; + if (i == 0) + mvccEnabled = cctx.mvccEnabled(); + else if (cctx.mvccEnabled() != mvccEnabled) + throw new IllegalStateException("Using caches with different mvcc settings in same query is " + + "forbidden."); + if (!cctx.isPartitioned()) continue; @@ -1612,6 +1624,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { "forbidden."); } } + + twoStepQry.mvccEnabled(mvccEnabled); } /** @@ -2519,7 +2533,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { //Prohibit usage indices with different numbers of segments in same query. List<Integer> cacheIds = new ArrayList<>(caches0); - checkCacheIndexSegmentation(cacheIds); + processCaches(cacheIds, twoStepQry); return cacheIds; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java index 439551d..8982236 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.query.h2.opt; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.h2.result.Row; @@ -36,41 +37,48 @@ public class GridH2PlainRowFactory extends RowFactory { } /** + * TODO IGNITE-3478: review usages. + * + * @param ctx Query context. * @param data Values. * @return Row. */ - public static Row create(Value... data) { + public static Row create(GridH2QueryContext ctx, Value... data) { + MvccCoordinatorVersion mvccVer = ctx != null ? ctx.mvccVersion() : null; + switch (data.length) { case 0: throw new IllegalStateException("Zero columns row."); case 1: - return new RowKey(data[0]); + return mvccVer != null ? new RowKeyMvcc(data[0], mvccVer) : new RowKey(data[0]); case 2: - return new RowPair(data[0], data[1]); + return mvccVer != null ? new RowPairMvcc(data[0], data[1], mvccVer) : new RowPair(data[0], data[1]); default: - return new RowSimple(data); + return mvccVer != null ? new RowSimpleMvcc(data, mvccVer) : new RowSimple(data); } } /** {@inheritDoc} */ @Override public Row createRow(Value[] data, int memory) { - return create(data); + GridH2QueryContext ctx = GridH2QueryContext.get(); + + return create(ctx, data); } /** * Single value row. */ - private static final class RowKey extends GridH2SearchRowAdapter { + private static class RowKey extends GridH2SearchRowAdapter { /** */ private Value key; /** * @param key Key. */ - public RowKey(Value key) { + RowKey(Value key) { this.key = key; } @@ -93,12 +101,12 @@ public class GridH2PlainRowFactory extends RowFactory { /** {@inheritDoc} */ @Override public long mvccCoordinatorVersion() { - return 0; // TODO IGNITE-3478 + return 0; } /** {@inheritDoc} */ @Override public long mvccCounter() { - return 0; // TODO IGNITE-3478 + return 0; } /** {@inheritDoc} */ @@ -108,9 +116,44 @@ public class GridH2PlainRowFactory extends RowFactory { } /** + * Single value row. + */ + private static final class RowKeyMvcc extends RowKey { + /** */ + private final MvccCoordinatorVersion mvccVer; + + /** + * @param key Key. + * @param mvccVer Mvcc version. + */ + RowKeyMvcc(Value key, MvccCoordinatorVersion mvccVer) { + super(key); + + assert mvccVer != null; + + this.mvccVer = mvccVer; + } + + /** {@inheritDoc} */ + @Override public long mvccCoordinatorVersion() { + return mvccVer.coordinatorVersion(); + } + + /** {@inheritDoc} */ + @Override public long mvccCounter() { + return mvccVer.counter(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(RowKeyMvcc.class, this); + } + } + + /** * Row of two values. */ - private static final class RowPair extends GridH2SearchRowAdapter { + private static class RowPair extends GridH2SearchRowAdapter { /** */ private Value v1; @@ -149,12 +192,12 @@ public class GridH2PlainRowFactory extends RowFactory { /** {@inheritDoc} */ @Override public long mvccCoordinatorVersion() { - return 0; // TODO IGNITE-3478 + return 0; } /** {@inheritDoc} */ @Override public long mvccCounter() { - return 0; // TODO IGNITE-3478 + return 0; } /** {@inheritDoc} */ @@ -164,9 +207,45 @@ public class GridH2PlainRowFactory extends RowFactory { } /** + * + */ + private static final class RowPairMvcc extends RowPair { + /** */ + private final MvccCoordinatorVersion mvccVer; + + /** + * @param v1 First value. + * @param v2 Second value. + * @param mvccVer Mvcc version. + */ + RowPairMvcc(Value v1, Value v2, MvccCoordinatorVersion mvccVer) { + super(v1, v2); + + assert mvccVer != null; + + this.mvccVer = mvccVer; + } + + /** {@inheritDoc} */ + @Override public long mvccCoordinatorVersion() { + return mvccVer.coordinatorVersion(); + } + + /** {@inheritDoc} */ + @Override public long mvccCounter() { + return mvccVer.counter(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(RowPairMvcc.class, this); + } + } + + /** * Simple array based row. */ - private static final class RowSimple extends GridH2SearchRowAdapter { + private static class RowSimple extends GridH2SearchRowAdapter { /** */ @GridToStringInclude private Value[] vals; @@ -195,12 +274,12 @@ public class GridH2PlainRowFactory extends RowFactory { /** {@inheritDoc} */ @Override public long mvccCoordinatorVersion() { - return 0; // TODO IGNITE-3478 + return 0; } /** {@inheritDoc} */ @Override public long mvccCounter() { - return 0; // TODO IGNITE-3478 + return 0; } /** {@inheritDoc} */ @@ -208,4 +287,39 @@ public class GridH2PlainRowFactory extends RowFactory { return S.toString(RowSimple.class, this); } } + + /** + * + */ + private static class RowSimpleMvcc extends RowSimple { + /** */ + private final MvccCoordinatorVersion mvccVer; + + /** + * @param vals Values. + * @param mvccVer Mvcc version. + */ + RowSimpleMvcc(Value[] vals, MvccCoordinatorVersion mvccVer) { + super(vals); + + assert mvccVer != null; + + this.mvccVer = mvccVer; + } + + /** {@inheritDoc} */ + @Override public long mvccCoordinatorVersion() { + return mvccVer.coordinatorVersion(); + } + + /** {@inheritDoc} */ + @Override public long mvccCounter() { + return mvccVer.counter(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(RowSimpleMvcc.class, this); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java index 91f0aef..1b4e433 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.spi.indexing.IndexingQueryFilter; @@ -83,6 +84,9 @@ public class GridH2QueryContext { /** */ private GridH2CollocationModel qryCollocationMdl; + /** */ + private MvccCoordinatorVersion mvccVer; + /** * @param locNodeId Local node ID. * @param nodeId The node who initiated the query. @@ -102,13 +106,34 @@ public class GridH2QueryContext { * @param segmentId Index segment ID. * @param type Query type. */ - public GridH2QueryContext(UUID locNodeId, UUID nodeId, long qryId, int segmentId, GridH2QueryType type) { + public GridH2QueryContext(UUID locNodeId, + UUID nodeId, + long qryId, + int segmentId, + GridH2QueryType type) { assert segmentId == 0 || type == MAP; key = new Key(locNodeId, nodeId, qryId, segmentId, type); } /** + * @return Mvcc version. + */ + @Nullable public MvccCoordinatorVersion mvccVersion() { + return mvccVer; + } + + /** + * @param mvccVer Mvcc version. + * @return {@code this}. + */ + public GridH2QueryContext mvccVersion(MvccCoordinatorVersion mvccVer) { + this.mvccVer = mvccVer; + + return this; + } + + /** * @return Type. */ public GridH2QueryType type() { http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 77b928f..fcc3296 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsReservation; import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; @@ -482,7 +483,8 @@ public class GridMapQueryExecutor { false, // Replicated is always false here (see condition above). req.timeout(), params, - true); // Lazy = true. + true, + req.mvccVersion()); // Lazy = true. } else { ctx.closure().callLocal( @@ -504,7 +506,8 @@ public class GridMapQueryExecutor { false, req.timeout(), params, - false); // Lazy = false. + false, + req.mvccVersion()); // Lazy = false. return null; } @@ -528,7 +531,8 @@ public class GridMapQueryExecutor { replicated, req.timeout(), params, - lazy); + lazy, + req.mvccVersion()); } /** @@ -561,7 +565,8 @@ public class GridMapQueryExecutor { final boolean replicated, final int timeout, final Object[] params, - boolean lazy + boolean lazy, + @Nullable final MvccCoordinatorVersion mvccVer ) { if (lazy && MapQueryLazyWorker.currentWorker() == null) { // Lazy queries must be re-submitted to dedicated workers. @@ -570,8 +575,24 @@ public class GridMapQueryExecutor { worker.submit(new Runnable() { @Override public void run() { - onQueryRequest0(node, reqId, segmentId, schemaName, qrys, cacheIds, topVer, partsMap, parts, - pageSize, distributedJoinMode, enforceJoinOrder, replicated, timeout, params, true); + onQueryRequest0( + node, + reqId, + segmentId, + schemaName, + qrys, + cacheIds, + topVer, + partsMap, + parts, + pageSize, + distributedJoinMode, + enforceJoinOrder, + replicated, + timeout, + params, + true, + mvccVer); } }); @@ -637,7 +658,8 @@ public class GridMapQueryExecutor { .distributedJoinMode(distributedJoinMode) .pageSize(pageSize) .topologyVersion(topVer) - .reservations(reserved); + .reservations(reserved) + .mvccVersion(mvccVer); Connection conn = h2.connectionForSchema(schemaName); http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java index 0dc8354..4eeacf6 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java @@ -368,7 +368,7 @@ public final class GridMergeIndexSorted extends GridMergeIndex { if (!iter.hasNext()) return false; - cur = GridH2PlainRowFactory.create(iter.next()); + cur = GridH2PlainRowFactory.create(null, iter.next()); return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java index 487d386..6e1ad1f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java @@ -139,7 +139,7 @@ public final class GridMergeIndexUnsorted extends GridMergeIndex { } @Override public Row next() { - return GridH2PlainRowFactory.create(iter.next()); + return GridH2PlainRowFactory.create(null, iter.next()); } @Override public void remove() { http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index f85cd94..debba5e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -59,6 +59,8 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; +import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; @@ -83,11 +85,13 @@ import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryReq import org.apache.ignite.internal.util.GridIntIterator; import org.apache.ignite.internal.util.GridIntList; import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.CIX2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiClosure; +import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.plugin.extensions.communication.Message; import org.h2.command.ddl.CreateTableData; @@ -562,6 +566,33 @@ public class GridReduceQueryExecutor { List<Integer> cacheIds = qry.cacheIds(); + MvccCoordinatorVersion mvccVer = null; + + // TODO IGNITE-3478. + if (qry.mvccEnabled()) { + assert !cacheIds.isEmpty(); + + final GridFutureAdapter<Void> fut = new GridFutureAdapter<>(); + + MvccQueryTracker mvccTracker = new MvccQueryTracker(cacheContext(cacheIds.get(0)), true, + new IgniteBiInClosure<AffinityTopologyVersion, IgniteCheckedException>() { + @Override public void apply(AffinityTopologyVersion topVer, IgniteCheckedException e) { + fut.onDone(null, e); + } + }); + + mvccTracker.requestVersion(topVer); + + try { + fut.get(); + + mvccVer = mvccTracker.mvccVersion(); + } + catch (IgniteCheckedException e) { + throw new CacheException(e); + } + } + Collection<ClusterNode> nodes = null; // Explicit partition mapping for unstable topology. @@ -728,7 +759,8 @@ public class GridReduceQueryExecutor { .parameters(params) .flags(flags) .timeout(timeoutMillis) - .schemaName(schemaName); + .schemaName(schemaName) + .mvccVersion(mvccVer); if (send(nodes, req, parts == null ? null : new ExplicitPartitionsSpecializer(qryMap), false)) { awaitAllReplies(r, nodes, cancel); http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java index 4e1fadb..347b88c 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.cache.query.QueryTable; @@ -42,6 +43,7 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS; @@ -133,6 +135,9 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { /** Schema name. */ private String schemaName; + /** */ + private MvccCoordinatorVersion mvccVer; + /** * Required by {@link Externalizable} */ @@ -157,6 +162,24 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { params = req.params; paramsBytes = req.paramsBytes; schemaName = req.schemaName; + mvccVer = req.mvccVer; + } + + /** + * @return Mvcc version. + */ + @Nullable public MvccCoordinatorVersion mvccVersion() { + return mvccVer; + } + + /** + * @param mvccVer Mvcc version. + * @return {@code this}. + */ + public GridH2QueryRequest mvccVersion(MvccCoordinatorVersion mvccVer) { + this.mvccVer = mvccVer; + + return this; } /** @@ -435,65 +458,71 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { writer.incrementState(); case 2: - if (!writer.writeInt("pageSize", pageSize)) + if (!writer.writeMessage("mvccVer", mvccVer)) return false; writer.incrementState(); case 3: - if (!writer.writeByteArray("paramsBytes", paramsBytes)) + if (!writer.writeInt("pageSize", pageSize)) return false; writer.incrementState(); case 4: - if (!writer.writeMap("parts", parts, MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR)) + if (!writer.writeByteArray("paramsBytes", paramsBytes)) return false; writer.incrementState(); case 5: - if (!writer.writeCollection("qrys", qrys, MessageCollectionItemType.MSG)) + if (!writer.writeMap("parts", parts, MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR)) return false; writer.incrementState(); case 6: - if (!writer.writeLong("reqId", reqId)) + if (!writer.writeIntArray("qryParts", qryParts)) return false; writer.incrementState(); case 7: - if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.MSG)) + if (!writer.writeCollection("qrys", qrys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 8: - if (!writer.writeInt("timeout", timeout)) + if (!writer.writeLong("reqId", reqId)) return false; writer.incrementState(); case 9: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeString("schemaName", schemaName)) return false; writer.incrementState(); - case 10: - if (!writer.writeIntArray("qryParts", qryParts)) + if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 11: - if (!writer.writeString("schemaName", schemaName)) + if (!writer.writeInt("timeout", timeout)) return false; writer.incrementState(); + + case 12: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + } return true; @@ -524,7 +553,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 2: - pageSize = reader.readInt("pageSize"); + mvccVer = reader.readMessage("mvccVer"); if (!reader.isLastRead()) return false; @@ -532,7 +561,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 3: - paramsBytes = reader.readByteArray("paramsBytes"); + pageSize = reader.readInt("pageSize"); if (!reader.isLastRead()) return false; @@ -540,7 +569,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 4: - parts = reader.readMap("parts", MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR, false); + paramsBytes = reader.readByteArray("paramsBytes"); if (!reader.isLastRead()) return false; @@ -548,7 +577,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 5: - qrys = reader.readCollection("qrys", MessageCollectionItemType.MSG); + parts = reader.readMap("parts", MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR, false); if (!reader.isLastRead()) return false; @@ -556,7 +585,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 6: - reqId = reader.readLong("reqId"); + qryParts = reader.readIntArray("qryParts"); if (!reader.isLastRead()) return false; @@ -564,7 +593,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 7: - tbls = reader.readCollection("tbls", MessageCollectionItemType.MSG); + qrys = reader.readCollection("qrys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -572,7 +601,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 8: - timeout = reader.readInt("timeout"); + reqId = reader.readLong("reqId"); if (!reader.isLastRead()) return false; @@ -580,16 +609,15 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 9: - topVer = reader.readMessage("topVer"); + schemaName = reader.readString("schemaName"); if (!reader.isLastRead()) return false; reader.incrementState(); - case 10: - qryParts = reader.readIntArray("qryParts"); + tbls = reader.readCollection("tbls", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -597,12 +625,21 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 11: - schemaName = reader.readString("schemaName"); + timeout = reader.readInt("timeout"); if (!reader.isLastRead()) return false; reader.incrementState(); + + case 12: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(GridH2QueryRequest.class); @@ -615,7 +652,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 12; + return 13; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java index e5e7f73..68b1e27 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java @@ -58,10 +58,14 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { setSqlIndexMaxInlineSize(0)); cache.put(1, new MvccTestSqlIndexValue(1)); - //cache.put(1, new MvccTestSqlIndexValue(2)); + cache.put(1, new MvccTestSqlIndexValue(2)); + + cache.put(2, new MvccTestSqlIndexValue(1)); + cache.put(3, new MvccTestSqlIndexValue(1)); + cache.put(4, new MvccTestSqlIndexValue(1)); SqlQuery<Integer, MvccTestSqlIndexValue> qry = - new SqlQuery<>(MvccTestSqlIndexValue.class, "_key >= 0"); + new SqlQuery<>(MvccTestSqlIndexValue.class, "_key = 1"); List<IgniteCache.Entry<Integer, MvccTestSqlIndexValue>> res = cache.query(qry).getAll();
