Repository: ignite
Updated Branches:
  refs/heads/ignite-5937 488662975 -> 0d69982fe


ignite-5937


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0d69982f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0d69982f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0d69982f

Branch: refs/heads/ignite-5937
Commit: 0d69982fe8d3be9ec6cf705ae77156061850f166
Parents: 4886629
Author: sboikov <[email protected]>
Authored: Tue Oct 17 16:54:43 2017 +0300
Committer: sboikov <[email protected]>
Committed: Tue Oct 17 16:59:41 2017 +0300

----------------------------------------------------------------------
 .../processors/query/GridQueryProcessor.java    |   7 +-
 .../cache/query/GridCacheTwoStepQuery.java      |  18 +++
 .../query/h2/DmlStatementsProcessor.java        |   2 +-
 .../processors/query/h2/IgniteH2Indexing.java   |  20 ++-
 .../query/h2/opt/GridH2PlainRowFactory.java     | 144 +++++++++++++++++--
 .../query/h2/opt/GridH2QueryContext.java        |  27 +++-
 .../query/h2/twostep/GridMapQueryExecutor.java  |  36 ++++-
 .../query/h2/twostep/GridMergeIndexSorted.java  |   2 +-
 .../h2/twostep/GridMergeIndexUnsorted.java      |   2 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |  34 ++++-
 .../h2/twostep/msg/GridH2QueryRequest.java      |  83 ++++++++---
 .../cache/mvcc/CacheMvccSqlQueriesTest.java     |   8 +-
 12 files changed, 324 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 1095e5f..e88a234 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -72,7 +72,6 @@ import 
org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.query.property.QueryBinaryProperty;
 import 
org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
 import 
org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl;
@@ -1868,7 +1867,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
 
         try {
             final String schemaName = qry.getSchema() != null ? 
qry.getSchema() : idx.schema(cctx.name());
-            final int mainCacheId = CU.cacheId(cctx.name());
+            final int mainCacheId = cctx.cacheId();
 
             IgniteOutClosureX<FieldsQueryCursor<List<?>>> clo;
 
@@ -2054,7 +2053,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
 
         try {
             final String schemaName = idx.schema(cctx.name());
-            final int mainCacheId = CU.cacheId(cctx.name());
+            final int mainCacheId = cctx.cacheId();
 
             return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx,
                 new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() {
@@ -2083,7 +2082,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
             throw new IllegalStateException("Failed to execute query (grid is 
stopping).");
 
         final String schemaName = idx.schema(cctx.name());
-        final int mainCacheId = CU.cacheId(cctx.name());
+        final int mainCacheId = cctx.cacheId();
 
         try {
             return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx,

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index 4a93aaf..f5c5e60 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -65,6 +65,9 @@ public class GridCacheTwoStepQuery {
     /** */
     private CacheQueryPartitionInfo[] derivedPartitions;
 
+    /** */
+    private boolean mvccEnabled;
+
     /**
      * @param originalSql Original query SQL.
      * @param tbls Tables in query.
@@ -241,6 +244,7 @@ public class GridCacheTwoStepQuery {
         cp.distributedJoins = distributedJoins;
         cp.derivedPartitions = derivedPartitions;
         cp.local = local;
+        cp.mvccEnabled = mvccEnabled;
 
         for (int i = 0; i < mapQrys.size(); i++)
             cp.mapQrys.add(mapQrys.get(i).copy());
@@ -262,6 +266,20 @@ public class GridCacheTwoStepQuery {
         return tbls;
     }
 
+    /**
+     * @return Mvcc flag.
+     */
+    public boolean mvccEnabled() {
+        return mvccEnabled;
+    }
+
+    /**
+     * @param mvccEnabled Mvcc flag.
+     */
+    public void mvccEnabled(boolean mvccEnabled) {
+        this.mvccEnabled = mvccEnabled;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridCacheTwoStepQuery.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index 9e55442..c3d48dd 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -361,7 +361,7 @@ public class DmlStatementsProcessor {
     private UpdateResult executeUpdateStatement(String schemaName, final 
GridCacheContext cctx, Connection c,
         Prepared prepared, SqlFieldsQuery fieldsQry, boolean loc, 
IndexingQueryFilter filters,
         GridQueryCancel cancel, Object[] failedKeys) throws 
IgniteCheckedException {
-        int mainCacheId = CU.cacheId(cctx.name());
+        int mainCacheId = cctx.cacheId();
 
         Integer errKeysPos = null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 9e8f593..57c9c57 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1587,9 +1587,11 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     }
 
     /**
+     * @param cacheIds Cache IDs.
+     * @param twoStepQry Query.
      * @throws IllegalStateException if segmented indices used with 
non-segmented indices.
      */
-    private void checkCacheIndexSegmentation(List<Integer> cacheIds) {
+    private void processCaches(List<Integer> cacheIds, GridCacheTwoStepQuery 
twoStepQry) {
         if (cacheIds.isEmpty())
             return; // Nothing to check
 
@@ -1597,11 +1599,21 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
         int expectedParallelism = 0;
 
-        for (Integer cacheId : cacheIds) {
+        boolean mvccEnabled = false;
+
+        for (int i = 0; i < cacheIds.size(); i++) {
+            Integer cacheId = cacheIds.get(i);
+
             GridCacheContext cctx = sharedCtx.cacheContext(cacheId);
 
             assert cctx != null;
 
+            if (i == 0)
+                mvccEnabled = cctx.mvccEnabled();
+            else if (cctx.mvccEnabled() != mvccEnabled)
+                throw new IllegalStateException("Using caches with different 
mvcc settings in same query is " +
+                    "forbidden.");
+
             if (!cctx.isPartitioned())
                 continue;
 
@@ -1612,6 +1624,8 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                     "forbidden.");
             }
         }
+
+        twoStepQry.mvccEnabled(mvccEnabled);
     }
 
     /**
@@ -2519,7 +2533,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
             //Prohibit usage indices with different numbers of segments in 
same query.
             List<Integer> cacheIds = new ArrayList<>(caches0);
 
-            checkCacheIndexSegmentation(cacheIds);
+            processCaches(cacheIds, twoStepQry);
 
             return cacheIds;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java
index 439551d..8982236 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.h2.opt;
 
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.h2.result.Row;
@@ -36,41 +37,48 @@ public class GridH2PlainRowFactory extends RowFactory {
     }
 
     /**
+     * TODO IGNITE-3478: review usages.
+     *
+     * @param ctx Query context.
      * @param data Values.
      * @return Row.
      */
-    public static Row create(Value... data) {
+    public static Row create(GridH2QueryContext ctx, Value... data) {
+        MvccCoordinatorVersion mvccVer = ctx != null ? ctx.mvccVersion() : 
null;
+
         switch (data.length) {
             case 0:
                 throw new IllegalStateException("Zero columns row.");
 
             case 1:
-                return new RowKey(data[0]);
+                return mvccVer != null ? new RowKeyMvcc(data[0], mvccVer) : 
new RowKey(data[0]);
 
             case 2:
-                return new RowPair(data[0], data[1]);
+                return mvccVer != null ? new RowPairMvcc(data[0], data[1], 
mvccVer) : new RowPair(data[0], data[1]);
 
             default:
-                return new RowSimple(data);
+                return mvccVer != null ? new RowSimpleMvcc(data, mvccVer) : 
new RowSimple(data);
         }
     }
 
     /** {@inheritDoc} */
     @Override public Row createRow(Value[] data, int memory) {
-        return create(data);
+        GridH2QueryContext ctx = GridH2QueryContext.get();
+
+        return create(ctx, data);
     }
 
     /**
      * Single value row.
      */
-    private static final class RowKey extends GridH2SearchRowAdapter {
+    private static class RowKey extends GridH2SearchRowAdapter {
         /** */
         private Value key;
 
         /**
          * @param key Key.
          */
-        public RowKey(Value key) {
+        RowKey(Value key) {
             this.key = key;
         }
 
@@ -93,12 +101,12 @@ public class GridH2PlainRowFactory extends RowFactory {
 
         /** {@inheritDoc} */
         @Override public long mvccCoordinatorVersion() {
-            return 0; // TODO IGNITE-3478
+            return 0;
         }
 
         /** {@inheritDoc} */
         @Override public long mvccCounter() {
-            return 0; // TODO IGNITE-3478
+            return 0;
         }
 
         /** {@inheritDoc} */
@@ -108,9 +116,44 @@ public class GridH2PlainRowFactory extends RowFactory {
     }
 
     /**
+     * Single value row.
+     */
+    private static final class RowKeyMvcc extends RowKey {
+        /** */
+        private final MvccCoordinatorVersion mvccVer;
+
+        /**
+         * @param key Key.
+         * @param mvccVer Mvcc version.
+         */
+        RowKeyMvcc(Value key, MvccCoordinatorVersion mvccVer) {
+            super(key);
+
+            assert mvccVer != null;
+
+            this.mvccVer = mvccVer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long mvccCoordinatorVersion() {
+            return mvccVer.coordinatorVersion();
+        }
+
+        /** {@inheritDoc} */
+        @Override public long mvccCounter() {
+            return mvccVer.counter();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RowKeyMvcc.class, this);
+        }
+    }
+
+    /**
      * Row of two values.
      */
-    private static final class RowPair extends GridH2SearchRowAdapter  {
+    private static class RowPair extends GridH2SearchRowAdapter  {
         /** */
         private Value v1;
 
@@ -149,12 +192,12 @@ public class GridH2PlainRowFactory extends RowFactory {
 
         /** {@inheritDoc} */
         @Override public long mvccCoordinatorVersion() {
-            return 0; // TODO IGNITE-3478
+            return 0;
         }
 
         /** {@inheritDoc} */
         @Override public long mvccCounter() {
-            return 0; // TODO IGNITE-3478
+            return 0;
         }
 
         /** {@inheritDoc} */
@@ -164,9 +207,45 @@ public class GridH2PlainRowFactory extends RowFactory {
     }
 
     /**
+     *
+     */
+    private static final class RowPairMvcc extends RowPair {
+        /** */
+        private final MvccCoordinatorVersion mvccVer;
+
+        /**
+         * @param v1 First value.
+         * @param v2 Second value.
+         * @param mvccVer Mvcc version.
+         */
+        RowPairMvcc(Value v1, Value v2, MvccCoordinatorVersion mvccVer) {
+            super(v1, v2);
+
+            assert mvccVer != null;
+
+            this.mvccVer = mvccVer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long mvccCoordinatorVersion() {
+            return mvccVer.coordinatorVersion();
+        }
+
+        /** {@inheritDoc} */
+        @Override public long mvccCounter() {
+            return mvccVer.counter();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RowPairMvcc.class, this);
+        }
+    }
+
+    /**
      * Simple array based row.
      */
-    private static final class RowSimple extends GridH2SearchRowAdapter {
+    private static class RowSimple extends GridH2SearchRowAdapter {
         /** */
         @GridToStringInclude
         private Value[] vals;
@@ -195,12 +274,12 @@ public class GridH2PlainRowFactory extends RowFactory {
 
         /** {@inheritDoc} */
         @Override public long mvccCoordinatorVersion() {
-            return 0; // TODO IGNITE-3478
+            return 0;
         }
 
         /** {@inheritDoc} */
         @Override public long mvccCounter() {
-            return 0; // TODO IGNITE-3478
+            return 0;
         }
 
         /** {@inheritDoc} */
@@ -208,4 +287,39 @@ public class GridH2PlainRowFactory extends RowFactory {
             return S.toString(RowSimple.class, this);
         }
     }
+
+    /**
+     *
+     */
+    private static class RowSimpleMvcc extends RowSimple {
+        /** */
+        private final MvccCoordinatorVersion mvccVer;
+
+        /**
+         * @param vals Values.
+         * @param mvccVer Mvcc version.
+         */
+        RowSimpleMvcc(Value[] vals, MvccCoordinatorVersion mvccVer) {
+            super(vals);
+
+            assert mvccVer != null;
+
+            this.mvccVer = mvccVer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long mvccCoordinatorVersion() {
+            return mvccVer.coordinatorVersion();
+        }
+
+        /** {@inheritDoc} */
+        @Override public long mvccCounter() {
+            return mvccVer.counter();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RowSimpleMvcc.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
index 91f0aef..1b4e433 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
@@ -83,6 +84,9 @@ public class GridH2QueryContext {
     /** */
     private GridH2CollocationModel qryCollocationMdl;
 
+    /** */
+    private MvccCoordinatorVersion mvccVer;
+
     /**
      * @param locNodeId Local node ID.
      * @param nodeId The node who initiated the query.
@@ -102,13 +106,34 @@ public class GridH2QueryContext {
      * @param segmentId Index segment ID.
      * @param type Query type.
      */
-    public GridH2QueryContext(UUID locNodeId, UUID nodeId, long qryId, int 
segmentId, GridH2QueryType type) {
+    public GridH2QueryContext(UUID locNodeId,
+        UUID nodeId,
+        long qryId,
+        int segmentId,
+        GridH2QueryType type) {
         assert segmentId == 0 || type == MAP;
 
         key = new Key(locNodeId, nodeId, qryId, segmentId, type);
     }
 
     /**
+     * @return Mvcc version.
+     */
+    @Nullable public MvccCoordinatorVersion mvccVersion() {
+        return mvccVer;
+    }
+
+    /**
+     * @param mvccVer Mvcc version.
+     * @return {@code this}.
+     */
+    public GridH2QueryContext mvccVersion(MvccCoordinatorVersion mvccVer) {
+        this.mvccVer = mvccVer;
+
+        return this;
+    }
+
+    /**
      * @return Type.
      */
     public GridH2QueryType type() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 77b928f..fcc3296 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -54,6 +54,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsReservation;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
@@ -482,7 +483,8 @@ public class GridMapQueryExecutor {
                     false, // Replicated is always false here (see condition 
above).
                     req.timeout(),
                     params,
-                    true); // Lazy = true.
+                    true,
+                    req.mvccVersion()); // Lazy = true.
             }
             else {
                 ctx.closure().callLocal(
@@ -504,7 +506,8 @@ public class GridMapQueryExecutor {
                                 false,
                                 req.timeout(),
                                 params,
-                                false); // Lazy = false.
+                                false,
+                                req.mvccVersion()); // Lazy = false.
 
                             return null;
                         }
@@ -528,7 +531,8 @@ public class GridMapQueryExecutor {
             replicated,
             req.timeout(),
             params,
-            lazy);
+            lazy,
+            req.mvccVersion());
     }
 
     /**
@@ -561,7 +565,8 @@ public class GridMapQueryExecutor {
         final boolean replicated,
         final int timeout,
         final Object[] params,
-        boolean lazy
+        boolean lazy,
+        @Nullable final MvccCoordinatorVersion mvccVer
     ) {
         if (lazy && MapQueryLazyWorker.currentWorker() == null) {
             // Lazy queries must be re-submitted to dedicated workers.
@@ -570,8 +575,24 @@ public class GridMapQueryExecutor {
 
             worker.submit(new Runnable() {
                 @Override public void run() {
-                    onQueryRequest0(node, reqId, segmentId, schemaName, qrys, 
cacheIds, topVer, partsMap, parts,
-                        pageSize, distributedJoinMode, enforceJoinOrder, 
replicated, timeout, params, true);
+                    onQueryRequest0(
+                        node,
+                        reqId,
+                        segmentId,
+                        schemaName,
+                        qrys,
+                        cacheIds,
+                        topVer,
+                        partsMap,
+                        parts,
+                        pageSize,
+                        distributedJoinMode,
+                        enforceJoinOrder,
+                        replicated,
+                        timeout,
+                        params,
+                        true,
+                        mvccVer);
                 }
             });
 
@@ -637,7 +658,8 @@ public class GridMapQueryExecutor {
                 .distributedJoinMode(distributedJoinMode)
                 .pageSize(pageSize)
                 .topologyVersion(topVer)
-                .reservations(reserved);
+                .reservations(reserved)
+                .mvccVersion(mvccVer);
 
             Connection conn = h2.connectionForSchema(schemaName);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
index 0dc8354..4eeacf6 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
@@ -368,7 +368,7 @@ public final class GridMergeIndexSorted extends 
GridMergeIndex {
             if (!iter.hasNext())
                 return false;
 
-            cur = GridH2PlainRowFactory.create(iter.next());
+            cur = GridH2PlainRowFactory.create(null, iter.next());
 
             return true;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
index 487d386..6e1ad1f 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
@@ -139,7 +139,7 @@ public final class GridMergeIndexUnsorted extends 
GridMergeIndex {
             }
 
             @Override public Row next() {
-                return GridH2PlainRowFactory.create(iter.next());
+                return GridH2PlainRowFactory.create(null, iter.next());
             }
 
             @Override public void remove() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index f85cd94..debba5e 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -59,6 +59,8 @@ import 
org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
@@ -83,11 +85,13 @@ import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryReq
 import org.apache.ignite.internal.util.GridIntIterator;
 import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.CIX2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
+import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.h2.command.ddl.CreateTableData;
@@ -562,6 +566,33 @@ public class GridReduceQueryExecutor {
 
             List<Integer> cacheIds = qry.cacheIds();
 
+            MvccCoordinatorVersion mvccVer = null;
+
+            // TODO IGNITE-3478.
+            if (qry.mvccEnabled()) {
+                assert !cacheIds.isEmpty();
+
+                final GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
+
+                MvccQueryTracker mvccTracker = new 
MvccQueryTracker(cacheContext(cacheIds.get(0)), true,
+                    new IgniteBiInClosure<AffinityTopologyVersion, 
IgniteCheckedException>() {
+                    @Override public void apply(AffinityTopologyVersion 
topVer, IgniteCheckedException e) {
+                        fut.onDone(null, e);
+                    }
+                });
+
+                mvccTracker.requestVersion(topVer);
+
+                try {
+                    fut.get();
+
+                    mvccVer = mvccTracker.mvccVersion();
+                }
+                catch (IgniteCheckedException e) {
+                    throw new CacheException(e);
+                }
+            }
+
             Collection<ClusterNode> nodes = null;
 
             // Explicit partition mapping for unstable topology.
@@ -728,7 +759,8 @@ public class GridReduceQueryExecutor {
                     .parameters(params)
                     .flags(flags)
                     .timeout(timeoutMillis)
-                    .schemaName(schemaName);
+                    .schemaName(schemaName)
+                    .mvccVersion(mvccVer);
 
                 if (send(nodes, req, parts == null ? null : new 
ExplicitPartitionsSpecializer(qryMap), false)) {
                     awaitAllReplies(r, nodes, cancel);

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index 4e1fadb..347b88c 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.cache.query.QueryTable;
@@ -42,6 +43,7 @@ import 
org.apache.ignite.plugin.extensions.communication.Message;
 import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS;
 
@@ -133,6 +135,9 @@ public class GridH2QueryRequest implements Message, 
GridCacheQueryMarshallable {
     /** Schema name. */
     private String schemaName;
 
+    /** */
+    private MvccCoordinatorVersion mvccVer;
+
     /**
      * Required by {@link Externalizable}
      */
@@ -157,6 +162,24 @@ public class GridH2QueryRequest implements Message, 
GridCacheQueryMarshallable {
         params = req.params;
         paramsBytes = req.paramsBytes;
         schemaName = req.schemaName;
+        mvccVer = req.mvccVer;
+    }
+
+    /**
+     * @return Mvcc version.
+     */
+    @Nullable public MvccCoordinatorVersion mvccVersion() {
+        return mvccVer;
+    }
+
+    /**
+     * @param mvccVer Mvcc version.
+     * @return {@code this}.
+     */
+    public GridH2QueryRequest mvccVersion(MvccCoordinatorVersion mvccVer) {
+        this.mvccVer = mvccVer;
+
+        return this;
     }
 
     /**
@@ -435,65 +458,71 @@ public class GridH2QueryRequest implements Message, 
GridCacheQueryMarshallable {
                 writer.incrementState();
 
             case 2:
-                if (!writer.writeInt("pageSize", pageSize))
+                if (!writer.writeMessage("mvccVer", mvccVer))
                     return false;
 
                 writer.incrementState();
 
             case 3:
-                if (!writer.writeByteArray("paramsBytes", paramsBytes))
+                if (!writer.writeInt("pageSize", pageSize))
                     return false;
 
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeMap("parts", parts, 
MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR))
+                if (!writer.writeByteArray("paramsBytes", paramsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeCollection("qrys", qrys, 
MessageCollectionItemType.MSG))
+                if (!writer.writeMap("parts", parts, 
MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeLong("reqId", reqId))
+                if (!writer.writeIntArray("qryParts", qryParts))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeCollection("tbls", tbls, 
MessageCollectionItemType.MSG))
+                if (!writer.writeCollection("qrys", qrys, 
MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeInt("timeout", timeout))
+                if (!writer.writeLong("reqId", reqId))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeString("schemaName", schemaName))
                     return false;
 
                 writer.incrementState();
 
-
             case 10:
-                if (!writer.writeIntArray("qryParts", qryParts))
+                if (!writer.writeCollection("tbls", tbls, 
MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeString("schemaName", schemaName))
+                if (!writer.writeInt("timeout", timeout))
                     return false;
 
                 writer.incrementState();
+
+            case 12:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -524,7 +553,7 @@ public class GridH2QueryRequest implements Message, 
GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 2:
-                pageSize = reader.readInt("pageSize");
+                mvccVer = reader.readMessage("mvccVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -532,7 +561,7 @@ public class GridH2QueryRequest implements Message, 
GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 3:
-                paramsBytes = reader.readByteArray("paramsBytes");
+                pageSize = reader.readInt("pageSize");
 
                 if (!reader.isLastRead())
                     return false;
@@ -540,7 +569,7 @@ public class GridH2QueryRequest implements Message, 
GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 4:
-                parts = reader.readMap("parts", 
MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR, false);
+                paramsBytes = reader.readByteArray("paramsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -548,7 +577,7 @@ public class GridH2QueryRequest implements Message, 
GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 5:
-                qrys = reader.readCollection("qrys", 
MessageCollectionItemType.MSG);
+                parts = reader.readMap("parts", 
MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -556,7 +585,7 @@ public class GridH2QueryRequest implements Message, 
GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 6:
-                reqId = reader.readLong("reqId");
+                qryParts = reader.readIntArray("qryParts");
 
                 if (!reader.isLastRead())
                     return false;
@@ -564,7 +593,7 @@ public class GridH2QueryRequest implements Message, 
GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 7:
-                tbls = reader.readCollection("tbls", 
MessageCollectionItemType.MSG);
+                qrys = reader.readCollection("qrys", 
MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -572,7 +601,7 @@ public class GridH2QueryRequest implements Message, 
GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 8:
-                timeout = reader.readInt("timeout");
+                reqId = reader.readLong("reqId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -580,16 +609,15 @@ public class GridH2QueryRequest implements Message, 
GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 9:
-                topVer = reader.readMessage("topVer");
+                schemaName = reader.readString("schemaName");
 
                 if (!reader.isLastRead())
                     return false;
 
                 reader.incrementState();
 
-
             case 10:
-                qryParts = reader.readIntArray("qryParts");
+                tbls = reader.readCollection("tbls", 
MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -597,12 +625,21 @@ public class GridH2QueryRequest implements Message, 
GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 11:
-                schemaName = reader.readString("schemaName");
+                timeout = reader.readInt("timeout");
 
                 if (!reader.isLastRead())
                     return false;
 
                 reader.incrementState();
+
+            case 12:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridH2QueryRequest.class);
@@ -615,7 +652,7 @@ public class GridH2QueryRequest implements Message, 
GridCacheQueryMarshallable {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 12;
+        return 13;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d69982f/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
index e5e7f73..68b1e27 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
@@ -58,10 +58,14 @@ public class CacheMvccSqlQueriesTest extends 
CacheMvccAbstractTest {
             setSqlIndexMaxInlineSize(0));
 
         cache.put(1, new MvccTestSqlIndexValue(1));
-        //cache.put(1, new MvccTestSqlIndexValue(2));
+        cache.put(1, new MvccTestSqlIndexValue(2));
+
+        cache.put(2, new MvccTestSqlIndexValue(1));
+        cache.put(3, new MvccTestSqlIndexValue(1));
+        cache.put(4, new MvccTestSqlIndexValue(1));
 
         SqlQuery<Integer, MvccTestSqlIndexValue> qry =
-            new SqlQuery<>(MvccTestSqlIndexValue.class, "_key >= 0");
+            new SqlQuery<>(MvccTestSqlIndexValue.class, "_key = 1");
 
         List<IgniteCache.Entry<Integer, MvccTestSqlIndexValue>> res = 
cache.query(qry).getAll();
 

Reply via email to