http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 9b7d268..f228111 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -40,6 +40,7 @@ import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.PartitionLossPolicy; import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.cache.query.SqlFieldsQuery; @@ -57,12 +58,12 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.CompoundLockFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsReservation; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; @@ -70,10 +71,8 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshalla import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.IgniteSQLException; -import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper; import org.apache.ignite.internal.processors.query.h2.H2Utils; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; -import org.apache.ignite.internal.processors.query.h2.ObjectPoolReusable; import org.apache.ignite.internal.processors.query.h2.ResultSetEnlistFuture; import org.apache.ignite.internal.processors.query.h2.UpdateResult; import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode; @@ -98,13 +97,13 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.indexing.IndexingQueryFilter; -import org.h2.api.ErrorCode; +import org.apache.ignite.thread.IgniteThread; import org.h2.command.Prepared; import org.h2.jdbc.JdbcResultSet; -import org.h2.jdbc.JdbcSQLException; import org.h2.value.Value; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_FORCE_LAZY_RESULT_SET; import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE; import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; @@ -124,6 +123,9 @@ import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2V @SuppressWarnings("ForLoopReplaceableByForEach") public class GridMapQueryExecutor { /** */ + public static final boolean FORCE_LAZY = IgniteSystemProperties.getBoolean(IGNITE_SQL_FORCE_LAZY_RESULT_SET); + + /** */ private IgniteLogger log; /** */ @@ -147,8 +149,8 @@ public class GridMapQueryExecutor { /** Busy lock for lazy workers. */ private final GridSpinBusyLock lazyWorkerBusyLock = new GridSpinBusyLock(); - /** Stop guard. */ - private final AtomicBoolean stopGuard = new AtomicBoolean(); + /** Lazy worker stop guard. */ + private final AtomicBoolean lazyWorkerStopGuard = new AtomicBoolean(); /** * @param busyLock Busy lock. @@ -205,21 +207,18 @@ public class GridMapQueryExecutor { } /** - * Stop query map executor, cleanup resources. + * Cancel active lazy queries and prevent submit of new queries. */ - public void stop() { - if (!stopGuard.compareAndSet(false, true)) + public void cancelLazyWorkers() { + if (!lazyWorkerStopGuard.compareAndSet(false, true)) return; - for (MapNodeResults res : qryRess.values()) - res.cancelAll(); - - for (MapQueryLazyWorker w : lazyWorkers.values()) - w.stop(true); - lazyWorkerBusyLock.block(); - assert lazyWorkers.isEmpty() : "Not cleaned lazy workers: " + lazyWorkers.size(); + for (MapQueryLazyWorker worker : lazyWorkers.values()) + worker.stop(false); + + lazyWorkers.clear(); } /** @@ -260,7 +259,7 @@ public class GridMapQueryExecutor { * @return Busy lock for lazy workers to guard their operations with. */ GridSpinBusyLock busyLock() { - return lazyWorkerBusyLock; + return busyLock; } /** @@ -555,7 +554,6 @@ public class GridMapQueryExecutor { /** * @param node Node. * @param req Query request. - * @throws IgniteCheckedException On error. */ private void onQueryRequest(final ClusterNode node, final GridH2QueryRequest req) throws IgniteCheckedException { int[] qryParts = req.queryPartitions(); @@ -568,14 +566,10 @@ public class GridMapQueryExecutor { req.isFlagSet(GridH2QueryRequest.FLAG_IS_LOCAL), req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS)); - final GridDhtTxLocalAdapter tx; - - GridH2SelectForUpdateTxDetails txReq = req.txDetails(); - final boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER); final boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN); final boolean replicated = req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED); - final boolean lazy = req.isFlagSet(GridH2QueryRequest.FLAG_LAZY) && txReq == null; + final boolean lazy = (FORCE_LAZY && req.queries().size() == 1) || req.isFlagSet(GridH2QueryRequest.FLAG_LAZY); final List<Integer> cacheIds = req.caches(); @@ -584,6 +578,10 @@ public class GridMapQueryExecutor { final Object[] params = req.parameters(); + final GridDhtTxLocalAdapter tx; + + GridH2SelectForUpdateTxDetails txReq = req.txDetails(); + try { if (txReq != null) { // Prepare to run queries. @@ -738,11 +736,7 @@ public class GridMapQueryExecutor { * @param parts Explicit partitions for current node. * @param pageSize Page size. * @param distributedJoinMode Query distributed join mode. - * @param enforceJoinOrder Enforce join order flag. - * @param replicated Replicated flag. - * @param timeout Query timeout. - * @param params Query params. - * @param lazy Lazy query execution flag. + * @param lazy Streaming flag. * @param mvccSnapshot MVCC snapshot. * @param tx Transaction. * @param txDetails TX details, if it's a {@code FOR UPDATE} request, or {@code null}. @@ -771,24 +765,75 @@ public class GridMapQueryExecutor { @Nullable final GridH2SelectForUpdateTxDetails txDetails, @Nullable final CompoundLockFuture lockFut, @Nullable final AtomicInteger runCntr) { + MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker(); + // In presence of TX, we also must always have matching details. assert tx == null || txDetails != null; - assert !lazy || txDetails == null : "Lazy execution of SELECT FOR UPDATE queries is not supported."; - boolean inTx = (tx != null); - MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker(); + if (lazy && worker == null) { + // Lazy queries must be re-submitted to dedicated workers. + MapQueryLazyWorkerKey key = new MapQueryLazyWorkerKey(node.id(), reqId, segmentId); + worker = new MapQueryLazyWorker(ctx.igniteInstanceName(), key, log, this); + + worker.submit(new Runnable() { + @Override public void run() { + onQueryRequest0( + node, + reqId, + segmentId, + schemaName, + qrys, + cacheIds, + topVer, + partsMap, + parts, + pageSize, + distributedJoinMode, + enforceJoinOrder, + replicated, + timeout, + params, + true, + mvccSnapshot, + tx, + txDetails, + lockFut, + runCntr); + } + }); + + if (lazyWorkerBusyLock.enterBusy()) { + try { + MapQueryLazyWorker oldWorker = lazyWorkers.put(key, worker); + + if (oldWorker != null) + oldWorker.stop(false); - if (lazy && worker == null) - worker = createLazyWorker(node, reqId, segmentId); + IgniteThread thread = new IgniteThread(worker); + + thread.start(); + } + finally { + lazyWorkerBusyLock.leaveBusy(); + } + } + else + log.info("Ignored query request (node is stopping) [nodeId=" + node.id() + ", reqId=" + reqId + ']'); + + return; + } + + if (lazy && txDetails != null) + throw new IgniteSQLException("Lazy execution of SELECT FOR UPDATE queries is not supported."); // Prepare to run queries. GridCacheContext<?, ?> mainCctx = mainCacheContext(cacheIds); MapNodeResults nodeRess = resultsForNode(node.id()); - MapQueryResults qryResults = null; + MapQueryResults qr = null; List<GridReservable> reserved = new ArrayList<>(); @@ -802,7 +847,7 @@ public class GridMapQueryExecutor { if (!F.isEmpty(err)) { // Unregister lazy worker because re-try may never reach this node again. if (lazy) - worker.stop(false); + stopAndUnregisterCurrentLazyWorker(); sendRetry(node, reqId, segmentId, err); @@ -810,7 +855,10 @@ public class GridMapQueryExecutor { } } - qryResults = new MapQueryResults(h2, reqId, qrys.size(), mainCctx, worker, inTx); + qr = new MapQueryResults(h2, reqId, qrys.size(), mainCctx, MapQueryLazyWorker.currentWorker(), inTx); + + if (nodeRess.put(reqId, segmentId, qr) != null) + throw new IllegalStateException(); // Prepare query context. GridH2QueryContext qctx = new GridH2QueryContext(ctx.localNodeId(), @@ -824,207 +872,186 @@ public class GridMapQueryExecutor { .pageSize(pageSize) .topologyVersion(topVer) .reservations(reserved) - .mvccSnapshot(mvccSnapshot); + .mvccSnapshot(mvccSnapshot) + .lazyWorker(worker); + + Connection conn = h2.connectionForSchema(schemaName); + + H2Utils.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder); + + GridH2QueryContext.set(qctx); // qctx is set, we have to release reservations inside of it. reserved = null; - if (worker != null) - worker.queryContext(qctx); + try { + if (nodeRess.cancelled(reqId)) { + GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type()); - GridH2QueryContext.set(qctx); + nodeRess.cancelRequest(reqId); - if (nodeRess.put(reqId, segmentId, qryResults) != null) - throw new IllegalStateException(); + throw new QueryCancelledException(); + } - Connection conn = h2.connectionForSchema(schemaName); + // Run queries. + int qryIdx = 0; - H2Utils.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder, lazy); + boolean evt = mainCctx != null && mainCctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED); - if (nodeRess.cancelled(reqId)) { - GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type()); + for (GridCacheSqlQuery qry : qrys) { + ResultSet rs = null; - nodeRess.cancelRequest(reqId); + boolean removeMapping = false; - throw new QueryCancelledException(); - } + // If we are not the target node for this replicated query, just ignore it. + if (qry.node() == null || (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) { + String sql = qry.query(); Collection<Object> params0 = F.asList(qry.parameters(params)); - // Run queries. - int qryIdx = 0; + PreparedStatement stmt; - boolean evt = mainCctx != null && mainCctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED); + try { + stmt = h2.prepareStatement(conn, sql, true); + } + catch (SQLException e) { + throw new IgniteCheckedException("Failed to parse SQL query: " + sql, e); + } - for (GridCacheSqlQuery qry : qrys) { - ResultSet rs = null; + Prepared p = GridSqlQueryParser.prepared(stmt); - boolean removeMapping = false; + if (GridSqlQueryParser.isForUpdateQuery(p)) { + sql = GridSqlQueryParser.rewriteQueryForUpdateIfNeeded(p, inTx); + stmt = h2.prepareStatement(conn, sql, true); + } - // If we are not the target node for this replicated query, just ignore it. - if (qry.node() == null || (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) { - String sql = qry.query(); Collection<Object> params0 = F.asList(qry.parameters(params)); + h2.bindParameters(stmt, params0); - PreparedStatement stmt; + int opTimeout = IgniteH2Indexing.operationTimeout(timeout, tx); - try { - stmt = h2.prepareStatement(conn, sql, true); - } - catch (SQLException e) { - throw new IgniteCheckedException("Failed to parse SQL query: " + sql, e); - } + rs = h2.executeSqlQueryWithTimer(stmt, conn, sql, params0, opTimeout, qr.queryCancel(qryIdx)); - Prepared p = GridSqlQueryParser.prepared(stmt); + if (inTx) { + ResultSetEnlistFuture enlistFut = ResultSetEnlistFuture.future( + ctx.localNodeId(), + txDetails.version(), + mvccSnapshot, + txDetails.threadId(), + IgniteUuid.randomUuid(), + txDetails.miniId(), + parts, + tx, + opTimeout, + mainCctx, + rs + ); - if (GridSqlQueryParser.isForUpdateQuery(p)) { - sql = GridSqlQueryParser.rewriteQueryForUpdateIfNeeded(p, inTx); - stmt = h2.prepareStatement(conn, sql, true); - } + if (lockFut != null) + lockFut.register(enlistFut); - h2.bindParameters(stmt, params0); + enlistFut.init(); - int opTimeout = IgniteH2Indexing.operationTimeout(timeout, tx); + enlistFut.get(); - rs = h2.executeSqlQueryWithTimer(stmt, conn, sql, params0, opTimeout, qryResults.queryCancel(qryIdx)); + rs.beforeFirst(); + } - if (inTx) { - ResultSetEnlistFuture enlistFut = ResultSetEnlistFuture.future( - ctx.localNodeId(), - txDetails.version(), - mvccSnapshot, - txDetails.threadId(), - IgniteUuid.randomUuid(), - txDetails.miniId(), - parts, - tx, - opTimeout, - mainCctx, - rs - ); - - if (lockFut != null) - lockFut.register(enlistFut); - - enlistFut.init(); - - enlistFut.get(); - - rs.beforeFirst(); - } + if (evt) { + ctx.event().record(new CacheQueryExecutedEvent<>( + node, + "SQL query executed.", + EVT_CACHE_QUERY_EXECUTED, + CacheQueryType.SQL.name(), + mainCctx.name(), + null, + qry.query(), + null, + null, + params, + node.id(), + null)); + } - if (evt) { - ctx.event().record(new CacheQueryExecutedEvent<>( - node, - "SQL query executed.", - EVT_CACHE_QUERY_EXECUTED, - CacheQueryType.SQL.name(), - mainCctx.name(), - null, - qry.query(), - null, - null, - params, - node.id(), - null)); + assert rs instanceof JdbcResultSet : rs.getClass(); } - assert rs instanceof JdbcResultSet : rs.getClass(); - } - - qryResults.addResult(qryIdx, qry, node.id(), rs, params); + qr.addResult(qryIdx, qry, node.id(), rs, params); - if (qryResults.cancelled()) { - qryResults.result(qryIdx).close(); + if (qr.cancelled()) { + qr.result(qryIdx).close(); - throw new QueryCancelledException(); - } + throw new QueryCancelledException(); + } - if (inTx) { - if (tx.dht() && (runCntr == null || runCntr.decrementAndGet() == 0)) { - if (removeMapping = tx.empty() && !tx.queryEnlisted()) - tx.rollbackAsync().get(); + if (inTx) { + if (tx.dht() && (runCntr == null || runCntr.decrementAndGet() == 0)) { + if (removeMapping = tx.empty() && !tx.queryEnlisted()) + tx.rollbackAsync().get(); + } } - } - // Send the first page. - if (lockFut == null) - sendNextPage(nodeRess, node, qryResults, qryIdx, segmentId, pageSize, removeMapping); - else { - GridQueryNextPageResponse msg = prepareNextPage(nodeRess, node, qryResults, qryIdx, segmentId, pageSize, removeMapping); - - if (msg != null) { - lockFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() { - @Override public void apply(IgniteInternalFuture<Void> future) { - try { - if (node.isLocal()) - h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg); - else - ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL); - } - catch (Exception e) { - U.error(log, e); + // Send the first page. + if (lockFut == null) + sendNextPage(nodeRess, node, qr, qryIdx, segmentId, pageSize, removeMapping); + else { + GridQueryNextPageResponse msg = prepareNextPage(nodeRess, node, qr, qryIdx, segmentId, pageSize, removeMapping); + + if (msg != null) { + lockFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() { + @Override public void apply(IgniteInternalFuture<Void> future) { + try { + if (node.isLocal()) + h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg); + else + ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL); + } + catch (Exception e) { + U.error(log, e); + } } - } - }); + }); + } } + + qryIdx++; } - qryIdx++; + // All request results are in the memory in result set already, so it's ok to release partitions. + if (!lazy) + releaseReservations(); } - - // All request results are in the memory in result set already, so it's ok to release partitions. - if (!lazy) + catch (Throwable e){ releaseReservations(); - else if (!qryResults.isAllClosed()) { - if (MapQueryLazyWorker.currentWorker() == null) { - final ObjectPoolReusable<H2ConnectionWrapper> detachedConn = h2.detachConnection(); - worker.start(H2Utils.session(conn), detachedConn); - - GridH2QueryContext.clearThreadLocal(); - } + throw e; } - else - unregisterLazyWorker(worker); } catch (Throwable e) { - if (qryResults != null) { - nodeRess.remove(reqId, segmentId, qryResults); + if (qr != null) { + nodeRess.remove(reqId, segmentId, qr); - qryResults.close(); + qr.cancel(false); } - else - releaseReservations(); - // Stop and unregister worker after possible cancellation. + // Unregister worker after possible cancellation. if (lazy) - worker.stop(false); - - if (e instanceof QueryCancelledException) - sendError(node, reqId, e); - else { - JdbcSQLException sqlEx = X.cause(e, JdbcSQLException.class); + stopAndUnregisterCurrentLazyWorker(); - if (sqlEx != null && sqlEx.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED) - sendError(node, reqId, new QueryCancelledException()); - else { - GridH2RetryException retryErr = X.cause(e, GridH2RetryException.class); + GridH2RetryException retryErr = X.cause(e, GridH2RetryException.class); - if (retryErr != null) { - final String retryCause = String.format( - "Failed to execute non-collocated query (will retry) [localNodeId=%s, rmtNodeId=%s, reqId=%s, " + - "errMsg=%s]", ctx.localNodeId(), node.id(), reqId, retryErr.getMessage() - ); + if (retryErr != null) { + final String retryCause = String.format( + "Failed to execute non-collocated query (will retry) [localNodeId=%s, rmtNodeId=%s, reqId=%s, " + + "errMsg=%s]", ctx.localNodeId(), node.id(), reqId, retryErr.getMessage() + ); - sendRetry(node, reqId, segmentId, retryCause); - } - else { - U.error(log, "Failed to execute local query.", e); + sendRetry(node, reqId, segmentId, retryCause); + } + else { + U.error(log, "Failed to execute local query.", e); - sendError(node, reqId, e); + sendError(node, reqId, e); - if (e instanceof Error) - throw (Error)e; - } - } + if (e instanceof Error) + throw (Error)e; } } finally { @@ -1033,25 +1060,10 @@ public class GridMapQueryExecutor { for (int i = 0; i < reserved.size(); i++) reserved.get(i).release(); } - - if (MapQueryLazyWorker.currentWorker() == null && GridH2QueryContext.get() != null) - GridH2QueryContext.clearThreadLocal(); } } /** - * @param node The node has sent map query request. - * @param reqId Request ID. - * @param segmentId Segment ID. - * @return Lazy worker. - */ - private MapQueryLazyWorker createLazyWorker(ClusterNode node, long reqId, int segmentId) { - MapQueryLazyWorkerKey key = new MapQueryLazyWorkerKey(node.id(), reqId, segmentId); - - return new MapQueryLazyWorker(ctx.igniteInstanceName(), key, log, this); - } - - /** * @param cacheIds Cache ids. * @return Id of the first cache in list, or {@code null} if list is empty. */ @@ -1076,7 +1088,6 @@ public class GridMapQueryExecutor { /** * @param node Node. * @param req DML request. - * @throws IgniteCheckedException On error. */ private void onDmlRequest(final ClusterNode node, final GridH2DmlRequest req) throws IgniteCheckedException { int[] parts = req.queryPartitions(); @@ -1244,34 +1255,24 @@ public class GridMapQueryExecutor { return; } - final MapQueryResults qryResults = nodeRess.get(req.queryRequestId(), req.segmentId()); + final MapQueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId()); - if (qryResults == null) + if (qr == null) sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req)); - else if (qryResults.cancelled()) + else if (qr.cancelled()) sendError(node, req.queryRequestId(), new QueryCancelledException()); else { - MapQueryLazyWorker lazyWorker = qryResults.lazyWorker(); + MapQueryLazyWorker lazyWorker = qr.lazyWorker(); if (lazyWorker != null) { lazyWorker.submit(new Runnable() { @Override public void run() { - try { - sendNextPage(nodeRess, node, qryResults, req.query(), req.segmentId(), req.pageSize(), false); - } - catch (Throwable e) { - JdbcSQLException sqlEx = X.cause(e, JdbcSQLException.class); - - if (sqlEx != null && sqlEx.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED) - sendError(node, qryResults.queryRequestId(), new QueryCancelledException()); - else - throw e; - } + sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize(), false); } }); } else - sendNextPage(nodeRess, node, qryResults, req.query(), req.segmentId(), req.pageSize(), false); + sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize(), false); } } @@ -1286,14 +1287,8 @@ public class GridMapQueryExecutor { * @return Next page. * @throws IgniteCheckedException If failed. */ - private GridQueryNextPageResponse prepareNextPage( - MapNodeResults nodeRess, - ClusterNode node, - MapQueryResults qr, - int qry, - int segmentId, - int pageSize, - boolean removeMapping) throws IgniteCheckedException { + private GridQueryNextPageResponse prepareNextPage(MapNodeResults nodeRess, ClusterNode node, MapQueryResults qr, int qry, int segmentId, + int pageSize, boolean removeMapping) throws IgniteCheckedException { MapQueryResult res = qr.result(qry); assert res != null; @@ -1314,11 +1309,8 @@ public class GridMapQueryExecutor { nodeRess.remove(qr.queryRequestId(), segmentId, qr); // Release reservations if the last page fetched, all requests are closed and this is a lazy worker. - if (qr.lazyWorker() != null) { + if (MapQueryLazyWorker.currentWorker() != null) releaseReservations(); - - qr.lazyWorker().stop(false); - } } } @@ -1350,14 +1342,8 @@ public class GridMapQueryExecutor { * @param removeMapping Remove mapping flag. */ @SuppressWarnings("unchecked") - private void sendNextPage( - MapNodeResults nodeRess, - ClusterNode node, - MapQueryResults qr, - int qry, - int segmentId, - int pageSize, - boolean removeMapping) { + private void sendNextPage(MapNodeResults nodeRess, ClusterNode node, MapQueryResults qr, int qry, int segmentId, + int pageSize, boolean removeMapping) { try { GridQueryNextPageResponse msg = prepareNextPage(nodeRess, node, qr, qry, segmentId, pageSize, removeMapping); @@ -1379,7 +1365,6 @@ public class GridMapQueryExecutor { * @param node Node. * @param reqId Request ID. * @param segmentId Index segment ID. - * @param retryCause Description of the retry cause. */ private void sendRetry(ClusterNode node, long reqId, int segmentId, String retryCause) { try { @@ -1416,11 +1401,25 @@ public class GridMapQueryExecutor { } /** + * Unregister lazy worker if needed (i.e. if we are currently in lazy worker thread). + */ + public void stopAndUnregisterCurrentLazyWorker() { + MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker(); + + if (worker != null) { + worker.stop(false); + + // Just stop is not enough as worker may be registered, but not started due to exception. + unregisterLazyWorker(worker); + } + } + + /** * Unregister lazy worker. * * @param worker Worker. */ - void unregisterLazyWorker(MapQueryLazyWorker worker) { + public void unregisterLazyWorker(MapQueryLazyWorker worker) { lazyWorkers.remove(worker.key(), worker); } @@ -1430,17 +1429,4 @@ public class GridMapQueryExecutor { public int registeredLazyWorkers() { return lazyWorkers.size(); } - - /** - * @param worker Worker to register. - */ - void registerLazyWorker(MapQueryLazyWorker worker) { - MapQueryLazyWorker oldWorker = lazyWorkers.put(worker.key(), worker); - - if (oldWorker != null) { - log.warning("Duplicates lazy worker: [key=" + worker.key() + ']'); - - oldWorker.stop(false); - } - } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index d9c542b..62c5c78 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -287,16 +287,11 @@ public class GridReduceQueryExecutor { */ private void fail(ReduceQueryRun r, UUID nodeId, String msg, byte failCode) { if (r != null) { - CacheException e; + CacheException e = new CacheException("Failed to execute map query on remote node [nodeId=" + nodeId + + ", errMsg=" + msg + ']'); - if (failCode == GridQueryFailResponse.CANCELLED_BY_ORIGINATOR) { - e = new CacheException("Failed to execute map query on remote node [nodeId=" + nodeId + - ", errMsg=" + msg + ']', new QueryCancelledException()); - } - else { - e = new CacheException("Failed to execute map query on remote node [nodeId=" + nodeId + - ", errMsg=" + msg + ']'); - } + if (failCode == GridQueryFailResponse.CANCELLED_BY_ORIGINATOR) + e.addSuppressed(new QueryCancelledException()); r.setStateOnException(nodeId, e); } @@ -1223,9 +1218,6 @@ public class GridReduceQueryExecutor { } } - r.setStateOnException(ctx.localNodeId(), - new CacheException("Query is canceled.", new QueryCancelledException())); - if (!runs.remove(qryReqId, r)) U.warn(log, "Query run was already removed: " + qryReqId); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java index 217cfad..0cb986b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java @@ -17,13 +17,12 @@ package org.apache.ignite.internal.processors.query.h2.twostep; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; -import java.util.List; import java.util.NoSuchElementException; -import java.util.RandomAccess; import java.util.UUID; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; @@ -73,11 +72,9 @@ public class GridResultPage { Collection<?> plainRows = res.plainRows(); if (plainRows != null) { - assert plainRows instanceof RandomAccess : "instance of " + plainRows.getClass(); - rowsInPage = plainRows.size(); - if (rowsInPage == 0 || ((List<Value[]>)plainRows).get(0).length == res.columns()) + if (rowsInPage == 0 || ((ArrayList<Value[]>)plainRows).get(0).length == res.columns()) rows = (Iterator<Value[]>)plainRows.iterator(); else { // If it's a result of SELECT FOR UPDATE (we can tell by difference in number http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java index 8f8553a..48116d3 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java @@ -17,11 +17,12 @@ package org.apache.ignite.internal.processors.query.h2.twostep; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; +import java.util.concurrent.ConcurrentHashMap; + +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q; @@ -85,10 +86,10 @@ class MapNodeResults { public void cancelRequest(long reqId) { for (MapRequestKey key : res.keySet()) { if (key.requestId() == reqId) { - final MapQueryResults removed = res.remove(key); + MapQueryResults removed = res.remove(key); if (removed != null) - removed.cancel(); + removed.cancel(true); } } @@ -143,7 +144,7 @@ class MapNodeResults { */ public void cancelAll() { for (MapQueryResults ress : res.values()) - ress.cancel(); + ress.cancel(true); // Cancel update requests for (GridQueryCancel upd: updCancels.values()) http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java index 1cbab19..98f3df9 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java @@ -20,41 +20,25 @@ package org.apache.ignite.internal.processors.query.h2.twostep; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper; -import org.apache.ignite.internal.processors.query.h2.H2Utils; -import org.apache.ignite.internal.processors.query.h2.ObjectPoolReusable; import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext; -import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; -import org.apache.ignite.thread.IgniteThread; -import org.h2.engine.Session; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF; - /** * Worker for lazy query execution. */ public class MapQueryLazyWorker extends GridWorker { - /** Poll task timeout milliseconds. */ - private static final int POLL_TASK_TIMEOUT_MS = 1000; - /** Lazy thread flag. */ private static final ThreadLocal<MapQueryLazyWorker> LAZY_WORKER = new ThreadLocal<>(); /** Active lazy worker count (for testing purposes). */ private static final LongAdder ACTIVE_CNT = new LongAdder(); - /** Mutex to synchronization worker start/stop. */ - private final Object mux = new Object(); - /** Task to be executed. */ private final BlockingQueue<Runnable> tasks = new LinkedBlockingDeque<>(); @@ -67,14 +51,8 @@ public class MapQueryLazyWorker extends GridWorker { /** Latch decremented when worker finishes. */ private final CountDownLatch stopLatch = new CountDownLatch(1); - /** Query context. */ - private GridH2QueryContext qctx; - - /** Worker is started flag. */ - private boolean started; - - /** Detached connection. */ - private ObjectPoolReusable<H2ConnectionWrapper> detached; + /** Map query result. */ + private volatile MapQueryResult res; /** * Constructor. @@ -92,106 +70,38 @@ public class MapQueryLazyWorker extends GridWorker { this.exec = exec; } - /** - * Start lazy worker for half-processed query. - * In this case we have to detach H2 connection from current thread and use it for current query processing. - * Also tables locks must be transferred to lazy thread from QUERY_POOL thread pool. - * - * @param ses H2 Session. - * @param detached H2 connection detached from current thread. - * @throws QueryCancelledException In case query is canceled during the worker start. - */ - void start(Session ses, ObjectPoolReusable<H2ConnectionWrapper> detached) throws QueryCancelledException { - synchronized (mux) { - if (!exec.busyLock().enterBusy()) { - log.warning("Lazy worker isn't started. Node is stopped [key=" + key + ']'); - - return; - } - - try { - if (started) - return; - - if (isCancelled) { - if (detached != null) - detached.recycle(); - - throw new QueryCancelledException(); - } - - if (ses != null) - lazyTransferStart(ses); - - this.detached = detached; - - exec.registerLazyWorker(this); - - IgniteThread thread = new IgniteThread(this); - - started = true; - - thread.start(); - } - finally { - exec.busyLock().leaveBusy(); - } - } - } - /** {@inheritDoc} */ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { LAZY_WORKER.set(this); ACTIVE_CNT.increment(); - boolean lockBusy = false; - try { - if (qctx != null) - GridH2QueryContext.set(qctx); - - if(detached != null) - lazyTransferFinish(H2Utils.session(detached.object().connection())); - while (!isCancelled()) { - Runnable task = tasks.poll(POLL_TASK_TIMEOUT_MS, TimeUnit.MILLISECONDS); + Runnable task = tasks.take(); if (task != null) { + if (!exec.busyLock().enterBusy()) + return; + try { task.run(); } - catch (Throwable t) { - log.warning("Lazy task error", t); - } - } - else { - try { - lockBusy = false; - - if (!exec.busyLock().enterBusy()) { - log.info("Stop lazy worker [key=" + key + ']'); - - return; - } - - lockBusy = true; - } finally { - if (lockBusy) - exec.busyLock().leaveBusy(); + exec.busyLock().leaveBusy(); } } } } finally { - exec.unregisterLazyWorker(this); + if (res != null) + res.close(); LAZY_WORKER.set(null); ACTIVE_CNT.decrement(); - stopLatch.countDown(); + exec.unregisterLazyWorker(this); } } @@ -201,9 +111,6 @@ public class MapQueryLazyWorker extends GridWorker { * @param task Task to be executed. */ public void submit(Runnable task) { - if (isCancelled) - return; - tasks.add(task); } @@ -218,76 +125,45 @@ public class MapQueryLazyWorker extends GridWorker { * Stop the worker. * @param nodeStop Node is stopping. */ - private void stop0(boolean nodeStop) { - synchronized (mux) { - if (qctx != null && qctx.distributedJoinMode() == OFF && !qctx.isCleared()) - qctx.clearContext(nodeStop); + public void stop(final boolean nodeStop) { + if (MapQueryLazyWorker.currentWorker() == null) + submit(new Runnable() { + @Override public void run() { + stop(nodeStop); + } + }); + else { + GridH2QueryContext qctx = GridH2QueryContext.get(); - if (detached != null) { - detached.recycle(); + if (qctx != null) { + qctx.clearContext(nodeStop); - detached = null; + GridH2QueryContext.clearThreadLocal(); } isCancelled = true; - mux.notifyAll(); + stopLatch.countDown(); } } /** - * @param task Stop task. + * Await worker stop. */ - public void submitStopTask(Runnable task) { - synchronized (mux) { - if (LAZY_WORKER.get() != null) - task.run(); - else - submit(task); + public void awaitStop() { + try { + U.await(stopLatch); } - } - - /** - * Stop the worker. - * @param nodeStop Node is stopping. - */ - public void stop(final boolean nodeStop) { - synchronized (mux) { - if (isCancelled) - return; - - if (started && currentWorker() == null) { - submit(new Runnable() { - @Override public void run() { - stop0(nodeStop); - } - }); - - awaitStop(); - } - else if (currentWorker() != null) - stop0(nodeStop); + catch (IgniteInterruptedCheckedException e) { + throw new IgniteException("Failed to wait for lazy worker stop (interrupted): " + name(), e); } } /** - * Await worker stop. + * @param res Map query result. */ - private void awaitStop() { - synchronized (mux) { - try { - if (!isCancelled) - mux.wait(); - - U.await(stopLatch); - } - catch (IgniteInterruptedCheckedException e) { - throw new IgniteException("Failed to wait for lazy worker stop (interrupted): " + name(), e); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } + public void result(MapQueryResult res) { + this.res = res; } /** @@ -305,13 +181,6 @@ public class MapQueryLazyWorker extends GridWorker { } /** - * @param qctx Query context. - */ - public void queryContext(GridH2QueryContext qctx) { - this.qctx = qctx; - } - - /** * Construct worker name. * * @param instanceName Instance name. @@ -322,32 +191,4 @@ public class MapQueryLazyWorker extends GridWorker { return "query-lazy-worker_" + instanceName + "_" + key.nodeId() + "_" + key.queryRequestId() + "_" + key.segment(); } - - /** - * Start session transfer to lazy thread. - * - * @param ses Session. - */ - private static void lazyTransferStart(Session ses) { - GridH2QueryContext qctx = GridH2QueryContext.get(); - - assert qctx != null; - - for(GridH2Table tbl : qctx.lockedTables()) - tbl.onLazyTransferStarted(ses); - } - - /** - * Finish session transfer to lazy thread. - * - * @param ses Session. - */ - private static void lazyTransferFinish(Session ses) { - GridH2QueryContext qctx = GridH2QueryContext.get(); - - assert qctx != null; - - for(GridH2Table tbl : qctx.lockedTables()) - tbl.onLazyTransferFinished(ses); - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java index 5a0c410..fb928c4 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java @@ -22,7 +22,6 @@ import java.sql.ResultSet; import java.util.ArrayList; import java.util.List; import java.util.UUID; -import org.apache.ignite.IgniteLogger; import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -61,9 +60,6 @@ class MapQueryResult { } } - /** Logger. */ - private final IgniteLogger log; - /** Indexing. */ private final IgniteH2Indexing h2; @@ -100,23 +96,26 @@ class MapQueryResult { /** */ private final Object[] params; + /** Lazy worker. */ + private final MapQueryLazyWorker lazyWorker; + /** - * @param h2 H2 indexing. * @param rs Result set. * @param cctx Cache context. * @param qrySrcNodeId Query source node. * @param qry Query. * @param params Query params. + * @param lazyWorker Lazy worker. */ MapQueryResult(IgniteH2Indexing h2, ResultSet rs, @Nullable GridCacheContext cctx, - UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params) { - this.log = h2.kernalContext().log(MapQueryResult.class); + UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params, @Nullable MapQueryLazyWorker lazyWorker) { this.h2 = h2; this.cctx = cctx; this.qry = qry; this.params = params; this.qrySrcNodeId = qrySrcNodeId; this.cpNeeded = F.eq(h2.kernalContext().localNodeId(), qrySrcNodeId); + this.lazyWorker = lazyWorker; if (rs != null) { this.rs = rs; @@ -175,6 +174,8 @@ class MapQueryResult { * @return {@code true} If there are no more rows available. */ synchronized boolean fetchNextPage(List<Value[]> rows, int pageSize) { + assert lazyWorker == null || lazyWorker == MapQueryLazyWorker.currentWorker(); + if (closed) return true; @@ -258,13 +259,30 @@ class MapQueryResult { * Close the result. */ public void close() { + if (lazyWorker != null && MapQueryLazyWorker.currentWorker() == null) { + lazyWorker.submit(new Runnable() { + @Override public void run() { + close(); + } + }); + + lazyWorker.awaitStop(); + + return; + } + synchronized (this) { + assert lazyWorker == null || lazyWorker == MapQueryLazyWorker.currentWorker(); + if (closed) return; closed = true; - U.close(rs, log); + U.closeQuiet(rs); + + if (lazyWorker != null) + lazyWorker.stop(false); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java index b13137c..76527bc 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java @@ -30,7 +30,7 @@ import org.jetbrains.annotations.Nullable; * Mapper query results. */ class MapQueryResults { - /** H2 indexing. */ + /** H@ indexing. */ private final IgniteH2Indexing h2; /** */ @@ -113,7 +113,10 @@ class MapQueryResults { * @param params Query arguments. */ void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params) { - MapQueryResult res = new MapQueryResult(h2, rs, cctx, qrySrcNodeId, q, params); + MapQueryResult res = new MapQueryResult(h2, rs, cctx, qrySrcNodeId, q, params, lazyWorker); + + if (lazyWorker != null) + lazyWorker.result(res); if (!results.compareAndSet(qry, null, res)) throw new IllegalStateException(); @@ -136,37 +139,28 @@ class MapQueryResults { /** * Cancels the query. */ - void cancel() { + void cancel(boolean forceQryCancel) { if (cancelled) return; cancelled = true; for (int i = 0; i < results.length(); i++) { - GridQueryCancel cancel = cancels[i]; + MapQueryResult res = results.get(i); - if (cancel != null) - cancel.cancel(); - } + if (res != null) { + res.close(); - if (lazyWorker == null) - close(); - else { - lazyWorker.submitStopTask(this::close); + continue; + } - lazyWorker.stop(false); - } - } + // NB: Cancel is already safe even for lazy queries (see implementation of passed Runnable). + if (forceQryCancel) { + GridQueryCancel cancel = cancels[i]; - /** - * - */ - public void close() { - for (int i = 0; i < results.length(); i++) { - MapQueryResult res = results.get(i); - - if (res != null) - res.close(); + if (cancel != null) + cancel.cancel(); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java index a991530..a112969 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java @@ -96,6 +96,7 @@ public class GridCacheLazyQueryPartitionsReleaseTest extends GridCommonAbstractT int partsFilled = fillAllPartitions(cache, aff); SqlFieldsQuery qry = new SqlFieldsQuery("select name, age from person") + .setLazy(true) .setPageSize(1); FieldsQueryCursor<List<?>> qryCursor = cache.query(qry); @@ -142,6 +143,7 @@ public class GridCacheLazyQueryPartitionsReleaseTest extends GridCommonAbstractT int partsFilled = fillAllPartitions(cache, aff); SqlFieldsQuery qry = new SqlFieldsQuery("select name, age from person") + .setLazy(true) .setPageSize(1); FieldsQueryCursor<List<?>> qryCursor = cache.query(qry); http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java index 24e2fb2..59be138 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java @@ -121,15 +121,12 @@ public class IgniteCacheQueryH2IndexingLeakTest extends GridCommonAbstractTest { private static int getStatementCacheSize(GridQueryProcessor qryProcessor) { IgniteH2Indexing h2Idx = GridTestUtils.getFieldValue(qryProcessor, GridQueryProcessor.class, "idx"); - ConcurrentMap<Thread, ConcurrentMap<H2ConnectionWrapper, Boolean>> conns = - GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "conns"); + ConcurrentMap<Thread, H2ConnectionWrapper> conns = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "conns"); int cntr = 0; - for (ConcurrentMap<H2ConnectionWrapper, Boolean> connPerThread: conns.values()) { - for (H2ConnectionWrapper w : connPerThread.keySet()) - cntr += w.statementCacheSize(); - } + for (H2ConnectionWrapper w : conns.values()) + cntr += w.statementCacheSize(); return cntr; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java index 67a9501..56fd7b8 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java @@ -100,83 +100,84 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr /** */ public void testRemoteQueryExecutionTimeout() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, true, true); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, true); } /** */ public void testRemoteQueryWithMergeTableTimeout() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 500, TimeUnit.MILLISECONDS, true, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 500, TimeUnit.MILLISECONDS, true); } /** */ public void testRemoteQueryExecutionCancel0() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.MILLISECONDS, false, true); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.MILLISECONDS, false); } /** */ public void testRemoteQueryExecutionCancel1() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, false, true); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, false); } /** */ public void testRemoteQueryExecutionCancel2() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.SECONDS, false, true); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.SECONDS, false); } /** */ public void testRemoteQueryExecutionCancel3() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 3, TimeUnit.SECONDS, false, true); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 3, TimeUnit.SECONDS, false); } /** */ public void testRemoteQueryWithMergeTableCancel0() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1, TimeUnit.MILLISECONDS, false, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1, TimeUnit.MILLISECONDS, false); } /** */ public void testRemoteQueryWithMergeTableCancel1() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 500, TimeUnit.MILLISECONDS, false, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 500, TimeUnit.MILLISECONDS, false); } /** */ public void testRemoteQueryWithMergeTableCancel2() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1_500, TimeUnit.MILLISECONDS, false, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1_500, TimeUnit.MILLISECONDS, false); } /** */ public void testRemoteQueryWithMergeTableCancel3() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 3, TimeUnit.SECONDS, false, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 3, TimeUnit.SECONDS, false); } /** */ public void testRemoteQueryWithoutMergeTableCancel0() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 1, TimeUnit.MILLISECONDS, false, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 1, TimeUnit.MILLISECONDS, false); } /** */ public void testRemoteQueryWithoutMergeTableCancel1() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 500, TimeUnit.MILLISECONDS, false, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 500, TimeUnit.MILLISECONDS, false); } /** */ public void testRemoteQueryWithoutMergeTableCancel2() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 1_000, TimeUnit.MILLISECONDS, false, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 1_000, TimeUnit.MILLISECONDS, false); } /** */ public void testRemoteQueryWithoutMergeTableCancel3() throws Exception { - testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false); } /** */ public void testRemoteQueryAlreadyFinishedStop() throws Exception { - testQueryCancel(100, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false, false); + testQueryCancel(100, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false); } /** */ private void testQueryCancel(int keyCnt, int valSize, String sql, int timeoutUnits, TimeUnit timeUnit, - boolean timeout, boolean checkCanceled) throws Exception { + boolean timeout) throws Exception { try (Ignite client = startGrid("client")) { + IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME); assertEquals(0, cache.localSize()); @@ -203,8 +204,7 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr qry.setTimeout(timeoutUnits, timeUnit); cursor = cache.query(qry); - } - else { + } else { cursor = cache.query(qry); client.scheduler().runLocal(new Runnable() { @@ -214,11 +214,8 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr }, timeoutUnits, timeUnit); } - try (QueryCursor<List<?>> ignored = cursor) { - cursor.getAll(); - - if (checkCanceled) - fail("Query not canceled"); + try(QueryCursor<List<?>> ignored = cursor) { + cursor.iterator(); } catch (CacheException ex) { log().error("Got expected exception", ex); http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java index 48b43a7..7e23c88 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java @@ -67,11 +67,6 @@ public class IgniteCacheQueryAbstractDistributedJoinSelfTest extends GridCommonA "where pr.companyId = co._key\n" + "order by co._key, pr._key "; - protected static final String QRY_LONG = "select pe.id, co.id, pr._key\n" + - "from \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co, \"pu\".Purchase pu\n" + - "where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \n" + - "order by pe.id desc"; - /** */ protected static final int GRID_CNT = 2; http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java index 3beebff..bad5303 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java @@ -33,7 +33,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; -import org.apache.ignite.testframework.GridTestUtils; /** * Test for distributed queries with node restarts. @@ -102,11 +101,11 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa assertEquals(broadcastQry, plan.contains("batched:broadcast")); - final List<List<?>> goldenRes = grid(0).cache("pu").query(qry0).getAll(); + final List<List<?>> pRes = grid(0).cache("pu").query(qry0).getAll(); Thread.sleep(3000); - assertEquals(goldenRes, grid(0).cache("pu").query(qry0).getAll()); + assertEquals(pRes, grid(0).cache("pu").query(qry0).getAll()); final SqlFieldsQuery qry1; @@ -123,7 +122,7 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa final List<List<?>> rRes = grid(0).cache("co").query(qry1).getAll(); - assertFalse(goldenRes.isEmpty()); + assertFalse(pRes.isEmpty()); assertFalse(rRes.isEmpty()); final AtomicInteger qryCnt = new AtomicInteger(); @@ -162,12 +161,9 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa qry.setPageSize(smallPageSize ? 30 : 1000); try { - assertEquals(goldenRes, cache.query(qry).getAll()); + assertEquals(pRes, cache.query(qry).getAll()); } catch (CacheException e) { - if (!smallPageSize) - log.error("Unexpected exception at the test", e); - assertTrue("On large page size must retry.", smallPageSize); boolean failedOnRemoteFetch = false; @@ -267,7 +263,7 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa } }, restartThreadsNum, "restart-thread"); - GridTestUtils.waitForCondition(() -> fail.get(), duration); + Thread.sleep(duration); info("Stopping..."); http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java index 9f8a2fc..03a8d49 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java @@ -40,47 +40,47 @@ import org.apache.ignite.internal.util.typedef.internal.U; public class IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest extends IgniteCacheQueryAbstractDistributedJoinSelfTest { /** */ public void testCancel1() throws Exception { - testQueryCancel(grid(0), "pe", QRY_LONG, 1, TimeUnit.MILLISECONDS, false, true); + testQueryCancel(grid(0), "pe", QRY_0, 1, TimeUnit.MILLISECONDS, false); } /** */ public void testCancel2() throws Exception { - testQueryCancel(grid(0), "pe", QRY_LONG, 50, TimeUnit.MILLISECONDS, false, true); + testQueryCancel(grid(0), "pe", QRY_0, 50, TimeUnit.MILLISECONDS, false); } /** */ public void testCancel3() throws Exception { - testQueryCancel(grid(0), "pe", QRY_LONG, 100, TimeUnit.MILLISECONDS, false, false); + testQueryCancel(grid(0), "pe", QRY_0, 100, TimeUnit.MILLISECONDS, false); } /** */ public void testCancel4() throws Exception { - testQueryCancel(grid(0), "pe", QRY_LONG, 500, TimeUnit.MILLISECONDS, false, false); + testQueryCancel(grid(0), "pe", QRY_0, 500, TimeUnit.MILLISECONDS, false); } /** */ public void testTimeout1() throws Exception { - testQueryCancel(grid(0), "pe", QRY_LONG, 1, TimeUnit.MILLISECONDS, true, true); + testQueryCancel(grid(0), "pe", QRY_0, 1, TimeUnit.MILLISECONDS, true); } /** */ public void testTimeout2() throws Exception { - testQueryCancel(grid(0), "pe", QRY_LONG, 50, TimeUnit.MILLISECONDS, true, true); + testQueryCancel(grid(0), "pe", QRY_0, 50, TimeUnit.MILLISECONDS, true); } /** */ public void testTimeout3() throws Exception { - testQueryCancel(grid(0), "pe", QRY_LONG, 100, TimeUnit.MILLISECONDS, true, false); + testQueryCancel(grid(0), "pe", QRY_0, 100, TimeUnit.MILLISECONDS, true); } /** */ public void testTimeout4() throws Exception { - testQueryCancel(grid(0), "pe", QRY_LONG, 500, TimeUnit.MILLISECONDS, true, false); + testQueryCancel(grid(0), "pe", QRY_0, 500, TimeUnit.MILLISECONDS, true); } /** */ private void testQueryCancel(Ignite ignite, String cacheName, String sql, int timeoutUnits, TimeUnit timeUnit, - boolean timeout, boolean checkCanceled) throws Exception { + boolean timeout) throws Exception { SqlFieldsQuery qry = new SqlFieldsQuery(sql).setDistributedJoins(true); IgniteCache<Object, Object> cache = ignite.cache(cacheName); @@ -101,10 +101,7 @@ public class IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest extend } try (QueryCursor<List<?>> ignored = cursor) { - cursor.getAll(); - - if (checkCanceled) - fail("Query not canceled"); + cursor.iterator(); } catch (CacheException ex) { log().error("Got expected exception", ex); http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java index 4d02b2e..072f1ab 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java @@ -627,8 +627,6 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo * @throws Exception If failed. */ public void testQueryConsistencyMultithreaded() throws Exception { - final int KEY_COUNT = 5000; - // Start complex topology. ignitionStart(serverConfiguration(1)); ignitionStart(serverConfiguration(2)); @@ -640,7 +638,7 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo run(cli, createSql); - put(cli, 0, KEY_COUNT); + put(cli, 0, 5000); final AtomicBoolean stopped = new AtomicBoolean(); @@ -698,7 +696,7 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo List<Cache.Entry<BinaryObject, BinaryObject>> res = cache.query( new SqlQuery<BinaryObject, BinaryObject>(valTypeName, "from " + TBL_NAME)).getAll(); - assertEquals(KEY_COUNT, res.size()); + assertEquals(5000, res.size()); } return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/0ccde7c4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java index fe45ed6..7713004 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java @@ -160,7 +160,7 @@ public class H2ConnectionLeaksSelfTest extends GridCommonAbstractTest { Map<Thread, ?> conns = perThreadConnections(i); for(Thread t : conns.keySet()) - log.error("Connection is not closed for thread: " + t.getName()); + log.error("+++ Connection is not closed for thread: " + t.getName()); } fail("H2 JDBC connections leak detected. See the log above.");
