http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java new file mode 100644 index 0000000..0f76316 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java @@ -0,0 +1,783 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import javax.cache.CacheException; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser; +import org.apache.ignite.internal.util.typedef.F; +import org.h2.command.dml.Query; +import org.h2.command.dml.Select; +import org.h2.command.dml.SelectUnion; +import org.h2.expression.Comparison; +import org.h2.expression.Expression; +import org.h2.expression.ExpressionColumn; +import org.h2.index.IndexCondition; +import org.h2.index.ViewIndex; +import org.h2.table.Column; +import org.h2.table.IndexColumn; +import org.h2.table.SubQueryInfo; +import org.h2.table.Table; +import org.h2.table.TableFilter; +import org.h2.table.TableView; + +/** + * Collocation model for a query. + */ +public final class GridH2CollocationModel { + /** */ + public static final int MULTIPLIER_COLLOCATED = 1; + + /** */ + private static final int MULTIPLIER_UNICAST = 50; + + /** */ + private static final int MULTIPLIER_BROADCAST = 200; + + /** */ + private static final int MULTIPLIER_REPLICATED_NOT_LAST = 10_000; + + /** */ + private final GridH2CollocationModel upper; + + /** */ + private final int filter; + + /** */ + private final boolean view; + + /** */ + private int multiplier; + + /** */ + private Type type; + + /** */ + private GridH2CollocationModel[] children; + + /** */ + private TableFilter[] childFilters; + + /** */ + private List<GridH2CollocationModel> unions; + + /** */ + private Select select; + + /** */ + private final boolean validate; + + /** + * @param upper Upper. + * @param filter Filter. + * @param view This model will be a subquery (or top level query) and must contain child filters. + * @param validate Query validation flag. + */ + private GridH2CollocationModel(GridH2CollocationModel upper, int filter, boolean view, boolean validate) { + this.upper = upper; + this.filter = filter; + this.view = view; + this.validate = validate; + } + + /** + * @param upper Upper. + * @param filter Filter. + * @param unions Unions. + * @param view This model will be a subquery (or top level query) and must contain child filters. + * @param validate Query validation flag. + * @return Created child collocation model. + */ + private static GridH2CollocationModel createChildModel(GridH2CollocationModel upper, + int filter, + List<GridH2CollocationModel> unions, + boolean view, + boolean validate) { + GridH2CollocationModel child = new GridH2CollocationModel(upper, filter, view, validate); + + if (unions != null) { + // Bind created child to unions. + assert upper == null || upper.child(filter, false) != null || unions.isEmpty(); + + if (upper != null && unions.isEmpty()) { + assert upper.child(filter, false) == null; + + upper.children[filter] = child; + } + + unions.add(child); + + child.unions = unions; + } + else if (upper != null) { + // Bind created child to upper model. + assert upper.child(filter, false) == null; + + upper.children[filter] = child; + } + + return child; + } + + /** + * @param childFilters New child filters. + * @return {@code true} If child filters were updated. + */ + private boolean childFilters(TableFilter[] childFilters) { + assert childFilters != null; + assert view; + + Select select = childFilters[0].getSelect(); + + assert this.select == null || this.select == select; + + if (this.select == null) { + this.select = select; + + assert this.childFilters == null; + } + else if (Arrays.equals(this.childFilters, childFilters)) + return false; + + if (this.childFilters == null) { + // We have to clone because H2 reuses array and reorders elements. + this.childFilters = childFilters.clone(); + + children = new GridH2CollocationModel[childFilters.length]; + } + else { + assert this.childFilters.length == childFilters.length; + + // We have to copy because H2 reuses array and reorders elements. + System.arraycopy(childFilters, 0, this.childFilters, 0, childFilters.length); + + Arrays.fill(children, null); + } + + // Reset results. + type = null; + multiplier = 0; + + return true; + } + + /** + * @param i Index. + * @param f Table filter. + * @return {@code true} If the child is not a table or view. + */ + private boolean isChildTableOrView(int i, TableFilter f) { + if (f == null) + f = childFilters[i]; + + Table t = f.getTable(); + + return t.isView() || t instanceof GridH2Table; + } + + /** + * Do the needed calculations. + */ + private void calculate() { + if (type != null) + return; + + if (view) { // We are at (sub-)query model. + assert childFilters != null; + + boolean collocated = true; + boolean partitioned = false; + int maxMultiplier = MULTIPLIER_COLLOCATED; + + for (int i = 0; i < childFilters.length; i++) { + GridH2CollocationModel child = child(i, true); + + Type t = child.type(true); + + if (child.multiplier == MULTIPLIER_REPLICATED_NOT_LAST) + maxMultiplier = child.multiplier; + + if (t.isPartitioned()) { + partitioned = true; + + if (!t.isCollocated()) { + collocated = false; + + int m = child.multiplier(true); + + if (m > maxMultiplier) { + maxMultiplier = m; + + if (maxMultiplier == MULTIPLIER_REPLICATED_NOT_LAST) + break; + } + } + } + } + + type = Type.of(partitioned, collocated); + multiplier = maxMultiplier; + } + else { + assert upper != null; + assert childFilters == null; + + // We are at table instance. + GridH2Table tbl = (GridH2Table)upper.childFilters[filter].getTable(); + + // Only partitioned tables will do distributed joins. + if (!tbl.isPartitioned()) { + type = Type.REPLICATED; + multiplier = MULTIPLIER_COLLOCATED; + + return; + } + + // If we are the first partitioned table in a join, then we are "base" for all the rest partitioned tables + // which will need to get remote result (if there is no affinity condition). Since this query is broadcasted + // to all the affinity nodes the "base" does not need to get remote results. + if (!upper.findPartitionedTableBefore(filter)) { + type = Type.PARTITIONED_COLLOCATED; + multiplier = MULTIPLIER_COLLOCATED; + } + else { + // It is enough to make sure that our previous join by affinity key is collocated, then we are + // collocated. If we at least have affinity key condition, then we do unicast which is cheaper. + switch (upper.joinedWithCollocated(filter)) { + case JOINED_WITH_COLLOCATED: + type = Type.PARTITIONED_COLLOCATED; + multiplier = MULTIPLIER_COLLOCATED; + + break; + + case HAS_AFFINITY_CONDITION: + type = Type.PARTITIONED_NOT_COLLOCATED; + multiplier = MULTIPLIER_UNICAST; + + break; + + case NONE: + type = Type.PARTITIONED_NOT_COLLOCATED; + multiplier = MULTIPLIER_BROADCAST; + + break; + + default: + throw new IllegalStateException(); + } + } + + if (upper.previousReplicated(filter)) + multiplier = MULTIPLIER_REPLICATED_NOT_LAST; + } + } + + /** + * @param f Current filter. + * @return {@code true} If partitioned table was found. + */ + private boolean findPartitionedTableBefore(int f) { + for (int i = 0; i < f; i++) { + GridH2CollocationModel child = child(i, true); + + // The c can be null if it is not a GridH2Table and not a sub-query, + // it is a some kind of function table or anything else that considered replicated. + if (child != null && child.type(true).isPartitioned()) + return true; + } + + // We have to search globally in upper queries as well. + return upper != null && upper.findPartitionedTableBefore(filter); + } + + /** + * @param f Current filter. + * @return {@code true} If previous table is REPLICATED. + */ + private boolean previousReplicated(int f) { + if (f > 0 && child(f - 1, true).type(true) == Type.REPLICATED) + return true; + + return upper != null && upper.previousReplicated(filter); + } + + /** + * @param f Filter. + * @return Affinity join type. + */ + private Affinity joinedWithCollocated(int f) { + TableFilter tf = childFilters[f]; + + GridH2Table tbl = (GridH2Table)tf.getTable(); + + if (validate) { + if (tbl.rowDescriptor().context().customAffinityMapper()) + throw customAffinityError(tbl.spaceName()); + + if (F.isEmpty(tf.getIndexConditions())) { + throw new CacheException("Failed to prepare distributed join query: " + + "join condition does not use index [joinedCache=" + tbl.spaceName() + + ", plan=" + tf.getSelect().getPlanSQL() + ']'); + } + } + + IndexColumn affCol = tbl.getAffinityKeyColumn(); + + boolean affKeyCondFound = false; + + if (affCol != null) { + ArrayList<IndexCondition> idxConditions = tf.getIndexConditions(); + + int affColId = affCol.column.getColumnId(); + + for (int i = 0; i < idxConditions.size(); i++) { + IndexCondition c = idxConditions.get(i); + + int cmpType = c.getCompareType(); + + if ((cmpType == Comparison.EQUAL || cmpType == Comparison.EQUAL_NULL_SAFE) && + c.getColumn().getColumnId() == affColId && c.isEvaluatable()) { + affKeyCondFound = true; + + Expression exp = c.getExpression(); + + exp = exp.getNonAliasExpression(); + + if (exp instanceof ExpressionColumn) { + ExpressionColumn expCol = (ExpressionColumn)exp; + + // This is one of our previous joins. + TableFilter prevJoin = expCol.getTableFilter(); + + if (prevJoin != null) { + GridH2CollocationModel cm = child(indexOf(prevJoin), true); + + if (cm != null) { + Type t = cm.type(true); + + if (t.isPartitioned() && t.isCollocated() && isAffinityColumn(prevJoin, expCol, validate)) + return Affinity.JOINED_WITH_COLLOCATED; + } + } + } + } + } + } + + return affKeyCondFound ? Affinity.HAS_AFFINITY_CONDITION : Affinity.NONE; + } + + /** + * @param f Table filter. + * @return Index. + */ + private int indexOf(TableFilter f) { + for (int i = 0; i < childFilters.length; i++) { + if (childFilters[i] == f) + return i; + } + + throw new IllegalStateException(); + } + + /** + * @param f Table filter. + * @param expCol Expression column. + * @param validate Query validation flag. + * @return {@code true} It it is an affinity column. + */ + private static boolean isAffinityColumn(TableFilter f, ExpressionColumn expCol, boolean validate) { + Column col = expCol.getColumn(); + + if (col == null) + return false; + + Table t = col.getTable(); + + if (t.isView()) { + Query qry; + + if (f.getIndex() != null) + qry = getSubQuery(f); + else + qry = GridSqlQueryParser.VIEW_QUERY.get((TableView)t); + + return isAffinityColumn(qry, expCol, validate); + } + + if (t instanceof GridH2Table) { + if (validate && ((GridH2Table)t).rowDescriptor().context().customAffinityMapper()) + throw customAffinityError(((GridH2Table)t).spaceName()); + + IndexColumn affCol = ((GridH2Table)t).getAffinityKeyColumn(); + + return affCol != null && col.getColumnId() == affCol.column.getColumnId(); + } + + return false; + } + + /** + * @param qry Query. + * @param expCol Expression column. + * @param validate Query validation flag. + * @return {@code true} It it is an affinity column. + */ + private static boolean isAffinityColumn(Query qry, ExpressionColumn expCol, boolean validate) { + if (qry.isUnion()) { + SelectUnion union = (SelectUnion)qry; + + return isAffinityColumn(union.getLeft(), expCol, validate) && isAffinityColumn(union.getRight(), expCol, validate); + } + + Expression exp = qry.getExpressions().get(expCol.getColumn().getColumnId()).getNonAliasExpression(); + + if (exp instanceof ExpressionColumn) { + expCol = (ExpressionColumn)exp; + + return isAffinityColumn(expCol.getTableFilter(), expCol, validate); + } + + return false; + } + + /** + * @return Multiplier. + */ + public int calculateMultiplier() { + // We don't need multiplier for union here because it will be summarized in H2. + return multiplier(false); + } + + /** + * @param withUnion With respect to union. + * @return Multiplier. + */ + private int multiplier(boolean withUnion) { + calculate(); + + assert multiplier != 0; + + if (withUnion && unions != null) { + int maxMultiplier = 0; + + for (int i = 0; i < unions.size(); i++) { + int m = unions.get(i).multiplier(false); + + if (m > maxMultiplier) + maxMultiplier = m; + } + + return maxMultiplier; + } + + return multiplier; + } + + /** + * @param withUnion With respect to union. + * @return Type. + */ + private Type type(boolean withUnion) { + calculate(); + + assert type != null; + + if (withUnion && unions != null) { + Type left = unions.get(0).type(false); + + for (int i = 1; i < unions.size(); i++) { + Type right = unions.get(i).type(false); + + if (!left.isCollocated() || !right.isCollocated()) { + left = Type.PARTITIONED_NOT_COLLOCATED; + + break; + } + else if (!left.isPartitioned() && !right.isPartitioned()) + left = Type.REPLICATED; + else + left = Type.PARTITIONED_COLLOCATED; + } + + return left; + } + + return type; + } + + /** + * @param i Index. + * @param create Create child if needed. + * @return Child collocation. + */ + private GridH2CollocationModel child(int i, boolean create) { + GridH2CollocationModel child = children[i]; + + if (child == null && create && isChildTableOrView(i, null)) { + TableFilter f = childFilters[i]; + + if (f.getTable().isView()) { + if (f.getIndex() == null) { + // If we don't have view index yet, then we just creating empty model and it must be filled later. + child = createChildModel(this, i, null, true, validate); + } + else + child = buildCollocationModel(this, i, getSubQuery(f), null, validate); + } + else + child = createChildModel(this, i, null, false, validate); + + assert child != null; + assert children[i] == child; + } + + return child; + } + + /** + * @param f Table filter. + * @return Sub-query. + */ + private static Query getSubQuery(TableFilter f) { + return ((ViewIndex)f.getIndex()).getQuery(); + } + + /** + * @return Unions list. + */ + private List<GridH2CollocationModel> getOrCreateUnions() { + if (unions == null) { + unions = new ArrayList<>(4); + + unions.add(this); + } + + return unions; + } + + /** + * @param qctx Query context. + * @param info Sub-query info. + * @param filters Filters. + * @param filter Filter. + * @param validate Query validation flag. + * @return Collocation. + */ + public static GridH2CollocationModel buildCollocationModel(GridH2QueryContext qctx, SubQueryInfo info, + TableFilter[] filters, int filter, boolean validate) { + GridH2CollocationModel cm; + + if (info != null) { + // Go up until we reach the root query. + cm = buildCollocationModel(qctx, info.getUpper(), info.getFilters(), info.getFilter(), validate); + } + else { + // We are at the root query. + cm = qctx.queryCollocationModel(); + + if (cm == null) { + cm = createChildModel(null, -1, null, true, validate); + + qctx.queryCollocationModel(cm); + } + } + + assert cm.view; + + Select select = filters[0].getSelect(); + + // Handle union. We have to rely on fact that select will be the same on uppermost select. + // For sub-queries we will drop collocation models, so that they will be recalculated anyways. + if (cm.select != null && cm.select != select) { + List<GridH2CollocationModel> unions = cm.getOrCreateUnions(); + + // Try to find this select in existing unions. + // Start with 1 because at 0 it always will be c. + for (int i = 1; i < unions.size(); i++) { + GridH2CollocationModel u = unions.get(i); + + if (u.select == select) { + cm = u; + + break; + } + } + + // Nothing was found, need to create new child in union. + if (cm.select != select) + cm = createChildModel(cm.upper, cm.filter, unions, true, validate); + } + + cm.childFilters(filters); + + return cm.child(filter, true); + } + + /** + * @param qry Query. + * @return {@code true} If the query is collocated. + */ + public static boolean isCollocated(Query qry) { + GridH2CollocationModel mdl = buildCollocationModel(null, -1, qry, null, true); + + Type type = mdl.type(true); + + if (!type.isCollocated() && mdl.multiplier == MULTIPLIER_REPLICATED_NOT_LAST) + throw new CacheException("Failed to execute query: for distributed join " + + "all REPLICATED caches must be at the end of the joined tables list."); + + return type.isCollocated(); + } + + /** + * @param upper Upper. + * @param filter Filter. + * @param qry Query. + * @param unions Unions. + * @param validate Query validation flag. + * @return Built model. + */ + private static GridH2CollocationModel buildCollocationModel(GridH2CollocationModel upper, + int filter, + Query qry, + List<GridH2CollocationModel> unions, + boolean validate) { + if (qry.isUnion()) { + if (unions == null) + unions = new ArrayList<>(); + + SelectUnion union = (SelectUnion)qry; + + GridH2CollocationModel left = buildCollocationModel(upper, filter, union.getLeft(), unions, validate); + GridH2CollocationModel right = buildCollocationModel(upper, filter, union.getRight(), unions, validate); + + assert left != null; + assert right != null; + + return upper != null ? upper : left; + } + + Select select = (Select)qry; + + List<TableFilter> list = new ArrayList<>(); + + for (TableFilter f = select.getTopTableFilter(); f != null; f = f.getJoin()) + list.add(f); + + TableFilter[] filters = list.toArray(new TableFilter[list.size()]); + + GridH2CollocationModel cm = createChildModel(upper, filter, unions, true, validate); + + cm.childFilters(filters); + + for (int i = 0; i < filters.length; i++) { + TableFilter f = filters[i]; + + if (f.getTable().isView()) + buildCollocationModel(cm, i, getSubQuery(f), null, validate); + else if (f.getTable() instanceof GridH2Table) + createChildModel(cm, i, null, false, validate); + } + + return upper != null ? upper : cm; + } + + /** + * @param cacheName Cache name. + * @return Error. + */ + private static CacheException customAffinityError(String cacheName) { + return new CacheException("Failed to prepare distributed join query: can not use distributed joins for cache " + + "with custom AffinityKeyMapper configured. " + + "Please use AffinityKeyMapped annotation instead [cache=" + cacheName + ']'); + } + + /** + * Collocation type. + */ + private enum Type { + /** */ + PARTITIONED_COLLOCATED(true, true), + + /** */ + PARTITIONED_NOT_COLLOCATED(true, false), + + /** */ + REPLICATED(false, true); + + /** */ + private final boolean partitioned; + + /** */ + private final boolean collocated; + + /** + * @param partitioned Partitioned. + * @param collocated Collocated. + */ + Type(boolean partitioned, boolean collocated) { + this.partitioned = partitioned; + this.collocated = collocated; + } + + /** + * @return {@code true} If partitioned. + */ + public boolean isPartitioned() { + return partitioned; + } + + /** + * @return {@code true} If collocated. + */ + public boolean isCollocated() { + return collocated; + } + + /** + * @param partitioned Partitioned. + * @param collocated Collocated. + * @return Type. + */ + static Type of(boolean partitioned, boolean collocated) { + if (collocated) + return partitioned ? Type.PARTITIONED_COLLOCATED : Type.REPLICATED; + + assert partitioned; + + return Type.PARTITIONED_NOT_COLLOCATED; + } + } + + /** + * Affinity of a table relative to previous joined tables. + */ + private enum Affinity { + /** */ + NONE, + + /** */ + HAS_AFFINITY_CONDITION, + + /** */ + JOINED_WITH_COLLOCATED + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Cursor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Cursor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Cursor.java index a2f60c4..66d5736 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Cursor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Cursor.java @@ -28,23 +28,44 @@ import org.h2.result.SearchRow; */ public class GridH2Cursor implements Cursor { /** */ - private Iterator<GridH2Row> iter; + public static final Cursor EMPTY = new Cursor() { + @Override public Row get() { + return null; + } + + @Override public SearchRow getSearchRow() { + return null; + } + + @Override public boolean next() { + return false; + } + + @Override public boolean previous() { + return false; + } + }; /** */ - private Row row; + protected Iterator<? extends Row> iter; + + /** */ + protected Row cur; /** * Constructor. * * @param iter Rows iterator. */ - public GridH2Cursor(Iterator<GridH2Row> iter) { + public GridH2Cursor(Iterator<? extends Row> iter) { + assert iter != null; + this.iter = iter; } /** {@inheritDoc} */ @Override public Row get() { - return row; + return cur; } /** {@inheritDoc} */ @@ -54,12 +75,9 @@ public class GridH2Cursor implements Cursor { /** {@inheritDoc} */ @Override public boolean next() { - row = null; - - if (iter.hasNext()) - row = iter.next(); + cur = iter.hasNext() ? iter.next() : null; - return row != null; + return cur != null; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2DefaultTableEngine.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2DefaultTableEngine.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2DefaultTableEngine.java new file mode 100644 index 0000000..f53f1b3 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2DefaultTableEngine.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt; + +import org.h2.api.TableEngine; +import org.h2.command.ddl.CreateTableData; +import org.h2.table.RegularTable; +import org.h2.table.Table; + +/** + * Default table engine. + */ +public class GridH2DefaultTableEngine implements TableEngine { + /** {@inheritDoc} */ + @Override public Table createTable(CreateTableData data) { + assert !data.persistData && !data.persistIndexes; + + if (data.isHidden && data.id == 0 && "SYS".equals(data.tableName)) + return new GridH2MetaTable(data); + + return new RegularTable(data); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java index fbf7c7c..c29239f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java @@ -17,49 +17,164 @@ package org.apache.ignite.internal.processors.query.h2.opt; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import javax.cache.CacheException; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteInterruptedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.GridTopic; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.managers.communication.GridMessageListener; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRange; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory; +import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.lang.GridFilteredIterator; +import org.apache.ignite.internal.util.typedef.CIX2; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.logger.NullLogger; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.h2.engine.Session; import org.h2.index.BaseIndex; +import org.h2.index.Cursor; +import org.h2.index.IndexCondition; +import org.h2.index.IndexLookupBatch; +import org.h2.index.ViewIndex; import org.h2.message.DbException; import org.h2.result.Row; import org.h2.result.SearchRow; -import org.h2.result.SortOrder; +import org.h2.table.IndexColumn; +import org.h2.table.TableFilter; +import org.h2.util.DoneFuture; import org.h2.value.Value; +import org.h2.value.ValueNull; import org.jetbrains.annotations.Nullable; +import static java.util.Collections.emptyIterator; +import static java.util.Collections.singletonList; +import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL; +import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VAL_COL; +import static org.apache.ignite.internal.processors.query.h2.opt.GridH2CollocationModel.buildCollocationModel; +import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP; +import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE; +import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_ERROR; +import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_NOT_FOUND; +import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_OK; +import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds.rangeBounds; +import static org.h2.result.Row.MEMORY_CALCULATE; + /** * Index base. */ public abstract class GridH2IndexBase extends BaseIndex { /** */ - protected static final ThreadLocal<IndexingQueryFilter> filters = new ThreadLocal<>(); + private static final Object EXPLICIT_NULL = new Object(); + + /** */ + private static final AtomicLong idxIdGen = new AtomicLong(); + + /** */ + protected final long idxId = idxIdGen.incrementAndGet(); + + /** */ + private final ThreadLocal<Object> snapshot = new ThreadLocal<>(); + + /** */ + private Object msgTopic; + + /** */ + private GridMessageListener msgLsnr; /** */ - protected final int keyCol; + private IgniteLogger log; /** */ - protected final int valCol; + private final CIX2<ClusterNode,Message> locNodeHnd = new CIX2<ClusterNode,Message>() { + @Override public void applyx(ClusterNode clusterNode, Message msg) throws IgniteCheckedException { + onMessage0(clusterNode.id(), msg); + } + }; /** - * @param keyCol Key column. - * @param valCol Value column. + * @param tbl Table. */ - protected GridH2IndexBase(int keyCol, int valCol) { - this.keyCol = keyCol; - this.valCol = valCol; + protected final void initDistributedJoinMessaging(GridH2Table tbl) { + final GridH2RowDescriptor desc = tbl.rowDescriptor(); + + if (desc != null && desc.context() != null) { + GridKernalContext ctx = desc.context().kernalContext(); + + log = ctx.log(getClass()); + + msgTopic = new IgniteBiTuple<>(GridTopic.TOPIC_QUERY, tbl.identifier() + '.' + getName()); + + msgLsnr = new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + GridSpinBusyLock l = desc.indexing().busyLock(); + + if (!l.enterBusy()) + return; + + try { + onMessage0(nodeId, msg); + } + finally { + l.leaveBusy(); + } + } + }; + + ctx.io().addMessageListener(msgTopic, msgLsnr); + } + else { + msgTopic = null; + msgLsnr = null; + log = new NullLogger(); + } + } + + /** {@inheritDoc} */ + @Override public final void close(Session ses) { + // No-op. Actual index destruction must happen in method destroy. } /** - * Sets key filters for current thread. - * - * @param fs Filters. + * Attempts to destroys index and release all the resources. + * We use this method instead of {@link #close(Session)} because that method + * is used by H2 internally. */ - public static void setFiltersForThread(IndexingQueryFilter fs) { - filters.set(fs); + public void destroy() { + if (msgLsnr != null) + kernalContext().io().removeMessageListener(msgTopic, msgLsnr); } /** @@ -92,71 +207,121 @@ public abstract class GridH2IndexBase extends BaseIndex { * Takes or sets existing snapshot to be used in current thread. * * @param s Optional existing snapshot to use. + * @param qctx Query context. * @return Snapshot. */ - public Object takeSnapshot(@Nullable Object s) { + public final Object takeSnapshot(@Nullable Object s, GridH2QueryContext qctx) { + assert snapshot.get() == null; + + if (s == null) + s = doTakeSnapshot(); + + if (s != null) { + if (s instanceof GridReservable && !((GridReservable)s).reserve()) + return null; + + snapshot.set(s); + + if (qctx != null) + qctx.putSnapshot(idxId, s); + } + return s; } /** - * Releases snapshot for current thread. + * @param ses Session. */ - public void releaseSnapshot() { - // No-op. + private static void clearViewIndexCache(Session ses) { + Map<Object,ViewIndex> viewIdxCache = ses.getViewIndexCache(true); + + if (!viewIdxCache.isEmpty()) + viewIdxCache.clear(); } - /** {@inheritDoc} */ - @Override public int compareRows(SearchRow rowData, SearchRow compare) { - if (rowData == compare) - return 0; + /** + * @param ses Session. + * @param filters All joined table filters. + * @param filter Current filter. + * @return Multiplier. + */ + public int getDistributedMultiplier(Session ses, TableFilter[] filters, int filter) { + GridH2QueryContext qctx = GridH2QueryContext.get(); - for (int i = 0, len = indexColumns.length; i < len; i++) { - int index = columnIds[i]; + // We do complex optimizations with respect to distributed joins only on prepare stage + // because on run stage reordering of joined tables by Optimizer is explicitly disabled + // and thus multiplier will be always the same, so it will not affect choice of index. + // Query expressions can not be distributed as well. + if (qctx == null || qctx.type() != PREPARE || !qctx.distributedJoins() || ses.isPreparingQueryExpression()) + return GridH2CollocationModel.MULTIPLIER_COLLOCATED; - Value v1 = rowData.getValue(index); - Value v2 = compare.getValue(index); + // We have to clear this cache because normally sub-query plan cost does not depend on anything + // other than index condition masks and sort order, but in our case it can depend on order + // of previous table filters. + clearViewIndexCache(ses); - if (v1 == null || v2 == null) - return 0; + assert filters != null; - int c = compareValues(v1, v2, indexColumns[i].sortType); + GridH2CollocationModel c = buildCollocationModel(qctx, ses.getSubQueryInfo(), filters, filter, false); - if (c != 0) - return c; - } - return 0; + return c.calculateMultiplier(); + } + + /** {@inheritDoc} */ + @Override public GridH2Table getTable() { + return (GridH2Table)super.getTable(); + } + + /** + * Takes and returns actual snapshot or {@code null} if snapshots are not supported. + * + * @return Snapshot or {@code null}. + */ + @Nullable protected abstract Object doTakeSnapshot(); + + /** + * @return Thread local snapshot. + */ + @SuppressWarnings("unchecked") + protected <T> T threadLocalSnapshot() { + return (T)snapshot.get(); } /** - * @param a First value. - * @param b Second value. - * @param sortType Sort type. - * @return Comparison result. + * Releases snapshot for current thread. */ - private int compareValues(Value a, Value b, int sortType) { - if (a == b) - return 0; + public void releaseSnapshot() { + Object s = snapshot.get(); + + assert s != null; - int comp = table.compareTypeSave(a, b); + snapshot.remove(); - if ((sortType & SortOrder.DESCENDING) != 0) - comp = -comp; + if (s instanceof GridReservable) + ((GridReservable)s).release(); - return comp; + if (s instanceof AutoCloseable) + U.closeQuiet((AutoCloseable)s); } /** * Filters rows from expired ones and using predicate. * * @param iter Iterator over rows. + * @param filter Optional filter. * @return Filtered iterator. */ - protected Iterator<GridH2Row> filter(Iterator<GridH2Row> iter) { - IgniteBiPredicate<Object, Object> p = null; + protected Iterator<GridH2Row> filter(Iterator<GridH2Row> iter, IndexingQueryFilter filter) { + return new FilteringIterator(iter, U.currentTimeMillis(), filter, getTable().spaceName()); + } - IndexingQueryFilter f = filters.get(); + /** + * @return Filter for currently running query or {@code null} if none. + */ + protected static IndexingQueryFilter threadLocalFilter() { + GridH2QueryContext qctx = GridH2QueryContext.get(); - return new FilteringIterator(iter, U.currentTimeMillis(), f); + return qctx != null ? qctx.filter() : null; } /** {@inheritDoc} */ @@ -194,10 +359,1125 @@ public abstract class GridH2IndexBase extends BaseIndex { return false; } + /** {@inheritDoc} */ + @Override public IndexLookupBatch createLookupBatch(TableFilter filter) { + GridH2QueryContext qctx = GridH2QueryContext.get(); + + if (qctx == null || !qctx.distributedJoins() || !getTable().isPartitioned()) + return null; + + IndexColumn affCol = getTable().getAffinityKeyColumn(); + + int affColId; + boolean ucast; + + if (affCol != null) { + affColId = affCol.column.getColumnId(); + int[] masks = filter.getMasks(); + ucast = masks != null && masks[affColId] == IndexCondition.EQUALITY; + } + else { + affColId = -1; + ucast = false; + } + + GridCacheContext<?,?> cctx = getTable().rowDescriptor().context(); + + return new DistributedLookupBatch(cctx, ucast, affColId); + } + + /** + * @param nodes Nodes. + * @param msg Message. + */ + private void send(Collection<ClusterNode> nodes, Message msg) { + if (!getTable().rowDescriptor().indexing().send(msgTopic, + -1, + nodes, + msg, + null, + locNodeHnd, + GridIoPolicy.IDX_POOL, + false)) + throw new GridH2RetryException("Failed to send message to nodes: " + nodes + "."); + } + + /** + * @param nodeId Source node ID. + * @param msg Message. + */ + private void onMessage0(UUID nodeId, Object msg) { + ClusterNode node = kernalContext().discovery().node(nodeId); + + if (node == null) + return; + + try { + if (msg instanceof GridH2IndexRangeRequest) + onIndexRangeRequest(node, (GridH2IndexRangeRequest)msg); + else if (msg instanceof GridH2IndexRangeResponse) + onIndexRangeResponse(node, (GridH2IndexRangeResponse)msg); + } + catch (Throwable th) { + U.error(log, "Failed to handle message[nodeId=" + nodeId + ", msg=" + msg + "]", th); + + if (th instanceof Error) + throw th; + } + } + + /** + * @return Kernal context. + */ + private GridKernalContext kernalContext() { + return getTable().rowDescriptor().context().kernalContext(); + } + + /** + * @param node Requesting node. + * @param msg Request message. + */ + private void onIndexRangeRequest(ClusterNode node, GridH2IndexRangeRequest msg) { + GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(), + msg.originNodeId(), + msg.queryId(), + MAP); + + GridH2IndexRangeResponse res = new GridH2IndexRangeResponse(); + + res.originNodeId(msg.originNodeId()); + res.queryId(msg.queryId()); + res.batchLookupId(msg.batchLookupId()); + + if (qctx == null) + res.status(STATUS_NOT_FOUND); + else { + try { + RangeSource src; + + if (msg.bounds() != null) { + // This is the first request containing all the search rows. + ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> snapshot0 = qctx.getSnapshot(idxId); + + assert !msg.bounds().isEmpty() : "empty bounds"; + + src = new RangeSource(msg.bounds(), snapshot0, qctx.filter()); + } + else { + // This is request to fetch next portion of data. + src = qctx.getSource(node.id(), msg.batchLookupId()); + + assert src != null; + } + + List<GridH2RowRange> ranges = new ArrayList<>(); + + int maxRows = qctx.pageSize(); + + assert maxRows > 0 : maxRows; + + while (maxRows > 0) { + GridH2RowRange range = src.next(maxRows); + + if (range == null) + break; + + ranges.add(range); + + if (range.rows() != null) + maxRows -= range.rows().size(); + } + + if (src.hasMoreRows()) { + // Save source for future fetches. + if (msg.bounds() != null) + qctx.putSource(node.id(), msg.batchLookupId(), src); + } + else if (msg.bounds() == null) { + // Drop saved source. + qctx.putSource(node.id(), msg.batchLookupId(), null); + } + + assert !ranges.isEmpty(); + + res.ranges(ranges); + res.status(STATUS_OK); + } + catch (Throwable th) { + U.error(log, "Failed to process request: " + msg, th); + + res.error(th.getClass() + ": " + th.getMessage()); + res.status(STATUS_ERROR); + } + } + + send(singletonList(node), res); + } + + /** + * @param node Responded node. + * @param msg Response message. + */ + private void onIndexRangeResponse(ClusterNode node, GridH2IndexRangeResponse msg) { + GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(), + msg.originNodeId(), msg.queryId(), MAP); + + if (qctx == null) + return; + + Map<ClusterNode, RangeStream> streams = qctx.getStreams(msg.batchLookupId()); + + if (streams == null) + return; + + RangeStream stream = streams.get(node); + + assert stream != null; + + stream.onResponse(msg); + } + + /** + * @param v1 First value. + * @param v2 Second value. + * @return {@code true} If they equal. + */ + private boolean equal(Value v1, Value v2) { + return v1 == v2 || (v1 != null && v2 != null && v1.compareTypeSafe(v2, getDatabase().getCompareMode()) == 0); + } + + /** + * @param qctx Query context. + * @param batchLookupId Batch lookup ID. + * @return Index range request. + */ + private static GridH2IndexRangeRequest createRequest(GridH2QueryContext qctx, int batchLookupId) { + GridH2IndexRangeRequest req = new GridH2IndexRangeRequest(); + + req.originNodeId(qctx.originNodeId()); + req.queryId(qctx.queryId()); + req.batchLookupId(batchLookupId); + + return req; + } + + /** + * @param qctx Query context. + * @param cctx Cache context. + * @return Collection of nodes for broadcasting. + */ + private List<ClusterNode> broadcastNodes(GridH2QueryContext qctx, GridCacheContext<?,?> cctx) { + Map<UUID, int[]> partMap = qctx.partitionsMap(); + + List<ClusterNode> res; + + if (partMap == null) + res = new ArrayList<>(CU.affinityNodes(cctx, qctx.topologyVersion())); + else { + res = new ArrayList<>(partMap.size()); + + GridKernalContext ctx = kernalContext(); + + for (UUID nodeId : partMap.keySet()) { + ClusterNode node = ctx.discovery().node(nodeId); + + if (node == null) + throw new GridH2RetryException("Failed to find node."); + + res.add(node); + } + } + + if (F.isEmpty(res)) + throw new GridH2RetryException("Failed to collect affinity nodes."); + + return res; + } + + /** + * @param cctx Cache context. + * @param qctx Query context. + * @param affKeyObj Affinity key. + * @return Cluster nodes or {@code null} if affinity key is a null value. + */ + private ClusterNode rangeNode(GridCacheContext<?,?> cctx, GridH2QueryContext qctx, Object affKeyObj) { + assert affKeyObj != null && affKeyObj != EXPLICIT_NULL : affKeyObj; + + ClusterNode node; + + if (qctx.partitionsMap() != null) { + // If we have explicit partitions map, we have to use it to calculate affinity node. + UUID nodeId = qctx.nodeForPartition(cctx.affinity().partition(affKeyObj), cctx); + + node = cctx.discovery().node(nodeId); + } + else // Get primary node for current topology version. + node = cctx.affinity().primary(affKeyObj, qctx.topologyVersion()); + + if (node == null) // Node was not found, probably topology changed and we need to retry the whole query. + throw new GridH2RetryException("Failed to find node."); + + return node; + } + + /** + * @param row Row. + * @return Row message. + */ + private GridH2RowMessage toRowMessage(Row row) { + if (row == null) + return null; + + int cols = row.getColumnCount(); + + assert cols > 0 : cols; + + List<GridH2ValueMessage> vals = new ArrayList<>(cols); + + for (int i = 0; i < cols; i++) { + try { + vals.add(GridH2ValueMessageFactory.toMessage(row.getValue(i))); + } + catch (IgniteCheckedException e) { + throw new CacheException(e); + } + } + + GridH2RowMessage res = new GridH2RowMessage(); + + res.values(vals); + + return res; + } + + /** + * @param msg Row message. + * @return Search row. + */ + private SearchRow toSearchRow(GridH2RowMessage msg) { + if (msg == null) + return null; + + GridKernalContext ctx = kernalContext(); + + Value[] vals = new Value[getTable().getColumns().length]; + + assert vals.length > 0; + + List<GridH2ValueMessage> msgVals = msg.values(); + + for (int i = 0; i < indexColumns.length; i++) { + if (i >= msgVals.size()) + continue; + + try { + vals[indexColumns[i].column.getColumnId()] = msgVals.get(i).value(ctx); + } + catch (IgniteCheckedException e) { + throw new CacheException(e); + } + } + + return database.createRow(vals, MEMORY_CALCULATE); + } + + /** + * @param row Search row. + * @return Row message. + */ + private GridH2RowMessage toSearchRowMessage(SearchRow row) { + if (row == null) + return null; + + List<GridH2ValueMessage> vals = new ArrayList<>(indexColumns.length); + + for (IndexColumn idxCol : indexColumns) { + Value val = row.getValue(idxCol.column.getColumnId()); + + if (val == null) + break; + + try { + vals.add(GridH2ValueMessageFactory.toMessage(val)); + } + catch (IgniteCheckedException e) { + throw new CacheException(e); + } + } + + GridH2RowMessage res = new GridH2RowMessage(); + + res.values(vals); + + return res; + } + + /** + * @param msg Message. + * @return Row. + */ + private Row toRow(GridH2RowMessage msg) { + if (msg == null) + return null; + + GridKernalContext ctx = kernalContext(); + + List<GridH2ValueMessage> vals = msg.values(); + + assert !F.isEmpty(vals) : vals; + + Value[] vals0 = new Value[vals.size()]; + + for (int i = 0; i < vals0.length; i++) { + try { + vals0[i] = vals.get(i).value(ctx); + } + catch (IgniteCheckedException e) { + throw new CacheException(e); + } + } + + return database.createRow(vals0, MEMORY_CALCULATE); + } + + /** + * Simple cursor from a single node. + */ + private static class UnicastCursor implements Cursor { + /** */ + final int rangeId; + + /** */ + RangeStream stream; + + /** + * @param rangeId Range ID. + * @param nodes Remote nodes. + * @param rangeStreams Range streams. + */ + private UnicastCursor(int rangeId, Collection<ClusterNode> nodes, Map<ClusterNode,RangeStream> rangeStreams) { + assert nodes.size() == 1; + + this.rangeId = rangeId; + this.stream = rangeStreams.get(F.first(nodes)); + + assert stream != null; + } + + /** {@inheritDoc} */ + @Override public boolean next() { + return stream.next(rangeId); + } + + /** {@inheritDoc} */ + @Override public Row get() { + return stream.get(rangeId); + } + + /** {@inheritDoc} */ + @Override public SearchRow getSearchRow() { + return get(); + } + + /** {@inheritDoc} */ + @Override public boolean previous() { + throw new UnsupportedOperationException(); + } + } + + /** + * Merge cursor from multiple nodes. + */ + private class BroadcastCursor implements Cursor, Comparator<RangeStream> { + /** */ + final int rangeId; + + /** */ + final RangeStream[] streams; + + /** */ + boolean first = true; + + /** */ + int off; + + /** + * @param rangeId Range ID. + * @param nodes Remote nodes. + * @param rangeStreams Range streams. + */ + private BroadcastCursor(int rangeId, Collection<ClusterNode> nodes, Map<ClusterNode,RangeStream> rangeStreams) { + assert nodes.size() > 1; + + this.rangeId = rangeId; + + streams = new RangeStream[nodes.size()]; + + int i = 0; + + for (ClusterNode node : nodes) { + RangeStream stream = rangeStreams.get(node); + + assert stream != null; + + streams[i++] = stream; + } + } + + /** {@inheritDoc} */ + @Override public int compare(RangeStream o1, RangeStream o2) { + if (o1 == o2) + return 0; + + // Nulls are at the beginning of array. + if (o1 == null) + return -1; + + if (o2 == null) + return 1; + + return compareRows(o1.get(rangeId), o2.get(rangeId)); + } + + /** + * Try to fetch the first row. + * + * @return {@code true} If we were able to find at least one row. + */ + private boolean goFirst() { + // Fetch first row from all the streams and sort them. + for (int i = 0; i < streams.length; i++) { + if (!streams[i].next(rangeId)) { + streams[i] = null; + off++; // After sorting this offset will cut off all null elements at the beginning of array. + } + } + + if (off == streams.length) + return false; + + Arrays.sort(streams, this); + + return true; + } + + /** + * Fetch next row. + * + * @return {@code true} If we were able to find at least one row. + */ + private boolean goNext() { + assert off != streams.length; + + if (!streams[off].next(rangeId)) { + // Next row from current min stream was not found -> nullify that stream and bump offset forward. + streams[off] = null; + + return ++off != streams.length; + } + + // Bubble up current min stream with respect to fetched row to achieve correct sort order of streams. + for (int i = off, last = streams.length - 1; i < last; i++) { + if (compareRows(streams[i].get(rangeId), streams[i + 1].get(rangeId)) <= 0) + break; + + U.swap(streams, i, i + 1); + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean next() { + if (first) { + first = false; + + return goFirst(); + } + + return goNext(); + } + + /** {@inheritDoc} */ + @Override public Row get() { + return streams[off].get(rangeId); + } + + /** {@inheritDoc} */ + @Override public SearchRow getSearchRow() { + return get(); + } + + /** {@inheritDoc} */ + @Override public boolean previous() { + throw new UnsupportedOperationException(); + } + } + + /** + * Index lookup batch. + */ + private class DistributedLookupBatch implements IndexLookupBatch { + /** */ + final GridCacheContext<?,?> cctx; + + /** */ + final boolean ucast; + + /** */ + final int affColId; + + /** */ + GridH2QueryContext qctx; + + /** */ + int batchLookupId; + + /** */ + Map<ClusterNode, RangeStream> rangeStreams = Collections.emptyMap(); + + /** */ + List<ClusterNode> broadcastNodes; + + /** */ + List<Future<Cursor>> res = Collections.emptyList(); + + /** */ + boolean batchFull; + + /** */ + boolean findCalled; + + /** + * @param cctx Cache Cache context. + * @param ucast Unicast or broadcast query. + * @param affColId Affinity column ID. + */ + private DistributedLookupBatch(GridCacheContext<?,?> cctx, boolean ucast, int affColId) { + this.cctx = cctx; + this.ucast = ucast; + this.affColId = affColId; + } + + /** + * @param firstRow First row. + * @param lastRow Last row. + * @return Affinity key or {@code null}. + */ + private Object getAffinityKey(SearchRow firstRow, SearchRow lastRow) { + if (firstRow == null || lastRow == null) + return null; + + Value affKeyFirst = firstRow.getValue(affColId); + Value affKeyLast = lastRow.getValue(affColId); + + if (affKeyFirst != null && equal(affKeyFirst, affKeyLast)) + return affKeyFirst == ValueNull.INSTANCE ? EXPLICIT_NULL : affKeyFirst.getObject(); + + if (affColId == KEY_COL) + return null; + + // Try to extract affinity key from primary key. + Value pkFirst = firstRow.getValue(KEY_COL); + Value pkLast = lastRow.getValue(KEY_COL); + + if (pkFirst == ValueNull.INSTANCE || pkLast == ValueNull.INSTANCE) + return EXPLICIT_NULL; + + if (pkFirst == null || pkLast == null || !equal(pkFirst, pkLast)) + return null; + + Object pkAffKeyFirst = cctx.affinity().affinityKey(pkFirst.getObject()); + Object pkAffKeyLast = cctx.affinity().affinityKey(pkLast.getObject()); + + if (pkAffKeyFirst == null || pkAffKeyLast == null) + throw new CacheException("Cache key without affinity key."); + + if (pkAffKeyFirst.equals(pkAffKeyLast)) + return pkAffKeyFirst; + + return null; + } + + /** {@inheritDoc} */ + @SuppressWarnings("ForLoopReplaceableByForEach") + @Override public boolean addSearchRows(SearchRow firstRow, SearchRow lastRow) { + if (qctx == null || findCalled) { + if (qctx == null) { + // It is the first call after query begin (may be after reuse), + // reinitialize query context and result. + qctx = GridH2QueryContext.get(); + res = new ArrayList<>(); + + assert qctx != null; + assert !findCalled; + } + else { + // Cleanup after the previous lookup phase. + assert batchLookupId != 0; + + findCalled = false; + qctx.putStreams(batchLookupId, null); + res.clear(); + } + + // Reinitialize for the next lookup phase. + batchLookupId = qctx.nextBatchLookupId(); + rangeStreams = new HashMap<>(); + } + + Object affKey = affColId == -1 ? null : getAffinityKey(firstRow, lastRow); + + List<ClusterNode> nodes; + Future<Cursor> fut; + + if (affKey != null) { + // Affinity key is provided. + if (affKey == EXPLICIT_NULL) // Affinity key is explicit null, we will not find anything. + return false; + + nodes = F.asList(rangeNode(cctx, qctx, affKey)); + } + else { + // Affinity key is not provided or is not the same in upper and lower bounds, we have to broadcast. + if (broadcastNodes == null) + broadcastNodes = broadcastNodes(qctx, cctx); + + nodes = broadcastNodes; + } + + assert !F.isEmpty(nodes) : nodes; + + final int rangeId = res.size(); + + // Create messages. + GridH2RowMessage first = toSearchRowMessage(firstRow); + GridH2RowMessage last = toSearchRowMessage(lastRow); + + // Range containing upper and lower bounds. + GridH2RowRangeBounds rangeBounds = rangeBounds(rangeId, first, last); + + // Add range to every message of every participating node. + for (int i = 0; i < nodes.size(); i++) { + ClusterNode node = nodes.get(i); + assert node != null; + + RangeStream stream = rangeStreams.get(node); + + List<GridH2RowRangeBounds> bounds; + + if (stream == null) { + stream = new RangeStream(qctx, node); + + stream.req = createRequest(qctx, batchLookupId); + stream.req.bounds(bounds = new ArrayList<>()); + + rangeStreams.put(node, stream); + } + else + bounds = stream.req.bounds(); + + bounds.add(rangeBounds); + + // If at least one node will have a full batch then we are ok. + if (bounds.size() >= qctx.pageSize()) + batchFull = true; + } + + fut = new DoneFuture<>(nodes.size() == 1 ? + new UnicastCursor(rangeId, nodes, rangeStreams) : + new BroadcastCursor(rangeId, nodes, rangeStreams)); + + res.add(fut); + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean isBatchFull() { + return batchFull; + } + + /** + * + */ + private void startStreams() { + if (rangeStreams.isEmpty()) { + assert res.isEmpty(); + + return; + } + + qctx.putStreams(batchLookupId, rangeStreams); + + // Start streaming. + for (RangeStream stream : rangeStreams.values()) + stream.start(); + } + + /** {@inheritDoc} */ + @Override public List<Future<Cursor>> find() { + batchFull = false; + findCalled = true; + + startStreams(); + + return res; + } + + /** {@inheritDoc} */ + @Override public void reset(boolean beforeQry) { + if (beforeQry || qctx == null) // Query context can be null if addSearchRows was never called. + return; + + assert batchLookupId != 0; + + // Do cleanup after the query run. + qctx.putStreams(batchLookupId, null); + qctx = null; // The same query can be reused multiple times for different query contexts. + batchLookupId = 0; + + rangeStreams = Collections.emptyMap(); + broadcastNodes = null; + batchFull = false; + findCalled = false; + res = Collections.emptyList(); + } + + /** {@inheritDoc} */ + @Override public String getPlanSQL() { + return ucast ? "unicast" : "broadcast"; + } + } + + /** + * Per node range stream. + */ + private class RangeStream { + /** */ + final GridH2QueryContext qctx; + + /** */ + final ClusterNode node; + + /** */ + GridH2IndexRangeRequest req; + + /** */ + int remainingRanges; + + /** */ + final BlockingQueue<GridH2IndexRangeResponse> respQueue = new LinkedBlockingQueue<>(); + + /** */ + Iterator<GridH2RowRange> ranges = emptyIterator(); + + /** */ + Cursor cursor = GridH2Cursor.EMPTY; + + /** */ + int cursorRangeId = -1; + + /** + * @param qctx Query context. + * @param node Node. + */ + RangeStream(GridH2QueryContext qctx, ClusterNode node) { + this.node = node; + this.qctx = qctx; + } + + /** + * Start streaming. + */ + private void start() { + remainingRanges = req.bounds().size(); + + assert remainingRanges > 0; + + if (log.isDebugEnabled()) + log.debug("Starting stream: [node=" + node + ", req=" + req + "]"); + + send(singletonList(node), req); + } + + /** + * @param msg Response. + */ + public void onResponse(GridH2IndexRangeResponse msg) { + respQueue.add(msg); + } + + /** + * @return Response. + */ + private GridH2IndexRangeResponse awaitForResponse() { + assert remainingRanges > 0; + + final long start = U.currentTimeMillis(); + + for (int attempt = 0;; attempt++) { + if (qctx.isCleared()) + throw new GridH2RetryException("Query is cancelled."); + + if (kernalContext().isStopping()) + throw new GridH2RetryException("Stopping node."); + + GridH2IndexRangeResponse res; + + try { + res = respQueue.poll(500, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + throw new GridH2RetryException("Interrupted."); + } + + if (res != null) { + switch (res.status()) { + case STATUS_OK: + List<GridH2RowRange> ranges0 = res.ranges(); + + remainingRanges -= ranges0.size(); + + if (ranges0.get(ranges0.size() - 1).isPartial()) + remainingRanges++; + + if (remainingRanges > 0) { + if (req.bounds() != null) + req = createRequest(qctx, req.batchLookupId()); + + // Prefetch next page. + send(singletonList(node), req); + } + else + req = null; + + return res; + + case STATUS_NOT_FOUND: + if (req == null || req.bounds() == null) // We have already received the first response. + throw new GridH2RetryException("Failure on remote node."); + + if (U.currentTimeMillis() - start > 30_000) + throw new GridH2RetryException("Timeout."); + + try { + U.sleep(20 * attempt); + } + catch (IgniteInterruptedCheckedException e) { + throw new IgniteInterruptedException(e.getMessage()); + } + + // Retry to send the request once more after some time. + send(singletonList(node), req); + + break; + + case STATUS_ERROR: + throw new CacheException(res.error()); + + default: + throw new IllegalStateException(); + } + } + + if (!kernalContext().discovery().alive(node)) + throw new GridH2RetryException("Node left: " + node); + } + } + + /** + * @param rangeId Requested range ID. + * @return {@code true} If next row for the requested range was found. + */ + private boolean next(final int rangeId) { + for (;;) { + if (rangeId == cursorRangeId) { + if (cursor.next()) + return true; + } + else if (rangeId < cursorRangeId) + return false; + + cursor = GridH2Cursor.EMPTY; + + while (!ranges.hasNext()) { + if (remainingRanges == 0) { + ranges = emptyIterator(); + + return false; + } + + ranges = awaitForResponse().ranges().iterator(); + } + + GridH2RowRange range = ranges.next(); + + cursorRangeId = range.rangeId(); + + if (!F.isEmpty(range.rows())) { + final Iterator<GridH2RowMessage> it = range.rows().iterator(); + + if (it.hasNext()) { + cursor = new GridH2Cursor(new Iterator<Row>() { + @Override public boolean hasNext() { + return it.hasNext(); + } + + @Override public Row next() { + // Lazily convert messages into real rows. + return toRow(it.next()); + } + + @Override public void remove() { + throw new UnsupportedOperationException(); + } + }); + } + } + } + } + + /** + * @param rangeId Requested range ID. + * @return Current row. + */ + private Row get(int rangeId) { + assert rangeId == cursorRangeId; + + return cursor.get(); + } + } + + /** + * Bounds iterator. + */ + private class RangeSource { + /** */ + Iterator<GridH2RowRangeBounds> boundsIter; + + /** */ + int curRangeId = -1; + + /** */ + Iterator<GridH2Row> curRange = emptyIterator(); + + /** */ + final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree; + + /** */ + final IndexingQueryFilter filter; + + /** + * @param bounds Bounds. + * @param tree Snapshot. + * @param filter Filter. + */ + RangeSource( + Iterable<GridH2RowRangeBounds> bounds, + ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree, + IndexingQueryFilter filter + ) { + this.filter = filter; + this.tree = tree; + boundsIter = bounds.iterator(); + } + + /** + * @return {@code true} If there are more rows in this source. + */ + public boolean hasMoreRows() { + return boundsIter.hasNext() || curRange.hasNext(); + } + + /** + * @param maxRows Max allowed rows. + * @return Range. + */ + public GridH2RowRange next(int maxRows) { + assert maxRows > 0 : maxRows; + + for (;;) { + if (curRange.hasNext()) { + // Here we are getting last rows from previously partially fetched range. + List<GridH2RowMessage> rows = new ArrayList<>(); + + GridH2RowRange nextRange = new GridH2RowRange(); + + nextRange.rangeId(curRangeId); + nextRange.rows(rows); + + do { + rows.add(toRowMessage(curRange.next())); + } + while (rows.size() < maxRows && curRange.hasNext()); + + if (curRange.hasNext()) + nextRange.setPartial(); + else + curRange = emptyIterator(); + + return nextRange; + } + + curRange = emptyIterator(); + + if (!boundsIter.hasNext()) { + boundsIter = emptyIterator(); + + return null; + } + + GridH2RowRangeBounds bounds = boundsIter.next(); + + curRangeId = bounds.rangeId(); + + SearchRow first = toSearchRow(bounds.first()); + SearchRow last = toSearchRow(bounds.last()); + + ConcurrentNavigableMap<GridSearchRowPointer,GridH2Row> t = tree != null ? tree : treeForRead(); + + curRange = doFind0(t, first, true, last, filter); + + if (!curRange.hasNext()) { + // We have to return empty range here. + GridH2RowRange emptyRange = new GridH2RowRange(); + + emptyRange.rangeId(curRangeId); + + return emptyRange; + } + } + } + } + + /** + * @return Snapshot for current thread if there is one. + */ + protected ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead() { + throw new UnsupportedOperationException(); + } + + /** + * @param t Tree. + * @param first Lower bound. + * @param includeFirst Whether lower bound should be inclusive. + * @param last Upper bound always inclusive. + * @param filter Filter. + * @return Iterator over rows in given range. + */ + protected Iterator<GridH2Row> doFind0(ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t, + @Nullable SearchRow first, + boolean includeFirst, + @Nullable SearchRow last, + IndexingQueryFilter filter) { + throw new UnsupportedOperationException(); + } + /** * Iterator which filters by expiration time and predicate. */ - protected class FilteringIterator extends GridFilteredIterator<GridH2Row> { + protected static class FilteringIterator extends GridFilteredIterator<GridH2Row> { /** */ private final IgniteBiPredicate<Object, Object> fltr; @@ -210,15 +1490,19 @@ public abstract class GridH2IndexBase extends BaseIndex { /** * @param iter Iterator. * @param time Time for expired rows filtering. + * @param qryFilter Filter. + * @param spaceName Space name. */ - protected FilteringIterator(Iterator<GridH2Row> iter, long time, - IndexingQueryFilter qryFilter) { + protected FilteringIterator(Iterator<GridH2Row> iter, + long time, + IndexingQueryFilter qryFilter, + String spaceName) { super(iter); this.time = time; if (qryFilter != null) { - this.fltr = qryFilter.forSpace(((GridH2Table)getTable()).spaceName()); + this.fltr = qryFilter.forSpace(spaceName); this.isValRequired = qryFilter.isValueRequired(); } else { @@ -242,8 +1526,8 @@ public abstract class GridH2IndexBase extends BaseIndex { if (fltr == null) return true; - Object key = row.getValue(keyCol).getObject(); - Object val = isValRequired ? row.getValue(valCol).getObject() : null; + Object key = row.getValue(KEY_COL).getObject(); + Object val = isValRequired ? row.getValue(VAL_COL).getObject() : null; assert key != null; assert !isValRequired || val != null;
