http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
index b220291..4889069 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
@@ -144,11 +144,6 @@ public class GridQueryNextPageResponse implements Message {
     }
 
     /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridQueryNextPageResponse.class, this);
-    }
-
-    /** {@inheritDoc} */
     @Override public void onAckReceived() {
         // No-op.
     }
@@ -304,4 +299,11 @@ public class GridQueryNextPageResponse implements Message {
     public void retry(AffinityTopologyVersion retry) {
         this.retry = retry;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridQueryNextPageResponse.class, this,
+            "valsSize", vals != null ? vals.size() : 0,
+            "rowsSize", plainRows != null ? plainRows.size() : 0);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
index 60d348b..f7de86c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
@@ -20,13 +20,16 @@ package 
org.apache.ignite.internal.processors.query.h2.twostep.messages;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.List;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteCodeGeneratingFail;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -35,8 +38,9 @@ import 
org.apache.ignite.plugin.extensions.communication.MessageWriter;
 /**
  * Query request.
  */
+@Deprecated
 @IgniteCodeGeneratingFail
-public class GridQueryRequest implements Message {
+public class GridQueryRequest implements Message, GridCacheQueryMarshallable {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -165,11 +169,29 @@ public class GridQueryRequest implements Message {
     /**
      * @return Queries.
      */
-    public Collection<GridCacheSqlQuery> queries() throws 
IgniteCheckedException {
+    public Collection<GridCacheSqlQuery> queries() {
         return qrys;
     }
 
     /** {@inheritDoc} */
+    @Override public void marshall(Marshaller m) {
+        if (F.isEmpty(qrys))
+            return;
+
+        for (GridCacheSqlQuery qry : qrys)
+            qry.marshall(m);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void unmarshall(Marshaller m, GridKernalContext ctx) {
+        if (F.isEmpty(qrys))
+            return;
+
+        for (GridCacheSqlQuery qry : qrys)
+            qry.unmarshall(m, ctx);
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridQueryRequest.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 269795b..7a81cf8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -6703,6 +6703,19 @@ public abstract class IgniteUtils {
     }
 
     /**
+     * Swaps two objects in array.
+     *
+     * @param arr Array.
+     * @param a Index of the first object.
+     * @param b Index of the second object.
+     */
+    public static void swap(Object[] arr, int a, int b) {
+        Object tmp = arr[a];
+        arr[a] = arr[b];
+        arr[b] = tmp;
+    }
+
+    /**
      * Returns array which is the union of two arrays
      * (array of elements contained in any of provided arrays).
      * <p/>

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index ab31625..5eb27d3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -2313,6 +2313,26 @@ public class GridFunc {
     }
 
     /**
+     * Converts collection of numbers to primitive {@code int[]} array.
+     *
+     * @param col Collection of numbers.
+     * @return Array of integers.
+     */
+    public static int[] toIntArray(Collection<? extends Number> col) {
+        if (col == null)
+            return null;
+
+        int[] res = new int[col.size()];
+
+        Iterator<? extends Number> iter = col.iterator();
+
+        for (int i = 0; i < res.length; i++)
+            res[i] = iter.next().intValue();
+
+        return res;
+    }
+
+    /**
      * Utility map getter. This method analogous to {@link #addIfAbsent(Map, 
Object, Callable)}
      * method but this one doesn't put the default value into the map when key 
is not found.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridOffHeapSnapTreeMap.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridOffHeapSnapTreeMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridOffHeapSnapTreeMap.java
index 04ea56c..7bde651 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridOffHeapSnapTreeMap.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridOffHeapSnapTreeMap.java
@@ -35,7 +35,6 @@
 
 package org.apache.ignite.internal.util.offheap.unsafe;
 
-import java.io.Closeable;
 import java.util.AbstractMap;
 import java.util.AbstractSet;
 import java.util.ArrayList;
@@ -51,10 +50,11 @@ import java.util.SortedSet;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
@@ -107,7 +107,7 @@ import org.jsr166.ConcurrentHashMap8;
  */
 @SuppressWarnings("ALL")
 public class GridOffHeapSnapTreeMap<K extends GridOffHeapSmartPointer,V 
extends GridOffHeapSmartPointer>
-    extends AbstractMap<K,V> implements ConcurrentNavigableMap<K, V>, 
Cloneable, Closeable {
+    extends AbstractMap<K,V> implements ConcurrentNavigableMap<K, V>, 
Cloneable, AutoCloseable, GridReservable {
     /** This is a special value that indicates that an optimistic read failed. 
*/
     private static final GridOffHeapSmartPointer SpecialRetry = new 
GridOffHeapSmartPointer() {
         @Override public long pointer() {
@@ -937,17 +937,17 @@ public class GridOffHeapSnapTreeMap<K extends 
GridOffHeapSmartPointer,V extends
         protected final ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock();
 
         /** */
-        private final AtomicBoolean stopped = new AtomicBoolean(false);
+        private boolean stopped;
 
         /** {@inheritDoc} */
         @Override public boolean add(long node) {
-            ReentrantReadWriteLock.ReadLock l =  lock.readLock();
+            Lock l =  lock.readLock();
 
             if (!l.tryLock())
                 return false;
 
             try {
-                return super.add(node);
+                return !stopped && super.add(node);
             }
             finally {
                 l.unlock();
@@ -956,13 +956,13 @@ public class GridOffHeapSnapTreeMap<K extends 
GridOffHeapSmartPointer,V extends
 
         /** {@inheritDoc} */
         @Override public boolean add(RecycleQueue que) {
-            ReentrantReadWriteLock.ReadLock l =  lock.readLock();
+            Lock l =  lock.readLock();
 
             if (!l.tryLock())
                 return false;
 
             try {
-                return super.add(que);
+                return !stopped && super.add(que);
             }
             finally {
                 l.unlock();
@@ -980,20 +980,21 @@ public class GridOffHeapSnapTreeMap<K extends 
GridOffHeapSmartPointer,V extends
          * @return {@code true} If we stopped this queue.
          */
         public boolean stop() {
-            if (stopped.compareAndSet(false, true)) {
-                lock.writeLock().lock();
+            Lock l = lock.writeLock();
 
-                return true;
-            }
+            l.lock();
 
-            return false;
-        }
+            try {
+                if (stopped)
+                    return false;
 
-        /**
-         * @return {@code true} If this queue is stopped..
-         */
-        public boolean isStopped() {
-            return stopped.get();
+                stopped = true;
+
+                return true;
+            }
+            finally {
+                l.unlock();
+            }
         }
     }
 
@@ -1046,6 +1047,12 @@ public class GridOffHeapSnapTreeMap<K extends 
GridOffHeapSmartPointer,V extends
     /** Recycle queue for this snapshot. */
     private volatile StoppableRecycleQueue recycleBin;
 
+    /** */
+    private AtomicInteger reservations;
+
+    /** */
+    private volatile boolean closing;
+
     /**
      * @param keyFactory Key factory.
      * @param valFactory Value factory.
@@ -1079,10 +1086,53 @@ public class GridOffHeapSnapTreeMap<K extends 
GridOffHeapSmartPointer,V extends
         this.holderRef = rootHolder();
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean reserve() {
+        for (;;) {
+            int r = reservations.get();
+
+            if (r == -1)
+                return false;
+
+            assert r >= 0 : r;
+
+            if (reservations.compareAndSet(r, r + 1))
+                return true;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void release() {
+        for (;;) {
+            int r = reservations.get();
+
+            assert r > 0 : r;
+
+            int z = r == 1 && closing ? -1 : r - 1;
+
+            if (reservations.compareAndSet(r, z)) {
+                if (z == -1)
+                    doClose();
+
+                return;
+            }
+        }
+    }
+
     /**
      * Closes tree map and reclaims memory.
      */
-    public void close() {
+    @Override public void close() {
+        closing = true;
+
+        if (reservations == null || reservations.compareAndSet(0, -1))
+            doClose();
+    }
+
+    /**
+     *
+     */
+    private void doClose() {
         RecycleQueue q;
 
         if (snapshotId.longValue() == 0) {
@@ -1142,6 +1192,7 @@ public class GridOffHeapSnapTreeMap<K extends 
GridOffHeapSmartPointer,V extends
         copy.holderRef = rootHolder(holderRef);
         markShared(root());
 
+        copy.reservations = new AtomicInteger();
         copy.size = new AtomicInteger(size());
         copy.recycleBin = new StoppableRecycleQueue();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java
index 9392cc1..37352e1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java
@@ -51,7 +51,7 @@ public class TcpDiscoveryJdbcIpFinderSelfTest extends
         dataSrc.setDriverClass("org.h2.Driver");
 
         if (initSchema)
-            dataSrc.setJdbcUrl("jdbc:h2:mem");
+            dataSrc.setJdbcUrl("jdbc:h2:mem:./test");
         else {
             
dataSrc.setJdbcUrl("jdbc:h2:mem:jdbc_ipfinder_not_initialized_schema");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 87509a4..e475754 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.Cache;
 import javax.cache.CacheException;
@@ -47,6 +48,7 @@ import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterTopologyException;
@@ -529,7 +531,7 @@ public abstract class GridCommonAbstractTest extends 
GridAbstractTest {
                                         ", locNode=" + g.cluster().localNode() 
+ ']');
                                 }
 
-                                Thread.sleep(200); // Busy wait.
+                                Thread.sleep(20); // Busy wait.
 
                                 continue;
                             }
@@ -1177,4 +1179,39 @@ public abstract class GridCommonAbstractTest extends 
GridAbstractTest {
             }
         }
     }
+
+    /**
+     * @param aff Affinity.
+     * @param key Counter.
+     * @param node Target node.
+     * @return Key.
+     */
+    protected final Integer keyForNode(Affinity<Object> aff, AtomicInteger 
key, ClusterNode node) {
+        for (int i = 0; i < 100_000; i++) {
+            Integer next = key.getAndIncrement();
+
+            if (aff.mapKeyToNode(next).equals(node))
+                return next;
+        }
+
+        fail("Failed to find key for node: " + node);
+
+        return null;
+    }
+
+    /**
+     * @param cache Cache.
+     * @param qry Query.
+     * @return Query plan.
+     */
+    protected final String queryPlan(IgniteCache<?, ?> cache, SqlFieldsQuery 
qry) {
+        return (String)cache.query(new SqlFieldsQuery("explain " + 
qry.getSql())
+            .setArgs(qry.getArgs())
+            .setLocal(qry.isLocal())
+            .setCollocated(qry.isCollocated())
+            .setPageSize(qry.getPageSize())
+            .setDistributedJoins(qry.isDistributedJoins())
+            .setEnforceJoinOrder(qry.isEnforceJoinOrder()))
+            .getAll().get(0).get(0);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
----------------------------------------------------------------------
diff --git 
a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
 
b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
index 29b7dd4..3062d13 100644
--- 
a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
+++ 
b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
@@ -28,29 +28,31 @@ import java.util.Map;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.h2.engine.Constants;
 import org.h2.engine.Session;
 import org.h2.index.Cursor;
-import org.h2.index.IndexCondition;
 import org.h2.index.IndexType;
 import org.h2.index.SingleRowCursor;
 import org.h2.index.SpatialIndex;
+import org.h2.index.SpatialTreeIndex;
 import org.h2.message.DbException;
 import org.h2.mvstore.MVStore;
 import org.h2.mvstore.rtree.MVRTreeMap;
 import org.h2.mvstore.rtree.SpatialKey;
 import org.h2.result.SearchRow;
 import org.h2.result.SortOrder;
-import org.h2.table.Column;
 import org.h2.table.IndexColumn;
 import org.h2.table.Table;
 import org.h2.table.TableFilter;
 import org.h2.value.Value;
 import org.h2.value.ValueGeometry;
+import org.jetbrains.annotations.Nullable;
+
+import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
 
 /**
  * Spatial index.
  */
+@SuppressWarnings("unused"/*reflection*/)
 public class GridH2SpatialIndex extends GridH2IndexBase implements 
SpatialIndex {
     /** */
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
@@ -80,12 +82,8 @@ public class GridH2SpatialIndex extends GridH2IndexBase 
implements SpatialIndex
      * @param tbl Table.
      * @param idxName Index name.
      * @param cols Columns.
-     * @param keyCol Key column.
-     * @param valCol Value column.
      */
-    public GridH2SpatialIndex(Table tbl, String idxName, IndexColumn[] cols, 
int keyCol, int valCol) {
-        super(keyCol, valCol);
-
+    public GridH2SpatialIndex(Table tbl, String idxName, IndexColumn... cols) {
         if (cols.length > 1)
             throw DbException.getUnsupportedException("can only do one 
column");
 
@@ -121,7 +119,14 @@ public class GridH2SpatialIndex extends GridH2IndexBase 
implements SpatialIndex
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override protected Object doTakeSnapshot() {
+        return null; // TODO We do not support snapshots, but probably this is 
possible.
+    }
+
+    /** {@inheritDoc} */
     @Override public GridH2Row put(GridH2Row row) {
+        assert row instanceof GridH2AbstractKeyValueRow : "requires key to be 
at 0";
+
         Lock l = lock.writeLock();
 
         l.lock();
@@ -129,7 +134,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase 
implements SpatialIndex
         try {
             checkClosed();
 
-            Value key = row.getValue(keyCol);
+            Value key = row.getValue(KEY_COL);
 
             assert key != null;
 
@@ -183,7 +188,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase 
implements SpatialIndex
         try {
             checkClosed();
 
-            Value key = row.getValue(keyCol);
+            Value key = row.getValue(KEY_COL);
 
             assert key != null;
 
@@ -208,7 +213,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase 
implements SpatialIndex
     }
 
     /** {@inheritDoc} */
-    @Override public void close(Session ses) {
+    @Override public void destroy() {
         Lock l = lock.writeLock();
 
         l.lock();
@@ -221,37 +226,31 @@ public class GridH2SpatialIndex extends GridH2IndexBase 
implements SpatialIndex
         finally {
             l.unlock();
         }
+
+        super.destroy();
     }
 
     /** {@inheritDoc} */
-    @Override protected long getCostRangeIndex(int[] masks, long rowCnt, 
TableFilter filter, SortOrder sortOrder) {
-        rowCnt += Constants.COST_ROW_OFFSET;
-        long cost = rowCnt;
-        long rows = rowCnt;
-
-        if (masks == null)
-            return cost;
-
-        for (Column column : columns) {
-            int idx = column.getColumnId();
-            int mask = masks[idx];
-            if ((mask & IndexCondition.SPATIAL_INTERSECTS) != 0) {
-                cost = 3 + rows / 4;
-
-                break;
-            }
-        }
-
-        return cost;
+    @Override public double getCost(Session ses, int[] masks, TableFilter[] 
filters, int filter, SortOrder sortOrder) {
+        return SpatialTreeIndex.getCostRangeIndex(masks,
+            table.getRowCountApproximation(), columns) / 10;
     }
 
     /** {@inheritDoc} */
-    @Override public double getCost(Session ses, int[] masks, TableFilter 
filter, SortOrder sortOrder) {
-        return getCostRangeIndex(masks, rowCnt, filter, sortOrder);
+    @Override public Cursor find(TableFilter filter, SearchRow first, 
SearchRow last) {
+        return find0(filter);
     }
 
     /** {@inheritDoc} */
     @Override public Cursor find(Session ses, SearchRow first, SearchRow last) 
{
+        return find0(null);
+    }
+
+    /**
+     * @param filter Table filter.
+     * @return Cursor.
+     */
+    private Cursor find0(TableFilter filter) {
         Lock l = lock.readLock();
 
         l.lock();
@@ -259,7 +258,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase 
implements SpatialIndex
         try {
             checkClosed();
 
-            return new GridH2Cursor(rowIterator(treeMap.keySet().iterator()));
+            return new GridH2Cursor(rowIterator(treeMap.keySet().iterator(), 
filter));
         }
         finally {
             l.unlock();
@@ -273,9 +272,10 @@ public class GridH2SpatialIndex extends GridH2IndexBase 
implements SpatialIndex
 
     /**
      * @param i Spatial key iterator.
+     * @param filter Table filter.
      * @return Iterator over rows.
      */
-    private Iterator<GridH2Row> rowIterator(Iterator<SpatialKey> i) {
+    private Iterator<GridH2Row> rowIterator(Iterator<SpatialKey> i, 
TableFilter filter) {
         if (!i.hasNext())
             return Collections.emptyIterator();
 
@@ -290,7 +290,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase 
implements SpatialIndex
         }
         while (i.hasNext());
 
-        return filter(rows.iterator());
+        return filter(rows.iterator(), threadLocalFilter());
     }
 
     /** {@inheritDoc} */
@@ -305,7 +305,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase 
implements SpatialIndex
             if (!first)
                 throw DbException.throwInternalError("Spatial Index can only 
be fetch by ascending order");
 
-            Iterator<GridH2Row> iter = 
rowIterator(treeMap.keySet().iterator());
+            Iterator<GridH2Row> iter = 
rowIterator(treeMap.keySet().iterator(), null);
 
             return new SingleRowCursor(iter.hasNext() ? iter.next() : null);
         }
@@ -334,7 +334,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase 
implements SpatialIndex
             if (intersection == null)
                 return find(filter.getSession(), null, null);
 
-            return new 
GridH2Cursor(rowIterator(treeMap.findIntersectingKeys(getEnvelope(intersection, 
0))));
+            return new 
GridH2Cursor(rowIterator(treeMap.findIntersectingKeys(getEnvelope(intersection, 
0)), filter));
         }
         finally {
             l.unlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 0b0fe85..535881e 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -64,6 +64,7 @@ import 
org.apache.ignite.cache.query.annotations.QuerySqlFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
@@ -74,6 +75,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
@@ -82,14 +84,15 @@ import 
org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
 import org.apache.ignite.internal.processors.query.GridQueryIndexing;
 import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
+import 
org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
 import 
org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOffheap;
 import 
org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowFactory;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2TreeIndex;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2Utils;
 import 
org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
 import org.apache.ignite.internal.processors.query.h2.opt.GridLuceneIndex;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
@@ -100,37 +103,44 @@ import 
org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
 import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeGuard;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiClosure;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.resources.LoggerResource;
-import org.apache.ignite.spi.IgniteSpiCloseableIterator;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
+import org.h2.api.ErrorCode;
 import org.h2.api.JavaObjectSerializer;
 import org.h2.command.CommandInterface;
-import org.h2.constant.ErrorCode;
-import org.h2.constant.SysProperties;
+import org.h2.engine.Session;
+import org.h2.engine.SysProperties;
 import org.h2.index.Index;
 import org.h2.index.SpatialIndex;
+import org.h2.jdbc.JdbcConnection;
 import org.h2.jdbc.JdbcPreparedStatement;
 import org.h2.message.DbException;
 import org.h2.mvstore.cache.CacheLongKeyLIRS;
+import org.h2.result.SortOrder;
 import org.h2.server.web.WebServer;
 import org.h2.table.Column;
 import org.h2.table.IndexColumn;
 import org.h2.table.Table;
 import org.h2.tools.Server;
-import org.h2.util.Utils;
+import org.h2.util.JdbcUtils;
 import org.h2.value.DataType;
 import org.h2.value.Value;
 import org.h2.value.ValueArray;
@@ -149,6 +159,7 @@ import org.h2.value.ValueNull;
 import org.h2.value.ValueShort;
 import org.h2.value.ValueString;
 import org.h2.value.ValueTime;
+import org.h2.value.ValueTimestamp;
 import org.h2.value.ValueUuid;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
@@ -161,9 +172,8 @@ import static 
org.apache.ignite.internal.processors.query.GridQueryIndexType.FUL
 import static 
org.apache.ignite.internal.processors.query.GridQueryIndexType.GEO_SPATIAL;
 import static 
org.apache.ignite.internal.processors.query.GridQueryIndexType.SORTED;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
-import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VAL_COL;
-import static org.h2.result.SortOrder.ASCENDING;
-import static org.h2.result.SortOrder.DESCENDING;
+import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.LOCAL;
+import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE;
 
 /**
  * Indexing implementation based on H2 database engine. In this implementation 
main query language is SQL,
@@ -181,8 +191,10 @@ import static org.h2.result.SortOrder.DESCENDING;
 public class IgniteH2Indexing implements GridQueryIndexing {
     /** Default DB options. */
     private static final String DB_OPTIONS = 
";LOCK_MODE=3;MULTI_THREADED=1;DB_CLOSE_ON_EXIT=FALSE" +
-        
";DEFAULT_LOCK_TIMEOUT=10000;FUNCTIONS_IN_SCHEMA=true;OPTIMIZE_REUSE_RESULTS=0;QUERY_CACHE_SIZE=0;"
 +
-        "RECOMPILE_ALWAYS=1;MAX_OPERATION_MEMORY=0";
+        
";DEFAULT_LOCK_TIMEOUT=10000;FUNCTIONS_IN_SCHEMA=true;OPTIMIZE_REUSE_RESULTS=0;QUERY_CACHE_SIZE=0"
 +
+        
";RECOMPILE_ALWAYS=1;MAX_OPERATION_MEMORY=0;NESTED_JOINS=0;BATCH_JOINS=1" +
+        ";ROW_FACTORY=\"" + GridH2RowFactory.class.getName() + "\"" +
+        ";DEFAULT_TABLE_ENGINE=" + GridH2DefaultTableEngine.class.getName();
 
     /** */
     private static final int PREPARED_STMT_CACHE_SIZE = 256;
@@ -219,9 +231,12 @@ public class IgniteH2Indexing implements GridQueryIndexing 
{
      * Command in H2 prepared statement.
      */
     static {
-        try {
-            System.setProperty("h2.objectCache", "false");
+        // Initialize system properties for H2.
+        System.setProperty("h2.objectCache", "false");
+        System.setProperty("h2.serializeJavaObject", "false");
+        System.setProperty("h2.objectCacheMaxPerElementSize", "0"); // Avoid 
ValueJavaObject caching.
 
+        try {
             COMMAND_FIELD = 
JdbcPreparedStatement.class.getDeclaredField("command");
 
             COMMAND_FIELD.setAccessible(true);
@@ -260,6 +275,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     private final Map<String, String> space2schema = new 
ConcurrentHashMap8<>();
 
     /** */
+    private GridSpinBusyLock busyLock;
+
+    /** */
     private final ThreadLocal<ConnectionWrapper> connCache = new 
ThreadLocal<ConnectionWrapper>() {
         @Nullable @Override public ConnectionWrapper get() {
             ConnectionWrapper c = super.get();
@@ -304,13 +322,35 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     /** */
     private volatile GridKernalContext ctx;
 
+    /** */
+    private final ConcurrentMap<String, GridH2Table> dataTables = new 
ConcurrentHashMap8<>();
+
     /** Statement cache. */
     private final ConcurrentHashMap<Thread, StatementCache> stmtCache = new 
ConcurrentHashMap<>();
 
     /** */
-    private final GridBoundedConcurrentLinkedHashMap<T3<String, String, 
Boolean>, TwoStepCachedQuery> twoStepCache =
+    private final GridBoundedConcurrentLinkedHashMap<TwoStepCachedQueryKey, 
TwoStepCachedQuery> twoStepCache =
         new GridBoundedConcurrentLinkedHashMap<>(TWO_STEP_QRY_CACHE_SIZE);
 
+    /** */
+    private final IgniteInClosure<? super IgniteInternalFuture<?>> logger = 
new IgniteInClosure<IgniteInternalFuture<?>>() {
+        @Override public void apply(IgniteInternalFuture<?> fut) {
+            try {
+                fut.get();
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, e.getMessage(), e);
+            }
+        }
+    };
+
+    /**
+     * @return Kernal context.
+     */
+    public GridKernalContext kernalContext() {
+        return ctx;
+    }
+
     /**
      * @param space Space.
      * @return Connection.
@@ -695,8 +735,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             U.close(stmt, log);
         }
 
-        tbl.tbl.close();
-
         if (tbl.luceneIdx != null)
             U.closeQuiet(tbl.luceneIdx);
 
@@ -705,7 +743,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> 
queryText(
+    @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> 
queryLocalText(
         @Nullable String spaceName, String qry, GridQueryTypeDescriptor type,
         IndexingQueryFilter filters) throws IgniteCheckedException {
         TableDescriptor tbl = tableDescriptor(spaceName, type);
@@ -727,14 +765,14 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public GridQueryFieldsResult queryFields(@Nullable final String 
spaceName, final String qry,
-        @Nullable final Collection<Object> params, final IndexingQueryFilter 
filters)
+    @Override public GridQueryFieldsResult queryLocalSqlFields(@Nullable final 
String spaceName, final String qry,
+        @Nullable final Collection<Object> params, final IndexingQueryFilter 
filters, boolean enforceJoinOrder)
         throws IgniteCheckedException {
-        setFilters(filters);
+        Connection conn = connectionForSpace(spaceName);
 
-        try {
-            Connection conn = connectionForThread(schema(spaceName));
+        initLocalQueryContext(conn, enforceJoinOrder, filters);
 
+        try {
             ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, qry, 
params, true);
 
             List<GridQueryFieldMetadata> meta = null;
@@ -751,7 +789,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             return new GridQueryFieldsResultAdapter(meta, new 
FieldsIterator(rs));
         }
         finally {
-            setFilters(null);
+            GridH2QueryContext.clearThreadLocal();
         }
     }
 
@@ -897,25 +935,6 @@ public class IgniteH2Indexing implements GridQueryIndexing 
{
     }
 
     /**
-     * Executes query.
-     *
-     * @param space Space.
-     * @param qry Query.
-     * @param params Query parameters.
-     * @param tbl Target table of query to generate select.
-     * @return Result set.
-     * @throws IgniteCheckedException If failed.
-     */
-    private ResultSet executeQuery(String space, String qry, @Nullable 
Collection<Object> params, TableDescriptor tbl)
-            throws IgniteCheckedException {
-        Connection conn = connectionForThread(tbl.schemaName());
-
-        String sql = generateQuery(qry, tbl);
-
-        return executeSqlQueryWithTimer(space, conn, sql, params, true);
-    }
-
-    /**
      * Binds parameters to prepared statement.
      *
      * @param stmt Prepared statement.
@@ -932,43 +951,66 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     }
 
     /**
-     * Executes regular query.
-     *
-     * @param spaceName Space name.
-     * @param qry Query.
-     * @param params Query parameters.
-     * @param type Query return type.
-     * @param filters Space name and key filters.
-     * @return Queried rows.
-     * @throws IgniteCheckedException If failed.
+     * @param conn Connection.
+     * @param enforceJoinOrder Enforce join order of tables.
+     * @param filter Filter.
      */
+    private void initLocalQueryContext(Connection conn, boolean 
enforceJoinOrder, IndexingQueryFilter filter) {
+        setupConnection(conn, false, enforceJoinOrder);
+
+        GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, 
LOCAL).filter(filter).distributedJoins(false));
+    }
+
+    /**
+     * @param conn Connection to use.
+     * @param distributedJoins If distributed joins are enabled.
+     * @param enforceJoinOrder Enforce join order of tables.
+     */
+    public void setupConnection(Connection conn, boolean distributedJoins, 
boolean enforceJoinOrder) {
+        Session s = session(conn);
+
+        s.setForceJoinOrder(enforceJoinOrder);
+        s.setJoinBatchEnabled(distributedJoins);
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> 
query(@Nullable String spaceName,
+    @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> 
queryLocalSql(@Nullable String spaceName,
         final String qry, @Nullable final Collection<Object> params, 
GridQueryTypeDescriptor type,
-        final IndexingQueryFilter filters) throws IgniteCheckedException {
+        final IndexingQueryFilter filter) throws IgniteCheckedException {
         final TableDescriptor tbl = tableDescriptor(spaceName, type);
 
         if (tbl == null)
             throw new CacheException("Failed to find SQL table for type: " + 
type.name());
 
-        setFilters(filters);
+        String sql = generateQuery(qry, tbl);
+
+        Connection conn = connectionForThread(tbl.schemaName());
+
+        initLocalQueryContext(conn, false, filter);
 
         try {
-            ResultSet rs = executeQuery(spaceName, qry, params, tbl);
+            ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, 
params, true);
 
             return new KeyValIterator(rs);
         }
         finally {
-            setFilters(null);
+            GridH2QueryContext.clearThreadLocal();
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public Iterable<List<?>> queryTwoStep(final 
GridCacheContext<?,?> cctx, final GridCacheTwoStepQuery qry,
-        final boolean keepCacheObj) {
+    /**
+     * @param cctx Cache context.
+     * @param qry Query.
+     * @param keepCacheObj Flag to keep cache object.
+     * @param enforceJoinOrder Enforce join order of tables.
+     * @return Iterable result.
+     */
+    private Iterable<List<?>> runQueryTwoStep(final GridCacheContext<?,?> 
cctx, final GridCacheTwoStepQuery qry,
+        final boolean keepCacheObj, final boolean enforceJoinOrder) {
         return new Iterable<List<?>>() {
             @Override public Iterator<List<?>> iterator() {
-                return rdcQryExec.query(cctx, qry, keepCacheObj);
+                return rdcQryExec.query(cctx, qry, keepCacheObj, 
enforceJoinOrder);
             }
         };
     }
@@ -997,6 +1039,7 @@ public class IgniteH2Indexing implements GridQueryIndexing 
{
 
         fqry.setArgs(qry.getArgs());
         fqry.setPageSize(qry.getPageSize());
+        fqry.setDistributedJoins(qry.isDistributedJoins());
 
         final QueryCursor<List<?>> res = queryTwoStep(cctx, fqry);
 
@@ -1030,6 +1073,14 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         };
     }
 
+    /**
+     * @param c Connection.
+     * @return Session.
+     */
+    public static Session session(Connection c) {
+        return (Session)((JdbcConnection)c).getSession();
+    }
+
     /** {@inheritDoc} */
     @Override public QueryCursor<List<?>> queryTwoStep(GridCacheContext<?,?> 
cctx, SqlFieldsQuery qry) {
         final String space = cctx.name();
@@ -1037,10 +1088,15 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
         Connection c = connectionForSpace(space);
 
+        final boolean enforceJoinOrder = qry.isEnforceJoinOrder();
+        final boolean distributedJoins = qry.isDistributedJoins() && 
cctx.isPartitioned();
+        final boolean grpByCollocated = qry.isCollocated();
+
         GridCacheTwoStepQuery twoStepQry;
         List<GridQueryFieldMetadata> meta;
 
-        final T3<String, String, Boolean> cachedQryKey = new T3<>(space, 
sqlQry, qry.isCollocated());
+        final TwoStepCachedQueryKey cachedQryKey = new 
TwoStepCachedQueryKey(space, sqlQry, grpByCollocated,
+            distributedJoins, enforceJoinOrder);
         TwoStepCachedQuery cachedQry = twoStepCache.get(cachedQryKey);
 
         if (cachedQry != null) {
@@ -1048,51 +1104,95 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
             meta = cachedQry.meta;
         }
         else {
+            final UUID locNodeId = ctx.localNodeId();
+
+            setupConnection(c, distributedJoins, enforceJoinOrder);
+
+            GridH2QueryContext.set(new GridH2QueryContext(locNodeId, 
locNodeId, 0, PREPARE)
+                .distributedJoins(distributedJoins));
+
             PreparedStatement stmt;
 
             boolean cachesCreated = false;
 
-            while (true) {
-                try {
-                    // Do not cache this statement because the whole two step 
query object will be cached later on.
-                    stmt = prepareStatement(c, sqlQry, false);
+            try {
+                while (true) {
+                    try {
+                        // Do not cache this statement because the whole two 
step query object will be cached later on.
+                        stmt = prepareStatement(c, sqlQry, false);
 
-                    break;
-                }
-                catch (SQLException e) {
-                    if (!cachesCreated && e.getErrorCode() == 
ErrorCode.SCHEMA_NOT_FOUND_1) {
-                        try {
-                            ctx.cache().createMissingCaches();
-                        }
-                        catch (IgniteCheckedException e1) {
-                            throw new CacheException("Failed to create missing 
caches.", e);
-                        }
+                        break;
+                    }
+                    catch (SQLException e) {
+                        if (!cachesCreated && e.getErrorCode() == 
ErrorCode.SCHEMA_NOT_FOUND_1) {
+                            try {
+                                ctx.cache().createMissingCaches();
+                            }
+                            catch (IgniteCheckedException e1) {
+                                throw new CacheException("Failed to create 
missing caches.", e);
+                            }
 
-                        cachesCreated = true;
+                            cachesCreated = true;
+                        }
+                        else
+                            throw new CacheException("Failed to parse query: " 
+ sqlQry, e);
                     }
-                    else
-                        throw new CacheException("Failed to parse query: " + 
sqlQry, e);
                 }
             }
+            finally {
+                GridH2QueryContext.clearThreadLocal();
+            }
 
             try {
-                try {
-                    bindParameters(stmt, F.asList(qry.getArgs()));
-                }
-                catch (IgniteCheckedException e) {
-                    throw new CacheException("Failed to bind parameters: 
[qry=" + sqlQry + ", params=" +
-                        Arrays.deepToString(qry.getArgs()) + "]", e);
-                }
+                bindParameters(stmt, F.asList(qry.getArgs()));
 
-                try {
-                    twoStepQry = GridSqlQuerySplitter.split(
-                        (JdbcPreparedStatement)stmt, qry.getArgs(), 
qry.isCollocated(), this);
+                twoStepQry = 
GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, qry.getArgs(), 
grpByCollocated,
+                    distributedJoins);
 
-                    meta = meta(stmt.getMetaData());
+                List<Integer> caches;
+                List<Integer> extraCaches = null;
+
+                // Setup spaces from schemas.
+                if (!twoStepQry.schemas().isEmpty()) {
+                    Collection<String> spaces = new 
ArrayList<>(twoStepQry.schemas().size());
+                    caches = new ArrayList<>(twoStepQry.schemas().size() + 1);
+                    caches.add(cctx.cacheId());
+
+                    for (String schema : twoStepQry.schemas()) {
+                        String space0 = space(schema);
+
+                        spaces.add(space0);
+
+                        if (!F.eq(space0, space)) {
+                            int cacheId = CU.cacheId(space0);
+
+                            caches.add(cacheId);
+
+                            if (extraCaches == null)
+                                extraCaches = new ArrayList<>();
+
+                            extraCaches.add(cacheId);
+                        }
+                    }
+
+                    twoStepQry.spaces(spaces);
                 }
-                catch (SQLException e) {
-                    throw new CacheException(e);
+                else {
+                    caches = Collections.singletonList(cctx.cacheId());
+                    extraCaches = null;
                 }
+
+                twoStepQry.caches(caches);
+                twoStepQry.extraCaches(extraCaches);
+
+                meta = meta(stmt.getMetaData());
+            }
+            catch (IgniteCheckedException e) {
+                throw new CacheException("Failed to bind parameters: [qry=" + 
sqlQry + ", params=" +
+                    Arrays.deepToString(qry.getArgs()) + "]", e);
+            }
+            catch (SQLException e) {
+                throw new CacheException(e);
             }
             finally {
                 U.close(stmt, log);
@@ -1104,7 +1204,8 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
         twoStepQry.pageSize(qry.getPageSize());
 
-        QueryCursorImpl<List<?>> cursor = new 
QueryCursorImpl<>(queryTwoStep(cctx, twoStepQry, cctx.keepBinary()));
+        QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
+            runQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), 
enforceJoinOrder));
 
         cursor.fieldsMeta(meta);
 
@@ -1117,17 +1218,6 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     }
 
     /**
-     * Sets filters for current thread. Must be set to not null value
-     * before executeQuery and reset to null after in finally block since it 
signals
-     * to table that it should return content without expired values.
-     *
-     * @param filters Filters.
-     */
-    public void setFilters(@Nullable IndexingQueryFilter filters) {
-        GridH2IndexBase.setFiltersForThread(filters);
-    }
-
-    /**
      * Prepares statement for query.
      *
      * @param qry Query string.
@@ -1331,7 +1421,18 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
         GridH2RowDescriptor desc = new RowDescriptor(tbl.type(), schema);
 
-        GridH2Table.Engine.createTable(conn, sql.toString(), desc, tbl, 
tbl.schema.spaceName);
+        GridH2Table res = GridH2Table.Engine.createTable(conn, sql.toString(), 
desc, tbl, tbl.schema.spaceName);
+
+        if (dataTables.putIfAbsent(res.identifier(), res) != null)
+            throw new IllegalStateException("Table already exists: " + 
res.identifier());
+    }
+
+    /**
+     * @param identifier Table identifier.
+     * @return Data table.
+     */
+    public GridH2Table dataTable(String identifier) {
+        return dataTables.get(identifier);
     }
 
     /**
@@ -1447,18 +1548,43 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         tbl.tbl.rebuildIndexes();
     }
 
-    /** {@inheritDoc} */
-    @Override public long size(@Nullable String spaceName, 
GridQueryTypeDescriptor type,
-        IndexingQueryFilter filters) throws IgniteCheckedException {
+    /**
+     * Gets size (for tests only).
+     *
+     * @param spaceName Space name.
+     * @param type Type descriptor.
+     * @return Size.
+     * @throws IgniteCheckedException If failed or {@code -1} if the type is 
unknown.
+     */
+    long size(@Nullable String spaceName, GridQueryTypeDescriptor type) throws 
IgniteCheckedException {
         TableDescriptor tbl = tableDescriptor(spaceName, type);
 
         if (tbl == null)
             return -1;
 
-        IgniteSpiCloseableIterator<List<?>> iter = queryFields(spaceName,
-            "SELECT COUNT(*) FROM " + tbl.fullTableName(), null, 
null).iterator();
+        Connection conn = connectionForSpace(spaceName);
+
+        setupConnection(conn, false, false);
+
+        ResultSet rs = executeSqlQuery(conn,
+            "SELECT COUNT(*) FROM " + tbl.fullTableName(), null, false);
+
+        try {
+            if (!rs.next())
+                throw new IllegalStateException();
 
-        return ((Number)iter.next().get(0)).longValue();
+            return rs.getLong(1);
+        }
+        catch (SQLException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * @return Busy lock.
+     */
+    public GridSpinBusyLock busyLock() {
+        return busyLock;
     }
 
     /**
@@ -1481,8 +1607,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         if (log.isDebugEnabled())
             log.debug("Starting cache query index...");
 
-        System.setProperty("h2.serializeJavaObject", "false");
-        System.setProperty("h2.objectCacheMaxPerElementSize", "0"); // Avoid 
ValueJavaObject caching.
+        this.busyLock = busyLock;
 
         if (SysProperties.serializeJavaObject) {
             U.warn(log, "Serialization of Java objects in H2 was enabled.");
@@ -1490,10 +1615,10 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
             SysProperties.serializeJavaObject = false;
         }
 
-        if (Utils.serializer != null)
+        if (JdbcUtils.serializer != null)
             U.warn(log, "Custom H2 serialization is already configured, will 
override.");
 
-        Utils.serializer = h2Serializer();
+        JdbcUtils.serializer = h2Serializer();
 
         String dbName = (ctx != null ? ctx.localNodeId() : 
UUID.randomUUID()).toString();
 
@@ -1522,8 +1647,11 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
             throw new IgniteCheckedException(e);
         }
 
-        if (ctx == null) // This is allowed in some tests.
+        if (ctx == null) {
+            // This is allowed in some tests.
+            nodeId = UUID.randomUUID();
             marshaller = new JdkMarshaller();
+        }
         else {
             this.ctx = ctx;
 
@@ -1548,9 +1676,92 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     }
 
     /**
+     * @param topic Topic.
+     * @param topicOrd Topic ordinal for {@link GridTopic}.
+     * @param nodes Nodes.
+     * @param msg Message.
+     * @param specialize Optional closure to specialize message for each node.
+     * @param locNodeHnd Handler for local node.
+     * @param plc Policy identifying the executor service which will process 
message.
+     * @param runLocParallel Run local handler in parallel thread.
+     * @return {@code true} If all messages sent successfully.
+     */
+    public boolean send(
+        Object topic,
+        int topicOrd,
+        Collection<ClusterNode> nodes,
+        Message msg,
+        @Nullable IgniteBiClosure<ClusterNode, Message, Message> specialize,
+        @Nullable final IgniteInClosure2X<ClusterNode, Message> locNodeHnd,
+        byte plc,
+        boolean runLocParallel
+    ) {
+        boolean ok = true;
+
+        if (specialize == null && msg instanceof GridCacheQueryMarshallable)
+            ((GridCacheQueryMarshallable)msg).marshall(marshaller);
+
+        ClusterNode locNode = null;
+
+        for (ClusterNode node : nodes) {
+            if (node.isLocal()) {
+                locNode = node;
+
+                continue;
+            }
+
+            try {
+                if (specialize != null) {
+                    msg = specialize.apply(node, msg);
+
+                    if (msg instanceof GridCacheQueryMarshallable)
+                        ((GridCacheQueryMarshallable)msg).marshall(marshaller);
+                }
+
+                ctx.io().send(node, topic, topicOrd, msg, plc);
+            }
+            catch (IgniteCheckedException e) {
+                ok = false;
+
+                U.warn(log, "Failed to send message [node=" + node + ", msg=" 
+ msg +
+                    ", errMsg=" + e.getMessage() + "]");
+            }
+        }
+
+        // Local node goes the last to allow parallel execution.
+        if (locNode != null) {
+            if (specialize != null)
+                msg = specialize.apply(locNode, msg);
+
+            if (runLocParallel) {
+                final ClusterNode finalLocNode = locNode;
+                final Message finalMsg = msg;
+
+                try {
+                    // We prefer runLocal to runLocalSafe, because the latter 
can produce deadlock here.
+                    ctx.closure().runLocal(new GridPlainRunnable() {
+                        @Override public void run() {
+                            locNodeHnd.apply(finalLocNode, finalMsg);
+                        }
+                    }, plc).listen(logger);
+                }
+                catch (IgniteCheckedException e) {
+                    ok = false;
+
+                    U.error(log, "Failed to execute query locally.", e);
+                }
+            }
+            else
+                locNodeHnd.apply(locNode, msg);
+        }
+
+        return ok;
+    }
+
+    /**
      * @return Serializer.
      */
-    protected JavaObjectSerializer h2Serializer() {
+    private JavaObjectSerializer h2Serializer() {
         return new JavaObjectSerializer() {
                 @Override public byte[] serialize(Object obj) throws Exception 
{
                     return marshaller.marshal(obj);
@@ -1607,8 +1818,6 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
         for (Schema schema : schemas.values()) {
             for (TableDescriptor desc : schema.tbls.values()) {
-                desc.tbl.close();
-
                 if (desc.luceneIdx != null)
                     U.closeQuiet(desc.luceneIdx);
             }
@@ -1632,18 +1841,19 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         if (stmtCacheCleanupTask != null)
             stmtCacheCleanupTask.close();
 
+        GridH2QueryContext.clearLocalNodeStop(nodeId);
+
         if (log.isDebugEnabled())
             log.debug("Cache query index stopped.");
     }
 
     /** {@inheritDoc} */
-    @Override public void registerCache(CacheConfiguration<?,?> ccfg) throws 
IgniteCheckedException {
+    @Override public void registerCache(GridCacheContext<?, ?> cctx, 
CacheConfiguration<?,?> ccfg)
+        throws IgniteCheckedException {
         String schema = schemaNameFromCacheConf(ccfg);
 
-        if (schemas.putIfAbsent(schema, new Schema(ccfg.getName(), schema,
-            ccfg.getOffHeapMaxMemory() >= 0 || ccfg.getMemoryMode() == 
CacheMemoryMode.OFFHEAP_TIERED ?
-            new GridUnsafeMemory(0) : null, ccfg)) != null)
-            throw new IgniteCheckedException("Schema for cache already 
registered: " + U.maskName(ccfg.getName()));
+        if (schemas.putIfAbsent(schema, new Schema(ccfg.getName(), schema, 
cctx, ccfg)) != null)
+            throw new IgniteCheckedException("Cache already registered: " + 
U.maskName(ccfg.getName()));
 
         space2schema.put(emptyIfNull(ccfg.getName()), schema);
 
@@ -1661,6 +1871,8 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
             space2schema.remove(emptyIfNull(rmv.spaceName));
             mapQryExec.onCacheStop(ccfg.getName());
 
+            rmv.onDrop();
+
             try {
                 dropSchema(schema);
             }
@@ -1668,11 +1880,11 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                 U.error(log, "Failed to drop schema on cache stop (will 
ignore): " + U.maskName(ccfg.getName()), e);
             }
 
-            for (Iterator<Map.Entry<T3<String, String, Boolean>, 
TwoStepCachedQuery>> it = twoStepCache.entrySet().iterator();
+            for (Iterator<Map.Entry<TwoStepCachedQueryKey, 
TwoStepCachedQuery>> it = twoStepCache.entrySet().iterator();
                  it.hasNext();) {
-                Map.Entry<T3<String, String, Boolean>, TwoStepCachedQuery> e = 
it.next();
+                Map.Entry<TwoStepCachedQueryKey, TwoStepCachedQuery> e = 
it.next();
 
-                if (F.eq(e.getKey().get1(), ccfg.getName()))
+                if (F.eq(e.getKey().space, ccfg.getName()))
                     it.remove();
             }
         }
@@ -1680,7 +1892,6 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
     /** {@inheritDoc} */
     @Override public IndexingQueryFilter backupFilter(
-        @Nullable final List<String> caches,
         @Nullable final AffinityTopologyVersion topVer,
         @Nullable final int[] parts
     ) {
@@ -1735,6 +1946,10 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
             @Override public boolean isValueRequired() {
                 return false;
             }
+
+            @Override public String toString() {
+                return "IndexingQueryFilter [ver=" + topVer + ']';
+            }
         };
     }
 
@@ -1762,6 +1977,81 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     }
 
     /**
+     * Key for cached two-step query.
+     */
+    private static final class TwoStepCachedQueryKey {
+        /** */
+        private final String space;
+
+        /** */
+        private final String sql;
+
+        /** */
+        private final boolean grpByCollocated;
+
+        /** */
+        private final boolean distributedJoins;
+
+        /** */
+        private final boolean enforceJoinOrder;
+
+        /**
+         * @param space Space.
+         * @param sql Sql.
+         * @param grpByCollocated Collocated GROUP BY.
+         * @param distributedJoins Distributed joins enabled.
+         * @param enforceJoinOrder Enforce join order of tables.
+         */
+        private TwoStepCachedQueryKey(String space,
+            String sql,
+            boolean grpByCollocated,
+            boolean distributedJoins,
+            boolean enforceJoinOrder) {
+            this.space = space;
+            this.sql = sql;
+            this.grpByCollocated = grpByCollocated;
+            this.distributedJoins = distributedJoins;
+            this.enforceJoinOrder = enforceJoinOrder;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TwoStepCachedQueryKey that = (TwoStepCachedQueryKey)o;
+
+            if (grpByCollocated != that.grpByCollocated)
+                return false;
+
+            if (distributedJoins != that.distributedJoins)
+                return false;
+
+            if (enforceJoinOrder != that.enforceJoinOrder)
+                return false;
+
+            if (space != null ? !space.equals(that.space) : that.space != null)
+                return false;
+
+            return sql.equals(that.sql);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = space != null ? space.hashCode() : 0;
+            res = 31 * res + sql.hashCode();
+            res = 31 * res + (grpByCollocated ? 1 : 0);
+            res = 31 * res + (distributedJoins ? 1 : 0);
+            res = 31 * res + (enforceJoinOrder ? 1 : 0);
+
+            return res;
+        }
+    }
+
+    /**
      * Cached two-step query.
      */
     private static final class TwoStepCachedQuery {
@@ -1787,6 +2077,47 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     }
 
     /**
+     * @param c1 First column.
+     * @param c2 Second column.
+     * @return {@code true} If they are the same.
+     */
+    private static boolean equal(IndexColumn c1, IndexColumn c2) {
+        return c1.column.getColumnId() == c2.column.getColumnId();
+    }
+
+    /**
+     * @param cols Columns list.
+     * @param col Column to find.
+     * @return {@code true} If found.
+     */
+    private static boolean containsColumn(List<IndexColumn> cols, IndexColumn 
col) {
+        for (int i = cols.size() - 1; i >= 0; i--) {
+            if (equal(cols.get(i), col))
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * @param cols Columns list.
+     * @param keyCol Primary key column.
+     * @param affCol Affinity key column.
+     * @return The same list back.
+     */
+    private static List<IndexColumn> treeIndexColumns(List<IndexColumn> cols, 
IndexColumn keyCol, IndexColumn affCol) {
+        assert keyCol != null;
+
+        if (!containsColumn(cols, keyCol))
+            cols.add(keyCol);
+
+        if (affCol != null && !containsColumn(cols, affCol))
+            cols.add(affCol);
+
+        return cols;
+    }
+
+    /**
      * Wrapper to store connection and flag is schema set or not.
      */
     private static class ConnectionWrapper {
@@ -2033,7 +2364,15 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
             ArrayList<Index> idxs = new ArrayList<>();
 
-            idxs.add(new GridH2TreeIndex("_key_PK", tbl, true, KEY_COL, 
VAL_COL, tbl.indexColumn(0, ASCENDING)));
+            IndexColumn keyCol = tbl.indexColumn(KEY_COL, SortOrder.ASCENDING);
+            IndexColumn affCol = tbl.getAffinityKeyColumn();
+
+            if (affCol != null && equal(affCol, keyCol))
+                affCol = null;
+
+            // Add primary key index.
+            idxs.add(new GridH2TreeIndex("_key_PK", tbl, true,
+                treeIndexColumns(new ArrayList<IndexColumn>(2), keyCol, 
affCol)));
 
             if (type().valueClass() == String.class) {
                 try {
@@ -2044,6 +2383,8 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                 }
             }
 
+            boolean affIdxFound = false;
+
             for (Map.Entry<String, GridQueryIndexDescriptor> e : 
type.indexes().entrySet()) {
                 String name = e.getKey();
                 GridQueryIndexDescriptor idx = e.getValue();
@@ -2057,30 +2398,40 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                     }
                 }
                 else {
-                    IndexColumn[] cols = new IndexColumn[idx.fields().size()];
-
-                    int i = 0;
+                    List<IndexColumn> cols = new 
ArrayList<>(idx.fields().size() + 2);
 
                     boolean escapeAll = schema.escapeAll();
 
                     for (String field : idx.fields()) {
-                        // H2 reserved keywords used as column name is case 
sensitive.
                         String fieldName = escapeAll ? field : 
escapeName(field, false).toUpperCase();
 
                         Column col = tbl.getColumn(fieldName);
 
-                        cols[i++] = tbl.indexColumn(col.getColumnId(), 
idx.descending(field) ? DESCENDING : ASCENDING);
+                        cols.add(tbl.indexColumn(col.getColumnId(),
+                            idx.descending(field) ? SortOrder.DESCENDING : 
SortOrder.ASCENDING));
                     }
 
-                    if (idx.type() == SORTED)
-                        idxs.add(new GridH2TreeIndex(name, tbl, false, 
KEY_COL, VAL_COL, cols));
+                    if (idx.type() == SORTED) {
+                        // We don't care about number of fields in affinity 
index, just affinity key must be the first.
+                        affIdxFound |= affCol != null && equal(cols.get(0), 
affCol);
+
+                        cols = treeIndexColumns(cols, keyCol, affCol);
+
+                        idxs.add(new GridH2TreeIndex(name, tbl, false, cols));
+                    }
                     else if (idx.type() == GEO_SPATIAL)
-                        idxs.add(createH2SpatialIndex(tbl, name, cols, 
KEY_COL, VAL_COL));
+                        idxs.add(createH2SpatialIndex(tbl, name, 
cols.toArray(new IndexColumn[cols.size()])));
                     else
-                        throw new IllegalStateException();
+                        throw new IllegalStateException("Index type: " + 
idx.type());
                 }
             }
 
+            // Add explicit affinity key index if nothing alike was found.
+            if (affCol != null && !affIdxFound) {
+                idxs.add(new GridH2TreeIndex("AFFINITY_KEY", tbl, false,
+                    treeIndexColumns(new ArrayList<IndexColumn>(2), affCol, 
keyCol)));
+            }
+
             return idxs;
         }
 
@@ -2088,15 +2439,11 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
          * @param tbl Table.
          * @param idxName Index name.
          * @param cols Columns.
-         * @param keyCol Key column.
-         * @param valCol Value column.
          */
         private SpatialIndex createH2SpatialIndex(
             Table tbl,
             String idxName,
-            IndexColumn[] cols,
-            int keyCol,
-            int valCol
+            IndexColumn[] cols
         ) {
             String className = 
"org.apache.ignite.internal.processors.query.h2.opt.GridH2SpatialIndex";
 
@@ -2106,14 +2453,12 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                 Constructor<?> ctor = cls.getConstructor(
                     Table.class,
                     String.class,
-                    IndexColumn[].class,
-                    int.class,
-                    int.class);
+                    IndexColumn[].class);
 
                 if (!ctor.isAccessible())
                     ctor.setAccessible(true);
 
-                return (SpatialIndex)ctor.newInstance(tbl, idxName, cols, 
keyCol, valCol);
+                return (SpatialIndex)ctor.newInstance(tbl, idxName, cols);
             }
             catch (Exception e) {
                 throw new IgniteException("Failed to instantiate: " + 
className, e);
@@ -2162,6 +2507,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         }
 
         /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
         @Override protected IgniteBiTuple<K, V> createRow() {
             K key = (K)row[0];
             V val = (V)row[1];
@@ -2256,10 +2602,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     /**
      * Database schema object.
      */
-    private static class Schema {
-        /** */
-        private static final long serialVersionUID = 0L;
-
+    private class Schema {
         /** */
         private final String spaceName;
 
@@ -2276,23 +2619,33 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         private final CacheLongKeyLIRS<GridH2KeyValueRowOffheap> rowCache;
 
         /** */
+        private final GridCacheContext<?,?> cctx;
+
+        /** */
         private final CacheConfiguration<?,?> ccfg;
 
         /**
          * @param spaceName Space name.
          * @param schemaName Schema name.
-         * @param offheap Offheap memory.
+         * @param cctx Cache context.
          * @param ccfg Cache configuration.
          */
-        private Schema(@Nullable String spaceName, String schemaName, 
GridUnsafeMemory offheap, CacheConfiguration<?,?> ccfg) {
+        private Schema(String spaceName, String schemaName, 
GridCacheContext<?,?> cctx, CacheConfiguration<?,?> ccfg) {
             this.spaceName = spaceName;
+            this.cctx = cctx;
             this.schemaName = schemaName;
-            this.offheap = offheap;
             this.ccfg = ccfg;
 
-            if (offheap != null)
-                rowCache = new 
CacheLongKeyLIRS<>(ccfg.getSqlOnheapRowCacheSize(), 1, 128, 256);
-            else
+            offheap = ccfg.getOffHeapMaxMemory() >= 0 || ccfg.getMemoryMode() 
== CacheMemoryMode.OFFHEAP_TIERED ?
+                new GridUnsafeMemory(0) : null;
+
+            if (offheap != null) {
+                CacheLongKeyLIRS.Config lirsCfg = new 
CacheLongKeyLIRS.Config();
+
+                lirsCfg.maxMemory = ccfg.getSqlOnheapRowCacheSize();
+
+                rowCache = new CacheLongKeyLIRS<>(lirsCfg);
+            } else
                 rowCache = null;
         }
 
@@ -2310,6 +2663,19 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         public boolean escapeAll() {
             return ccfg.isSqlEscapeAll();
         }
+
+        /**
+         * Called after the schema was dropped.
+         */
+        public void onDrop() {
+            for (TableDescriptor tblDesc : tbls.values()) {
+                GridH2Table tbl = tblDesc.tbl;
+
+                dataTables.remove(tbl.identifier(), tbl);
+
+                tbl.destroy();
+            }
+        }
     }
 
     /**
@@ -2390,6 +2756,26 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         }
 
         /** {@inheritDoc} */
+        @Override public IgniteH2Indexing indexing() {
+            return IgniteH2Indexing.this;
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridQueryTypeDescriptor type() {
+            return type;
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridCacheContext<?,?> context() {
+            return schema.cctx;
+        }
+
+        /** {@inheritDoc} */
+        @Override public CacheConfiguration configuration() {
+            return schema.ccfg;
+        }
+
+        /** {@inheritDoc} */
         @Override public GridUnsafeGuard guard() {
             return guard;
         }
@@ -2414,11 +2800,6 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         }
 
         /** {@inheritDoc} */
-        @Override public IgniteH2Indexing owner() {
-            return IgniteH2Indexing.this;
-        }
-
-        /** {@inheritDoc} */
         @Override public Value wrap(Object obj, int type) throws 
IgniteCheckedException {
             assert obj != null;
 
@@ -2457,7 +2838,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                     if (obj instanceof java.util.Date && !(obj instanceof 
Timestamp))
                         obj = new Timestamp(((java.util.Date) obj).getTime());
 
-                    return GridH2Utils.toValueTimestamp((Timestamp)obj);
+                    return ValueTimestamp.get((Timestamp)obj);
                 case Value.DECIMAL:
                     return ValueDecimal.get((BigDecimal)obj);
                 case Value.STRING:
@@ -2491,7 +2872,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
             throws IgniteCheckedException {
             try {
                 if (val == null) // Only can happen for remove operation, can 
create simple search row.
-                    return new GridH2Row(wrap(key, keyType), null);
+                    return GridH2RowFactory.create(wrap(key, keyType));
 
                 return schema.offheap == null ?
                     new GridH2KeyValueRowOnheap(this, key, keyType, val, 
valType, expirationTime) :

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
index fe6851d..284f8c5 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
@@ -55,6 +55,15 @@ public abstract class GridH2AbstractKeyValueRow extends 
GridH2Row {
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
     protected long expirationTime;
 
+    /** */
+    private Value key;
+
+    /** */
+    private volatile Value val;
+
+    /** */
+    private Value[] valCache;
+
     /**
      * Constructor.
      *
@@ -68,8 +77,6 @@ public abstract class GridH2AbstractKeyValueRow extends 
GridH2Row {
      */
     protected GridH2AbstractKeyValueRow(GridH2RowDescriptor desc, Object key, 
int keyType, @Nullable Object val,
         int valType, long expirationTime) throws IgniteCheckedException {
-        super(null, null);
-
         setValue(KEY_COL, desc.wrap(key, keyType));
 
         if (val != null) // We remove by key only, so value can be null here.
@@ -79,14 +86,17 @@ public abstract class GridH2AbstractKeyValueRow extends 
GridH2Row {
         this.expirationTime = expirationTime;
     }
 
+    /** {@inheritDoc} */
+    @Override public Value[] getValueList() {
+        throw new UnsupportedOperationException();
+    }
+
     /**
      * Protected constructor for {@link GridH2KeyValueRowOffheap}
      *
      * @param desc Row descriptor.
      */
     protected GridH2AbstractKeyValueRow(GridH2RowDescriptor desc) {
-        super(new Value[DEFAULT_COLUMNS_COUNT]);
-
         this.desc = desc;
     }
 
@@ -184,11 +194,20 @@ public abstract class GridH2AbstractKeyValueRow extends 
GridH2Row {
      * @return Value if exists.
      */
     protected final Value peekValue(int col) {
-        return getValueList()[col];
+        return col == KEY_COL ? key : val;
     }
 
     /** {@inheritDoc} */
     @Override public Value getValue(int col) {
+        Value[] vCache = valCache;
+
+        if (vCache != null) {
+            Value v = vCache[col];
+
+            if (v != null)
+                return v;
+        }
+
         if (col < DEFAULT_COLUMNS_COUNT) {
             Value v;
 
@@ -288,15 +307,35 @@ public abstract class GridH2AbstractKeyValueRow extends 
GridH2Row {
 
         Object res = desc.columnValue(key.getObject(), val.getObject(), col);
 
-        if (res == null)
-            return ValueNull.INSTANCE;
+        Value v;
 
-        try {
-            return desc.wrap(res, desc.fieldType(col));
+        if (res == null)
+            v = ValueNull.INSTANCE;
+        else {
+            try {
+                v = desc.wrap(res, desc.fieldType(col));
+            }
+            catch (IgniteCheckedException e) {
+                throw DbException.convert(e);
+            }
         }
-        catch (IgniteCheckedException e) {
-            throw DbException.convert(e);
+
+        if (vCache != null)
+            vCache[col + DEFAULT_COLUMNS_COUNT] = v;
+
+        return v;
+    }
+
+    /**
+     * @param valCache Value cache.
+     */
+    public void valuesCache(Value[] valCache) {
+        if (valCache != null) {
+            valCache[KEY_COL] = key;
+            valCache[VAL_COL] = val;
         }
+
+        this.valCache = valCache;
     }
 
     /**
@@ -467,6 +506,17 @@ public abstract class GridH2AbstractKeyValueRow extends 
GridH2Row {
     }
 
     /** {@inheritDoc} */
+    @Override public void setValue(int idx, Value v) {
+        if (idx == VAL_COL)
+            val = v;
+        else {
+            assert idx == KEY_COL : idx + " " + v;
+
+            key = v;
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public final int hashCode() {
         throw new IllegalStateException();
     }

Reply via email to