Merge branch 'ignite-1.6.10' into ignite-1.7.3
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5fac786b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5fac786b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5fac786b Branch: refs/heads/master Commit: 5fac786b6dbb179127ac725180acb54d0f6f4b0a Parents: d0f4b23 a863eee Author: devozerov <[email protected]> Authored: Mon Oct 31 21:31:05 2016 +0300 Committer: thatcoach <[email protected]> Committed: Mon Oct 31 21:58:20 2016 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteScheduler.java | 13 + .../cache/query/QueryCancelledException.java | 35 +++ .../apache/ignite/cache/query/QueryCursor.java | 8 +- .../ignite/cache/query/SqlFieldsQuery.java | 26 ++ .../org/apache/ignite/cache/query/SqlQuery.java | 25 ++ .../ignite/internal/IgniteSchedulerImpl.java | 18 ++ .../processors/cache/QueryCursorImpl.java | 92 +++++-- .../closure/GridClosureProcessor.java | 1 + .../processors/query/GridQueryCancel.java | 84 +++++++ .../processors/query/GridQueryFieldsResult.java | 3 +- .../query/GridQueryFieldsResultAdapter.java | 3 +- .../processors/query/GridQueryIndexing.java | 11 +- .../processors/query/GridQueryProcessor.java | 105 ++++++-- .../twostep/messages/GridQueryFailResponse.java | 34 ++- .../h2/twostep/messages/GridQueryRequest.java | 31 ++- .../junits/GridTestKernalContext.java | 1 - .../processors/query/h2/IgniteH2Indexing.java | 160 +++++++++--- .../query/h2/twostep/GridMapQueryExecutor.java | 66 +++-- .../h2/twostep/GridReduceQueryExecutor.java | 117 ++++++--- ...niteCacheDistributedQueryCancelSelfTest.java | 176 +++++++++++++ ...butedQueryStopOnCancelOrTimeoutSelfTest.java | 248 +++++++++++++++++++ ...cheQueryAbstractDistributedJoinSelfTest.java | 7 + .../IgniteCacheQueryNodeRestartSelfTest2.java | 125 ++++++---- ...nCancelOrTimeoutDistributedJoinSelfTest.java | 7 + ...eCacheLocalQueryCancelOrTimeoutSelfTest.java | 158 ++++++++++++ .../h2/GridIndexingSpiAbstractSelfTest.java | 4 +- .../IgniteCacheQuerySelfTestSuite2.java | 8 + 27 files changed, 1388 insertions(+), 178 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java index 48dab6b,d1a5117..d3f85af --- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java @@@ -56,12 -58,9 +58,15 @@@ public final class SqlFieldsQuery exten /** Collocation flag. */ private boolean collocated; + /** Query timeout in millis. */ + private int timeout; + + /** */ + private boolean enforceJoinOrder; + + /** */ + private boolean distributedJoins; + /** * Constructs SQL fields query. * http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java index f809b8d,51c6cb5..83e171d --- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java @@@ -43,9 -44,9 +44,12 @@@ public final class SqlQuery<K, V> exten @GridToStringInclude private Object[] args; + /** Timeout in millis. */ + private int timeout; + + /** */ + private boolean distributedJoins; + /** * Constructs query for the given type name and SQL query. * http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index 643cb8c,b1b3c68..6bffa5d --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@@ -80,13 -80,15 +80,15 @@@ public interface GridQueryIndexing * @param spaceName Space name. * @param qry Query. * @param params Query parameters. - * @param filters Space name and key filters. + * @param filter Space name and key filter. + * @param enforceJoinOrder Enforce join order of tables in the query. + * @param timeout Query timeout in milliseconds. + * @param cancel Query cancel. * @return Query result. * @throws IgniteCheckedException If failed. */ - public GridQueryFieldsResult execute(@Nullable String spaceName, String qry, - Collection<Object> params, IndexingQueryFilter filters, int timeout, GridQueryCancel cancel) - throws IgniteCheckedException; + public GridQueryFieldsResult queryLocalSqlFields(@Nullable String spaceName, String qry, - Collection<Object> params, IndexingQueryFilter filter, boolean enforceJoinOrder) throws IgniteCheckedException; ++ Collection<Object> params, IndexingQueryFilter filter, boolean enforceJoinOrder, int timeout, GridQueryCancel cancel) throws IgniteCheckedException; /** * Executes regular query. http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 12cf962,3d185c6..27c0b71 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@@ -17,6 -17,6 +17,7 @@@ package org.apache.ignite.internal.processors.query; ++import java.util.concurrent.TimeUnit; import java.lang.reflect.AccessibleObject; import java.lang.reflect.Field; import java.lang.reflect.Member; @@@ -719,6 -740,43 +734,43 @@@ public class GridQueryProcessor extend } /** + * @param space Space. + * @param clause Clause. + * @param params Parameters collection. + * @param resType Result type. + * @param filters Filters. + * @return Key/value rows. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(final String space, final String clause, + final Collection<Object> params, final String resType, final IndexingQueryFilter filters) + throws IgniteCheckedException { + checkEnabled(); + + if (!busyLock.enterBusy()) + throw new IllegalStateException("Failed to execute query (grid is stopping)."); + + try { + final GridCacheContext<?, ?> cctx = ctx.cache().internalCache(space).context(); + + return executeQuery(cctx, new IgniteOutClosureX<GridCloseableIterator<IgniteBiTuple<K, V>>>() { + @Override public GridCloseableIterator<IgniteBiTuple<K, V>> applyx() throws IgniteCheckedException { + TypeDescriptor type = typesByName.get(new TypeName(space, resType)); + + if (type == null || !type.registered()) + throw new CacheException("Failed to find SQL table for type: " + resType); + - return idx.query(space, clause, params, type, filters); ++ return idx.queryLocalSql(space, clause, params, type, filters); + } + }, false); + } + finally { + busyLock.leaveBusy(); + } + } + + /** * @param cctx Cache context. * @param qry Query. * @return Cursor. @@@ -868,6 -925,24 +920,24 @@@ } /** + * @param timeout Timeout. + * @param timeUnit Time unit. + * @return Converted time. + */ + public static int validateTimeout(int timeout, TimeUnit timeUnit) { + A.ensure(timeUnit != TimeUnit.MICROSECONDS && timeUnit != TimeUnit.NANOSECONDS, - "timeUnit minimal resolution is millisecond."); ++ "timeUnit minimal resolution is millisecond."); + + A.ensure(timeout >= 0, "timeout value should be non-negative."); + + long tmp = TimeUnit.MILLISECONDS.convert(timeout, timeUnit); + + A.ensure(timeout <= Integer.MAX_VALUE, "timeout value too large."); + + return (int) tmp; + } + + /** * Closeable iterator. */ private interface ClIter<X> extends AutoCloseable, Iterator<X> { @@@ -888,14 -963,13 +958,13 @@@ return executeQuery(cctx, new IgniteOutClosureX<QueryCursor<List<?>>>() { @Override public QueryCursor<List<?>> applyx() throws IgniteCheckedException { - String space = cctx.name(); - String sql = qry.getSql(); - Object[] args = qry.getArgs(); + final String space = cctx.name(); + final String sql = qry.getSql(); + final Object[] args = qry.getArgs(); + final GridQueryCancel cancel = new GridQueryCancel(); - final GridQueryFieldsResult res = idx.execute(space, sql, F.asList(args), - idx.backupFilter(null, requestTopVer.get(), null), qry.getTimeout(), cancel); + final GridQueryFieldsResult res = idx.queryLocalSqlFields(space, sql, F.asList(args), - idx.backupFilter(requestTopVer.get(), null), qry.isEnforceJoinOrder()); - - sendQueryExecutedEvent(sql, args); ++ idx.backupFilter(requestTopVer.get(), null), qry.isEnforceJoinOrder(), qry.getTimeout(), cancel); QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() { @Override public Iterator<List<?>> iterator() { @@@ -2412,4 -2501,4 +2487,4 @@@ private enum IndexType { ASC, DESC, TEXT } --} ++} http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java index f7de86c,550cf9b..6e42f1c --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java @@@ -167,9 -169,16 +173,16 @@@ public class GridQueryRequest implement } /** + * @return Timeout. + */ + public int timeout() { + return this.timeout; + } + + /** * @return Queries. */ - public Collection<GridCacheSqlQuery> queries() throws IgniteCheckedException { + public Collection<GridCacheSqlQuery> queries() { return qrys; } http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 331b6f9,ab332c1..ed42bc6 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@@ -75,8 -76,8 +77,9 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; + import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.query.GridQueryFieldsResult; import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter; @@@ -132,9 -128,9 +135,10 @@@ import org.h2.index.Index import org.h2.index.SpatialIndex; import org.h2.jdbc.JdbcConnection; import org.h2.jdbc.JdbcPreparedStatement; + import org.h2.jdbc.JdbcStatement; import org.h2.message.DbException; import org.h2.mvstore.cache.CacheLongKeyLIRS; +import org.h2.result.SortOrder; import org.h2.server.web.WebServer; import org.h2.table.Column; import org.h2.table.IndexColumn; @@@ -390,7 -356,7 +394,7 @@@ public class IgniteH2Indexing implement PreparedStatement stmt = cache.get(sql); - if (stmt != null && !stmt.isClosed()) { - if (stmt != null && !((JdbcStatement)stmt).wasCancelled()) { ++ if (stmt != null && !stmt.isClosed() && !((JdbcStatement)stmt).wasCancelled()) { assert stmt.getConnection() == c; return stmt; @@@ -764,31 -733,36 +768,36 @@@ /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public GridQueryFieldsResult execute(@Nullable final String spaceName, final String qry, - @Nullable final Collection<Object> params, final IndexingQueryFilter filters, + @Override public GridQueryFieldsResult queryLocalSqlFields(@Nullable final String spaceName, final String qry, - @Nullable final Collection<Object> params, final IndexingQueryFilter filters, boolean enforceJoinOrder) ++ @Nullable final Collection<Object> params, final IndexingQueryFilter filters, boolean enforceJoinOrder, + final int timeout, final GridQueryCancel cancel) throws IgniteCheckedException { - Connection conn = connectionForSpace(spaceName); - setFilters(filters); ++ final Connection conn = connectionForSpace(spaceName); - try { - final Connection conn = connectionForThread(schema(spaceName)); + initLocalQueryContext(conn, enforceJoinOrder, filters); + try { - ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, qry, params, true); + final PreparedStatement stmt = preparedStatementWithParams(conn, qry, params, true); - List<GridQueryFieldMetadata> meta = null; + List<GridQueryFieldMetadata> meta; - if (rs != null) { - try { - meta = meta(rs.getMetaData()); - } - catch (SQLException e) { - throw new IgniteCheckedException("Failed to get meta data.", e); - } + try { + meta = meta(stmt.getMetaData()); + } + catch (SQLException e) { + throw new IgniteCheckedException("Cannot prepare query metadata", e); } - return new GridQueryFieldsResultAdapter(meta, new FieldsIterator(rs)); + return new GridQueryFieldsResultAdapter(meta, null) { + @Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException{ + ResultSet rs = executeSqlQueryWithTimer(spaceName, stmt, conn, qry, params, timeout, cancel); + + return new FieldsIterator(rs); + } + }; } finally { - setFilters(null); + GridH2QueryContext.clearThreadLocal(); } } @@@ -877,12 -851,54 +886,52 @@@ bindParameters(stmt, params); + return stmt; + } + + /** + * Executes sql query statement. + * + * @param conn Connection,. + * @param stmt Statement. + * @param cancel Query cancel. + * @return Result. + * @throws IgniteCheckedException If failed. + */ + private ResultSet executeSqlQuery(final Connection conn, final PreparedStatement stmt, + int timeoutMillis, @Nullable GridQueryCancel cancel) + throws IgniteCheckedException { + + if (timeoutMillis > 0) + ((Session)((JdbcConnection)conn).getSession()).setQueryTimeout(timeoutMillis); + + if (cancel != null) { + cancel.set(new Runnable() { + @Override public void run() { + try { + stmt.cancel(); - } catch (SQLException ignored) { ++ } ++ catch (SQLException ignored) { + // No-op. + } + } + }); + } + try { return stmt.executeQuery(); } catch (SQLException e) { + // Throw special exception. + if (e.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED) + throw new QueryCancelledException(); + throw new IgniteCheckedException("Failed to execute SQL query.", e); } + finally { - if(cancel != null) - cancel.setCompleted(); - + if (timeoutMillis > 0) + ((Session)((JdbcConnection)conn).getSession()).setQueryTimeout(0); + } } /** @@@ -982,14 -1034,10 +1057,14 @@@ if (tbl == null) throw new CacheException("Failed to find SQL table for type: " + type.name()); - setFilters(filters); + String sql = generateQuery(qry, tbl); + + Connection conn = connectionForThread(tbl.schemaName()); + + initLocalQueryContext(conn, false, filter); try { - ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, params, true); - ResultSet rs = executeQuery(spaceName, qry, params, tbl, null); ++ ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, params, true, 0, null); return new KeyValIterator(rs); } @@@ -998,18 -1046,20 +1073,20 @@@ } } - /** {@inheritDoc} */ - private Iterable<List<?>> doQueryTwoStep(final GridCacheContext<?, ?> cctx, final GridCacheTwoStepQuery qry, - final boolean keepCacheObj, + /** + * @param cctx Cache context. + * @param qry Query. + * @param keepCacheObj Flag to keep cache object. + * @param enforceJoinOrder Enforce join order of tables. + * @return Iterable result. + */ + private Iterable<List<?>> runQueryTwoStep(final GridCacheContext<?,?> cctx, final GridCacheTwoStepQuery qry, - final boolean keepCacheObj, final boolean enforceJoinOrder) { ++ final boolean keepCacheObj, final boolean enforceJoinOrder, + final int timeoutMillis, + final GridQueryCancel cancel) { return new Iterable<List<?>>() { @Override public Iterator<List<?>> iterator() { - return rdcQryExec.query(cctx, qry, keepCacheObj, enforceJoinOrder); - try { - return rdcQryExec.query(cctx, qry, keepCacheObj, timeoutMillis, cancel); - } - finally { - if (cancel != null) - cancel.setCompleted(); - } ++ return rdcQryExec.query(cctx, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel); } }; } @@@ -1038,8 -1088,10 +1115,11 @@@ fqry.setArgs(qry.getArgs()); fqry.setPageSize(qry.getPageSize()); + fqry.setDistributedJoins(qry.isDistributedJoins()); + if(qry.getTimeout() > 0) + fqry.setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS); + final QueryCursor<List<?>> res = queryTwoStep(cctx, fqry); final Iterable<Cache.Entry<K, V>> converted = new Iterable<Cache.Entry<K, V>>() { @@@ -1203,8 -1198,10 +1283,10 @@@ twoStepQry.pageSize(qry.getPageSize()); + GridQueryCancel cancel = new GridQueryCancel(); + QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>( - runQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), enforceJoinOrder)); - doQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), qry.getTimeout(), cancel), cancel); ++ runQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), enforceJoinOrder, qry.getTimeout(), cancel), cancel); cursor.fieldsMeta(meta); @@@ -1263,7 -1271,7 +1345,7 @@@ if (!upper.startsWith("FROM")) from = " FROM " + t + (upper.startsWith("WHERE") || upper.startsWith("ORDER") || upper.startsWith("LIMIT") ? -- " " : " WHERE "); ++ " " : " WHERE "); qry = "SELECT " + t + "." + KEY_FIELD_NAME + ", " + t + "." + VAL_FIELD_NAME + from + qry; @@@ -1561,29 -1552,10 +1643,29 @@@ if (tbl == null) return -1; - IgniteSpiCloseableIterator<List<?>> iter = execute(spaceName, - "SELECT COUNT(*) FROM " + tbl.fullTableName(), null, null, 0, null).iterator(); + Connection conn = connectionForSpace(spaceName); + + setupConnection(conn, false, false); + - ResultSet rs = executeSqlQuery(conn, - "SELECT COUNT(*) FROM " + tbl.fullTableName(), null, false); - + try { ++ ResultSet rs = executeSqlQuery(conn, prepareStatement(conn, "SELECT COUNT(*) FROM " + tbl.fullTableName(), false), ++ 0, null); ++ + if (!rs.next()) + throw new IllegalStateException(); - return ((Number)iter.next().get(0)).longValue(); + return rs.getLong(1); + } + catch (SQLException e) { + throw new IgniteCheckedException(e); + } + } + + /** + * @return Busy lock. + */ + public GridSpinBusyLock busyLock() { + return busyLock; } /** @@@ -1875,11 -1765,11 +1957,11 @@@ U.error(log, "Failed to drop schema on cache stop (will ignore): " + U.maskName(ccfg.getName()), e); } - for (Iterator<Map.Entry<T3<String, String, Boolean>, TwoStepCachedQuery>> it = twoStepCache.entrySet().iterator(); - it.hasNext();) { - Map.Entry<T3<String, String, Boolean>, TwoStepCachedQuery> e = it.next(); + for (Iterator<Map.Entry<TwoStepCachedQueryKey, TwoStepCachedQuery>> it = twoStepCache.entrySet().iterator(); - it.hasNext();) { ++ it.hasNext();) { + Map.Entry<TwoStepCachedQueryKey, TwoStepCachedQuery> e = it.next(); - if (F.eq(e.getKey().get1(), ccfg.getName())) + if (F.eq(e.getKey().space, ccfg.getName())) it.remove(); } } @@@ -3000,4 -2715,10 +3082,10 @@@ lastUsage = U.currentTimeMillis(); } } - } + + /** {@inheritDoc} */ + @Override public void cancelAllQueries() { + for (Connection conn : conns) + U.close(conn, log); + } -} ++} http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index bb5e419,1f05bf7..0314b3d --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@@ -35,6 -32,6 +35,7 @@@ import javax.cache.CacheException import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; ++import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.CacheQueryExecutedEvent; import org.apache.ignite.events.CacheQueryReadEvent; @@@ -51,12 -49,9 +52,13 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsReservation; import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; + import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest; @@@ -158,8 -144,8 +160,8 @@@ public class GridMapQueryExecutor if (nodeRess == null) return; - for (QueryResults ress : nodeRess.values()) - ress.cancel(); + for (QueryResults ress : nodeRess.results().values()) - ress.cancel(); ++ ress.cancel(true); } }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT); @@@ -237,7 -208,7 +239,7 @@@ if (results == null) return; -- results.cancel(); ++ results.cancel(true); } /** @@@ -409,226 -393,132 +411,231 @@@ * @param req Query request. */ private void onQueryRequest(ClusterNode node, GridQueryRequest req) { - ConcurrentMap<Long,QueryResults> nodeRess = resultsForNode(node.id()); + List<Integer> cacheIds; - QueryResults qr = null; + if (req.extraSpaces() != null) { + cacheIds = new ArrayList<>(req.extraSpaces().size() + 1); - List<GridReservable> reserved = new ArrayList<>(); + cacheIds.add(CU.cacheId(req.space())); - try { - // Unmarshall query params. - Collection<GridCacheSqlQuery> qrys; + for (String extraSpace : req.extraSpaces()) + cacheIds.add(CU.cacheId(extraSpace)); + } + else + cacheIds = Collections.singletonList(CU.cacheId(req.space())); + + onQueryRequest0(node, + req.requestId(), + req.queries(), + cacheIds, + req.topologyVersion(), + null, + req.partitions(), + null, + req.pageSize(), - false); ++ false, ++ req.timeout()); + } - try { - qrys = req.queries(); + /** + * @param node Node. + * @param req Query request. + */ + private void onQueryRequest(ClusterNode node, GridH2QueryRequest req) { + Map<UUID,int[]> partsMap = req.partitions(); + int[] parts = partsMap == null ? null : partsMap.get(ctx.localNodeId()); + + onQueryRequest0(node, + req.requestId(), + req.queries(), + req.caches(), + req.topologyVersion(), + partsMap, + parts, + req.tables(), + req.pageSize(), - req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS)); ++ req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS), ++ req.timeout()); + } - if (!node.isLocal()) { - Marshaller m = ctx.config().getMarshaller(); + /** + * @param node Node authored request. + * @param reqId Request ID. + * @param qrys Queries to execute. + * @param cacheIds Caches which will be affected by these queries. + * @param topVer Topology version. + * @param partsMap Partitions map for unstable topology. + * @param parts Explicit partitions for current node. + * @param tbls Tables. + * @param pageSize Page size. + * @param distributedJoins Can we expect distributed joins to be ran. + */ + private void onQueryRequest0( + ClusterNode node, + long reqId, + Collection<GridCacheSqlQuery> qrys, + List<Integer> cacheIds, + AffinityTopologyVersion topVer, + Map<UUID, int[]> partsMap, + int[] parts, + Collection<String> tbls, + int pageSize, - boolean distributedJoins ++ boolean distributedJoins, ++ int timeout + ) { + // Prepare to run queries. + GridCacheContext<?, ?> mainCctx = ctx.cache().context().cacheContext(cacheIds.get(0)); - for (GridCacheSqlQuery qry : qrys) - qry.unmarshallParams(m, ctx); - } - } - catch (IgniteCheckedException e) { - throw new CacheException("Failed to unmarshall parameters.", e); - } + if (mainCctx == null) + throw new CacheException("Failed to find cache."); + + NodeResults nodeRess = resultsForNode(node.id()); - List<String> caches = (List<String>)F.concat(true, req.space(), req.extraSpaces()); + QueryResults qr = null; - // Topology version can be null in rolling restart with previous version! - final AffinityTopologyVersion topVer = req.topologyVersion(); + List<GridReservable> reserved = new ArrayList<>(); + try { if (topVer != null) { // Reserve primary for topology version or explicit partitions. - if (!reservePartitions(caches, topVer, req.partitions(), reserved)) { - sendRetry(node, req.requestId()); + if (!reservePartitions(cacheIds, topVer, parts, reserved)) { + sendRetry(node, reqId); return; } } - // Prepare to run queries. - GridCacheContext<?,?> mainCctx = cacheContext(req.space()); + qr = new QueryResults(reqId, qrys.size(), mainCctx); - if (mainCctx == null) - throw new CacheException("Failed to find cache: " + req.space()); + if (nodeRess.results().put(reqId, qr) != null) + throw new IllegalStateException(); - qr = new QueryResults(req.requestId(), qrys.size(), mainCctx); + // Prepare query context. + GridH2QueryContext qctx = new GridH2QueryContext(ctx.localNodeId(), + node.id(), + reqId, + mainCctx.isReplicated() ? REPLICATED : MAP) + .filter(h2.backupFilter(topVer, parts)) + .partitionsMap(partsMap) + .distributedJoins(distributedJoins) + .pageSize(pageSize) + .topologyVersion(topVer) + .reservations(reserved); - if (nodeRess.put(req.requestId(), qr) != null) - throw new IllegalStateException(); + List<GridH2Table> snapshotedTbls = null; - h2.setFilters(h2.backupFilter(caches, topVer, req.partitions())); + if (!F.isEmpty(tbls)) { + snapshotedTbls = new ArrayList<>(tbls.size()); - // TODO Prepare snapshots for all the needed tables before the run. + for (String identifier : tbls) { + GridH2Table tbl = h2.dataTable(identifier); - // Run queries. - int i = 0; + Objects.requireNonNull(tbl, identifier); - for (GridCacheSqlQuery qry : qrys) { - ResultSet rs = h2.executeSqlQueryWithTimer(req.space(), - h2.connectionForSpace(req.space()), - qry.query(), - F.asList(qry.parameters()), - true, - req.timeout(), - qr.cancels[i]); + tbl.snapshotIndexes(qctx); - if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { - ctx.event().record(new CacheQueryExecutedEvent<>( - node, - "SQL query executed.", - EVT_CACHE_QUERY_EXECUTED, - CacheQueryType.SQL.name(), - mainCctx.namex(), - null, - qry.query(), - null, - null, - qry.parameters(), - node.id(), - null)); + snapshotedTbls.add(tbl); } + } - assert rs instanceof JdbcResultSet : rs.getClass(); + Connection conn = h2.connectionForSpace(mainCctx.name()); - qr.addResult(i, qry, node.id(), rs); + // Here we enforce join order to have the same behavior on all the nodes. + h2.setupConnection(conn, distributedJoins, true); - if (qr.canceled) { - qr.result(i).close(); + GridH2QueryContext.set(qctx); - return; + // qctx is set, we have to release reservations inside of it. + reserved = null; + + try { + if (nodeRess.cancelled(reqId)) { + GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type()); + + nodeRess.results().remove(reqId); + - return; ++ throw new QueryCancelledException(); } - // Send the first page. - sendNextPage(nodeRess, node, qr, i, req.pageSize()); + // Run queries. + int i = 0; + + boolean evt = ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED); + + for (GridCacheSqlQuery qry : qrys) { + ResultSet rs = h2.executeSqlQueryWithTimer(mainCctx.name(), conn, qry.query(), - F.asList(qry.parameters()), true); ++ F.asList(qry.parameters()), true, ++ timeout, ++ qr.cancels[i]); + + if (evt) { + ctx.event().record(new CacheQueryExecutedEvent<>( + node, + "SQL query executed.", + EVT_CACHE_QUERY_EXECUTED, + CacheQueryType.SQL.name(), + mainCctx.namex(), + null, + qry.query(), + null, + null, + qry.parameters(), + node.id(), + null)); + } + + assert rs instanceof JdbcResultSet : rs.getClass(); + + qr.addResult(i, qry, node.id(), rs); + + if (qr.canceled) { + qr.result(i).close(); - return; - i++; ++ throw new QueryCancelledException(); + } + + // Send the first page. + sendNextPage(nodeRess, node, qr, i, pageSize); + + i++; + } + } + finally { + GridH2QueryContext.clearThreadLocal(); + + if (!distributedJoins) + qctx.clearContext(false); + + if (!F.isEmpty(snapshotedTbls)) { + for (GridH2Table dataTbl : snapshotedTbls) + dataTbl.releaseSnapshots(); + } } } catch (Throwable e) { if (qr != null) { - nodeRess.remove(req.requestId(), qr); + nodeRess.results().remove(reqId, qr); -- qr.cancel(); ++ qr.cancel(false); } - U.error(log, "Failed to execute local query: " + req, e); + if (X.hasCause(e, GridH2RetryException.class)) + sendRetry(node, reqId); + else { + U.error(log, "Failed to execute local query.", e); - sendError(node, req.requestId(), e); + sendError(node, reqId, e); - if (e instanceof Error) - throw (Error)e; + if (e instanceof Error) + throw (Error)e; + } } finally { - h2.setFilters(null); - - // Release reserved partitions. - for (GridReservable r : reserved) - r.release(); - - // Ensure all cancels state is correct. - if (qr != null) - for (int i = 0; i < qr.cancels.length; i++) { - GridQueryCancel cancel = qr.cancels[i]; - - if (cancel != null) - cancel.setCompleted(); - } + if (reserved != null) { + // Release reserved partitions. + for (int i = 0; i < reserved.size(); i++) + reserved.get(i).release(); + } } } @@@ -661,12 -548,12 +668,24 @@@ * @param req Request. */ private void onNextPageRequest(ClusterNode node, GridQueryNextPageRequest req) { - ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(node.id()); + NodeResults nodeRess = qryRess.get(node.id()); + - QueryResults qr = nodeRess == null ? null : nodeRess.results().get(req.queryRequestId()); ++ if (nodeRess == null) { ++ sendError(node, req.queryRequestId(), new CacheException("No node result found for request: " + req)); - if (qr == null || qr.canceled) - QueryResults qr = nodeRess == null ? null : nodeRess.get(req.queryRequestId()); ++ return; ++ } else if (nodeRess.cancelled(req.queryRequestId())) { ++ sendError(node, req.queryRequestId(), new QueryCancelledException()); ++ ++ return; ++ } + - if (qr == null || qr.canceled) ++ QueryResults qr = nodeRess.results().get(req.queryRequestId()); ++ ++ if (qr == null) sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req)); ++ else if (qr.canceled) ++ sendError(node, req.queryRequestId(), new QueryCancelledException()); else sendNextPage(nodeRess, node, qr, req.query(), req.pageSize()); } @@@ -854,9 -705,9 +880,9 @@@ } /** - * + * Cancels the query. */ -- void cancel() { ++ void cancel(boolean forceQryCancel) { if (canceled) return; @@@ -865,8 -716,16 +891,18 @@@ for (int i = 0; i < results.length(); i++) { QueryResult res = results.get(i); - if (res != null) + if (res != null) { res.close(); + + continue; + } + - GridQueryCancel cancel = cancels[i]; ++ if (forceQryCancel) { ++ GridQueryCancel cancel = cancels[i]; + - if (cancel != null) - cancel.cancel(); ++ if (cancel != null) ++ cancel.cancel(); ++ } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 04449ac,3fdbf42..3847373 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@@ -46,6 -45,6 +45,7 @@@ import org.apache.ignite.IgniteCheckedE import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; ++import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; @@@ -64,9 -62,10 +64,10 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator; + import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.h2.GridH2ResultSetIterator; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; -import org.apache.ignite.cache.query.QueryCancelledException; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlType; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest; @@@ -74,14 -73,11 +75,15 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryRequest; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.typedef.CIX2; import org.apache.ignite.internal.util.typedef.F; ++import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiClosure; import org.apache.ignite.lang.IgniteFuture; -import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.plugin.extensions.communication.Message; import org.h2.command.ddl.CreateTableData; import org.h2.engine.Session; @@@ -468,16 -456,13 +476,20 @@@ public class GridReduceQueryExecutor /** * @param cctx Cache context. * @param qry Query. - * @param keepBinary Keep binary. + * @param keepPortable Keep portable. + * @param enforceJoinOrder Enforce join order of tables. - * @return Cursor. + * @param timeoutMillis Timeout in milliseconds. + * @param cancel Query cancel. + * @return Rows iterator. */ - public Iterator<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry, boolean keepBinary, - int timeoutMillis, GridQueryCancel cancel) { + public Iterator<List<?>> query( + GridCacheContext<?, ?> cctx, + GridCacheTwoStepQuery qry, + boolean keepPortable, - boolean enforceJoinOrder ++ boolean enforceJoinOrder, ++ int timeoutMillis, ++ GridQueryCancel cancel + ) { for (int attempt = 0;; attempt++) { if (attempt != 0) { try { @@@ -579,39 -575,27 +595,49 @@@ mapQrys.add(new GridCacheSqlQuery("EXPLAIN " + mapQry.query(), mapQry.parameters())); } - boolean retry = false; - - if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes. - Marshaller m = ctx.config().getMarshaller(); + IgniteProductVersion minNodeVer = cctx.shared().exchange().minimumNodeVersion(topVer); - for (GridCacheSqlQuery mapQry : mapQrys) - mapQry.marshallParams(m); - } + final boolean oldStyle = minNodeVer.compareToIgnoreTimestamp(DISTRIBUTED_JOIN_SINCE) < 0; + final boolean distributedJoins = qry.distributedJoins(); + cancel.set(new Runnable() { + @Override public void run() { - send(finalNodes, new GridQueryCancelRequest(qryReqId), null); ++ send(finalNodes, new GridQueryCancelRequest(qryReqId), null, false); + } + }); + + boolean retry = false; + + if (oldStyle && distributedJoins) + throw new CacheException("Failed to enable distributed joins. Topology contains older data nodes."); + if (send(nodes, - new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null, timeoutMillis), partsMap)) { + oldStyle ? + new GridQueryRequest(qryReqId, + r.pageSize, + space, + mapQrys, + topVer, + extraSpaces(space, qry.spaces()), - null) : ++ null, ++ timeoutMillis) : + new GridH2QueryRequest() + .requestId(qryReqId) + .topologyVersion(topVer) + .pageSize(r.pageSize) + .caches(qry.caches()) + .tables(distributedJoins ? qry.tables() : null) + .partitions(convert(partsMap)) + .queries(mapQrys) - .flags(distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0), ++ .flags(distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0) ++ .timeout(timeoutMillis), + oldStyle && partsMap != null ? new ExplicitPartitionsSpecializer(partsMap) : null, + distributedJoins) - ) { ++ ) { awaitAllReplies(r, nodes); + cancel.checkCancelled(); + Object state = r.state.get(); if (state != null) { @@@ -663,30 -653,20 +692,34 @@@ resIter = res.iterator(); } else { + cancel.checkCancelled(); + - GridCacheSqlQuery rdc = qry.reduceQuery(); + UUID locNodeId = ctx.localNodeId(); + + h2.setupConnection(r.conn, false, enforceJoinOrder); - // Statement caching is prohibited here because we can't guarantee correct merge index reuse. - ResultSet res = h2.executeSqlQueryWithTimer(space, - r.conn, - rdc.query(), - F.asList(rdc.parameters()), - false, - timeoutMillis, - cancel); + GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, qryReqId, REDUCE) + .pageSize(r.pageSize).distributedJoins(false)); - resIter = new Iter(res); + try { + if (qry.explain()) + return explainPlan(r.conn, space, qry); + + GridCacheSqlQuery rdc = qry.reduceQuery(); + + ResultSet res = h2.executeSqlQueryWithTimer(space, + r.conn, + rdc.query(), + F.asList(rdc.parameters()), - false); ++ false, ++ timeoutMillis, ++ cancel); + + resIter = new Iter(res); + } + finally { + GridH2QueryContext.clearThreadLocal(); + } } } @@@ -699,16 -677,7 +730,7 @@@ continue; } - final Collection<ClusterNode> finalNodes = nodes; - - return new GridQueryCacheObjectsIterator(resIter, cctx, keepPortable) { - @Override public void close() throws Exception { - super.close(); - - if (distributedJoins || !allIndexesFetched(r.idxs)) - send(finalNodes, new GridQueryCancelRequest(qryReqId), null, false); - } - }; - return new GridQueryCacheObjectsIterator(resIter, cctx, keepBinary); ++ return new GridQueryCacheObjectsIterator(resIter, cctx, keepPortable); } catch (IgniteCheckedException | RuntimeException e) { U.closeQuiet(r.conn); @@@ -741,19 -717,33 +770,43 @@@ } /** + * @param idxs Merge indexes. + * @return {@code true} If all remote data was fetched. + */ + private static boolean allIndexesFetched(List<GridMergeIndex> idxs) { + for (int i = 0; i < idxs.size(); i++) { + if (!idxs.get(i).fetchedAll()) + return false; + } + + return true; + } + + /** + * Returns true if the exception is triggered by query cancel. + * + * @param e Exception. + * @return {@code true} if exception is caused by cancel. + */ + private boolean wasCancelled(CacheException e) { - return e.getSuppressed() != null && e.getSuppressed().length > 0 && - e.getSuppressed()[0] instanceof QueryCancelledException; ++ return X.hasSuppressed(e, QueryCancelledException.class); + } + + /** - * Explicitly cancels remote queries. - * @param nodes Nodes. + * @param r Query run. + * @param qryReqId Query id. + */ + private void cancelRemoteQueriesIfNeeded(Collection<ClusterNode> nodes, QueryRun r, long qryReqId) { + for (GridMergeIndex idx : r.idxs) { + if (!idx.fetchedAll()) { - send(nodes, new GridQueryCancelRequest(qryReqId), null); ++ send(nodes, new GridQueryCancelRequest(qryReqId), null, false); + + break; + } + } + } + + /** * @param r Query run. * @param nodes Nodes to check periodically if they alive. * @throws IgniteInterruptedCheckedException If interrupted. @@@ -1290,7 -1259,7 +1345,7 @@@ latch.countDown(); for (GridMergeIndex idx : idxs) // Fail all merge indexes. -- idx.fail(nodeId); ++ idx.fail(nodeId, o instanceof CacheException ? (CacheException) o : null); } /** @@@ -1332,24 -1301,4 +1387,24 @@@ return res; } } + + /** + * + */ + private class ExplicitPartitionsSpecializer implements IgniteBiClosure<ClusterNode,Message,Message> { + /** */ + private final Map<ClusterNode,IntArray> partsMap; + + /** + * @param partsMap Partitions map. + */ + private ExplicitPartitionsSpecializer(Map<ClusterNode,IntArray> partsMap) { + this.partsMap = partsMap; + } + + /** {@inheritDoc} */ + @Override public Message apply(ClusterNode n, Message msg) { + return copy(msg, n, partsMap); + } + } - } + } http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java index 0000000,0000000..be34a09 new file mode 100644 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java @@@ -1,0 -1,0 +1,7 @@@ ++package org.apache.ignite.internal.processors.cache.distributed.near; ++ ++/** ++ * Created by vozerov on 31.10.2016. ++ */ ++public class IgniteCacheQueryAbstractDistributedJoinSelfTest { ++} http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java index 0000000,0000000..80bd62e new file mode 100644 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java @@@ -1,0 -1,0 +1,7 @@@ ++package org.apache.ignite.internal.processors.cache.distributed.near; ++ ++/** ++ * Created by vozerov on 31.10.2016. ++ */ ++public class IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest { ++} http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java index 760ee19,6e493ea..bcf8f9d --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java @@@ -349,8 -348,8 +349,8 @@@ public abstract class GridIndexingSpiAb // Fields query GridQueryFieldsResult fieldsRes = - spi.execute("A", "select a.a.name n1, a.a.age a1, b.a.name n2, " + - "b.a.age a2 from a.a, b.a where a.a.id = b.a.id ", Collections.emptySet(), null, 0, null); + spi.queryLocalSqlFields("A", "select a.a.name n1, a.a.age a1, b.a.name n2, " + - "b.a.age a2 from a.a, b.a where a.a.id = b.a.id ", Collections.emptySet(), null, false); ++ "b.a.age a2 from a.a, b.a where a.a.id = b.a.id ", Collections.emptySet(), null, false, 0, null); String[] aliases = {"N1", "A1", "N2", "A2"}; Object[] vals = { "Valera", 19, "Kolya", 25}; @@@ -451,8 -450,7 +451,8 @@@ time = now; range *= 3; - GridQueryFieldsResult res = spi.execute("A", sql, Arrays.<Object>asList(1, range), null, 0, null); + GridQueryFieldsResult res = spi.queryLocalSqlFields("A", sql, Arrays.<Object>asList(1, range), null, - false); ++ false, 0, null); assert res.iterator().hasNext(); http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java index 568f880,9128f76..5722c01 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java @@@ -39,7 -41,12 +41,8 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryP2PEnabledSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQuerySelfTest; import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest; + import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQueryCancelOrTimeoutSelfTest; import org.apache.ignite.internal.processors.cache.query.GridCacheSwapScanQuerySelfTest; -import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryAtomicSelfTest; -import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryLocalSelfTest; -import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryPartitionedSelfTest; -import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryReplicatedSelfTest; import org.apache.ignite.internal.processors.query.h2.sql.BaseH2CompareQueryTest; import org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryTest; import org.apache.ignite.spi.communication.tcp.GridOrderedMessageCancelSelfTest;
