http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java index 49c679d..57ca4df 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java @@ -36,9 +36,6 @@ public class GridSqlTable extends GridSqlElement { /** */ private final GridH2Table tbl; - /** */ - private boolean affKeyCond; - /** * @param schema Schema. * @param tblName Table name. @@ -60,7 +57,7 @@ public class GridSqlTable extends GridSqlElement { * @param tbl H2 Table. */ private GridSqlTable(@Nullable String schema, String tblName, @Nullable Table tbl) { - super(Collections.<GridSqlElement>emptyList()); + super(Collections.<GridSqlAst>emptyList()); assert schema != null : "schema"; assert tblName != null : "tblName"; @@ -71,20 +68,6 @@ public class GridSqlTable extends GridSqlElement { this.tbl = tbl instanceof GridH2Table ? (GridH2Table)tbl : null; } - /** - * @param affKeyCond If affinity key condition is found. - */ - public void affinityKeyCondition(boolean affKeyCond) { - this.affKeyCond = affKeyCond; - } - - /** - * @return {@code true} If affinity key condition is found. - */ - public boolean affinityKeyCondition() { - return affKeyCond; - } - /** {@inheritDoc} */ @Override public String getSQL() { if (schema == null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlType.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlType.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlType.java index efe9138..b4a610c 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlType.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlType.java @@ -48,6 +48,10 @@ public final class GridSqlType { public static final GridSqlType BOOLEAN = new GridSqlType(Value.BOOLEAN, 0, ValueBoolean.PRECISION, ValueBoolean.DISPLAY_SIZE, "BOOLEAN"); + /** */ + public static final GridSqlType RESULT_SET = new GridSqlType(Value.RESULT_SET, 0, + Integer.MAX_VALUE, Integer.MAX_VALUE, ""); + /** H2 type. */ private final int type; @@ -71,7 +75,7 @@ public final class GridSqlType { * @param sql SQL definition of the type. */ private GridSqlType(int type, int scale, long precision, int displaySize, String sql) { - assert !F.isEmpty(sql) || type == Value.UNKNOWN; + assert !F.isEmpty(sql) || type == Value.UNKNOWN || type == Value.RESULT_SET; this.type = type; this.scale = scale; http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlUnion.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlUnion.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlUnion.java index b11278e..09f0e24 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlUnion.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlUnion.java @@ -26,6 +26,12 @@ import org.h2.util.StatementBuilder; */ public class GridSqlUnion extends GridSqlQuery { /** */ + public static final int LEFT_CHILD = 2; + + /** */ + public static final int RIGHT_CHILD = 3; + + /** */ private int unionType; /** */ @@ -35,6 +41,57 @@ public class GridSqlUnion extends GridSqlQuery { private GridSqlQuery left; /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <E extends GridSqlAst> E child(int childIdx) { + if (childIdx < LEFT_CHILD) + return super.child(childIdx); + + switch (childIdx) { + case LEFT_CHILD: + assert left != null; + + return (E)left; + + case RIGHT_CHILD: + assert right != null; + + return (E)right; + + default: + throw new IllegalStateException("Child index: " + childIdx); + } + } + + /** {@inheritDoc} */ + @Override public <E extends GridSqlAst> void child(int childIdx, E child) { + if (childIdx < LEFT_CHILD) { + super.child(childIdx, child); + + return; + } + + switch (childIdx) { + case LEFT_CHILD: + left = (GridSqlQuery)child; + + break; + + case RIGHT_CHILD: + right = (GridSqlQuery)child; + + break; + + default: + throw new IllegalStateException("Child index: " + childIdx); + } + } + + /** {@inheritDoc} */ + @Override public int size() { + return 4; // OFFSET + LIMIT + LEFT + RIGHT + } + + /** {@inheritDoc} */ @Override protected int visibleColumns() { return left.visibleColumns(); } @@ -50,7 +107,7 @@ public class GridSqlUnion extends GridSqlQuery { buff.append('(').append(left.getSQL()).append(')'); - switch (unionType) { + switch (unionType()) { case SelectUnion.UNION_ALL: buff.append("\nUNION ALL\n"); break; @@ -78,6 +135,13 @@ public class GridSqlUnion extends GridSqlQuery { return buff.toString(); } + /** {@inheritDoc} */ + @Override public boolean simpleQuery() { + return unionType() == SelectUnion.UNION_ALL && sort().isEmpty() && + offset() == null && limit() == null && + left().simpleQuery() && right().simpleQuery(); + } + /** * @return Union type. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlValue.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlValue.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlValue.java deleted file mode 100644 index 665268c..0000000 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlValue.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.h2.sql; - -/** - * Marker interface for a simple value. - */ -public interface GridSqlValue { - // No-op. -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index ac1a6a6..b0fa639 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -84,6 +84,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; +import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.setupConnection; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED; import static org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.QUERY_POOL; @@ -435,6 +436,7 @@ public class GridMapQueryExecutor { null, req.pageSize(), false, + true, req.timeout()); } @@ -456,6 +458,7 @@ public class GridMapQueryExecutor { req.tables(), req.pageSize(), req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS), + req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER), req.timeout()); } @@ -482,6 +485,7 @@ public class GridMapQueryExecutor { Collection<String> tbls, int pageSize, boolean distributedJoins, + boolean enforceJoinOrder, int timeout ) { // Prepare to run queries. @@ -541,8 +545,7 @@ public class GridMapQueryExecutor { Connection conn = h2.connectionForSpace(mainCctx.name()); - // Here we enforce join order to have the same behavior on all the nodes. - h2.setupConnection(conn, distributedJoins, true); + setupConnection(conn, distributedJoins, enforceJoinOrder); GridH2QueryContext.set(qctx); http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java index c267f4a..de14771 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java @@ -17,21 +17,25 @@ package org.apache.ignite.internal.processors.query.h2.twostep; +import java.util.AbstractList; import java.util.ArrayList; import java.util.Collection; -import java.util.ConcurrentModificationException; +import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.RandomAccess; import java.util.Set; import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.cache.CacheException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor; import org.apache.ignite.internal.util.typedef.internal.U; import org.h2.engine.Session; import org.h2.index.BaseIndex; @@ -43,9 +47,13 @@ import org.h2.result.SearchRow; import org.h2.result.SortOrder; import org.h2.table.IndexColumn; import org.h2.table.TableFilter; +import org.h2.value.Value; import org.jetbrains.annotations.Nullable; +import static java.util.Collections.emptyIterator; +import static java.util.Objects.requireNonNull; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_MERGE_TABLE_MAX_SIZE; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE; import static org.apache.ignite.IgniteSystemProperties.getInteger; /** @@ -55,6 +63,27 @@ public abstract class GridMergeIndex extends BaseIndex { /** */ private static final int MAX_FETCH_SIZE = getInteger(IGNITE_SQL_MERGE_TABLE_MAX_SIZE, 10_000); + /** */ + private static final int PREFETCH_SIZE = getInteger(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE, 1024); + + /** */ + protected final Comparator<SearchRow> firstRowCmp = new Comparator<SearchRow>() { + @Override public int compare(SearchRow rowInList, SearchRow searchRow) { + int res = compareRows(rowInList, searchRow); + + return res == 0 ? 1 : res; + } + }; + + /** */ + protected final Comparator<SearchRow> lastRowCmp = new Comparator<SearchRow>() { + @Override public int compare(SearchRow rowInList, SearchRow searchRow) { + int res = compareRows(rowInList, searchRow); + + return res == 0 ? -1 : res; + } + }; + /** All rows number. */ private final AtomicInteger expRowsCnt = new AtomicInteger(0); @@ -67,10 +96,13 @@ public abstract class GridMergeIndex extends BaseIndex { /** * Will be r/w from query execution thread only, does not need to be threadsafe. */ - private ArrayList<Row> fetched = new ArrayList<>(); + private final BlockList<Row> fetched; /** */ - private int fetchedCnt; + private Row lastEvictedRow; + + /** */ + private volatile int fetchedCnt; /** */ private final GridKernalContext ctx; @@ -86,8 +118,9 @@ public abstract class GridMergeIndex extends BaseIndex { GridMergeTable tbl, String name, IndexType type, - IndexColumn[] cols) { - this.ctx = ctx; + IndexColumn[] cols + ) { + this(ctx); initBaseIndex(tbl, 0, name, cols, type); } @@ -96,7 +129,19 @@ public abstract class GridMergeIndex extends BaseIndex { * @param ctx Context. */ protected GridMergeIndex(GridKernalContext ctx) { + if (!U.isPow2(PREFETCH_SIZE)) { + throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + PREFETCH_SIZE + + ") must be positive and a power of 2."); + } + + if (PREFETCH_SIZE >= MAX_FETCH_SIZE) { + throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + PREFETCH_SIZE + + ") must be less than " + IGNITE_SQL_MERGE_TABLE_MAX_SIZE + " (" + MAX_FETCH_SIZE + ")."); + } + this.ctx = ctx; + + fetched = new BlockList<>(PREFETCH_SIZE); } /** @@ -109,7 +154,7 @@ public abstract class GridMergeIndex extends BaseIndex { /** * Fails index if any source node is left. */ - protected final void checkSourceNodesAlive() { + private void checkSourceNodesAlive() { for (UUID nodeId : sources()) { if (!ctx.discovery().alive(nodeId)) { fail(nodeId, null); @@ -154,6 +199,50 @@ public abstract class GridMergeIndex extends BaseIndex { } /** + * @param queue Queue to poll. + * @return Next page. + */ + private GridResultPage takeNextPage(BlockingQueue<GridResultPage> queue) { + GridResultPage page; + + for (;;) { + try { + page = queue.poll(500, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + throw new CacheException("Query execution was interrupted.", e); + } + + if (page != null) + break; + + checkSourceNodesAlive(); + } + + return page; + } + + /** + * @param queue Queue to poll. + * @param iter Current iterator. + * @return The same or new iterator. + */ + protected final Iterator<Value[]> pollNextIterator(BlockingQueue<GridResultPage> queue, Iterator<Value[]> iter) { + while (!iter.hasNext()) { + GridResultPage page = takeNextPage(queue); + + if (page.isLast()) + return emptyIterator(); // We are done. + + fetchNextPage(page); + + iter = page.rows(); + } + + return iter; + } + + /** * @param e Error. */ public void fail(final CacheException e) { @@ -261,9 +350,8 @@ public abstract class GridMergeIndex extends BaseIndex { } /** {@inheritDoc} */ - @Override public Cursor find(Session ses, SearchRow first, SearchRow last) { - if (fetched == null) - throw new IgniteException("Fetched result set was too large."); + @Override public final Cursor find(Session ses, SearchRow first, SearchRow last) { + checkBounds(lastEvictedRow, first, last); if (fetchedAll()) return findAllFetched(fetched, first, last); @@ -279,16 +367,26 @@ public abstract class GridMergeIndex extends BaseIndex { } /** - * @param first First row. - * @param last Last row. + * @param lastEvictedRow Last evicted fetched row. + * @param first Lower bound. + * @param last Upper bound. + */ + protected void checkBounds(Row lastEvictedRow, SearchRow first, SearchRow last) { + if (lastEvictedRow != null) + throw new IgniteException("Fetched result set was too large."); + } + + /** + * @param first Lower bound. + * @param last Upper bound. * @return Cursor. Usually it must be {@link FetchingCursor} instance. */ protected abstract Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last); /** * @param fetched Fetched rows. - * @param first First row. - * @param last Last row. + * @param first Lower bound. + * @param last Upper bound. * @return Cursor. */ protected abstract Cursor findAllFetched(List<Row> fetched, @Nullable SearchRow first, @Nullable SearchRow last); @@ -349,69 +447,212 @@ public abstract class GridMergeIndex extends BaseIndex { } /** + * @param rows Sorted rows list. + * @param searchRow Search row. + * @param cmp Comparator. + * @param checkLast If we need to optimistically check the last row right away. + * @return Insertion point for the search row. + */ + protected static int binarySearchRow( + List<Row> rows, + SearchRow searchRow, + Comparator<SearchRow> cmp, + boolean checkLast + ) { + assert !rows.isEmpty(); + + // Optimistically compare with the last row as a first step. + if (checkLast) { + int res = cmp.compare(last(rows), searchRow); + + assert res != 0; // Comparators must never return 0 here. + + if (res < 0) + return rows.size(); // The search row is greater than the last row. + } + + int res = Collections.binarySearch(rows, searchRow, cmp); + + assert res < 0: res; // Comparator must never return 0. + + return -res - 1; + } + + /** + * @param evictedBlock Evicted block. + */ + private void onBlockEvict(List<Row> evictedBlock) { + assert evictedBlock.size() == PREFETCH_SIZE; + + // Remember the last row (it will be max row) from the evicted block. + lastEvictedRow = requireNonNull(last(evictedBlock)); + } + + /** + * @param l List. + * @return Last element. + */ + private static <Z> Z last(List<Z> l) { + return l.get(l.size() - 1); + } + + /** * Fetching cursor. */ - protected class FetchingCursor extends GridH2Cursor { + protected class FetchingCursor implements Cursor { /** */ - private Iterator<Row> stream; + Iterator<Row> stream; + + /** */ + List<Row> rows; + + /** */ + int cur; + + /** */ + SearchRow first; + + /** */ + SearchRow last; + + /** */ + int lastFound = Integer.MAX_VALUE; /** - * @param stream Iterator. + * @param first Lower bound. + * @param last Upper bound. + * @param stream Stream of all the rows from remote nodes. */ - public FetchingCursor(Iterator<Row> stream) { - super(new FetchedIterator()); - + public FetchingCursor(SearchRow first, SearchRow last, Iterator<Row> stream) { assert stream != null; + // Initially we will use all the fetched rows, after we will switch to the last block. + rows = fetched; + this.stream = stream; + this.first = first; + this.last = last; + + if (haveBounds() && !rows.isEmpty()) + cur = findBounds(); + + cur--; // Set current position before the first row. } - /** {@inheritDoc} */ - @Override public boolean next() { - if (super.next()) { - assert cur != null; - - if (iter == stream && fetched != null) { // Cache fetched rows for reuse. - if (fetched.size() == MAX_FETCH_SIZE) - fetched = null; // Throw away fetched result if it is too large. - else - fetched.add(cur); - } + /** + * @return {@code true} If we have bounds. + */ + private boolean haveBounds() { + return first != null || last != null; + } - fetchedCnt++; + /** + * @return Lower bound. + */ + private int findBounds() { + assert !rows.isEmpty(): "rows"; - return true; + int firstFound = cur; + + // Find the lower bound. + if (first != null) { + firstFound = binarySearchRow(rows, first, firstRowCmp, true); + + assert firstFound >= cur && firstFound <= rows.size(): "firstFound"; + + if (firstFound == rows.size()) + return firstFound; // The lower bound is greater than all the rows we have. + + first = null; // We have found the lower bound, do not need it anymore. } - if (iter == stream) // We've fetched the stream. - return false; + // Find the upper bound. + if (last != null) { + assert lastFound == Integer.MAX_VALUE: "lastFound"; + + int lastFound0 = binarySearchRow(rows, last, lastRowCmp, true); - iter = stream; // Switch from cached to stream. + // If the upper bound is too large we will ignore it. + if (lastFound0 != rows.size()) + lastFound = lastFound0; + } - return next(); + return firstFound; } - } - /** - * List iterator without {@link ConcurrentModificationException}. - */ - private class FetchedIterator implements Iterator<Row> { - /** */ - private int idx; + /** + * Fetch rows from the stream. + */ + private void fetchRows() { + for (;;) { + // Take the current last block and set the position after last. + rows = fetched.lastBlock(); + cur = rows.size(); + + // Fetch stream. + while (stream.hasNext()) { + fetched.add(requireNonNull(stream.next())); + + // Evict block if we've fetched too many rows. + if (fetched.size() == MAX_FETCH_SIZE) { + onBlockEvict(fetched.evictFirstBlock()); + + assert fetched.size() < MAX_FETCH_SIZE; + } + + // No bounds -> no need to do binary search, can return the fetched row right away. + if (!haveBounds()) + break; + + // When the last block changed, it means that we've filled the current last block. + // We have fetched the needed number of rows for binary search. + if (fetched.lastBlock() != rows) { + assert fetched.lastBlock().isEmpty(); // The last row must be added to the previous block. + + break; + } + } + + if (cur == rows.size()) + cur = Integer.MAX_VALUE; // We were not able to fetch anything. Done. + else { + // Update fetched count. + fetchedCnt += rows.size() - cur; + + if (haveBounds()) { + cur = findBounds(); + + if (cur == rows.size()) + continue; // The lower bound is too large, continue fetching rows. + } + } + + return; + } + } + + /** {@inheritDoc} */ + @Override public boolean next() { + if (++cur == rows.size()) + fetchRows(); + + return cur < lastFound; + } /** {@inheritDoc} */ - @Override public boolean hasNext() { - return fetched != null && idx < fetched.size(); + @Override public Row get() { + return rows.get(cur); } /** {@inheritDoc} */ - @Override public Row next() { - return fetched.get(idx++); + @Override public SearchRow getSearchRow() { + return get(); } /** {@inheritDoc} */ - @Override public void remove() { - throw new UnsupportedOperationException(); + @Override public boolean previous() { + // Should never be called. + throw DbException.getUnsupportedException("previous"); } } @@ -427,4 +668,81 @@ public abstract class GridMergeIndex extends BaseIndex { /** */ volatile State state = State.UNINITIALIZED; } + + /** + */ + private static final class BlockList<Z> extends AbstractList<Z> implements RandomAccess { + /** */ + private final List<List<Z>> blocks; + + /** */ + private int size; + + /** */ + private final int maxBlockSize; + + /** */ + private final int shift; + + /** */ + private final int mask; + + /** + * @param maxBlockSize Max block size. + */ + private BlockList(int maxBlockSize) { + assert U.isPow2(maxBlockSize); + + this.maxBlockSize = maxBlockSize; + + shift = Integer.numberOfTrailingZeros(maxBlockSize); + mask = maxBlockSize - 1; + + blocks = new ArrayList<>(); + blocks.add(new ArrayList<Z>()); + } + + /** {@inheritDoc} */ + @Override public int size() { + return size; + } + + /** {@inheritDoc} */ + @Override public boolean add(Z z) { + size++; + + List<Z> lastBlock = lastBlock(); + + lastBlock.add(z); + + if (lastBlock.size() == maxBlockSize) + blocks.add(new ArrayList<Z>()); + + return true; + } + + /** {@inheritDoc} */ + @Override public Z get(int idx) { + return blocks.get(idx >>> shift).get(idx & mask); + } + + /** + * @return Last block. + */ + private List<Z> lastBlock() { + return last(blocks); + } + + /** + * @return Evicted block. + */ + private List<Z> evictFirstBlock() { + // Remove head block. + List<Z> res = blocks.remove(0); + + size -= res.size(); + + return res; + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java new file mode 100644 index 0000000..a1b6691 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowFactory; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.h2.index.Cursor; +import org.h2.index.IndexType; +import org.h2.result.Row; +import org.h2.result.SearchRow; +import org.h2.table.IndexColumn; +import org.h2.value.Value; +import org.jetbrains.annotations.Nullable; + +import static java.util.Collections.emptyIterator; +import static org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase.bubbleUp; + +/** + * Sorted index. + */ +public final class GridMergeIndexSorted extends GridMergeIndex { + /** */ + private final Comparator<RowStream> streamCmp = new Comparator<RowStream>() { + @Override public int compare(RowStream o1, RowStream o2) { + // Nulls at the beginning. + if (o1 == null) + return -1; + + if (o2 == null) + return 1; + + return compareRows(o1.get(), o2.get()); + } + }; + + /** */ + private Map<UUID,RowStream> streamsMap; + + /** */ + private RowStream[] streams; + + /** + * @param ctx Kernal context. + * @param tbl Table. + * @param name Index name, + * @param type Index type. + * @param cols Columns. + */ + public GridMergeIndexSorted( + GridKernalContext ctx, + GridMergeTable tbl, + String name, + IndexType type, + IndexColumn[] cols + ) { + super(ctx, tbl, name, type, cols); + } + + /** {@inheritDoc} */ + @Override public void setSources(Collection<ClusterNode> nodes) { + super.setSources(nodes); + + streamsMap = U.newHashMap(nodes.size()); + streams = new RowStream[nodes.size()]; + + int i = 0; + + for (ClusterNode node : nodes) { + RowStream stream = new RowStream(node.id()); + + streams[i] = stream; + + if (streamsMap.put(stream.src, stream) != null) + throw new IllegalStateException(); + } + } + + /** {@inheritDoc} */ + @Override protected void addPage0(GridResultPage page) { + if (page.isLast() || page.isFail()) { + // Finish all the streams. + for (RowStream stream : streams) + stream.addPage(page); + } + else { + assert page.rowsInPage() > 0; + + UUID src = page.source(); + + streamsMap.get(src).addPage(page); + } + } + + /** {@inheritDoc} */ + @Override protected void checkBounds(Row lastEvictedRow, SearchRow first, SearchRow last) { + // If our last evicted fetched row was smaller than the given lower bound, + // then we are ok. This is important for merge join to work. + if (lastEvictedRow != null && first != null && compareRows(lastEvictedRow, first) < 0) + return; + + super.checkBounds(lastEvictedRow, first, last); + } + + /** {@inheritDoc} */ + @Override protected Cursor findAllFetched(List<Row> fetched, SearchRow first, SearchRow last) { + Iterator<Row> iter; + + if (fetched.isEmpty()) + iter = emptyIterator(); + else if (first == null && last == null) + iter = fetched.iterator(); + else { + int low = first == null ? 0 : binarySearchRow(fetched, first, firstRowCmp, false); + + if (low == fetched.size()) + iter = emptyIterator(); + else { + int high = last == null ? fetched.size() : binarySearchRow(fetched, last, lastRowCmp, false); + + iter = fetched.subList(low, high).iterator(); + } + } + + return new GridH2Cursor(iter); + } + + /** {@inheritDoc} */ + @Override protected Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last) { + return new FetchingCursor(first, last, new MergeStreamIterator()); + } + + /** + * Iterator merging multiple row streams. + */ + private final class MergeStreamIterator implements Iterator<Row> { + /** */ + private boolean first = true; + + /** */ + private int off; + + /** */ + private boolean hasNext; + + /** + * + */ + private void goFirst() { + for (int i = 0; i < streams.length; i++) { + if (!streams[i].next()) { + streams[i] = null; + off++; // Move left bound. + } + } + + if (off < streams.length) + Arrays.sort(streams, streamCmp); + + first = false; + } + + /** + * + */ + private void goNext() { + if (streams[off].next()) + bubbleUp(streams, off, streamCmp); + else + streams[off++] = null; // Move left bound and nullify empty stream. + } + + /** {@inheritDoc} */ + @Override public boolean hasNext() { + if (hasNext) + return true; + + if (first) + goFirst(); + else + goNext(); + + return hasNext = off < streams.length; + } + + /** {@inheritDoc} */ + @Override public Row next() { + if (!hasNext()) + throw new NoSuchElementException(); + + hasNext = false; + + return streams[off].get(); + } + + /** {@inheritDoc} */ + @Override public void remove() { + throw new UnsupportedOperationException(); + } + } + + /** + * Row stream. + */ + private final class RowStream { + /** */ + final UUID src; + + /** */ + final BlockingQueue<GridResultPage> queue = new ArrayBlockingQueue<>(8); + + /** */ + Iterator<Value[]> iter = emptyIterator(); + + /** */ + Row cur; + + /** + * @param src Source. + */ + private RowStream(UUID src) { + this.src = src; + } + + /** + * @param page Page. + */ + private void addPage(GridResultPage page) { + queue.offer(page); + } + + /** + * @return {@code true} If we successfully switched to the next row. + */ + private boolean next() { + cur = null; + + iter = pollNextIterator(queue, iter); + + if (!iter.hasNext()) + return false; + + cur = GridH2RowFactory.create(iter.next()); + + return true; + } + + /** + * @return Current row. + */ + private Row get() { + assert cur != null; + + return cur; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java index 8a8577f..b69c898 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java @@ -22,8 +22,6 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import javax.cache.CacheException; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowFactory; @@ -33,12 +31,11 @@ import org.h2.result.Row; import org.h2.result.SearchRow; import org.h2.table.IndexColumn; import org.h2.value.Value; -import org.jetbrains.annotations.Nullable; /** * Unsorted merge index. */ -public class GridMergeIndexUnsorted extends GridMergeIndex { +public final class GridMergeIndexUnsorted extends GridMergeIndex { /** */ private final BlockingQueue<GridResultPage> queue = new LinkedBlockingQueue<>(); @@ -74,43 +71,22 @@ public class GridMergeIndexUnsorted extends GridMergeIndex { } /** {@inheritDoc} */ - @Override protected Cursor findAllFetched(List<Row> fetched, @Nullable SearchRow first, @Nullable SearchRow last) { + @Override protected Cursor findAllFetched(List<Row> fetched, SearchRow first, SearchRow last) { + // This index is unsorted: have to ignore bounds. return new GridH2Cursor(fetched.iterator()); } /** {@inheritDoc} */ - @Override protected Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last) { - return new FetchingCursor(new Iterator<Row>() { + @Override protected Cursor findInStream(SearchRow first, SearchRow last) { + // This index is unsorted: have to ignore bounds. + return new FetchingCursor(null, null, new Iterator<Row>() { /** */ Iterator<Value[]> iter = Collections.emptyIterator(); @Override public boolean hasNext() { - while (!iter.hasNext()) { - GridResultPage page; + iter = pollNextIterator(queue, iter); - for (;;) { - try { - page = queue.poll(500, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) { - throw new CacheException("Query execution was interrupted.", e); - } - - if (page != null) - break; - - checkSourceNodesAlive(); - } - - if (page.isLast()) - return false; // We are done. - - fetchNextPage(page); - - iter = page.rows(); - } - - return true; + return iter.hasNext(); } @Override public Row next() { http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index f54fab6..128ca8e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -67,7 +67,6 @@ import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext; -import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlType; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse; @@ -101,6 +100,7 @@ import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE; +import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter.mergeTableIdentifier; /** * Reduce query executor. @@ -589,7 +589,8 @@ public class GridReduceQueryExecutor { mapQrys = new ArrayList<>(qry.mapQueries().size()); for (GridCacheSqlQuery mapQry : qry.mapQueries()) - mapQrys.add(new GridCacheSqlQuery("EXPLAIN " + mapQry.query(), mapQry.parameters())); + mapQrys.add(new GridCacheSqlQuery("EXPLAIN " + mapQry.query()) + .parameters(mapQry.parameters(), mapQry.parameterIndexes())); } IgniteProductVersion minNodeVer = cctx.shared().exchange().minimumNodeVersion(topVer); @@ -608,6 +609,12 @@ public class GridReduceQueryExecutor { if (oldStyle && distributedJoins) throw new CacheException("Failed to enable distributed joins. Topology contains older data nodes."); + // Always enforce join order on map side to have consistent behavior. + int flags = GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER; + + if (distributedJoins) + flags |= GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS; + if (send(nodes, oldStyle ? new GridQueryRequest(qryReqId, @@ -626,7 +633,7 @@ public class GridReduceQueryExecutor { .tables(distributedJoins ? qry.tables() : null) .partitions(convert(partsMap)) .queries(mapQrys) - .flags(distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0) + .flags(flags) .timeout(timeoutMillis), oldStyle && partsMap != null ? new ExplicitPartitionsSpecializer(partsMap) : null, distributedJoins) @@ -835,14 +842,6 @@ public class GridReduceQueryExecutor { } /** - * @param idx Table index. - * @return Table name. - */ - private static String table(int idx) { - return GridSqlQuerySplitter.table(idx).getSQL(); - } - - /** * Gets or creates new fake table for index. * * @param c Connection. @@ -860,7 +859,7 @@ public class GridReduceQueryExecutor { try { if ((tbls = fakeTbls).size() == idx) { // Double check inside of lock. try (Statement stmt = c.createStatement()) { - stmt.executeUpdate("CREATE TABLE " + table(idx) + + stmt.executeUpdate("CREATE TABLE " + mergeTableIdentifier(idx) + "(fake BOOL) ENGINE \"" + GridThreadLocalTable.Engine.class.getName() + '"'); } catch (SQLException e) { @@ -1117,7 +1116,8 @@ public class GridReduceQueryExecutor { List<List<?>> lists = new ArrayList<>(); for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++) { - ResultSet rs = h2.executeSqlQueryWithTimer(space, c, "SELECT PLAN FROM " + table(i), null, false, 0, null); + ResultSet rs = h2.executeSqlQueryWithTimer(space, c, + "SELECT PLAN FROM " + mergeTableIdentifier(i), null, false, 0, null); lists.add(F.asList(getPlan(rs))); } http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java index 884173f..e5dbf33 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java @@ -50,6 +50,11 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { */ public static int FLAG_DISTRIBUTED_JOINS = 1; + /** + * Remote map query executor will enforce join order for the received map queries. + */ + public static int FLAG_ENFORCE_JOIN_ORDER = 1 << 1; + /** */ private long reqId; @@ -209,6 +214,8 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { * @return {@code this}. */ public GridH2QueryRequest flags(int flags) { + assert flags >= 0 && flags <= 255: flags; + this.flags = (byte)flags; return this; http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java index d5f02eb..cd38d31 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java @@ -368,7 +368,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA if (cacheMode() == PARTITIONED) { assertEquals(2, res.size()); - assertTrue(((String)res.get(1).get(0)).contains(GridSqlQuerySplitter.table(0).getSQL())); + assertTrue(((String)res.get(1).get(0)).contains(GridSqlQuerySplitter.mergeTableIdentifier(0))); } else assertEquals(1, res.size()); http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java index 06afe7c..432ed34 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java @@ -33,8 +33,8 @@ import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheKeyConfiguration; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CachePeekMode; -import org.apache.ignite.cache.affinity.AffinityKeyMapped; import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.affinity.AffinityKeyMapped; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.annotations.QuerySqlField; @@ -79,6 +79,11 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { return cfg; } + @Override + protected long getTestTimeout() { + return 100_000_000; + } + /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { startGridsMultiThreaded(3, false); @@ -271,9 +276,9 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { String select = "select o.name n1, p.name n2 from Person2 p, Organization o where p.orgId = o._key and o._key=1" + " union select o.name n1, p.name n2 from Person2 p, Organization o where p.orgId = o._key and o._key=2"; - String plan = (String)c.query(new SqlFieldsQuery("explain " + select) + String plan = c.query(new SqlFieldsQuery("explain " + select) .setDistributedJoins(true).setEnforceJoinOrder(true)) - .getAll().get(0).get(0); + .getAll().toString(); X.println("Plan : " + plan); @@ -285,9 +290,9 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { select = "select * from (" + select + ")"; - plan = (String)c.query(new SqlFieldsQuery("explain " + select) + plan = c.query(new SqlFieldsQuery("explain " + select) .setDistributedJoins(true).setEnforceJoinOrder(true)) - .getAll().get(0).get(0); + .getAll().toString(); X.println("Plan : " + plan); @@ -548,15 +553,15 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { "\"orgRepl\".Organization o", "where p.affKey = o._key", true); - checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ", - "(select * from \"persPart\".Person2) p", - "\"orgPart\".Organization o", - "where p._key = o._key", false); - - checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ", - "\"persPart\".Person2 p", - "(select * from \"orgPart\".Organization) o", - "where p._key = o._key", false); + // TODO Now we can not analyze subqueries to decide if we are collocated or not. +// checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ", +// "(select * from \"persPart\".Person2) p", +// "\"orgPart\".Organization o", +// "where p._key = o._key", false); +// checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ", +// "\"persPart\".Person2 p", +// "(select * from \"orgPart\".Organization) o", +// "where p._key = o._key", false); // Join multiple. @@ -576,26 +581,32 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { sql); sql = "select o.k1, p1._key k2, p2._key k3 from " + - "(select o1._key k1, o2._key k2 from \"orgRepl\".Organization o1, \"orgRepl2\".Organization o2 where o1._key > o2._key) o, " + - "\"persPartAff\".Person2 p1, \"persPart\".Person2 p2 where p1._key=p2._key and p2.orgId = o.k1"; + "(select o1._key k1, o2._key k2 " + + "from \"orgRepl\".Organization o1, \"orgRepl2\".Organization o2 " + + "where o1._key > o2._key) o, " + + "\"persPartAff\".Person2 p1, \"persPart\".Person2 p2 " + + "where p1._key=p2._key and p2.orgId = o.k1"; checkQueryPlan(persPart, false, - 1, + 0, sql, - "persPartAff", "persPart", "batched:unicast", "orgRepl"); + "persPartAff", "persPart", "orgRepl"); checkQueryFails(persPart, sql, true); - sql = "select o.k1, p._key k2 from " + - "(select o1._key k1, p1._key k2 from \"orgRepl\".Organization o1, \"persPart\".Person2 p1 where o1._key = p1.orgId) o, " + - "\"persPartAff\".Person2 p where p._key=o.k1"; + sql = "select o.ok, p._key from " + + "(select o1._key ok, p1._key pk " + + "from \"orgRepl\".Organization o1, \"persPart\".Person2 p1 " + + "where o1._key = p1.orgId) o, " + + "\"persPartAff\".Person2 p where p._key=o.ok"; checkQueryPlan(persPart, false, 1, sql, - "FROM \"persPart\"", "INNER JOIN \"orgRepl\"", "INNER JOIN \"persPartAff\"", "batched:broadcast"); + "FROM \"persPart\"", "INNER JOIN \"orgRepl\"", + "INNER JOIN \"persPartAff\"", "batched:unicast"); checkQueryFails(persPart, sql, true); } @@ -657,7 +668,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { { String sql = "select p1._key k1, p2._key k2, o._key k3 " + "from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, \"orgRepl\".Organization o " + - "where p1._key=p2._key and p2.orgId = o._key"; + "where p1._key=p2.name and p2.orgId = o._key"; checkQueryPlan(persPart, false, @@ -666,21 +677,39 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { "batched:unicast"); sql = "select p1._key k1, p2._key k2, o._key k3 " + - "from \"orgRepl\".Organization o, \"persPartAff\".Person2 p1, \"persPart\".Person2 p2 " + + "from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, \"orgRepl\".Organization o " + "where p1._key=p2._key and p2.orgId = o._key"; checkQueryPlan(persPart, false, + 0, + sql); + + sql = "select p1._key k1, p2._key k2, o._key k3 " + + "from \"orgRepl\".Organization o, \"persPartAff\".Person2 p1, \"persPart\".Person2 p2 " + + "where p1._key=p2.name and p2.orgId = o._key"; + + checkQueryPlan(persPart, + false, 1, sql, "batched:unicast"); sql = "select p1._key k1, p2._key k2, o._key k3 " + - "from \"persPartAff\".Person2 p1, \"orgRepl\".Organization o, \"persPart\".Person2 p2 " + + "from \"orgRepl\".Organization o, \"persPartAff\".Person2 p1, \"persPart\".Person2 p2 " + "where p1._key=p2._key and p2.orgId = o._key"; checkQueryPlan(persPart, false, + 0, + sql); + + sql = "select p1._key k1, p2._key k2, o._key k3 " + + "from (select * from \"orgRepl\".Organization) o, \"persPartAff\".Person2 p1, \"persPart\".Person2 p2 " + + "where p1._key=p2.name and p2.orgId = o._key"; + + checkQueryPlan(persPart, + false, 1, sql, "batched:unicast"); @@ -691,9 +720,8 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { checkQueryPlan(persPart, false, - 1, - sql, - "batched:unicast"); + 0, + sql); } } finally { @@ -823,17 +851,28 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { boolean enforceJoinOrder, int expBatchedJoins, String sql, - String...expText) { + String...expText + ) { + checkQueryPlan(cache, + enforceJoinOrder, + expBatchedJoins, + new SqlFieldsQuery(sql), + expText); + + sql = "select * from (" + sql + ")"; + checkQueryPlan(cache, enforceJoinOrder, expBatchedJoins, new SqlFieldsQuery(sql), expText); + sql = "select * from (" + sql + ")"; + checkQueryPlan(cache, enforceJoinOrder, expBatchedJoins, - new SqlFieldsQuery("select * from (" + sql + ")"), + new SqlFieldsQuery(sql), expText); } @@ -854,7 +893,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { String plan = queryPlan(cache, qry); - log.info("Plan: " + plan); + log.info("\n Plan:\n" + plan); assertEquals("Unexpected number of batched joins in plan [plan=" + plan + ", qry=" + qry + ']', expBatchedJoins, http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java index b909b36..d3ff902 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java @@ -53,6 +53,12 @@ public class GridQueryParsingTest extends GridCommonAbstractTest { private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); /** */ + private static final String TEST_SCHEMA = "SCH"; + + /** */ + private static final String TEST_CACHE = "my-cache"; + + /** */ private static Ignite ignite; /** {@inheritDoc} */ @@ -69,12 +75,14 @@ public class GridQueryParsingTest extends GridCommonAbstractTest { // Cache. CacheConfiguration cc = defaultCacheConfiguration(); + cc.setName(TEST_CACHE); cc.setCacheMode(CacheMode.PARTITIONED); cc.setAtomicityMode(CacheAtomicityMode.ATOMIC); cc.setNearConfiguration(null); cc.setWriteSynchronizationMode(FULL_SYNC); cc.setRebalanceMode(SYNC); cc.setSwapEnabled(false); + cc.setSqlSchema(TEST_SCHEMA); cc.setSqlFunctionClasses(GridQueryParsingTest.class); cc.setIndexedTypes( String.class, Address.class, @@ -163,7 +171,7 @@ public class GridQueryParsingTest extends GridCommonAbstractTest { checkQuery("select avg(old) from Person, Address where Person.addrId = Address.id " + "and lower(Address.street) = lower(?)"); - checkQuery("select name, date from Person"); + checkQuery("select name, name, date, date d from Person"); checkQuery("select distinct name, date from Person"); checkQuery("select * from Person p"); checkQuery("select * from Person"); @@ -240,16 +248,16 @@ public class GridQueryParsingTest extends GridCommonAbstractTest { checkQuery("select street from Person p, (select a.street from Address a where a.street is not null) "); checkQuery("select addr.street from Person p, (select a.street from Address a where a.street is not null) addr"); - checkQuery("select p.name n from \"\".Person p order by p.old + 10"); + checkQuery("select p.name n from sch.Person p order by p.old + 10"); - checkQuery("select case when p.name is null then 'Vasya' end x from \"\".Person p"); - checkQuery("select case when p.name like 'V%' then 'Vasya' else 'Other' end x from \"\".Person p"); - checkQuery("select case when upper(p.name) = 'VASYA' then 'Vasya' when p.name is not null then p.name else 'Other' end x from \"\".Person p"); + checkQuery("select case when p.name is null then 'Vasya' end x from sch.Person p"); + checkQuery("select case when p.name like 'V%' then 'Vasya' else 'Other' end x from sch.Person p"); + checkQuery("select case when upper(p.name) = 'VASYA' then 'Vasya' when p.name is not null then p.name else 'Other' end x from sch.Person p"); - checkQuery("select case p.name when 'Vasya' then 1 end z from \"\".Person p"); - checkQuery("select case p.name when 'Vasya' then 1 when 'Petya' then 2 end z from \"\".Person p"); - checkQuery("select case p.name when 'Vasya' then 1 when 'Petya' then 2 else 3 end z from \"\".Person p"); - checkQuery("select case p.name when 'Vasya' then 1 else 3 end z from \"\".Person p"); + checkQuery("select case p.name when 'Vasya' then 1 end z from sch.Person p"); + checkQuery("select case p.name when 'Vasya' then 1 when 'Petya' then 2 end z from sch.Person p"); + checkQuery("select case p.name when 'Vasya' then 1 when 'Petya' then 2 else 3 end z from sch.Person p"); + checkQuery("select case p.name when 'Vasya' then 1 else 3 end z from sch.Person p"); checkQuery("select count(*) as a from Person union select count(*) as a from Address"); checkQuery("select old, count(*) as a from Person group by old union select 1, count(*) as a from Address"); @@ -262,6 +270,17 @@ public class GridQueryParsingTest extends GridCommonAbstractTest { checkQuery("(select name from Person limit 4) UNION (select street from Address limit 1) limit ? offset ?"); checkQuery("(select 2 a) union all (select 1) order by 1"); checkQuery("(select 2 a) union all (select 1) order by a desc nulls first limit ? offset ?"); + + checkQuery("select public.\"#\".\"@\" from (select 1 as \"@\") \"#\""); +// checkQuery("select sch.\"#\".\"@\" from (select 1 as \"@\") \"#\""); // Illegal query. + checkQuery("select \"#\".\"@\" from (select 1 as \"@\") \"#\""); + checkQuery("select \"@\" from (select 1 as \"@\") \"#\""); + checkQuery("select sch.\"#\".old from sch.Person \"#\""); + checkQuery("select sch.\"#\".old from Person \"#\""); + checkQuery("select \"#\".old from Person \"#\""); + checkQuery("select old from Person \"#\""); +// checkQuery("select Person.old from Person \"#\""); // Illegal query. + checkQuery("select sch.\"#\".* from Person \"#\""); } /** */ @@ -374,7 +393,7 @@ public class GridQueryParsingTest extends GridCommonAbstractTest { IgniteH2Indexing idx = U.field(qryProcessor, "idx"); - return (JdbcConnection)idx.connectionForSpace(null); + return (JdbcConnection)idx.connectionForSpace(TEST_CACHE); } /** @@ -416,7 +435,7 @@ public class GridQueryParsingTest extends GridCommonAbstractTest { private void checkQuery(String qry) throws Exception { Prepared prepared = parse(qry); - GridSqlStatement gQry = new GridSqlQueryParser().parse(prepared); + GridSqlStatement gQry = new GridSqlQueryParser(false).parse(prepared); String res = gQry.getSQL();
