IGNITE-10812: SQL: split classes responsible for distributed joins. This closes #5742.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/53543733 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/53543733 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/53543733 Branch: refs/heads/ignite-601 Commit: 535437336b1fd0e382c19f220d511fea52dbf889 Parents: 0431c28 Author: devozerov <voze...@gridgain.com> Authored: Wed Dec 26 18:23:56 2018 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Wed Dec 26 18:23:56 2018 +0300 ---------------------------------------------------------------------- .../query/h2/opt/GridH2SpatialIndex.java | 5 +- .../internal/processors/query/h2/H2Utils.java | 62 + .../processors/query/h2/IgniteH2Indexing.java | 4 +- .../query/h2/database/H2PkHashIndex.java | 2 +- .../query/h2/database/H2TreeClientIndex.java | 2 +- .../query/h2/database/H2TreeIndex.java | 23 +- .../query/h2/opt/DistributedJoinMode.java | 51 - .../query/h2/opt/GridH2CollocationModel.java | 838 -------------- .../query/h2/opt/GridH2IndexBase.java | 1085 +----------------- .../query/h2/opt/GridH2QueryContext.java | 163 +-- .../query/h2/opt/QueryContextKey.java | 127 ++ .../query/h2/opt/join/BroadcastCursor.java | 155 +++ .../query/h2/opt/join/CollocationModel.java | 841 ++++++++++++++ .../h2/opt/join/CursorIteratorWrapper.java | 68 ++ .../query/h2/opt/join/DistributedJoinMode.java | 51 + .../h2/opt/join/DistributedLookupBatch.java | 430 +++++++ .../query/h2/opt/join/RangeSource.java | 137 +++ .../query/h2/opt/join/RangeStream.java | 296 +++++ .../query/h2/opt/join/SegmentKey.java | 81 ++ .../processors/query/h2/opt/join/SourceKey.java | 66 ++ .../query/h2/opt/join/UnicastCursor.java | 64 ++ .../query/h2/sql/GridSqlQuerySplitter.java | 2 +- .../query/h2/twostep/GridMapQueryExecutor.java | 6 +- .../h2/twostep/GridReduceQueryExecutor.java | 2 +- .../h2/twostep/RetryCauseMessageSelfTest.java | 4 +- 25 files changed, 2465 insertions(+), 2100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java ---------------------------------------------------------------------- diff --git a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java index 4d1577b..35e9424 100644 --- a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java +++ b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java @@ -28,6 +28,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.query.h2.H2Cursor; +import org.apache.ignite.internal.processors.query.h2.H2Utils; import org.apache.ignite.internal.util.GridCursorIteratorWrapper; import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter; @@ -154,7 +155,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex } /** {@inheritDoc} */ - @Override protected int segmentsCount() { + @Override public int segmentsCount() { return segments.length; } @@ -335,7 +336,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex @SuppressWarnings("unchecked") private GridCursor<GridH2Row> rowIterator(Iterator<SpatialKey> i, TableFilter filter) { if (!i.hasNext()) - return EMPTY_CURSOR; + return H2Utils.EMPTY_CURSOR; long time = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java index c542758..2a5f33c 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java @@ -45,15 +45,22 @@ import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.query.GridQueryProperty; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory; import org.apache.ignite.internal.util.GridStringBuilder; +import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import org.h2.engine.Session; import org.h2.jdbc.JdbcConnection; +import org.h2.result.Row; import org.h2.result.SortOrder; import org.h2.table.IndexColumn; import org.h2.util.LocalDateTimeUtils; @@ -78,6 +85,8 @@ import org.h2.value.ValueTime; import org.h2.value.ValueTimestamp; import org.h2.value.ValueUuid; +import javax.cache.CacheException; + import static org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME; import static org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_NAME; import static org.apache.ignite.internal.processors.query.QueryUtils.VER_FIELD_NAME; @@ -94,6 +103,19 @@ public class H2Utils { /** Quotation character. */ private static final char ESC_CH = '\"'; + /** Empty cursor. */ + public static final GridCursor<GridH2Row> EMPTY_CURSOR = new GridCursor<GridH2Row>() { + /** {@inheritDoc} */ + @Override public boolean next() { + return false; + } + + /** {@inheritDoc} */ + @Override public GridH2Row get() { + return null; + } + }; + /** * @param c1 First column. * @param c2 Second column. @@ -598,4 +620,44 @@ public class H2Utils { return qry; } + + /** + * @param row Row. + * @return Row message. + */ + public static GridH2RowMessage toRowMessage(Row row) { + if (row == null) + return null; + + int cols = row.getColumnCount(); + + assert cols > 0 : cols; + + List<GridH2ValueMessage> vals = new ArrayList<>(cols); + + for (int i = 0; i < cols; i++) { + try { + vals.add(GridH2ValueMessageFactory.toMessage(row.getValue(i))); + } + catch (IgniteCheckedException e) { + throw new CacheException(e); + } + } + + GridH2RowMessage res = new GridH2RowMessage(); + + res.values(vals); + + return res; + } + + /** + * Create retry exception for distributed join. + * + * @param msg Message. + * @return Exception. + */ + public static GridH2RetryException retryException(String msg) { + return new GridH2RetryException(msg); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index d1b435d..5f692e9 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -177,8 +177,8 @@ import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.txStart import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT; import static org.apache.ignite.internal.processors.query.h2.PreparedStatementEx.MVCC_CACHE_ID; import static org.apache.ignite.internal.processors.query.h2.PreparedStatementEx.MVCC_STATE; -import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF; -import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode; +import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF; +import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.distributedJoinMode; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.LOCAL; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE; http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java index ef6d5ff..9a42362 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java @@ -82,7 +82,7 @@ public class H2PkHashIndex extends GridH2IndexBase { } /** {@inheritDoc} */ - @Override protected int segmentsCount() { + @Override public int segmentsCount() { return 1; } http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeClientIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeClientIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeClientIndex.java index a0bab43..df896ed 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeClientIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeClientIndex.java @@ -68,7 +68,7 @@ public class H2TreeClientIndex extends H2TreeIndexBase { } /** {@inheritDoc} */ - @Override protected int segmentsCount() { + @Override public int segmentsCount() { throw SHOULDNT_BE_INVOKED_EXCEPTION; } http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java index 40d0a9f..6b09b76 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java @@ -40,8 +40,6 @@ import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.stat.IoStatisticsHolder; import org.apache.ignite.internal.stat.IoStatisticsType; -import org.apache.ignite.internal.util.IgniteTree; -import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter; @@ -273,7 +271,7 @@ public class H2TreeIndex extends H2TreeIndexBase { } /** {@inheritDoc} */ - @Override protected int segmentsCount() { + @Override public int segmentsCount() { return segments.length; } @@ -449,25 +447,6 @@ public class H2TreeIndex extends H2TreeIndexBase { } /** {@inheritDoc} */ - @Override protected H2Cursor doFind0( - IgniteTree t, - @Nullable SearchRow first, - @Nullable SearchRow last, - BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter) { - try { - GridCursor<GridH2Row> range = ((BPlusTree)t).find(first, last, filter, null); - - if (range == null) - range = EMPTY_CURSOR; - - return new H2Cursor(range); - } - catch (IgniteCheckedException e) { - throw DbException.convert(e); - } - } - - /** {@inheritDoc} */ @Override protected BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter(GridH2QueryContext qctx) { if (qctx == null) { assert !cctx.mvccEnabled(); http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java deleted file mode 100644 index cc06244..0000000 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.h2.opt; - -/** - * Defines set of distributed join modes. - */ -public enum DistributedJoinMode { - /** - * Distributed joins is disabled. Local joins will be performed instead. - */ - OFF, - - /** - * Distributed joins is enabled within local node only. - * - * NOTE: This mode is used with segmented indices for local sql queries. - * As in this case we need to make distributed join across local index segments - * and prevent range-queries to other nodes. - */ - LOCAL_ONLY, - - /** - * Distributed joins is enabled. - */ - ON; - - /** - * @param isLocal Query local flag. - * @param distributedJoins Query distributed joins flag. - * @return DistributedJoinMode for the query. - */ - public static DistributedJoinMode distributedJoinMode(boolean isLocal, boolean distributedJoins) { - return distributedJoins ? (isLocal ? LOCAL_ONLY : ON) : OFF; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java deleted file mode 100644 index 2a92511..0000000 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java +++ /dev/null @@ -1,838 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.h2.opt; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import javax.cache.CacheException; -import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.SB; -import org.h2.command.dml.Query; -import org.h2.command.dml.Select; -import org.h2.command.dml.SelectUnion; -import org.h2.expression.Comparison; -import org.h2.expression.Expression; -import org.h2.expression.ExpressionColumn; -import org.h2.index.IndexCondition; -import org.h2.index.ViewIndex; -import org.h2.table.Column; -import org.h2.table.IndexColumn; -import org.h2.table.SubQueryInfo; -import org.h2.table.Table; -import org.h2.table.TableFilter; -import org.h2.table.TableView; - -/** - * Collocation model for a query. - */ -public final class GridH2CollocationModel { - /** */ - public static final int MULTIPLIER_COLLOCATED = 1; - - /** */ - private static final int MULTIPLIER_UNICAST = 50; - - /** */ - private static final int MULTIPLIER_BROADCAST = 200; - - /** */ - private static final int MULTIPLIER_REPLICATED_NOT_LAST = 10_000; - - /** */ - private final GridH2CollocationModel upper; - - /** */ - private final int filter; - - /** */ - private final boolean view; - - /** */ - private int multiplier; - - /** */ - private Type type; - - /** */ - private GridH2CollocationModel[] children; - - /** */ - private TableFilter[] childFilters; - - /** */ - private List<GridH2CollocationModel> unions; - - /** */ - private Select select; - - /** */ - private final boolean validate; - - /** - * @param upper Upper. - * @param filter Filter. - * @param view This model will be a subquery (or top level query) and must contain child filters. - * @param validate Query validation flag. - */ - private GridH2CollocationModel(GridH2CollocationModel upper, int filter, boolean view, boolean validate) { - this.upper = upper; - this.filter = filter; - this.view = view; - this.validate = validate; - } - - /** - * @return Table filter for this collocation model. - */ - private TableFilter filter() { - return upper == null ? null : upper.childFilters[filter]; - } - - /** {@inheritDoc} */ - @Override public String toString() { - calculate(); - - SB b = new SB(); - - for (int lvl = 0; lvl < 20; lvl++) { - if (!toString(b, lvl)) - break; - - b.a('\n'); - } - - return b.toString(); - } - - /** - * @param b String builder. - * @param lvl Depth level. - */ - private boolean toString(SB b, int lvl) { - boolean res = false; - - if (lvl == 0) { - TableFilter f = filter(); - String tblAlias = f == null ? "^" : f.getTableAlias(); - - b.a("[tbl=").a(tblAlias).a(", type=").a(type).a(", mul=").a(multiplier).a("]"); - - res = true; - } - else if (childFilters != null) { - assert lvl > 0; - - lvl--; - - for (int i = 0; i < childFilters.length; i++) { - if (lvl == 0) - b.a(" | "); - - res |= child(i, true).toString(b, lvl); - } - - if (lvl == 0) - b.a(" | "); - } - - return res; - } - - /** - * @param upper Upper. - * @param filter Filter. - * @param unions Unions. - * @param view This model will be a subquery (or top level query) and must contain child filters. - * @param validate Query validation flag. - * @return Created child collocation model. - */ - private static GridH2CollocationModel createChildModel(GridH2CollocationModel upper, - int filter, - List<GridH2CollocationModel> unions, - boolean view, - boolean validate) { - GridH2CollocationModel child = new GridH2CollocationModel(upper, filter, view, validate); - - if (unions != null) { - // Bind created child to unions. - assert upper == null || upper.child(filter, false) != null || unions.isEmpty(); - - if (upper != null && unions.isEmpty()) { - assert upper.child(filter, false) == null; - - upper.children[filter] = child; - } - - unions.add(child); - - child.unions = unions; - } - else if (upper != null) { - // Bind created child to upper model. - assert upper.child(filter, false) == null; - - upper.children[filter] = child; - } - - return child; - } - - /** - * @param childFilters New child filters. - * @return {@code true} If child filters were updated. - */ - private boolean childFilters(TableFilter[] childFilters) { - assert childFilters != null; - assert view; - - Select select = childFilters[0].getSelect(); - - assert this.select == null || this.select == select; - - if (this.select == null) { - this.select = select; - - assert this.childFilters == null; - } - else if (Arrays.equals(this.childFilters, childFilters)) - return false; - - if (this.childFilters == null) { - // We have to clone because H2 reuses array and reorders elements. - this.childFilters = childFilters.clone(); - - children = new GridH2CollocationModel[childFilters.length]; - } - else { - assert this.childFilters.length == childFilters.length; - - // We have to copy because H2 reuses array and reorders elements. - System.arraycopy(childFilters, 0, this.childFilters, 0, childFilters.length); - - Arrays.fill(children, null); - } - - // Reset results. - type = null; - multiplier = 0; - - return true; - } - - /** - * Do the needed calculations. - */ - @SuppressWarnings("ConstantConditions") - private void calculate() { - if (type != null) - return; - - if (view) { // We are at (sub-)query model. - assert childFilters != null; - - boolean collocated = true; - boolean partitioned = false; - int maxMultiplier = MULTIPLIER_COLLOCATED; - - for (int i = 0; i < childFilters.length; i++) { - GridH2CollocationModel child = child(i, true); - - Type t = child.type(true); - - if (child.multiplier == MULTIPLIER_REPLICATED_NOT_LAST) - maxMultiplier = child.multiplier; - - if (t.isPartitioned()) { - partitioned = true; - - if (!t.isCollocated()) { - collocated = false; - - int m = child.multiplier(true); - - if (m > maxMultiplier) { - maxMultiplier = m; - - if (maxMultiplier == MULTIPLIER_REPLICATED_NOT_LAST) - break; - } - } - } - } - - type = Type.of(partitioned, collocated); - multiplier = maxMultiplier; - } - else { - assert upper != null; - assert childFilters == null; - - // We are at table instance. - Table tbl = filter().getTable(); - - // Only partitioned tables will do distributed joins. - if (!(tbl instanceof GridH2Table) || !((GridH2Table)tbl).isPartitioned()) { - type = Type.REPLICATED; - multiplier = MULTIPLIER_COLLOCATED; - - return; - } - - // If we are the first partitioned table in a join, then we are "base" for all the rest partitioned tables - // which will need to get remote result (if there is no affinity condition). Since this query is broadcasted - // to all the affinity nodes the "base" does not need to get remote results. - if (!upper.findPartitionedTableBefore(filter)) { - type = Type.PARTITIONED_COLLOCATED; - multiplier = MULTIPLIER_COLLOCATED; - } - else { - // It is enough to make sure that our previous join by affinity key is collocated, then we are - // collocated. If we at least have affinity key condition, then we do unicast which is cheaper. - switch (upper.joinedWithCollocated(filter)) { - case COLLOCATED_JOIN: - type = Type.PARTITIONED_COLLOCATED; - multiplier = MULTIPLIER_COLLOCATED; - - break; - - case HAS_AFFINITY_CONDITION: - type = Type.PARTITIONED_NOT_COLLOCATED; - multiplier = MULTIPLIER_UNICAST; - - break; - - case NONE: - type = Type.PARTITIONED_NOT_COLLOCATED; - multiplier = MULTIPLIER_BROADCAST; - - break; - - default: - throw new IllegalStateException(); - } - } - - if (upper.previousReplicated(filter)) - multiplier = MULTIPLIER_REPLICATED_NOT_LAST; - } - } - - /** - * @param f Current filter. - * @return {@code true} If partitioned table was found. - */ - private boolean findPartitionedTableBefore(int f) { - for (int i = 0; i < f; i++) { - GridH2CollocationModel child = child(i, true); - - // The c can be null if it is not a GridH2Table and not a sub-query, - // it is a some kind of function table or anything else that considered replicated. - if (child != null && child.type(true).isPartitioned()) - return true; - } - - // We have to search globally in upper queries as well. - return upper != null && upper.findPartitionedTableBefore(filter); - } - - /** - * @param f Current filter. - * @return {@code true} If previous table is REPLICATED. - */ - @SuppressWarnings("SimplifiableIfStatement") - private boolean previousReplicated(int f) { - if (f > 0 && child(f - 1, true).type(true) == Type.REPLICATED) - return true; - - return upper != null && upper.previousReplicated(filter); - } - - /** - * @param f Filter. - * @return Affinity join type. - */ - @SuppressWarnings("ForLoopReplaceableByForEach") - private Affinity joinedWithCollocated(int f) { - TableFilter tf = childFilters[f]; - - GridH2Table tbl = (GridH2Table)tf.getTable(); - - if (validate) { - if (tbl.isCustomAffinityMapper()) - throw customAffinityError(tbl.cacheName()); - - if (F.isEmpty(tf.getIndexConditions())) { - throw new CacheException("Failed to prepare distributed join query: " + - "join condition does not use index [joinedCache=" + tbl.cacheName() + - ", plan=" + tf.getSelect().getPlanSQL() + ']'); - } - } - - IndexColumn affCol = tbl.getAffinityKeyColumn(); - - boolean affKeyCondFound = false; - - if (affCol != null) { - ArrayList<IndexCondition> idxConditions = tf.getIndexConditions(); - - int affColId = affCol.column.getColumnId(); - - for (int i = 0; i < idxConditions.size(); i++) { - IndexCondition c = idxConditions.get(i); - int colId = c.getColumn().getColumnId(); - int cmpType = c.getCompareType(); - - if ((cmpType == Comparison.EQUAL || cmpType == Comparison.EQUAL_NULL_SAFE) && - (colId == affColId || tbl.rowDescriptor().isKeyColumn(colId)) && c.isEvaluatable()) { - affKeyCondFound = true; - - Expression exp = c.getExpression(); - exp = exp.getNonAliasExpression(); - - if (exp instanceof ExpressionColumn) { - ExpressionColumn expCol = (ExpressionColumn)exp; - - // This is one of our previous joins. - TableFilter prevJoin = expCol.getTableFilter(); - - if (prevJoin != null) { - GridH2CollocationModel cm = child(indexOf(prevJoin), true); - - // If the previous joined model is a subquery (view), we can not be sure that - // the found affinity column is the needed one, since we can select multiple - // different affinity columns from different tables. - if (cm != null && !cm.view) { - Type t = cm.type(true); - - if (t.isPartitioned() && t.isCollocated() && isAffinityColumn(prevJoin, expCol, validate)) - return Affinity.COLLOCATED_JOIN; - } - } - } - } - } - } - - return affKeyCondFound ? Affinity.HAS_AFFINITY_CONDITION : Affinity.NONE; - } - - /** - * @param f Table filter. - * @return Index. - */ - private int indexOf(TableFilter f) { - for (int i = 0; i < childFilters.length; i++) { - if (childFilters[i] == f) - return i; - } - - throw new IllegalStateException(); - } - - /** - * @param f Table filter. - * @param expCol Expression column. - * @param validate Query validation flag. - * @return {@code true} It it is an affinity column. - */ - private static boolean isAffinityColumn(TableFilter f, ExpressionColumn expCol, boolean validate) { - Column col = expCol.getColumn(); - - if (col == null) - return false; - - Table t = col.getTable(); - - if (t.isView()) { - Query qry; - - if (f.getIndex() != null) - qry = getSubQuery(f); - else - qry = GridSqlQueryParser.VIEW_QUERY.get((TableView)t); - - return isAffinityColumn(qry, expCol, validate); - } - - if (t instanceof GridH2Table) { - GridH2Table t0 = (GridH2Table)t; - - if (validate && t0.isCustomAffinityMapper()) - throw customAffinityError((t0).cacheName()); - - IndexColumn affCol = t0.getAffinityKeyColumn(); - - return affCol != null && col.getColumnId() == affCol.column.getColumnId(); - } - - return false; - } - - /** - * @param qry Query. - * @param expCol Expression column. - * @param validate Query validation flag. - * @return {@code true} It it is an affinity column. - */ - private static boolean isAffinityColumn(Query qry, ExpressionColumn expCol, boolean validate) { - if (qry.isUnion()) { - SelectUnion union = (SelectUnion)qry; - - return isAffinityColumn(union.getLeft(), expCol, validate) && isAffinityColumn(union.getRight(), expCol, validate); - } - - Expression exp = qry.getExpressions().get(expCol.getColumn().getColumnId()).getNonAliasExpression(); - - if (exp instanceof ExpressionColumn) { - expCol = (ExpressionColumn)exp; - - return isAffinityColumn(expCol.getTableFilter(), expCol, validate); - } - - return false; - } - - /** - * @return Multiplier. - */ - public int calculateMultiplier() { - // We don't need multiplier for union here because it will be summarized in H2. - return multiplier(false); - } - - /** - * @param withUnion With respect to union. - * @return Multiplier. - */ - @SuppressWarnings("ForLoopReplaceableByForEach") - private int multiplier(boolean withUnion) { - calculate(); - - assert multiplier != 0; - - if (withUnion && unions != null) { - int maxMultiplier = 0; - - for (int i = 0; i < unions.size(); i++) { - int m = unions.get(i).multiplier(false); - - if (m > maxMultiplier) - maxMultiplier = m; - } - - return maxMultiplier; - } - - return multiplier; - } - - /** - * @param withUnion With respect to union. - * @return Type. - */ - private Type type(boolean withUnion) { - calculate(); - - assert type != null; - - if (withUnion && unions != null) { - Type left = unions.get(0).type(false); - - for (int i = 1; i < unions.size(); i++) { - Type right = unions.get(i).type(false); - - if (!left.isCollocated() || !right.isCollocated()) { - left = Type.PARTITIONED_NOT_COLLOCATED; - - break; - } - else if (!left.isPartitioned() && !right.isPartitioned()) - left = Type.REPLICATED; - else - left = Type.PARTITIONED_COLLOCATED; - } - - return left; - } - - return type; - } - - /** - * @param i Index. - * @param create Create child if needed. - * @return Child collocation. - */ - private GridH2CollocationModel child(int i, boolean create) { - GridH2CollocationModel child = children[i]; - - if (child == null && create) { - TableFilter f = childFilters[i]; - - if (f.getTable().isView()) { - if (f.getIndex() == null) { - // If we don't have view index yet, then we just creating empty model and it must be filled later. - child = createChildModel(this, i, null, true, validate); - } - else - child = buildCollocationModel(this, i, getSubQuery(f), null, validate); - } - else - child = createChildModel(this, i, null, false, validate); - - assert child != null; - assert children[i] == child; - } - - return child; - } - - /** - * @param f Table filter. - * @return Sub-query. - */ - private static Query getSubQuery(TableFilter f) { - return ((ViewIndex)f.getIndex()).getQuery(); - } - - /** - * @return Unions list. - */ - private List<GridH2CollocationModel> getOrCreateUnions() { - if (unions == null) { - unions = new ArrayList<>(4); - - unions.add(this); - } - - return unions; - } - - /** - * @param qctx Query context. - * @param info Sub-query info. - * @param filters Filters. - * @param filter Filter. - * @param validate Query validation flag. - * @return Collocation. - */ - public static GridH2CollocationModel buildCollocationModel(GridH2QueryContext qctx, SubQueryInfo info, - TableFilter[] filters, int filter, boolean validate) { - GridH2CollocationModel cm; - - if (info != null) { - // Go up until we reach the root query. - cm = buildCollocationModel(qctx, info.getUpper(), info.getFilters(), info.getFilter(), validate); - } - else { - // We are at the root query. - cm = qctx.queryCollocationModel(); - - if (cm == null) { - cm = createChildModel(null, -1, null, true, validate); - - qctx.queryCollocationModel(cm); - } - } - - if (filters == null) - return cm; - - assert cm.view; - - Select select = filters[0].getSelect(); - - // Handle union. We have to rely on fact that select will be the same on uppermost select. - // For sub-queries we will drop collocation models, so that they will be recalculated anyways. - if (cm.select != null && cm.select != select) { - List<GridH2CollocationModel> unions = cm.getOrCreateUnions(); - - // Try to find this select in existing unions. - // Start with 1 because at 0 it always will be c. - for (int i = 1; i < unions.size(); i++) { - GridH2CollocationModel u = unions.get(i); - - if (u.select == select) { - cm = u; - - break; - } - } - - // Nothing was found, need to create new child in union. - if (cm.select != select) - cm = createChildModel(cm.upper, cm.filter, unions, true, validate); - } - - cm.childFilters(filters); - - return cm.child(filter, true); - } - - /** - * @param qry Query. - * @return {@code true} If the query is collocated. - */ - public static boolean isCollocated(Query qry) { - GridH2CollocationModel mdl = buildCollocationModel(null, -1, qry, null, true); - - Type type = mdl.type(true); - - if (!type.isCollocated() && mdl.multiplier == MULTIPLIER_REPLICATED_NOT_LAST) - throw new CacheException("Failed to execute query: for distributed join " + - "all REPLICATED caches must be at the end of the joined tables list."); - - return type.isCollocated(); - } - - /** - * @param upper Upper. - * @param filter Filter. - * @param qry Query. - * @param unions Unions. - * @param validate Query validation flag. - * @return Built model. - */ - private static GridH2CollocationModel buildCollocationModel(GridH2CollocationModel upper, - int filter, - Query qry, - List<GridH2CollocationModel> unions, - boolean validate) { - if (qry.isUnion()) { - if (unions == null) - unions = new ArrayList<>(); - - SelectUnion union = (SelectUnion)qry; - - GridH2CollocationModel left = buildCollocationModel(upper, filter, union.getLeft(), unions, validate); - GridH2CollocationModel right = buildCollocationModel(upper, filter, union.getRight(), unions, validate); - - assert left != null; - assert right != null; - - return upper != null ? upper : left; - } - - Select select = (Select)qry; - - List<TableFilter> list = new ArrayList<>(); - - for (TableFilter f = select.getTopTableFilter(); f != null; f = f.getJoin()) - list.add(f); - - TableFilter[] filters = list.toArray(new TableFilter[list.size()]); - - GridH2CollocationModel cm = createChildModel(upper, filter, unions, true, validate); - - cm.childFilters(filters); - - for (int i = 0; i < filters.length; i++) { - TableFilter f = filters[i]; - - if (f.getTable().isView()) - buildCollocationModel(cm, i, getSubQuery(f), null, validate); - else if (f.getTable() instanceof GridH2Table) - createChildModel(cm, i, null, false, validate); - } - - return upper != null ? upper : cm; - } - - /** - * @param cacheName Cache name. - * @return Error. - */ - private static CacheException customAffinityError(String cacheName) { - return new CacheException("Failed to prepare distributed join query: can not use distributed joins for cache " + - "with custom AffinityKeyMapper configured. " + - "Please use AffinityKeyMapped annotation instead [cache=" + cacheName + ']'); - } - - /** - * Collocation type. - */ - private enum Type { - /** */ - PARTITIONED_COLLOCATED(true, true), - - /** */ - PARTITIONED_NOT_COLLOCATED(true, false), - - /** */ - REPLICATED(false, true); - - /** */ - private final boolean partitioned; - - /** */ - private final boolean collocated; - - /** - * @param partitioned Partitioned. - * @param collocated Collocated. - */ - Type(boolean partitioned, boolean collocated) { - this.partitioned = partitioned; - this.collocated = collocated; - } - - /** - * @return {@code true} If partitioned. - */ - public boolean isPartitioned() { - return partitioned; - } - - /** - * @return {@code true} If collocated. - */ - public boolean isCollocated() { - return collocated; - } - - /** - * @param partitioned Partitioned. - * @param collocated Collocated. - * @return Type. - */ - static Type of(boolean partitioned, boolean collocated) { - if (collocated) - return partitioned ? Type.PARTITIONED_COLLOCATED : Type.REPLICATED; - - assert partitioned; - - return Type.PARTITIONED_NOT_COLLOCATED; - } - } - - /** - * Affinity of a table relative to previous joined tables. - */ - private enum Affinity { - /** */ - NONE, - - /** */ - HAS_AFFINITY_CONDITION, - - /** */ - COLLOCATED_JOIN - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/53543733/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java index 6c45c29..9547b5f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java @@ -18,18 +18,23 @@ package org.apache.ignite.internal.processors.query.h2.opt; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTopic; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; import org.apache.ignite.internal.processors.query.h2.H2Cursor; +import org.apache.ignite.internal.processors.query.h2.H2Utils; +import org.apache.ignite.internal.processors.query.h2.opt.join.CursorIteratorWrapper; +import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedLookupBatch; +import org.apache.ignite.internal.processors.query.h2.opt.join.CollocationModel; +import org.apache.ignite.internal.processors.query.h2.opt.join.RangeSource; +import org.apache.ignite.internal.processors.query.h2.opt.join.RangeStream; +import org.apache.ignite.internal.processors.query.h2.opt.join.SegmentKey; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage; @@ -41,15 +46,12 @@ import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.IgniteTree; import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.typedef.CIX2; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.logger.NullLogger; import org.apache.ignite.plugin.extensions.communication.Message; import org.h2.engine.Session; import org.h2.index.BaseIndex; -import org.h2.index.Cursor; import org.h2.index.IndexCondition; import org.h2.index.IndexLookupBatch; import org.h2.index.ViewIndex; @@ -58,40 +60,26 @@ import org.h2.result.Row; import org.h2.result.SearchRow; import org.h2.table.IndexColumn; import org.h2.table.TableFilter; -import org.h2.util.DoneFuture; import org.h2.value.Value; -import org.h2.value.ValueNull; -import org.jetbrains.annotations.Nullable; import javax.cache.CacheException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import static java.util.Collections.emptyIterator; import static java.util.Collections.singletonList; -import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.LOCAL_ONLY; -import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF; -import static org.apache.ignite.internal.processors.query.h2.opt.GridH2CollocationModel.buildCollocationModel; +import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF; +import static org.apache.ignite.internal.processors.query.h2.opt.join.CollocationModel.buildCollocationModel; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE; -import static org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor.COL_NOT_EXISTS; import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_ERROR; import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_NOT_FOUND; import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_OK; -import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds.rangeBounds; import static org.h2.result.Row.MEMORY_CALCULATE; /** @@ -99,7 +87,7 @@ import static org.h2.result.Row.MEMORY_CALCULATE; */ public abstract class GridH2IndexBase extends BaseIndex { /** */ - private static final Object EXPLICIT_NULL = new Object(); + public static final Object EXPLICIT_NULL = new Object(); /** */ private Object msgTopic; @@ -112,7 +100,7 @@ public abstract class GridH2IndexBase extends BaseIndex { /** */ private final CIX2<ClusterNode,Message> locNodeHnd = new CIX2<ClusterNode,Message>() { - @Override public void applyx(ClusterNode clusterNode, Message msg) throws IgniteCheckedException { + @Override public void applyx(ClusterNode clusterNode, Message msg) { onMessage0(clusterNode.id(), msg); } }; @@ -122,6 +110,7 @@ public abstract class GridH2IndexBase extends BaseIndex { /** * @param tbl Table. */ + @SuppressWarnings("MapReplaceableByEnumMap") protected final void initDistributedJoinMessaging(GridH2Table tbl) { final GridH2RowDescriptor desc = tbl.rowDescriptor(); @@ -248,7 +237,7 @@ public abstract class GridH2IndexBase extends BaseIndex { // Query expressions can not be distributed as well. if (qctx == null || qctx.type() != PREPARE || qctx.distributedJoinMode() == OFF || !ses.isJoinBatchEnabled() || ses.isPreparingQueryExpression()) - return GridH2CollocationModel.MULTIPLIER_COLLOCATED; + return CollocationModel.MULTIPLIER_COLLOCATED; // We have to clear this cache because normally sub-query plan cost does not depend on anything // other than index condition masks and sort order, but in our case it can depend on order @@ -257,7 +246,7 @@ public abstract class GridH2IndexBase extends BaseIndex { assert filters != null; - GridH2CollocationModel c = buildCollocationModel(qctx, ses.getSubQueryInfo(), filters, filter, false); + CollocationModel c = buildCollocationModel(qctx, ses.getSubQueryInfo(), filters, filter, false); return c.calculateMultiplier(); } @@ -327,7 +316,7 @@ public abstract class GridH2IndexBase extends BaseIndex { GridCacheContext<?, ?> cctx = getTable().rowDescriptor().context(); - return new DistributedLookupBatch(cctx, ucast, affColId); + return new DistributedLookupBatch(this, cctx, ucast, affColId); } /** {@inheritDoc} */ @@ -344,7 +333,7 @@ public abstract class GridH2IndexBase extends BaseIndex { * @param nodes Nodes. * @param msg Message. */ - private void send(Collection<ClusterNode> nodes, Message msg) { + public void send(Collection<ClusterNode> nodes, Message msg) { if (!getTable().rowDescriptor().indexing().send(msgTopic, -1, nodes, @@ -353,7 +342,7 @@ public abstract class GridH2IndexBase extends BaseIndex { locNodeHnd, GridIoPolicy.IDX_POOL, false)) - throw retryException("Failed to send message to nodes: " + nodes); + throw H2Utils.retryException("Failed to send message to nodes: " + nodes); } /** @@ -413,7 +402,7 @@ public abstract class GridH2IndexBase extends BaseIndex { // This is the first request containing all the search rows. assert !msg.bounds().isEmpty() : "empty bounds"; - src = new RangeSource(msg.bounds(), msg.segment(), filter(qctx)); + src = new RangeSource(this, msg.bounds(), msg.segment(), filter(qctx)); } else { // This is request to fetch next portion of data. @@ -498,197 +487,6 @@ public abstract class GridH2IndexBase extends BaseIndex { } /** - * @param v1 First value. - * @param v2 Second value. - * @return {@code true} If they equal. - */ - private boolean equal(Value v1, Value v2) { - return v1 == v2 || (v1 != null && v2 != null && v1.compareTypeSafe(v2, getDatabase().getCompareMode()) == 0); - } - - /** - * @param qctx Query context. - * @param batchLookupId Batch lookup ID. - * @param segmentId Segment ID. - * @return Index range request. - */ - private static GridH2IndexRangeRequest createRequest(GridH2QueryContext qctx, int batchLookupId, int segmentId) { - GridH2IndexRangeRequest req = new GridH2IndexRangeRequest(); - - req.originNodeId(qctx.originNodeId()); - req.queryId(qctx.queryId()); - req.originSegmentId(qctx.segment()); - req.segment(segmentId); - req.batchLookupId(batchLookupId); - - return req; - } - - - /** - * @param qctx Query context. - * @param cctx Cache context. - * @param isLocalQry Local query flag. - * @return Collection of nodes for broadcasting. - */ - private List<SegmentKey> broadcastSegments(GridH2QueryContext qctx, GridCacheContext<?, ?> cctx, boolean isLocalQry) { - Map<UUID, int[]> partMap = qctx.partitionsMap(); - - List<ClusterNode> nodes; - - if (isLocalQry) { - if (partMap != null && !partMap.containsKey(cctx.localNodeId())) - return Collections.emptyList(); // Prevent remote index call for local queries. - - nodes = Collections.singletonList(cctx.localNode()); - } - else { - if (partMap == null) - nodes = new ArrayList<>(CU.affinityNodes(cctx, qctx.topologyVersion())); - else { - nodes = new ArrayList<>(partMap.size()); - - GridKernalContext ctx = kernalContext(); - - for (UUID nodeId : partMap.keySet()) { - ClusterNode node = ctx.discovery().node(nodeId); - - if (node == null) - throw retryException("Failed to get node by ID during broadcast [nodeId=" + nodeId + ']'); - - nodes.add(node); - } - } - - if (F.isEmpty(nodes)) - throw retryException("Failed to collect affinity nodes during broadcast [" + - "cacheName=" + cctx.name() + ']'); - } - - int segmentsCount = segmentsCount(); - - List<SegmentKey> res = new ArrayList<>(nodes.size() * segmentsCount); - - for (ClusterNode node : nodes) { - for (int seg = 0; seg < segmentsCount; seg++) - res.add(new SegmentKey(node, seg)); - } - - return res; - } - - /** - * @param cctx Cache context. - * @param qctx Query context. - * @param affKeyObj Affinity key. - * @param isLocalQry Local query flag. - * @return Segment key for Affinity key. - */ - private SegmentKey rangeSegment(GridCacheContext<?, ?> cctx, GridH2QueryContext qctx, Object affKeyObj, boolean isLocalQry) { - assert affKeyObj != null && affKeyObj != EXPLICIT_NULL : affKeyObj; - - ClusterNode node; - - int partition = cctx.affinity().partition(affKeyObj); - - if (isLocalQry) { - if (qctx.partitionsMap() != null) { - // If we have explicit partitions map, we have to use it to calculate affinity node. - UUID nodeId = qctx.nodeForPartition(partition, cctx); - - if(!cctx.localNodeId().equals(nodeId)) - return null; // Prevent remote index call for local queries. - } - - if (!cctx.affinity().primaryByKey(cctx.localNode(), partition, qctx.topologyVersion())) - return null; - - node = cctx.localNode(); - } - else{ - if (qctx.partitionsMap() != null) { - // If we have explicit partitions map, we have to use it to calculate affinity node. - UUID nodeId = qctx.nodeForPartition(partition, cctx); - - node = cctx.discovery().node(nodeId); - } - else // Get primary node for current topology version. - node = cctx.affinity().primaryByKey(affKeyObj, qctx.topologyVersion()); - - if (node == null) // Node was not found, probably topology changed and we need to retry the whole query. - throw retryException("Failed to get primary node by key for range segment."); - } - - return new SegmentKey(node, segmentForPartition(partition)); - } - - /** */ - protected class SegmentKey { - /** */ - final ClusterNode node; - - /** */ - final int segmentId; - - SegmentKey(ClusterNode node, int segmentId) { - assert node != null; - - this.node = node; - this.segmentId = segmentId; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - - SegmentKey key = (SegmentKey)o; - - return segmentId == key.segmentId && node.id().equals(key.node.id()); - - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int result = node.hashCode(); - result = 31 * result + segmentId; - return result; - } - } - - /** - * @param row Row. - * @return Row message. - */ - private GridH2RowMessage toRowMessage(Row row) { - if (row == null) - return null; - - int cols = row.getColumnCount(); - - assert cols > 0 : cols; - - List<GridH2ValueMessage> vals = new ArrayList<>(cols); - - for (int i = 0; i < cols; i++) { - try { - vals.add(GridH2ValueMessageFactory.toMessage(row.getValue(i))); - } - catch (IgniteCheckedException e) { - throw new CacheException(e); - } - } - - GridH2RowMessage res = new GridH2RowMessage(); - - res.values(vals); - - return res; - } - - /** * @param msg Row message. * @return Search row. */ @@ -723,7 +521,7 @@ public abstract class GridH2IndexBase extends BaseIndex { * @param row Search row. * @return Row message. */ - private GridH2RowMessage toSearchRowMessage(SearchRow row) { + public GridH2RowMessage toSearchRowMessage(SearchRow row) { if (row == null) return null; @@ -766,41 +564,15 @@ public abstract class GridH2IndexBase extends BaseIndex { } /** - * @param msg Message. - * @return Row. + * @return Index segments count. */ - private Row toRow(GridH2RowMessage msg) { - if (msg == null) - return null; - - GridKernalContext ctx = kernalContext(); - - List<GridH2ValueMessage> vals = msg.values(); - - assert !F.isEmpty(vals) : vals; - - Value[] vals0 = new Value[vals.size()]; - - for (int i = 0; i < vals0.length; i++) { - try { - vals0[i] = vals.get(i).value(ctx); - } - catch (IgniteCheckedException e) { - throw new CacheException(e); - } - } - - return database.createRow(vals0, MEMORY_CALCULATE); - } - - /** @return Index segments count. */ - protected abstract int segmentsCount(); + public abstract int segmentsCount(); /** * @param partition Partition idx. * @return Segment ID for given key */ - protected int segmentForPartition(int partition){ + public int segmentForPartition(int partition){ return segmentsCount() == 1 ? 0 : (partition % segmentsCount()); } @@ -808,6 +580,7 @@ public abstract class GridH2IndexBase extends BaseIndex { * @param row Table row. * @return Segment ID for given row. */ + @SuppressWarnings("IfMayBeConditional") protected int segmentForRow(SearchRow row) { assert row != null; @@ -831,722 +604,33 @@ public abstract class GridH2IndexBase extends BaseIndex { } /** - * Simple cursor from a single node. - */ - private static class UnicastCursor implements Cursor { - /** */ - final int rangeId; - - /** */ - RangeStream stream; - - /** - * @param rangeId Range ID. - * @param keys Remote index segment keys. - * @param rangeStreams Range streams. - */ - UnicastCursor(int rangeId, List<SegmentKey> keys, Map<SegmentKey, RangeStream> rangeStreams) { - assert keys.size() == 1; - - this.rangeId = rangeId; - this.stream = rangeStreams.get(F.first(keys)); - - assert stream != null; - } - - /** {@inheritDoc} */ - @Override public boolean next() { - return stream.next(rangeId); - } - - /** {@inheritDoc} */ - @Override public Row get() { - return stream.get(rangeId); - } - - /** {@inheritDoc} */ - @Override public SearchRow getSearchRow() { - return get(); - } - - /** {@inheritDoc} */ - @Override public boolean previous() { - throw new UnsupportedOperationException(); - } - } - - /** - * Merge cursor from multiple nodes. - */ - private class BroadcastCursor implements Cursor, Comparator<RangeStream> { - /** */ - final int rangeId; - - /** */ - final RangeStream[] streams; - - /** */ - boolean first = true; - - /** */ - int off; - - /** - * @param rangeId Range ID. - * @param segmentKeys Remote nodes. - * @param rangeStreams Range streams. - */ - BroadcastCursor(int rangeId, Collection<SegmentKey> segmentKeys, Map<SegmentKey, RangeStream> rangeStreams) { - - this.rangeId = rangeId; - - streams = new RangeStream[segmentKeys.size()]; - - int i = 0; - - for (SegmentKey segmentKey : segmentKeys) { - RangeStream stream = rangeStreams.get(segmentKey); - - assert stream != null; - - streams[i++] = stream; - } - } - - /** {@inheritDoc} */ - @Override public int compare(RangeStream o1, RangeStream o2) { - if (o1 == o2) - return 0; - - // Nulls are at the beginning of array. - if (o1 == null) - return -1; - - if (o2 == null) - return 1; - - return compareRows(o1.get(rangeId), o2.get(rangeId)); - } - - /** - * Try to fetch the first row. - * - * @return {@code true} If we were able to find at least one row. - */ - private boolean goFirst() { - // Fetch first row from all the streams and sort them. - for (int i = 0; i < streams.length; i++) { - if (!streams[i].next(rangeId)) { - streams[i] = null; - off++; // After sorting this offset will cut off all null elements at the beginning of array. - } - } - - if (off == streams.length) - return false; - - Arrays.sort(streams, this); - - return true; - } - - /** - * Fetch next row. - * - * @return {@code true} If we were able to find at least one row. - */ - private boolean goNext() { - assert off != streams.length; - - if (!streams[off].next(rangeId)) { - // Next row from current min stream was not found -> nullify that stream and bump offset forward. - streams[off] = null; - - return ++off != streams.length; - } - - // Bubble up current min stream with respect to fetched row to achieve correct sort order of streams. - bubbleUp(streams, off, this); - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean next() { - if (first) { - first = false; - - return goFirst(); - } - - return goNext(); - } - - /** {@inheritDoc} */ - @Override public Row get() { - return streams[off].get(rangeId); - } - - /** {@inheritDoc} */ - @Override public SearchRow getSearchRow() { - return get(); - } - - /** {@inheritDoc} */ - @Override public boolean previous() { - throw new UnsupportedOperationException(); - } - } - - /** - * Index lookup batch. - */ - private class DistributedLookupBatch implements IndexLookupBatch { - /** */ - final GridCacheContext<?,?> cctx; - - /** */ - final boolean ucast; - - /** */ - final int affColId; - - /** */ - GridH2QueryContext qctx; - - /** */ - int batchLookupId; - - /** */ - Map<SegmentKey, RangeStream> rangeStreams = Collections.emptyMap(); - - /** */ - List<SegmentKey> broadcastSegments; - - /** */ - List<Future<Cursor>> res = Collections.emptyList(); - - /** */ - boolean batchFull; - - /** */ - boolean findCalled; - - /** - * @param cctx Cache Cache context. - * @param ucast Unicast or broadcast query. - * @param affColId Affinity column ID. - */ - DistributedLookupBatch(GridCacheContext<?, ?> cctx, boolean ucast, int affColId) { - this.cctx = cctx; - this.ucast = ucast; - this.affColId = affColId; - } - - /** - * @param firstRow First row. - * @param lastRow Last row. - * @return Affinity key or {@code null}. - */ - private Object getAffinityKey(SearchRow firstRow, SearchRow lastRow) { - if (affColId == COL_NOT_EXISTS) - return null; - - if (firstRow == null || lastRow == null) - return null; - - Value affKeyFirst = firstRow.getValue(affColId); - Value affKeyLast = lastRow.getValue(affColId); - - if (affKeyFirst != null && equal(affKeyFirst, affKeyLast)) - return affKeyFirst == ValueNull.INSTANCE ? EXPLICIT_NULL : affKeyFirst.getObject(); - - if (getTable().rowDescriptor().isKeyColumn(affColId)) - return null; - - // Try to extract affinity key from primary key. - Value pkFirst = firstRow.getValue(KEY_COL); - Value pkLast = lastRow.getValue(KEY_COL); - - if (pkFirst == ValueNull.INSTANCE || pkLast == ValueNull.INSTANCE) - return EXPLICIT_NULL; - - if (pkFirst == null || pkLast == null || !equal(pkFirst, pkLast)) - return null; - - Object pkAffKeyFirst = cctx.affinity().affinityKey(pkFirst.getObject()); - Object pkAffKeyLast = cctx.affinity().affinityKey(pkLast.getObject()); - - if (pkAffKeyFirst == null || pkAffKeyLast == null) - throw new CacheException("Cache key without affinity key."); - - if (pkAffKeyFirst.equals(pkAffKeyLast)) - return pkAffKeyFirst; - - return null; - } - - /** {@inheritDoc} */ - @SuppressWarnings("ForLoopReplaceableByForEach") - @Override public boolean addSearchRows(SearchRow firstRow, SearchRow lastRow) { - if (qctx == null || findCalled) { - if (qctx == null) { - // It is the first call after query begin (may be after reuse), - // reinitialize query context and result. - qctx = GridH2QueryContext.get(); - res = new ArrayList<>(); - - assert qctx != null; - assert !findCalled; - } - else { - // Cleanup after the previous lookup phase. - assert batchLookupId != 0; - - findCalled = false; - qctx.putStreams(batchLookupId, null); - res.clear(); - } - - // Reinitialize for the next lookup phase. - batchLookupId = qctx.nextBatchLookupId(); - rangeStreams = new HashMap<>(); - } - - Object affKey = getAffinityKey(firstRow, lastRow); - - boolean locQry = localQuery(); - - List<SegmentKey> segmentKeys; - - if (affKey != null) { - // Affinity key is provided. - if (affKey == EXPLICIT_NULL) // Affinity key is explicit null, we will not find anything. - return false; - - segmentKeys = F.asList(rangeSegment(cctx, qctx, affKey, locQry)); - } - else { - // Affinity key is not provided or is not the same in upper and lower bounds, we have to broadcast. - if (broadcastSegments == null) - broadcastSegments = broadcastSegments(qctx, cctx, locQry); - - segmentKeys = broadcastSegments; - } - - if (locQry && segmentKeys.isEmpty()) - return false; // Nothing to do - - assert !F.isEmpty(segmentKeys) : segmentKeys; - - final int rangeId = res.size(); - - // Create messages. - GridH2RowMessage first = toSearchRowMessage(firstRow); - GridH2RowMessage last = toSearchRowMessage(lastRow); - - // Range containing upper and lower bounds. - GridH2RowRangeBounds rangeBounds = rangeBounds(rangeId, first, last); - - // Add range to every message of every participating node. - for (int i = 0; i < segmentKeys.size(); i++) { - SegmentKey segmentKey = segmentKeys.get(i); - assert segmentKey != null; - - RangeStream stream = rangeStreams.get(segmentKey); - - List<GridH2RowRangeBounds> bounds; - - if (stream == null) { - stream = new RangeStream(qctx, segmentKey.node); - - stream.req = createRequest(qctx, batchLookupId, segmentKey.segmentId); - stream.req.bounds(bounds = new ArrayList<>()); - - rangeStreams.put(segmentKey, stream); - } - else - bounds = stream.req.bounds(); - - bounds.add(rangeBounds); - - // If at least one node will have a full batch then we are ok. - if (bounds.size() >= qctx.pageSize()) - batchFull = true; - } - - Future<Cursor> fut = new DoneFuture<>(segmentKeys.size() == 1 ? - new UnicastCursor(rangeId, segmentKeys, rangeStreams) : - new BroadcastCursor(rangeId, segmentKeys, rangeStreams)); - - res.add(fut); - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean isBatchFull() { - return batchFull; - } - - /** - * @return {@code True} if local query execution is enforced. - */ - private boolean localQuery() { - assert qctx != null : "Missing query context: " + this; - - return qctx.distributedJoinMode() == LOCAL_ONLY; - } - - /** - * - */ - private void startStreams() { - if (rangeStreams.isEmpty()) { - assert res.isEmpty(); - - return; - } - - qctx.putStreams(batchLookupId, rangeStreams); - - // Start streaming. - for (RangeStream stream : rangeStreams.values()) - stream.start(); - } - - /** {@inheritDoc} */ - @Override public List<Future<Cursor>> find() { - batchFull = false; - findCalled = true; - - startStreams(); - - return res; - } - - /** {@inheritDoc} */ - @Override public void reset(boolean beforeQry) { - if (beforeQry || qctx == null) // Query context can be null if addSearchRows was never called. - return; - - assert batchLookupId != 0; - - // Do cleanup after the query run. - qctx.putStreams(batchLookupId, null); - qctx = null; // The same query can be reused multiple times for different query contexts. - batchLookupId = 0; - - rangeStreams = Collections.emptyMap(); - broadcastSegments = null; - batchFull = false; - findCalled = false; - res = Collections.emptyList(); - } - - /** {@inheritDoc} */ - @Override public String getPlanSQL() { - return ucast ? "unicast" : "broadcast"; - } - } - - /** - * Per node range stream. + * Find rows for the segments (distributed joins). + * + * @param bounds Bounds. + * @param segment Segment. + * @param filter Filter. + * @return Iterator. */ - private class RangeStream { - /** */ - final GridH2QueryContext qctx; - - /** */ - final ClusterNode node; - - /** */ - GridH2IndexRangeRequest req; - - /** */ - int remainingRanges; - - /** */ - final BlockingQueue<GridH2IndexRangeResponse> respQueue = new LinkedBlockingQueue<>(); - - /** */ - Iterator<GridH2RowRange> ranges = emptyIterator(); - - /** */ - Cursor cursor = GridH2Cursor.EMPTY; - - /** */ - int cursorRangeId = -1; - - /** - * @param qctx Query context. - * @param node Node. - */ - RangeStream(GridH2QueryContext qctx, ClusterNode node) { - this.node = node; - this.qctx = qctx; - } - - /** - * Start streaming. - */ - private void start() { - assert ctx != null; - assert log != null: getName(); - - remainingRanges = req.bounds().size(); - - assert remainingRanges > 0; - - if (log.isDebugEnabled()) - log.debug("Starting stream: [node=" + node + ", req=" + req + "]"); - - send(singletonList(node), req); - } - - /** - * @param msg Response. - */ - public void onResponse(GridH2IndexRangeResponse msg) { - respQueue.add(msg); - } - - /** - * @return Response. - */ - private GridH2IndexRangeResponse awaitForResponse() { - assert remainingRanges > 0; - - final long start = U.currentTimeMillis(); - - for (int attempt = 0;; attempt++) { - if (qctx.isCleared()) - throw retryException("Query is cancelled."); - - if (kernalContext().isStopping()) - throw retryException("Local node is stopping."); - - GridH2IndexRangeResponse res; - - try { - res = respQueue.poll(500, TimeUnit.MILLISECONDS); - } - catch (InterruptedException ignored) { - throw retryException("Interrupted while waiting for reply."); - } - - if (res != null) { - switch (res.status()) { - case STATUS_OK: - List<GridH2RowRange> ranges0 = res.ranges(); - - remainingRanges -= ranges0.size(); - - if (ranges0.get(ranges0.size() - 1).isPartial()) - remainingRanges++; - - if (remainingRanges > 0) { - if (req.bounds() != null) - req = createRequest(qctx, req.batchLookupId(), req.segment()); - - // Prefetch next page. - send(singletonList(node), req); - } - else - req = null; - - return res; - - case STATUS_NOT_FOUND: - if (req == null || req.bounds() == null) // We have already received the first response. - throw retryException("Failure on remote node."); - - if (U.currentTimeMillis() - start > 30_000) - throw retryException("Timeout reached."); - - try { - U.sleep(20 * attempt); - } - catch (IgniteInterruptedCheckedException e) { - throw new IgniteInterruptedException(e.getMessage()); - } - - // Retry to send the request once more after some time. - send(singletonList(node), req); - - break; - - case STATUS_ERROR: - throw new CacheException(res.error()); - - default: - throw new IllegalStateException(); - } - } - - if (!kernalContext().discovery().alive(node)) - throw retryException("Node has left topology: " + node.id()); - } - } - - /** - * @param rangeId Requested range ID. - * @return {@code true} If next row for the requested range was found. - */ - private boolean next(final int rangeId) { - for (;;) { - if (rangeId == cursorRangeId) { - if (cursor.next()) - return true; - } - else if (rangeId < cursorRangeId) - return false; - - cursor = GridH2Cursor.EMPTY; - - while (!ranges.hasNext()) { - if (remainingRanges == 0) { - ranges = emptyIterator(); - - return false; - } - - ranges = awaitForResponse().ranges().iterator(); - } - - GridH2RowRange range = ranges.next(); - - cursorRangeId = range.rangeId(); - - if (!F.isEmpty(range.rows())) { - final Iterator<GridH2RowMessage> it = range.rows().iterator(); - - if (it.hasNext()) { - cursor = new GridH2Cursor(new Iterator<Row>() { - @Override public boolean hasNext() { - return it.hasNext(); - } - - @Override public Row next() { - // Lazily convert messages into real rows. - return toRow(it.next()); - } + @SuppressWarnings("unchecked") + public Iterator<GridH2Row> findForSegment(GridH2RowRangeBounds bounds, int segment, + BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter) { + SearchRow first = toSearchRow(bounds.first()); + SearchRow last = toSearchRow(bounds.last()); - @Override public void remove() { - throw new UnsupportedOperationException(); - } - }); - } - } - } - } + IgniteTree t = treeForRead(segment); - /** - * @param rangeId Requested range ID. - * @return Current row. - */ - private Row get(int rangeId) { - assert rangeId == cursorRangeId; + try { + GridCursor<GridH2Row> range = ((BPlusTree)t).find(first, last, filter, null); - return cursor.get(); - } - } + if (range == null) + range = H2Utils.EMPTY_CURSOR; - /** - * Bounds iterator. - */ - private class RangeSource { - /** */ - Iterator<GridH2RowRangeBounds> boundsIter; - - /** */ - int curRangeId = -1; - - /** */ - private final int segment; - - /** */ - private final BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter; - - /** Iterator. */ - Iterator<GridH2Row> iter = emptyIterator(); - - /** - * @param bounds Bounds. - * @param segment Segment. - * @param filter Filter. - */ - RangeSource(Iterable<GridH2RowRangeBounds> bounds, int segment, BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter) { - this.segment = segment; - this.filter = filter; - boundsIter = bounds.iterator(); - } + H2Cursor cur = new H2Cursor(range); - /** - * @return {@code true} If there are more rows in this source. - */ - public boolean hasMoreRows() throws IgniteCheckedException { - return boundsIter.hasNext() || iter.hasNext(); + return new CursorIteratorWrapper(cur); } - - /** - * @param maxRows Max allowed rows. - * @return Range. - */ - public GridH2RowRange next(int maxRows) { - assert maxRows > 0 : maxRows; - - for (; ; ) { - if (iter.hasNext()) { - // Here we are getting last rows from previously partially fetched range. - List<GridH2RowMessage> rows = new ArrayList<>(); - - GridH2RowRange nextRange = new GridH2RowRange(); - - nextRange.rangeId(curRangeId); - nextRange.rows(rows); - - do { - rows.add(toRowMessage(iter.next())); - } - while (rows.size() < maxRows && iter.hasNext()); - - if (iter.hasNext()) - nextRange.setPartial(); - else - iter = emptyIterator(); - - return nextRange; - } - - iter = emptyIterator(); - - if (!boundsIter.hasNext()) { - boundsIter = emptyIterator(); - - return null; - } - - GridH2RowRangeBounds bounds = boundsIter.next(); - - curRangeId = bounds.rangeId(); - - SearchRow first = toSearchRow(bounds.first()); - SearchRow last = toSearchRow(bounds.last()); - - IgniteTree t = treeForRead(segment); - - iter = new CursorIteratorWrapper(doFind0(t, first, last, filter)); - - if (!iter.hasNext()) { - // We have to return empty range here. - GridH2RowRange emptyRange = new GridH2RowRange(); - - emptyRange.rangeId(curRangeId); - - return emptyRange; - } - } + catch (IgniteCheckedException e) { + throw DbException.convert(e); } } @@ -1559,21 +643,6 @@ public abstract class GridH2IndexBase extends BaseIndex { } /** - * @param t Tree. - * @param first Lower bound. - * @param last Upper bound always inclusive. - * @param filter Filter. - * @return Iterator over rows in given range. - */ - protected H2Cursor doFind0( - IgniteTree t, - @Nullable SearchRow first, - @Nullable SearchRow last, - BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter) { - throw new UnsupportedOperationException(); - } - - /** * Re-assign column ids after removal of column(s). */ public void refreshColumnIds() { @@ -1582,72 +651,4 @@ public abstract class GridH2IndexBase extends BaseIndex { for (int pos = 0; pos < columnIds.length; ++pos) columnIds[pos] = columns[pos].getColumnId(); } - - /** - * Create retry exception for distributed join. - * - * @param msg Message. - * @return Exception. - */ - private GridH2RetryException retryException(String msg) { - return new GridH2RetryException(msg); - } - - /** - * - */ - private static final class CursorIteratorWrapper implements Iterator<GridH2Row> { - /** */ - private final H2Cursor cursor; - - /** Next element. */ - private GridH2Row next; - - /** - * @param cursor Cursor. - */ - private CursorIteratorWrapper(H2Cursor cursor) { - assert cursor != null; - - this.cursor = cursor; - - if (cursor.next()) - next = (GridH2Row)cursor.get(); - } - - /** {@inheritDoc} */ - @Override public boolean hasNext() { - return next != null; - } - - /** {@inheritDoc} */ - @Override public GridH2Row next() { - GridH2Row res = next; - - if (cursor.next()) - next = (GridH2Row)cursor.get(); - else - next = null; - - return res; - } - - /** {@inheritDoc} */ - @Override public void remove() { - throw new UnsupportedOperationException("operation is not supported"); - } - } - - /** Empty cursor. */ - protected static final GridCursor<GridH2Row> EMPTY_CURSOR = new GridCursor<GridH2Row>() { - /** {@inheritDoc} */ - @Override public boolean next() { - return false; - } - - /** {@inheritDoc} */ - @Override public GridH2Row get() { - return null; - } - }; }