IGNITE-10812: SQL: split classes responsible for distributed joins. This closes 
#5742.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/53543733
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/53543733
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/53543733

Branch: refs/heads/ignite-601
Commit: 535437336b1fd0e382c19f220d511fea52dbf889
Parents: 0431c28
Author: devozerov <voze...@gridgain.com>
Authored: Wed Dec 26 18:23:56 2018 +0300
Committer: devozerov <voze...@gridgain.com>
Committed: Wed Dec 26 18:23:56 2018 +0300

----------------------------------------------------------------------
 .../query/h2/opt/GridH2SpatialIndex.java        |    5 +-
 .../internal/processors/query/h2/H2Utils.java   |   62 +
 .../processors/query/h2/IgniteH2Indexing.java   |    4 +-
 .../query/h2/database/H2PkHashIndex.java        |    2 +-
 .../query/h2/database/H2TreeClientIndex.java    |    2 +-
 .../query/h2/database/H2TreeIndex.java          |   23 +-
 .../query/h2/opt/DistributedJoinMode.java       |   51 -
 .../query/h2/opt/GridH2CollocationModel.java    |  838 --------------
 .../query/h2/opt/GridH2IndexBase.java           | 1085 +-----------------
 .../query/h2/opt/GridH2QueryContext.java        |  163 +--
 .../query/h2/opt/QueryContextKey.java           |  127 ++
 .../query/h2/opt/join/BroadcastCursor.java      |  155 +++
 .../query/h2/opt/join/CollocationModel.java     |  841 ++++++++++++++
 .../h2/opt/join/CursorIteratorWrapper.java      |   68 ++
 .../query/h2/opt/join/DistributedJoinMode.java  |   51 +
 .../h2/opt/join/DistributedLookupBatch.java     |  430 +++++++
 .../query/h2/opt/join/RangeSource.java          |  137 +++
 .../query/h2/opt/join/RangeStream.java          |  296 +++++
 .../query/h2/opt/join/SegmentKey.java           |   81 ++
 .../processors/query/h2/opt/join/SourceKey.java |   66 ++
 .../query/h2/opt/join/UnicastCursor.java        |   64 ++
 .../query/h2/sql/GridSqlQuerySplitter.java      |    2 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |    6 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |    2 +-
 .../h2/twostep/RetryCauseMessageSelfTest.java   |    4 +-
 25 files changed, 2465 insertions(+), 2100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
----------------------------------------------------------------------
diff --git 
a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
 
b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
index 4d1577b..35e9424 100644
--- 
a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
+++ 
b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
@@ -28,6 +28,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.query.h2.H2Cursor;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.util.GridCursorIteratorWrapper;
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
@@ -154,7 +155,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase 
implements SpatialIndex
     }
 
     /** {@inheritDoc} */
-    @Override protected int segmentsCount() {
+    @Override public int segmentsCount() {
         return segments.length;
     }
 
@@ -335,7 +336,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase 
implements SpatialIndex
     @SuppressWarnings("unchecked")
     private GridCursor<GridH2Row> rowIterator(Iterator<SpatialKey> i, 
TableFilter filter) {
         if (!i.hasNext())
-            return EMPTY_CURSOR;
+            return H2Utils.EMPTY_CURSOR;
 
         long time = System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
index c542758..2a5f33c 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
@@ -45,15 +45,22 @@ import 
org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import 
org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
+import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
+import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage;
+import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory;
 import org.apache.ignite.internal.util.GridStringBuilder;
+import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.h2.engine.Session;
 import org.h2.jdbc.JdbcConnection;
+import org.h2.result.Row;
 import org.h2.result.SortOrder;
 import org.h2.table.IndexColumn;
 import org.h2.util.LocalDateTimeUtils;
@@ -78,6 +85,8 @@ import org.h2.value.ValueTime;
 import org.h2.value.ValueTimestamp;
 import org.h2.value.ValueUuid;
 
+import javax.cache.CacheException;
+
 import static 
org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME;
 import static 
org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_NAME;
 import static 
org.apache.ignite.internal.processors.query.QueryUtils.VER_FIELD_NAME;
@@ -94,6 +103,19 @@ public class H2Utils {
     /** Quotation character. */
     private static final char ESC_CH = '\"';
 
+    /** Empty cursor. */
+    public static final GridCursor<GridH2Row> EMPTY_CURSOR = new 
GridCursor<GridH2Row>() {
+        /** {@inheritDoc} */
+        @Override public boolean next() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridH2Row get() {
+            return null;
+        }
+    };
+
     /**
      * @param c1 First column.
      * @param c2 Second column.
@@ -598,4 +620,44 @@ public class H2Utils {
 
         return qry;
     }
+
+    /**
+     * @param row Row.
+     * @return Row message.
+     */
+    public static GridH2RowMessage toRowMessage(Row row) {
+        if (row == null)
+            return null;
+
+        int cols = row.getColumnCount();
+
+        assert cols > 0 : cols;
+
+        List<GridH2ValueMessage> vals = new ArrayList<>(cols);
+
+        for (int i = 0; i < cols; i++) {
+            try {
+                vals.add(GridH2ValueMessageFactory.toMessage(row.getValue(i)));
+            }
+            catch (IgniteCheckedException e) {
+                throw new CacheException(e);
+            }
+        }
+
+        GridH2RowMessage res = new GridH2RowMessage();
+
+        res.values(vals);
+
+        return res;
+    }
+
+    /**
+     * Create retry exception for distributed join.
+     *
+     * @param msg Message.
+     * @return Exception.
+     */
+    public static GridH2RetryException retryException(String msg) {
+        return new GridH2RetryException(msg);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index d1b435d..5f692e9 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -177,8 +177,8 @@ import static 
org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.txStart
 import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
 import static 
org.apache.ignite.internal.processors.query.h2.PreparedStatementEx.MVCC_CACHE_ID;
 import static 
org.apache.ignite.internal.processors.query.h2.PreparedStatementEx.MVCC_STATE;
-import static 
org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
-import static 
org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode;
+import static 
org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
+import static 
org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.distributedJoinMode;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.LOCAL;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
index ef6d5ff..9a42362 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
@@ -82,7 +82,7 @@ public class H2PkHashIndex extends GridH2IndexBase {
     }
 
     /** {@inheritDoc} */
-    @Override protected int segmentsCount() {
+    @Override public int segmentsCount() {
         return 1;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeClientIndex.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeClientIndex.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeClientIndex.java
index a0bab43..df896ed 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeClientIndex.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeClientIndex.java
@@ -68,7 +68,7 @@ public class H2TreeClientIndex extends H2TreeIndexBase {
     }
 
     /** {@inheritDoc} */
-    @Override protected int segmentsCount() {
+    @Override public int segmentsCount() {
         throw SHOULDNT_BE_INVOKED_EXCEPTION;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
index 40d0a9f..6b09b76 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
@@ -40,8 +40,6 @@ import 
org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.stat.IoStatisticsHolder;
 import org.apache.ignite.internal.stat.IoStatisticsType;
-import org.apache.ignite.internal.util.IgniteTree;
-import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
@@ -273,7 +271,7 @@ public class H2TreeIndex extends H2TreeIndexBase {
     }
 
     /** {@inheritDoc} */
-    @Override protected int segmentsCount() {
+    @Override public int segmentsCount() {
         return segments.length;
     }
 
@@ -449,25 +447,6 @@ public class H2TreeIndex extends H2TreeIndexBase {
     }
 
     /** {@inheritDoc} */
-    @Override protected H2Cursor doFind0(
-        IgniteTree t,
-        @Nullable SearchRow first,
-        @Nullable SearchRow last,
-        BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter) {
-        try {
-            GridCursor<GridH2Row> range = ((BPlusTree)t).find(first, last, 
filter, null);
-
-            if (range == null)
-                range = EMPTY_CURSOR;
-
-            return new H2Cursor(range);
-        }
-        catch (IgniteCheckedException e) {
-            throw DbException.convert(e);
-        }
-    }
-
-    /** {@inheritDoc} */
     @Override protected BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> 
filter(GridH2QueryContext qctx) {
         if (qctx == null) {
             assert !cctx.mvccEnabled();

http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java
deleted file mode 100644
index cc06244..0000000
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.opt;
-
-/**
- * Defines set of distributed join modes.
- */
-public enum DistributedJoinMode {
-    /**
-     * Distributed joins is disabled. Local joins will be performed instead.
-     */
-    OFF,
-
-    /**
-     * Distributed joins is enabled within local node only.
-     *
-     * NOTE: This mode is used with segmented indices for local sql queries.
-     * As in this case we need to make distributed join across local index 
segments
-     * and prevent range-queries to other nodes.
-     */
-    LOCAL_ONLY,
-
-    /**
-     * Distributed joins is enabled.
-     */
-    ON;
-
-    /**
-     * @param isLocal Query local flag.
-     * @param distributedJoins Query distributed joins flag.
-     * @return DistributedJoinMode for the query.
-     */
-    public static DistributedJoinMode distributedJoinMode(boolean isLocal, 
boolean distributedJoins) {
-        return distributedJoins ? (isLocal ? LOCAL_ONLY : ON) : OFF;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
deleted file mode 100644
index 2a92511..0000000
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
+++ /dev/null
@@ -1,838 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.opt;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import javax.cache.CacheException;
-import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.SB;
-import org.h2.command.dml.Query;
-import org.h2.command.dml.Select;
-import org.h2.command.dml.SelectUnion;
-import org.h2.expression.Comparison;
-import org.h2.expression.Expression;
-import org.h2.expression.ExpressionColumn;
-import org.h2.index.IndexCondition;
-import org.h2.index.ViewIndex;
-import org.h2.table.Column;
-import org.h2.table.IndexColumn;
-import org.h2.table.SubQueryInfo;
-import org.h2.table.Table;
-import org.h2.table.TableFilter;
-import org.h2.table.TableView;
-
-/**
- * Collocation model for a query.
- */
-public final class GridH2CollocationModel {
-    /** */
-    public static final int MULTIPLIER_COLLOCATED = 1;
-
-    /** */
-    private static final int MULTIPLIER_UNICAST = 50;
-
-    /** */
-    private static final int MULTIPLIER_BROADCAST = 200;
-
-    /** */
-    private static final int MULTIPLIER_REPLICATED_NOT_LAST = 10_000;
-
-    /** */
-    private final GridH2CollocationModel upper;
-
-    /** */
-    private final int filter;
-
-    /** */
-    private final boolean view;
-
-    /** */
-    private int multiplier;
-
-    /** */
-    private Type type;
-
-    /** */
-    private GridH2CollocationModel[] children;
-
-    /** */
-    private TableFilter[] childFilters;
-
-    /** */
-    private List<GridH2CollocationModel> unions;
-
-    /** */
-    private Select select;
-
-    /** */
-    private final boolean validate;
-
-    /**
-     * @param upper Upper.
-     * @param filter Filter.
-     * @param view This model will be a subquery (or top level query) and must 
contain child filters.
-     * @param validate Query validation flag.
-     */
-    private GridH2CollocationModel(GridH2CollocationModel upper, int filter, 
boolean view, boolean validate) {
-        this.upper = upper;
-        this.filter = filter;
-        this.view = view;
-        this.validate = validate;
-    }
-
-    /**
-     * @return Table filter for this collocation model.
-     */
-    private TableFilter filter() {
-        return upper == null ? null : upper.childFilters[filter];
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        calculate();
-
-        SB b = new SB();
-
-        for (int lvl = 0; lvl < 20; lvl++) {
-            if (!toString(b, lvl))
-                break;
-
-            b.a('\n');
-        }
-
-        return b.toString();
-    }
-
-    /**
-     * @param b String builder.
-     * @param lvl Depth level.
-     */
-    private boolean toString(SB b, int lvl) {
-        boolean res = false;
-
-        if (lvl == 0) {
-            TableFilter f = filter();
-            String tblAlias = f == null ? "^" : f.getTableAlias();
-
-            b.a("[tbl=").a(tblAlias).a(", type=").a(type).a(", 
mul=").a(multiplier).a("]");
-
-            res = true;
-        }
-        else if (childFilters != null) {
-            assert lvl > 0;
-
-            lvl--;
-
-            for (int i = 0; i < childFilters.length; i++) {
-                if (lvl == 0)
-                    b.a(" | ");
-
-                res |= child(i, true).toString(b, lvl);
-            }
-
-            if (lvl == 0)
-                b.a(" | ");
-        }
-
-        return res;
-    }
-
-    /**
-     * @param upper Upper.
-     * @param filter Filter.
-     * @param unions Unions.
-     * @param view This model will be a subquery (or top level query) and must 
contain child filters.
-     * @param validate Query validation flag.
-     * @return Created child collocation model.
-     */
-    private static GridH2CollocationModel 
createChildModel(GridH2CollocationModel upper,
-        int filter,
-        List<GridH2CollocationModel> unions,
-        boolean view,
-        boolean validate) {
-        GridH2CollocationModel child = new GridH2CollocationModel(upper, 
filter, view, validate);
-
-        if (unions != null) {
-            // Bind created child to unions.
-            assert upper == null || upper.child(filter, false) != null || 
unions.isEmpty();
-
-            if (upper != null && unions.isEmpty()) {
-                assert upper.child(filter, false) == null;
-
-                upper.children[filter] = child;
-            }
-
-            unions.add(child);
-
-            child.unions = unions;
-        }
-        else if (upper != null) {
-            // Bind created child to upper model.
-            assert upper.child(filter, false) == null;
-
-            upper.children[filter] = child;
-        }
-
-        return child;
-    }
-
-    /**
-     * @param childFilters New child filters.
-     * @return {@code true} If child filters were updated.
-     */
-    private boolean childFilters(TableFilter[] childFilters) {
-        assert childFilters != null;
-        assert view;
-
-        Select select = childFilters[0].getSelect();
-
-        assert this.select == null || this.select == select;
-
-        if (this.select == null) {
-            this.select = select;
-
-            assert this.childFilters == null;
-        }
-        else if (Arrays.equals(this.childFilters, childFilters))
-            return false;
-
-        if (this.childFilters == null) {
-            // We have to clone because H2 reuses array and reorders elements.
-            this.childFilters = childFilters.clone();
-
-            children = new GridH2CollocationModel[childFilters.length];
-        }
-        else {
-            assert this.childFilters.length == childFilters.length;
-
-            // We have to copy because H2 reuses array and reorders elements.
-            System.arraycopy(childFilters, 0, this.childFilters, 0, 
childFilters.length);
-
-            Arrays.fill(children, null);
-        }
-
-        // Reset results.
-        type = null;
-        multiplier = 0;
-
-        return true;
-    }
-
-    /**
-     * Do the needed calculations.
-     */
-    @SuppressWarnings("ConstantConditions")
-    private void calculate() {
-        if (type != null)
-            return;
-
-        if (view) { // We are at (sub-)query model.
-            assert childFilters != null;
-
-            boolean collocated = true;
-            boolean partitioned = false;
-            int maxMultiplier = MULTIPLIER_COLLOCATED;
-
-            for (int i = 0; i < childFilters.length; i++) {
-                GridH2CollocationModel child = child(i, true);
-
-                Type t = child.type(true);
-
-                if (child.multiplier == MULTIPLIER_REPLICATED_NOT_LAST)
-                    maxMultiplier = child.multiplier;
-
-                if (t.isPartitioned()) {
-                    partitioned = true;
-
-                    if (!t.isCollocated()) {
-                        collocated = false;
-
-                        int m = child.multiplier(true);
-
-                        if (m > maxMultiplier) {
-                            maxMultiplier = m;
-
-                            if (maxMultiplier == 
MULTIPLIER_REPLICATED_NOT_LAST)
-                                break;
-                        }
-                    }
-                }
-            }
-
-            type = Type.of(partitioned, collocated);
-            multiplier = maxMultiplier;
-        }
-        else {
-            assert upper != null;
-            assert childFilters == null;
-
-            // We are at table instance.
-            Table tbl = filter().getTable();
-
-            // Only partitioned tables will do distributed joins.
-            if (!(tbl instanceof GridH2Table) || 
!((GridH2Table)tbl).isPartitioned()) {
-                type = Type.REPLICATED;
-                multiplier = MULTIPLIER_COLLOCATED;
-
-                return;
-            }
-
-            // If we are the first partitioned table in a join, then we are 
"base" for all the rest partitioned tables
-            // which will need to get remote result (if there is no affinity 
condition). Since this query is broadcasted
-            // to all the affinity nodes the "base" does not need to get 
remote results.
-            if (!upper.findPartitionedTableBefore(filter)) {
-                type = Type.PARTITIONED_COLLOCATED;
-                multiplier = MULTIPLIER_COLLOCATED;
-            }
-            else {
-                // It is enough to make sure that our previous join by 
affinity key is collocated, then we are
-                // collocated. If we at least have affinity key condition, 
then we do unicast which is cheaper.
-                switch (upper.joinedWithCollocated(filter)) {
-                    case COLLOCATED_JOIN:
-                        type = Type.PARTITIONED_COLLOCATED;
-                        multiplier = MULTIPLIER_COLLOCATED;
-
-                        break;
-
-                    case HAS_AFFINITY_CONDITION:
-                        type = Type.PARTITIONED_NOT_COLLOCATED;
-                        multiplier = MULTIPLIER_UNICAST;
-
-                        break;
-
-                    case NONE:
-                        type = Type.PARTITIONED_NOT_COLLOCATED;
-                        multiplier = MULTIPLIER_BROADCAST;
-
-                        break;
-
-                    default:
-                        throw new IllegalStateException();
-                }
-            }
-
-            if (upper.previousReplicated(filter))
-                multiplier = MULTIPLIER_REPLICATED_NOT_LAST;
-        }
-    }
-
-    /**
-     * @param f Current filter.
-     * @return {@code true} If partitioned table was found.
-     */
-    private boolean findPartitionedTableBefore(int f) {
-        for (int i = 0; i < f; i++) {
-            GridH2CollocationModel child = child(i, true);
-
-            // The c can be null if it is not a GridH2Table and not a 
sub-query,
-            // it is a some kind of function table or anything else that 
considered replicated.
-            if (child != null && child.type(true).isPartitioned())
-                return true;
-        }
-
-        // We have to search globally in upper queries as well.
-        return upper != null && upper.findPartitionedTableBefore(filter);
-    }
-
-    /**
-     * @param f Current filter.
-     * @return {@code true} If previous table is REPLICATED.
-     */
-    @SuppressWarnings("SimplifiableIfStatement")
-    private boolean previousReplicated(int f) {
-        if (f > 0 && child(f - 1, true).type(true) == Type.REPLICATED)
-            return true;
-
-        return upper != null && upper.previousReplicated(filter);
-    }
-
-    /**
-     * @param f Filter.
-     * @return Affinity join type.
-     */
-    @SuppressWarnings("ForLoopReplaceableByForEach")
-    private Affinity joinedWithCollocated(int f) {
-        TableFilter tf = childFilters[f];
-
-        GridH2Table tbl = (GridH2Table)tf.getTable();
-
-        if (validate) {
-            if (tbl.isCustomAffinityMapper())
-                throw customAffinityError(tbl.cacheName());
-
-            if (F.isEmpty(tf.getIndexConditions())) {
-                throw new CacheException("Failed to prepare distributed join 
query: " +
-                    "join condition does not use index [joinedCache=" + 
tbl.cacheName() +
-                    ", plan=" + tf.getSelect().getPlanSQL() + ']');
-            }
-        }
-
-        IndexColumn affCol = tbl.getAffinityKeyColumn();
-
-        boolean affKeyCondFound = false;
-
-        if (affCol != null) {
-            ArrayList<IndexCondition> idxConditions = tf.getIndexConditions();
-
-            int affColId = affCol.column.getColumnId();
-
-            for (int i = 0; i < idxConditions.size(); i++) {
-                IndexCondition c = idxConditions.get(i);
-                int colId = c.getColumn().getColumnId();
-                int cmpType = c.getCompareType();
-
-                if ((cmpType == Comparison.EQUAL || cmpType == 
Comparison.EQUAL_NULL_SAFE) &&
-                    (colId == affColId || 
tbl.rowDescriptor().isKeyColumn(colId)) && c.isEvaluatable()) {
-                    affKeyCondFound = true;
-
-                    Expression exp = c.getExpression();
-                    exp = exp.getNonAliasExpression();
-
-                    if (exp instanceof ExpressionColumn) {
-                        ExpressionColumn expCol = (ExpressionColumn)exp;
-
-                        // This is one of our previous joins.
-                        TableFilter prevJoin = expCol.getTableFilter();
-
-                        if (prevJoin != null) {
-                            GridH2CollocationModel cm = 
child(indexOf(prevJoin), true);
-
-                            // If the previous joined model is a subquery 
(view), we can not be sure that
-                            // the found affinity column is the needed one, 
since we can select multiple
-                            // different affinity columns from different 
tables.
-                            if (cm != null && !cm.view) {
-                                Type t = cm.type(true);
-
-                                if (t.isPartitioned() && t.isCollocated() && 
isAffinityColumn(prevJoin, expCol, validate))
-                                    return Affinity.COLLOCATED_JOIN;
-                            }
-                        }
-                    }
-                }
-            }
-        }
-
-        return affKeyCondFound ? Affinity.HAS_AFFINITY_CONDITION : 
Affinity.NONE;
-    }
-
-    /**
-     * @param f Table filter.
-     * @return Index.
-     */
-    private int indexOf(TableFilter f) {
-        for (int i = 0; i < childFilters.length; i++) {
-            if (childFilters[i] == f)
-                return i;
-        }
-
-        throw new IllegalStateException();
-    }
-
-    /**
-     * @param f Table filter.
-     * @param expCol Expression column.
-     * @param validate Query validation flag.
-     * @return {@code true} It it is an affinity column.
-     */
-    private static boolean isAffinityColumn(TableFilter f, ExpressionColumn 
expCol, boolean validate) {
-        Column col = expCol.getColumn();
-
-        if (col == null)
-            return false;
-
-        Table t = col.getTable();
-
-        if (t.isView()) {
-            Query qry;
-
-            if (f.getIndex() != null)
-                qry = getSubQuery(f);
-            else
-                qry = GridSqlQueryParser.VIEW_QUERY.get((TableView)t);
-
-            return isAffinityColumn(qry, expCol, validate);
-        }
-
-        if (t instanceof GridH2Table) {
-            GridH2Table t0 = (GridH2Table)t;
-
-            if (validate && t0.isCustomAffinityMapper())
-                throw customAffinityError((t0).cacheName());
-
-            IndexColumn affCol = t0.getAffinityKeyColumn();
-
-            return affCol != null && col.getColumnId() == 
affCol.column.getColumnId();
-        }
-
-        return false;
-    }
-
-    /**
-     * @param qry Query.
-     * @param expCol Expression column.
-     * @param validate Query validation flag.
-     * @return {@code true} It it is an affinity column.
-     */
-    private static boolean isAffinityColumn(Query qry, ExpressionColumn 
expCol, boolean validate) {
-        if (qry.isUnion()) {
-            SelectUnion union = (SelectUnion)qry;
-
-            return isAffinityColumn(union.getLeft(), expCol, validate) && 
isAffinityColumn(union.getRight(), expCol, validate);
-        }
-
-        Expression exp = 
qry.getExpressions().get(expCol.getColumn().getColumnId()).getNonAliasExpression();
-
-        if (exp instanceof ExpressionColumn) {
-            expCol = (ExpressionColumn)exp;
-
-            return isAffinityColumn(expCol.getTableFilter(), expCol, validate);
-        }
-
-        return false;
-    }
-
-    /**
-     * @return Multiplier.
-     */
-    public int calculateMultiplier() {
-        // We don't need multiplier for union here because it will be 
summarized in H2.
-        return multiplier(false);
-    }
-
-    /**
-     * @param withUnion With respect to union.
-     * @return Multiplier.
-     */
-    @SuppressWarnings("ForLoopReplaceableByForEach")
-    private int multiplier(boolean withUnion) {
-        calculate();
-
-        assert multiplier != 0;
-
-        if (withUnion && unions != null) {
-            int maxMultiplier = 0;
-
-            for (int i = 0; i < unions.size(); i++) {
-                int m = unions.get(i).multiplier(false);
-
-                if (m > maxMultiplier)
-                    maxMultiplier = m;
-            }
-
-            return maxMultiplier;
-        }
-
-        return multiplier;
-    }
-
-    /**
-     * @param withUnion With respect to union.
-     * @return Type.
-     */
-    private Type type(boolean withUnion) {
-        calculate();
-
-        assert type != null;
-
-        if (withUnion && unions != null) {
-            Type left = unions.get(0).type(false);
-
-            for (int i = 1; i < unions.size(); i++) {
-                Type right = unions.get(i).type(false);
-
-                if (!left.isCollocated() || !right.isCollocated()) {
-                    left = Type.PARTITIONED_NOT_COLLOCATED;
-
-                    break;
-                }
-                else if (!left.isPartitioned() && !right.isPartitioned())
-                    left = Type.REPLICATED;
-                else
-                    left = Type.PARTITIONED_COLLOCATED;
-            }
-
-            return left;
-        }
-
-        return type;
-    }
-
-    /**
-     * @param i Index.
-     * @param create Create child if needed.
-     * @return Child collocation.
-     */
-    private GridH2CollocationModel child(int i, boolean create) {
-        GridH2CollocationModel child = children[i];
-
-        if (child == null && create) {
-            TableFilter f = childFilters[i];
-
-            if (f.getTable().isView()) {
-                if (f.getIndex() == null) {
-                    // If we don't have view index yet, then we just creating 
empty model and it must be filled later.
-                    child = createChildModel(this, i, null, true, validate);
-                }
-                else
-                    child = buildCollocationModel(this, i, getSubQuery(f), 
null, validate);
-            }
-            else
-                child = createChildModel(this, i, null, false, validate);
-
-            assert child != null;
-            assert children[i] == child;
-        }
-
-        return child;
-    }
-
-    /**
-     * @param f Table filter.
-     * @return Sub-query.
-     */
-    private static Query getSubQuery(TableFilter f) {
-        return ((ViewIndex)f.getIndex()).getQuery();
-    }
-
-    /**
-     * @return Unions list.
-     */
-    private List<GridH2CollocationModel> getOrCreateUnions() {
-        if (unions == null) {
-            unions = new ArrayList<>(4);
-
-            unions.add(this);
-        }
-
-        return unions;
-    }
-
-    /**
-     * @param qctx Query context.
-     * @param info Sub-query info.
-     * @param filters Filters.
-     * @param filter Filter.
-     * @param validate Query validation flag.
-     * @return Collocation.
-     */
-    public static GridH2CollocationModel 
buildCollocationModel(GridH2QueryContext qctx, SubQueryInfo info,
-        TableFilter[] filters, int filter, boolean validate) {
-        GridH2CollocationModel cm;
-
-        if (info != null) {
-            // Go up until we reach the root query.
-            cm = buildCollocationModel(qctx, info.getUpper(), 
info.getFilters(), info.getFilter(), validate);
-        }
-        else {
-            // We are at the root query.
-            cm = qctx.queryCollocationModel();
-
-            if (cm == null) {
-                cm = createChildModel(null, -1, null, true, validate);
-
-                qctx.queryCollocationModel(cm);
-            }
-        }
-
-        if (filters == null)
-            return cm;
-
-        assert cm.view;
-
-        Select select = filters[0].getSelect();
-
-        // Handle union. We have to rely on fact that select will be the same 
on uppermost select.
-        // For sub-queries we will drop collocation models, so that they will 
be recalculated anyways.
-        if (cm.select != null && cm.select != select) {
-            List<GridH2CollocationModel> unions = cm.getOrCreateUnions();
-
-            // Try to find this select in existing unions.
-            // Start with 1 because at 0 it always will be c.
-            for (int i = 1; i < unions.size(); i++) {
-                GridH2CollocationModel u = unions.get(i);
-
-                if (u.select == select) {
-                    cm = u;
-
-                    break;
-                }
-            }
-
-            // Nothing was found, need to create new child in union.
-            if (cm.select != select)
-                cm = createChildModel(cm.upper, cm.filter, unions, true, 
validate);
-        }
-
-        cm.childFilters(filters);
-
-        return cm.child(filter, true);
-    }
-
-    /**
-     * @param qry Query.
-     * @return {@code true} If the query is collocated.
-     */
-    public static boolean isCollocated(Query qry) {
-        GridH2CollocationModel mdl = buildCollocationModel(null, -1, qry, 
null, true);
-
-        Type type = mdl.type(true);
-
-        if (!type.isCollocated() && mdl.multiplier == 
MULTIPLIER_REPLICATED_NOT_LAST)
-            throw new CacheException("Failed to execute query: for distributed 
join " +
-                "all REPLICATED caches must be at the end of the joined tables 
list.");
-
-        return type.isCollocated();
-    }
-
-    /**
-     * @param upper Upper.
-     * @param filter Filter.
-     * @param qry Query.
-     * @param unions Unions.
-     * @param validate Query validation flag.
-     * @return Built model.
-     */
-    private static GridH2CollocationModel 
buildCollocationModel(GridH2CollocationModel upper,
-        int filter,
-        Query qry,
-        List<GridH2CollocationModel> unions,
-        boolean validate) {
-        if (qry.isUnion()) {
-            if (unions == null)
-                unions = new ArrayList<>();
-
-            SelectUnion union = (SelectUnion)qry;
-
-            GridH2CollocationModel left = buildCollocationModel(upper, filter, 
union.getLeft(), unions, validate);
-            GridH2CollocationModel right = buildCollocationModel(upper, 
filter, union.getRight(), unions, validate);
-
-            assert left != null;
-            assert right != null;
-
-            return upper != null ? upper : left;
-        }
-
-        Select select = (Select)qry;
-
-        List<TableFilter> list = new ArrayList<>();
-
-        for (TableFilter f = select.getTopTableFilter(); f != null; f = 
f.getJoin())
-            list.add(f);
-
-        TableFilter[] filters = list.toArray(new TableFilter[list.size()]);
-
-        GridH2CollocationModel cm = createChildModel(upper, filter, unions, 
true, validate);
-
-        cm.childFilters(filters);
-
-        for (int i = 0; i < filters.length; i++) {
-            TableFilter f = filters[i];
-
-            if (f.getTable().isView())
-                buildCollocationModel(cm, i, getSubQuery(f), null, validate);
-            else if (f.getTable() instanceof GridH2Table)
-                createChildModel(cm, i, null, false, validate);
-        }
-
-        return upper != null ? upper : cm;
-    }
-
-    /**
-     * @param cacheName Cache name.
-     * @return Error.
-     */
-    private static CacheException customAffinityError(String cacheName) {
-        return new CacheException("Failed to prepare distributed join query: 
can not use distributed joins for cache " +
-            "with custom AffinityKeyMapper configured. " +
-            "Please use AffinityKeyMapped annotation instead [cache=" + 
cacheName + ']');
-    }
-
-    /**
-     * Collocation type.
-     */
-    private enum Type {
-        /** */
-        PARTITIONED_COLLOCATED(true, true),
-
-        /** */
-        PARTITIONED_NOT_COLLOCATED(true, false),
-
-        /** */
-        REPLICATED(false, true);
-
-        /** */
-        private final boolean partitioned;
-
-        /** */
-        private final boolean collocated;
-
-        /**
-         * @param partitioned Partitioned.
-         * @param collocated Collocated.
-         */
-        Type(boolean partitioned, boolean collocated) {
-            this.partitioned = partitioned;
-            this.collocated = collocated;
-        }
-
-        /**
-         * @return {@code true} If partitioned.
-         */
-        public boolean isPartitioned() {
-            return partitioned;
-        }
-
-        /**
-         * @return {@code true} If collocated.
-         */
-        public boolean isCollocated() {
-            return collocated;
-        }
-
-        /**
-         * @param partitioned Partitioned.
-         * @param collocated Collocated.
-         * @return Type.
-         */
-        static Type of(boolean partitioned, boolean collocated) {
-            if (collocated)
-                return partitioned ? Type.PARTITIONED_COLLOCATED : 
Type.REPLICATED;
-
-            assert partitioned;
-
-            return Type.PARTITIONED_NOT_COLLOCATED;
-        }
-    }
-
-    /**
-     * Affinity of a table relative to previous joined tables.
-     */
-    private enum Affinity {
-        /** */
-        NONE,
-
-        /** */
-        HAS_AFFINITY_CONDITION,
-
-        /** */
-        COLLOCATED_JOIN
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 6c45c29..9547b5f 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -18,18 +18,23 @@
 package org.apache.ignite.internal.processors.query.h2.opt;
 
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
 import org.apache.ignite.internal.processors.query.h2.H2Cursor;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
+import 
org.apache.ignite.internal.processors.query.h2.opt.join.CursorIteratorWrapper;
+import 
org.apache.ignite.internal.processors.query.h2.opt.join.DistributedLookupBatch;
+import 
org.apache.ignite.internal.processors.query.h2.opt.join.CollocationModel;
+import org.apache.ignite.internal.processors.query.h2.opt.join.RangeSource;
+import org.apache.ignite.internal.processors.query.h2.opt.join.RangeStream;
+import org.apache.ignite.internal.processors.query.h2.opt.join.SegmentKey;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
@@ -41,15 +46,12 @@ import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteTree;
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.typedef.CIX2;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.logger.NullLogger;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.h2.engine.Session;
 import org.h2.index.BaseIndex;
-import org.h2.index.Cursor;
 import org.h2.index.IndexCondition;
 import org.h2.index.IndexLookupBatch;
 import org.h2.index.ViewIndex;
@@ -58,40 +60,26 @@ import org.h2.result.Row;
 import org.h2.result.SearchRow;
 import org.h2.table.IndexColumn;
 import org.h2.table.TableFilter;
-import org.h2.util.DoneFuture;
 import org.h2.value.Value;
-import org.h2.value.ValueNull;
-import org.jetbrains.annotations.Nullable;
 
 import javax.cache.CacheException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
-import static java.util.Collections.emptyIterator;
 import static java.util.Collections.singletonList;
-import static 
org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.LOCAL_ONLY;
-import static 
org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
-import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2CollocationModel.buildCollocationModel;
+import static 
org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
+import static 
org.apache.ignite.internal.processors.query.h2.opt.join.CollocationModel.buildCollocationModel;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE;
-import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor.COL_NOT_EXISTS;
 import static 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_ERROR;
 import static 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_NOT_FOUND;
 import static 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_OK;
-import static 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds.rangeBounds;
 import static org.h2.result.Row.MEMORY_CALCULATE;
 
 /**
@@ -99,7 +87,7 @@ import static org.h2.result.Row.MEMORY_CALCULATE;
  */
 public abstract class GridH2IndexBase extends BaseIndex {
     /** */
-    private static final Object EXPLICIT_NULL = new Object();
+    public static final Object EXPLICIT_NULL = new Object();
 
     /** */
     private Object msgTopic;
@@ -112,7 +100,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
     /** */
     private final CIX2<ClusterNode,Message> locNodeHnd = new 
CIX2<ClusterNode,Message>() {
-        @Override public void applyx(ClusterNode clusterNode, Message msg) 
throws IgniteCheckedException {
+        @Override public void applyx(ClusterNode clusterNode, Message msg) {
             onMessage0(clusterNode.id(), msg);
         }
     };
@@ -122,6 +110,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
     /**
      * @param tbl Table.
      */
+    @SuppressWarnings("MapReplaceableByEnumMap")
     protected final void initDistributedJoinMessaging(GridH2Table tbl) {
         final GridH2RowDescriptor desc = tbl.rowDescriptor();
 
@@ -248,7 +237,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
         // Query expressions can not be distributed as well.
         if (qctx == null || qctx.type() != PREPARE || 
qctx.distributedJoinMode() == OFF ||
             !ses.isJoinBatchEnabled() || ses.isPreparingQueryExpression())
-            return GridH2CollocationModel.MULTIPLIER_COLLOCATED;
+            return CollocationModel.MULTIPLIER_COLLOCATED;
 
         // We have to clear this cache because normally sub-query plan cost 
does not depend on anything
         // other than index condition masks and sort order, but in our case it 
can depend on order
@@ -257,7 +246,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
         assert filters != null;
 
-        GridH2CollocationModel c = buildCollocationModel(qctx, 
ses.getSubQueryInfo(), filters, filter, false);
+        CollocationModel c = buildCollocationModel(qctx, 
ses.getSubQueryInfo(), filters, filter, false);
 
         return c.calculateMultiplier();
     }
@@ -327,7 +316,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
         GridCacheContext<?, ?> cctx = getTable().rowDescriptor().context();
 
-        return new DistributedLookupBatch(cctx, ucast, affColId);
+        return new DistributedLookupBatch(this, cctx, ucast, affColId);
     }
 
     /** {@inheritDoc} */
@@ -344,7 +333,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @param nodes Nodes.
      * @param msg Message.
      */
-    private void send(Collection<ClusterNode> nodes, Message msg) {
+    public void send(Collection<ClusterNode> nodes, Message msg) {
         if (!getTable().rowDescriptor().indexing().send(msgTopic,
             -1,
             nodes,
@@ -353,7 +342,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
             locNodeHnd,
             GridIoPolicy.IDX_POOL,
             false))
-            throw retryException("Failed to send message to nodes: " + nodes);
+            throw H2Utils.retryException("Failed to send message to nodes: " + 
nodes);
     }
 
     /**
@@ -413,7 +402,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
                     // This is the first request containing all the search 
rows.
                     assert !msg.bounds().isEmpty() : "empty bounds";
 
-                    src = new RangeSource(msg.bounds(), msg.segment(), 
filter(qctx));
+                    src = new RangeSource(this, msg.bounds(), msg.segment(), 
filter(qctx));
                 }
                 else {
                     // This is request to fetch next portion of data.
@@ -498,197 +487,6 @@ public abstract class GridH2IndexBase extends BaseIndex {
     }
 
     /**
-     * @param v1 First value.
-     * @param v2 Second value.
-     * @return {@code true} If they equal.
-     */
-    private boolean equal(Value v1, Value v2) {
-        return v1 == v2 || (v1 != null && v2 != null && v1.compareTypeSafe(v2, 
getDatabase().getCompareMode()) == 0);
-    }
-
-    /**
-     * @param qctx Query context.
-     * @param batchLookupId Batch lookup ID.
-     * @param segmentId Segment ID.
-     * @return Index range request.
-     */
-    private static GridH2IndexRangeRequest createRequest(GridH2QueryContext 
qctx, int batchLookupId, int segmentId) {
-        GridH2IndexRangeRequest req = new GridH2IndexRangeRequest();
-
-        req.originNodeId(qctx.originNodeId());
-        req.queryId(qctx.queryId());
-        req.originSegmentId(qctx.segment());
-        req.segment(segmentId);
-        req.batchLookupId(batchLookupId);
-
-        return req;
-    }
-
-
-    /**
-     * @param qctx Query context.
-     * @param cctx Cache context.
-     * @param isLocalQry Local query flag.
-     * @return Collection of nodes for broadcasting.
-     */
-    private List<SegmentKey> broadcastSegments(GridH2QueryContext qctx, 
GridCacheContext<?, ?> cctx, boolean isLocalQry) {
-        Map<UUID, int[]> partMap = qctx.partitionsMap();
-
-        List<ClusterNode> nodes;
-
-        if (isLocalQry) {
-            if (partMap != null && !partMap.containsKey(cctx.localNodeId()))
-                return Collections.emptyList(); // Prevent remote index call 
for local queries.
-
-            nodes = Collections.singletonList(cctx.localNode());
-        }
-        else {
-            if (partMap == null)
-                nodes = new ArrayList<>(CU.affinityNodes(cctx, 
qctx.topologyVersion()));
-            else {
-                nodes = new ArrayList<>(partMap.size());
-
-                GridKernalContext ctx = kernalContext();
-
-                for (UUID nodeId : partMap.keySet()) {
-                    ClusterNode node = ctx.discovery().node(nodeId);
-
-                    if (node == null)
-                        throw retryException("Failed to get node by ID during 
broadcast [nodeId=" + nodeId + ']');
-
-                    nodes.add(node);
-                }
-            }
-
-            if (F.isEmpty(nodes))
-                throw retryException("Failed to collect affinity nodes during 
broadcast [" +
-                    "cacheName=" + cctx.name() + ']');
-        }
-
-        int segmentsCount = segmentsCount();
-
-        List<SegmentKey> res = new ArrayList<>(nodes.size() * segmentsCount);
-
-        for (ClusterNode node : nodes) {
-            for (int seg = 0; seg < segmentsCount; seg++)
-                res.add(new SegmentKey(node, seg));
-        }
-
-        return res;
-    }
-
-    /**
-     * @param cctx Cache context.
-     * @param qctx Query context.
-     * @param affKeyObj Affinity key.
-     * @param isLocalQry Local query flag.
-     * @return Segment key for Affinity key.
-     */
-    private SegmentKey rangeSegment(GridCacheContext<?, ?> cctx, 
GridH2QueryContext qctx, Object affKeyObj, boolean isLocalQry) {
-        assert affKeyObj != null && affKeyObj != EXPLICIT_NULL : affKeyObj;
-
-        ClusterNode node;
-
-        int partition = cctx.affinity().partition(affKeyObj);
-
-        if (isLocalQry) {
-            if (qctx.partitionsMap() != null) {
-                // If we have explicit partitions map, we have to use it to 
calculate affinity node.
-                UUID nodeId = qctx.nodeForPartition(partition, cctx);
-
-                if(!cctx.localNodeId().equals(nodeId))
-                    return null; // Prevent remote index call for local 
queries.
-            }
-
-            if (!cctx.affinity().primaryByKey(cctx.localNode(), partition, 
qctx.topologyVersion()))
-                return null;
-
-            node = cctx.localNode();
-        }
-        else{
-            if (qctx.partitionsMap() != null) {
-                // If we have explicit partitions map, we have to use it to 
calculate affinity node.
-                UUID nodeId = qctx.nodeForPartition(partition, cctx);
-
-            node = cctx.discovery().node(nodeId);
-        }
-        else // Get primary node for current topology version.
-            node = cctx.affinity().primaryByKey(affKeyObj, 
qctx.topologyVersion());
-
-            if (node == null) // Node was not found, probably topology changed 
and we need to retry the whole query.
-                throw retryException("Failed to get primary node by key for 
range segment.");
-        }
-
-        return new SegmentKey(node, segmentForPartition(partition));
-    }
-
-    /** */
-    protected class SegmentKey {
-        /** */
-        final ClusterNode node;
-
-        /** */
-        final int segmentId;
-
-        SegmentKey(ClusterNode node, int segmentId) {
-            assert node != null;
-
-            this.node = node;
-            this.segmentId = segmentId;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            SegmentKey key = (SegmentKey)o;
-
-            return segmentId == key.segmentId && 
node.id().equals(key.node.id());
-
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int result = node.hashCode();
-            result = 31 * result + segmentId;
-            return result;
-        }
-    }
-
-    /**
-     * @param row Row.
-     * @return Row message.
-     */
-    private GridH2RowMessage toRowMessage(Row row) {
-        if (row == null)
-            return null;
-
-        int cols = row.getColumnCount();
-
-        assert cols > 0 : cols;
-
-        List<GridH2ValueMessage> vals = new ArrayList<>(cols);
-
-        for (int i = 0; i < cols; i++) {
-            try {
-                vals.add(GridH2ValueMessageFactory.toMessage(row.getValue(i)));
-            }
-            catch (IgniteCheckedException e) {
-                throw new CacheException(e);
-            }
-        }
-
-        GridH2RowMessage res = new GridH2RowMessage();
-
-        res.values(vals);
-
-        return res;
-    }
-
-    /**
      * @param msg Row message.
      * @return Search row.
      */
@@ -723,7 +521,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @param row Search row.
      * @return Row message.
      */
-    private GridH2RowMessage toSearchRowMessage(SearchRow row) {
+    public GridH2RowMessage toSearchRowMessage(SearchRow row) {
         if (row == null)
             return null;
 
@@ -766,41 +564,15 @@ public abstract class GridH2IndexBase extends BaseIndex {
     }
 
     /**
-     * @param msg Message.
-     * @return Row.
+     * @return Index segments count.
      */
-    private Row toRow(GridH2RowMessage msg) {
-        if (msg == null)
-            return null;
-
-        GridKernalContext ctx = kernalContext();
-
-        List<GridH2ValueMessage> vals = msg.values();
-
-        assert !F.isEmpty(vals) : vals;
-
-        Value[] vals0 = new Value[vals.size()];
-
-        for (int i = 0; i < vals0.length; i++) {
-            try {
-                vals0[i] = vals.get(i).value(ctx);
-            }
-            catch (IgniteCheckedException e) {
-                throw new CacheException(e);
-            }
-        }
-
-        return database.createRow(vals0, MEMORY_CALCULATE);
-    }
-
-    /** @return Index segments count. */
-    protected abstract int segmentsCount();
+    public abstract int segmentsCount();
 
     /**
      * @param partition Partition idx.
      * @return Segment ID for given key
      */
-    protected int segmentForPartition(int partition){
+    public int segmentForPartition(int partition){
         return segmentsCount() == 1 ? 0 : (partition % segmentsCount());
     }
 
@@ -808,6 +580,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @param row Table row.
      * @return Segment ID for given row.
      */
+    @SuppressWarnings("IfMayBeConditional")
     protected int segmentForRow(SearchRow row) {
         assert row != null;
 
@@ -831,722 +604,33 @@ public abstract class GridH2IndexBase extends BaseIndex {
     }
 
     /**
-     * Simple cursor from a single node.
-     */
-    private static class UnicastCursor implements Cursor {
-        /** */
-        final int rangeId;
-
-        /** */
-        RangeStream stream;
-
-        /**
-         * @param rangeId Range ID.
-         * @param keys Remote index segment keys.
-         * @param rangeStreams Range streams.
-         */
-        UnicastCursor(int rangeId, List<SegmentKey> keys, Map<SegmentKey, 
RangeStream> rangeStreams) {
-            assert keys.size() == 1;
-
-            this.rangeId = rangeId;
-            this.stream = rangeStreams.get(F.first(keys));
-
-            assert stream != null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean next() {
-            return stream.next(rangeId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Row get() {
-            return stream.get(rangeId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public SearchRow getSearchRow() {
-            return get();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean previous() {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    /**
-     * Merge cursor from multiple nodes.
-     */
-    private class BroadcastCursor implements Cursor, Comparator<RangeStream> {
-        /** */
-        final int rangeId;
-
-        /** */
-        final RangeStream[] streams;
-
-        /** */
-        boolean first = true;
-
-        /** */
-        int off;
-
-        /**
-         * @param rangeId Range ID.
-         * @param segmentKeys Remote nodes.
-         * @param rangeStreams Range streams.
-         */
-        BroadcastCursor(int rangeId, Collection<SegmentKey> segmentKeys, 
Map<SegmentKey, RangeStream> rangeStreams) {
-
-            this.rangeId = rangeId;
-
-            streams = new RangeStream[segmentKeys.size()];
-
-            int i = 0;
-
-            for (SegmentKey segmentKey : segmentKeys) {
-                RangeStream stream = rangeStreams.get(segmentKey);
-
-                assert stream != null;
-
-                streams[i++] = stream;
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public int compare(RangeStream o1, RangeStream o2) {
-            if (o1 == o2)
-                return 0;
-
-            // Nulls are at the beginning of array.
-            if (o1 == null)
-                return -1;
-
-            if (o2 == null)
-                return 1;
-
-            return compareRows(o1.get(rangeId), o2.get(rangeId));
-        }
-
-        /**
-         * Try to fetch the first row.
-         *
-         * @return {@code true} If we were able to find at least one row.
-         */
-        private boolean goFirst() {
-            // Fetch first row from all the streams and sort them.
-            for (int i = 0; i < streams.length; i++) {
-                if (!streams[i].next(rangeId)) {
-                    streams[i] = null;
-                    off++; // After sorting this offset will cut off all null 
elements at the beginning of array.
-                }
-            }
-
-            if (off == streams.length)
-                return false;
-
-            Arrays.sort(streams, this);
-
-            return true;
-        }
-
-        /**
-         * Fetch next row.
-         *
-         * @return {@code true} If we were able to find at least one row.
-         */
-        private boolean goNext() {
-            assert off != streams.length;
-
-            if (!streams[off].next(rangeId)) {
-                // Next row from current min stream was not found -> nullify 
that stream and bump offset forward.
-                streams[off] = null;
-
-                return ++off != streams.length;
-            }
-
-            // Bubble up current min stream with respect to fetched row to 
achieve correct sort order of streams.
-            bubbleUp(streams, off, this);
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean next() {
-            if (first) {
-                first = false;
-
-                return goFirst();
-            }
-
-            return goNext();
-        }
-
-        /** {@inheritDoc} */
-        @Override public Row get() {
-            return streams[off].get(rangeId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public SearchRow getSearchRow() {
-            return get();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean previous() {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    /**
-     * Index lookup batch.
-     */
-    private class DistributedLookupBatch implements IndexLookupBatch {
-        /** */
-        final GridCacheContext<?,?> cctx;
-
-        /** */
-        final boolean ucast;
-
-        /** */
-        final int affColId;
-
-        /** */
-        GridH2QueryContext qctx;
-
-        /** */
-        int batchLookupId;
-
-        /** */
-        Map<SegmentKey, RangeStream> rangeStreams = Collections.emptyMap();
-
-        /** */
-        List<SegmentKey> broadcastSegments;
-
-        /** */
-        List<Future<Cursor>> res = Collections.emptyList();
-
-        /** */
-        boolean batchFull;
-
-        /** */
-        boolean findCalled;
-
-        /**
-         * @param cctx Cache Cache context.
-         * @param ucast Unicast or broadcast query.
-         * @param affColId Affinity column ID.
-         */
-        DistributedLookupBatch(GridCacheContext<?, ?> cctx, boolean ucast, int 
affColId) {
-            this.cctx = cctx;
-            this.ucast = ucast;
-            this.affColId = affColId;
-        }
-
-        /**
-         * @param firstRow First row.
-         * @param lastRow Last row.
-         * @return Affinity key or {@code null}.
-         */
-        private Object getAffinityKey(SearchRow firstRow, SearchRow lastRow) {
-            if (affColId == COL_NOT_EXISTS)
-                return null;
-
-            if (firstRow == null || lastRow == null)
-                return null;
-
-            Value affKeyFirst = firstRow.getValue(affColId);
-            Value affKeyLast = lastRow.getValue(affColId);
-
-            if (affKeyFirst != null && equal(affKeyFirst, affKeyLast))
-                return affKeyFirst == ValueNull.INSTANCE ? EXPLICIT_NULL : 
affKeyFirst.getObject();
-
-            if (getTable().rowDescriptor().isKeyColumn(affColId))
-                return null;
-
-            // Try to extract affinity key from primary key.
-            Value pkFirst = firstRow.getValue(KEY_COL);
-            Value pkLast = lastRow.getValue(KEY_COL);
-
-            if (pkFirst == ValueNull.INSTANCE || pkLast == ValueNull.INSTANCE)
-                return EXPLICIT_NULL;
-
-            if (pkFirst == null || pkLast == null || !equal(pkFirst, pkLast))
-                return null;
-
-            Object pkAffKeyFirst = 
cctx.affinity().affinityKey(pkFirst.getObject());
-            Object pkAffKeyLast = 
cctx.affinity().affinityKey(pkLast.getObject());
-
-            if (pkAffKeyFirst == null || pkAffKeyLast == null)
-                throw new CacheException("Cache key without affinity key.");
-
-            if (pkAffKeyFirst.equals(pkAffKeyLast))
-                return pkAffKeyFirst;
-
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("ForLoopReplaceableByForEach")
-        @Override public boolean addSearchRows(SearchRow firstRow, SearchRow 
lastRow) {
-            if (qctx == null || findCalled) {
-                if (qctx == null) {
-                    // It is the first call after query begin (may be after 
reuse),
-                    // reinitialize query context and result.
-                    qctx = GridH2QueryContext.get();
-                    res = new ArrayList<>();
-
-                    assert qctx != null;
-                    assert !findCalled;
-                }
-                else {
-                    // Cleanup after the previous lookup phase.
-                    assert batchLookupId != 0;
-
-                    findCalled = false;
-                    qctx.putStreams(batchLookupId, null);
-                    res.clear();
-                }
-
-                // Reinitialize for the next lookup phase.
-                batchLookupId = qctx.nextBatchLookupId();
-                rangeStreams = new HashMap<>();
-            }
-
-            Object affKey = getAffinityKey(firstRow, lastRow);
-
-            boolean locQry = localQuery();
-
-            List<SegmentKey> segmentKeys;
-
-            if (affKey != null) {
-                // Affinity key is provided.
-                if (affKey == EXPLICIT_NULL) // Affinity key is explicit null, 
we will not find anything.
-                    return false;
-
-                segmentKeys = F.asList(rangeSegment(cctx, qctx, affKey, 
locQry));
-            }
-            else {
-                // Affinity key is not provided or is not the same in upper 
and lower bounds, we have to broadcast.
-                if (broadcastSegments == null)
-                    broadcastSegments = broadcastSegments(qctx, cctx, locQry);
-
-                segmentKeys = broadcastSegments;
-            }
-
-            if (locQry && segmentKeys.isEmpty())
-                return false; // Nothing to do
-
-            assert !F.isEmpty(segmentKeys) : segmentKeys;
-
-            final int rangeId = res.size();
-
-            // Create messages.
-            GridH2RowMessage first = toSearchRowMessage(firstRow);
-            GridH2RowMessage last = toSearchRowMessage(lastRow);
-
-            // Range containing upper and lower bounds.
-            GridH2RowRangeBounds rangeBounds = rangeBounds(rangeId, first, 
last);
-
-            // Add range to every message of every participating node.
-            for (int i = 0; i < segmentKeys.size(); i++) {
-                SegmentKey segmentKey = segmentKeys.get(i);
-                assert segmentKey != null;
-
-                RangeStream stream = rangeStreams.get(segmentKey);
-
-                List<GridH2RowRangeBounds> bounds;
-
-                if (stream == null) {
-                    stream = new RangeStream(qctx, segmentKey.node);
-
-                    stream.req = createRequest(qctx, batchLookupId, 
segmentKey.segmentId);
-                    stream.req.bounds(bounds = new ArrayList<>());
-
-                    rangeStreams.put(segmentKey, stream);
-                }
-                else
-                    bounds = stream.req.bounds();
-
-                bounds.add(rangeBounds);
-
-                // If at least one node will have a full batch then we are ok.
-                if (bounds.size() >= qctx.pageSize())
-                    batchFull = true;
-            }
-
-            Future<Cursor> fut = new DoneFuture<>(segmentKeys.size() == 1 ?
-                new UnicastCursor(rangeId, segmentKeys, rangeStreams) :
-                new BroadcastCursor(rangeId, segmentKeys, rangeStreams));
-
-            res.add(fut);
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isBatchFull() {
-            return batchFull;
-        }
-
-        /**
-         * @return {@code True} if local query execution is enforced.
-         */
-        private boolean localQuery() {
-            assert qctx != null : "Missing query context: " + this;
-
-            return qctx.distributedJoinMode() == LOCAL_ONLY;
-        }
-
-        /**
-         *
-         */
-        private void startStreams() {
-            if (rangeStreams.isEmpty()) {
-                assert res.isEmpty();
-
-                return;
-            }
-
-            qctx.putStreams(batchLookupId, rangeStreams);
-
-            // Start streaming.
-            for (RangeStream stream : rangeStreams.values())
-                stream.start();
-        }
-
-        /** {@inheritDoc} */
-        @Override public List<Future<Cursor>> find() {
-            batchFull = false;
-            findCalled = true;
-
-            startStreams();
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void reset(boolean beforeQry) {
-            if (beforeQry || qctx == null) // Query context can be null if 
addSearchRows was never called.
-                return;
-
-            assert batchLookupId != 0;
-
-            // Do cleanup after the query run.
-            qctx.putStreams(batchLookupId, null);
-            qctx = null; // The same query can be reused multiple times for 
different query contexts.
-            batchLookupId = 0;
-
-            rangeStreams = Collections.emptyMap();
-            broadcastSegments = null;
-            batchFull = false;
-            findCalled = false;
-            res = Collections.emptyList();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String getPlanSQL() {
-            return ucast ? "unicast" : "broadcast";
-        }
-    }
-
-    /**
-     * Per node range stream.
+     * Find rows for the segments (distributed joins).
+     *
+     * @param bounds Bounds.
+     * @param segment Segment.
+     * @param filter Filter.
+     * @return Iterator.
      */
-    private class RangeStream {
-        /** */
-        final GridH2QueryContext qctx;
-
-        /** */
-        final ClusterNode node;
-
-        /** */
-        GridH2IndexRangeRequest req;
-
-        /** */
-        int remainingRanges;
-
-        /** */
-        final BlockingQueue<GridH2IndexRangeResponse> respQueue = new 
LinkedBlockingQueue<>();
-
-        /** */
-        Iterator<GridH2RowRange> ranges = emptyIterator();
-
-        /** */
-        Cursor cursor = GridH2Cursor.EMPTY;
-
-        /** */
-        int cursorRangeId = -1;
-
-        /**
-         * @param qctx Query context.
-         * @param node Node.
-         */
-        RangeStream(GridH2QueryContext qctx, ClusterNode node) {
-            this.node = node;
-            this.qctx = qctx;
-        }
-
-        /**
-         * Start streaming.
-         */
-        private void start() {
-            assert ctx != null;
-            assert log != null: getName();
-
-            remainingRanges = req.bounds().size();
-
-            assert remainingRanges > 0;
-
-            if (log.isDebugEnabled())
-                log.debug("Starting stream: [node=" + node + ", req=" + req + 
"]");
-
-            send(singletonList(node), req);
-        }
-
-        /**
-         * @param msg Response.
-         */
-        public void onResponse(GridH2IndexRangeResponse msg) {
-            respQueue.add(msg);
-        }
-
-        /**
-         * @return Response.
-         */
-        private GridH2IndexRangeResponse awaitForResponse() {
-            assert remainingRanges > 0;
-
-            final long start = U.currentTimeMillis();
-
-            for (int attempt = 0;; attempt++) {
-                if (qctx.isCleared())
-                    throw retryException("Query is cancelled.");
-
-                if (kernalContext().isStopping())
-                    throw retryException("Local node is stopping.");
-
-                GridH2IndexRangeResponse res;
-
-                try {
-                    res = respQueue.poll(500, TimeUnit.MILLISECONDS);
-                }
-                catch (InterruptedException ignored) {
-                    throw retryException("Interrupted while waiting for 
reply.");
-                }
-
-                if (res != null) {
-                    switch (res.status()) {
-                        case STATUS_OK:
-                            List<GridH2RowRange> ranges0 = res.ranges();
-
-                            remainingRanges -= ranges0.size();
-
-                            if (ranges0.get(ranges0.size() - 1).isPartial())
-                                remainingRanges++;
-
-                            if (remainingRanges > 0) {
-                                if (req.bounds() != null)
-                                    req = createRequest(qctx, 
req.batchLookupId(), req.segment());
-
-                                // Prefetch next page.
-                                send(singletonList(node), req);
-                            }
-                            else
-                                req = null;
-
-                            return res;
-
-                        case STATUS_NOT_FOUND:
-                            if (req == null || req.bounds() == null) // We 
have already received the first response.
-                                throw retryException("Failure on remote 
node.");
-
-                            if (U.currentTimeMillis() - start > 30_000)
-                                throw retryException("Timeout reached.");
-
-                            try {
-                                U.sleep(20 * attempt);
-                            }
-                            catch (IgniteInterruptedCheckedException e) {
-                                throw new 
IgniteInterruptedException(e.getMessage());
-                            }
-
-                            // Retry to send the request once more after some 
time.
-                            send(singletonList(node), req);
-
-                            break;
-
-                        case STATUS_ERROR:
-                            throw new CacheException(res.error());
-
-                        default:
-                            throw new IllegalStateException();
-                    }
-                }
-
-                if (!kernalContext().discovery().alive(node))
-                    throw retryException("Node has left topology: " + 
node.id());
-            }
-        }
-
-        /**
-         * @param rangeId Requested range ID.
-         * @return {@code true} If next row for the requested range was found.
-         */
-        private boolean next(final int rangeId) {
-            for (;;) {
-                if (rangeId == cursorRangeId) {
-                    if (cursor.next())
-                        return true;
-                }
-                else if (rangeId < cursorRangeId)
-                    return false;
-
-                cursor = GridH2Cursor.EMPTY;
-
-                while (!ranges.hasNext()) {
-                    if (remainingRanges == 0) {
-                        ranges = emptyIterator();
-
-                        return false;
-                    }
-
-                    ranges = awaitForResponse().ranges().iterator();
-                }
-
-                GridH2RowRange range = ranges.next();
-
-                cursorRangeId = range.rangeId();
-
-                if (!F.isEmpty(range.rows())) {
-                    final Iterator<GridH2RowMessage> it = 
range.rows().iterator();
-
-                    if (it.hasNext()) {
-                        cursor = new GridH2Cursor(new Iterator<Row>() {
-                            @Override public boolean hasNext() {
-                                return it.hasNext();
-                            }
-
-                            @Override public Row next() {
-                                // Lazily convert messages into real rows.
-                                return toRow(it.next());
-                            }
+    @SuppressWarnings("unchecked")
+    public Iterator<GridH2Row> findForSegment(GridH2RowRangeBounds bounds, int 
segment,
+        BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter) {
+        SearchRow first = toSearchRow(bounds.first());
+        SearchRow last = toSearchRow(bounds.last());
 
-                            @Override public void remove() {
-                                throw new UnsupportedOperationException();
-                            }
-                        });
-                    }
-                }
-            }
-        }
+        IgniteTree t = treeForRead(segment);
 
-        /**
-         * @param rangeId Requested range ID.
-         * @return Current row.
-         */
-        private Row get(int rangeId) {
-            assert rangeId == cursorRangeId;
+        try {
+            GridCursor<GridH2Row> range = ((BPlusTree)t).find(first, last, 
filter, null);
 
-            return cursor.get();
-        }
-    }
+            if (range == null)
+                range = H2Utils.EMPTY_CURSOR;
 
-    /**
-     * Bounds iterator.
-     */
-    private class RangeSource {
-        /** */
-        Iterator<GridH2RowRangeBounds> boundsIter;
-
-        /** */
-        int curRangeId = -1;
-
-        /** */
-        private final int segment;
-
-        /** */
-        private final BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> 
filter;
-
-        /** Iterator. */
-        Iterator<GridH2Row> iter = emptyIterator();
-
-        /**
-         * @param bounds Bounds.
-         * @param segment Segment.
-         * @param filter Filter.
-         */
-        RangeSource(Iterable<GridH2RowRangeBounds> bounds, int segment, 
BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter) {
-            this.segment = segment;
-            this.filter = filter;
-            boundsIter = bounds.iterator();
-        }
+            H2Cursor cur = new H2Cursor(range);
 
-        /**
-         * @return {@code true} If there are more rows in this source.
-         */
-        public boolean hasMoreRows() throws IgniteCheckedException {
-            return boundsIter.hasNext() || iter.hasNext();
+            return new CursorIteratorWrapper(cur);
         }
-
-        /**
-         * @param maxRows Max allowed rows.
-         * @return Range.
-         */
-        public GridH2RowRange next(int maxRows) {
-            assert maxRows > 0 : maxRows;
-
-            for (; ; ) {
-                if (iter.hasNext()) {
-                    // Here we are getting last rows from previously partially 
fetched range.
-                    List<GridH2RowMessage> rows = new ArrayList<>();
-
-                    GridH2RowRange nextRange = new GridH2RowRange();
-
-                    nextRange.rangeId(curRangeId);
-                    nextRange.rows(rows);
-
-                    do {
-                        rows.add(toRowMessage(iter.next()));
-                    }
-                    while (rows.size() < maxRows && iter.hasNext());
-
-                    if (iter.hasNext())
-                        nextRange.setPartial();
-                    else
-                        iter = emptyIterator();
-
-                    return nextRange;
-                }
-
-                iter = emptyIterator();
-
-                if (!boundsIter.hasNext()) {
-                    boundsIter = emptyIterator();
-
-                    return null;
-                }
-
-                GridH2RowRangeBounds bounds = boundsIter.next();
-
-                curRangeId = bounds.rangeId();
-
-                SearchRow first = toSearchRow(bounds.first());
-                SearchRow last = toSearchRow(bounds.last());
-
-                IgniteTree t = treeForRead(segment);
-
-                iter = new CursorIteratorWrapper(doFind0(t, first, last, 
filter));
-
-                if (!iter.hasNext()) {
-                    // We have to return empty range here.
-                    GridH2RowRange emptyRange = new GridH2RowRange();
-
-                    emptyRange.rangeId(curRangeId);
-
-                    return emptyRange;
-                }
-            }
+        catch (IgniteCheckedException e) {
+            throw DbException.convert(e);
         }
     }
 
@@ -1559,21 +643,6 @@ public abstract class GridH2IndexBase extends BaseIndex {
     }
 
     /**
-     * @param t Tree.
-     * @param first Lower bound.
-     * @param last Upper bound always inclusive.
-     * @param filter Filter.
-     * @return Iterator over rows in given range.
-     */
-    protected H2Cursor doFind0(
-        IgniteTree t,
-        @Nullable SearchRow first,
-        @Nullable SearchRow last,
-        BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter) {
-        throw new UnsupportedOperationException();
-    }
-
-    /**
      * Re-assign column ids after removal of column(s).
      */
     public void refreshColumnIds() {
@@ -1582,72 +651,4 @@ public abstract class GridH2IndexBase extends BaseIndex {
         for (int pos = 0; pos < columnIds.length; ++pos)
             columnIds[pos] = columns[pos].getColumnId();
     }
-
-    /**
-     * Create retry exception for distributed join.
-     *
-     * @param msg Message.
-     * @return Exception.
-     */
-    private GridH2RetryException retryException(String msg) {
-        return new GridH2RetryException(msg);
-    }
-
-    /**
-     *
-     */
-    private static final class CursorIteratorWrapper implements 
Iterator<GridH2Row> {
-        /** */
-        private final H2Cursor cursor;
-
-        /** Next element. */
-        private GridH2Row next;
-
-        /**
-         * @param cursor Cursor.
-         */
-        private CursorIteratorWrapper(H2Cursor cursor) {
-            assert cursor != null;
-
-            this.cursor = cursor;
-
-            if (cursor.next())
-                next = (GridH2Row)cursor.get();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean hasNext() {
-            return next != null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridH2Row next() {
-            GridH2Row res = next;
-
-            if (cursor.next())
-                next = (GridH2Row)cursor.get();
-            else
-                next = null;
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void remove() {
-            throw new UnsupportedOperationException("operation is not 
supported");
-        }
-    }
-
-    /** Empty cursor. */
-    protected static final GridCursor<GridH2Row> EMPTY_CURSOR = new 
GridCursor<GridH2Row>() {
-        /** {@inheritDoc} */
-        @Override public boolean next() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridH2Row get() {
-            return null;
-        }
-    };
 }

Reply via email to