Repository: ignite Updated Branches: refs/heads/ignite-1232-1 81c586583 -> 53e1a79d9
ignite-1232 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/53e1a79d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/53e1a79d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/53e1a79d Branch: refs/heads/ignite-1232-1 Commit: 53e1a79d9ff2d49f43c00397a29b337cc2d821aa Parents: 81c5865 Author: sboikov <[email protected]> Authored: Wed Jul 20 10:11:04 2016 +0300 Committer: sboikov <[email protected]> Committed: Wed Jul 20 10:11:04 2016 +0300 ---------------------------------------------------------------------- .../query/h2/twostep/GridMapQueryExecutor.java | 167 +++++++++---------- 1 file changed, 77 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/53e1a79d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index d7b6cb0..7d85ddc 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -46,7 +46,6 @@ import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsReservation; @@ -71,13 +70,11 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.Message; import org.h2.jdbc.JdbcResultSet; import org.h2.result.ResultInterface; import org.h2.value.Value; -import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; @@ -121,7 +118,7 @@ public class GridMapQueryExecutor { private IgniteH2Indexing h2; /** */ - private ConcurrentMap<UUID, ConcurrentMap<Long, QueryResults>> qryRess = new ConcurrentHashMap8<>(); + private ConcurrentMap<UUID, NodeResults> qryRess = new ConcurrentHashMap8<>(); /** */ private final GridSpinBusyLock busyLock; @@ -130,10 +127,6 @@ public class GridMapQueryExecutor { private final ConcurrentMap<T2<String, AffinityTopologyVersion>, GridReservable> reservations = new ConcurrentHashMap8<>(); - /** */ - private final GridBoundedConcurrentLinkedHashMap<QueryKey, Boolean> qryHist = - new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q); - /** * @param busyLock Busy lock. */ @@ -160,12 +153,12 @@ public class GridMapQueryExecutor { GridH2QueryContext.clearAfterDeadNode(locNodeId, nodeId); - ConcurrentMap<Long,QueryResults> nodeRess = qryRess.remove(nodeId); + NodeResults nodeRess = qryRess.remove(nodeId); if (nodeRess == null) return; - for (QueryResults ress : nodeRess.values()) + for (QueryResults ress : nodeRess.results().values()) ress.cancel(); } }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT); @@ -229,16 +222,14 @@ public class GridMapQueryExecutor { private void onCancel(ClusterNode node, GridQueryCancelRequest msg) { long qryReqId = msg.queryRequestId(); - Boolean old = qryHist.putIfAbsent(new QueryKey(node.id(), qryReqId), Boolean.FALSE); + NodeResults nodeRess = resultsForNode(node.id()); - if (old == null || !old) + if (!nodeRess.onCancel(qryReqId)) return; - ConcurrentMap<Long, QueryResults> nodeRess = resultsForNode(node.id()); - GridH2QueryContext.clear(ctx.localNodeId(), node.id(), qryReqId, MAP); - QueryResults results = nodeRess.remove(qryReqId); + QueryResults results = nodeRess.results().remove(qryReqId); if (results == null) return; @@ -250,13 +241,13 @@ public class GridMapQueryExecutor { * @param nodeId Node ID. * @return Results for node. */ - private ConcurrentMap<Long, QueryResults> resultsForNode(UUID nodeId) { - ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(nodeId); + private NodeResults resultsForNode(UUID nodeId) { + NodeResults nodeRess = qryRess.get(nodeId); if (nodeRess == null) { - nodeRess = new ConcurrentHashMap8<>(); + nodeRess = new NodeResults(); - ConcurrentMap<Long, QueryResults> old = qryRess.putIfAbsent(nodeId, nodeRess); + NodeResults old = qryRess.putIfAbsent(nodeId, nodeRess); if (old != null) nodeRess = old; @@ -484,7 +475,13 @@ public class GridMapQueryExecutor { int pageSize, boolean distributedJoins ) { - ConcurrentMap<Long, QueryResults> nodeRess = resultsForNode(node.id()); + // Prepare to run queries. + GridCacheContext<?, ?> mainCctx = ctx.cache().context().cacheContext(cacheIds.get(0)); + + if (mainCctx == null) + throw new CacheException("Failed to find cache."); + + NodeResults nodeRess = resultsForNode(node.id()); QueryResults qr = null; @@ -500,15 +497,9 @@ public class GridMapQueryExecutor { } } - // Prepare to run queries. - GridCacheContext<?, ?> mainCctx = ctx.cache().context().cacheContext(cacheIds.get(0)); - - if (mainCctx == null) - throw new CacheException("Failed to find cache."); - qr = new QueryResults(reqId, qrys.size(), mainCctx); - if (nodeRess.put(reqId, qr) != null) + if (nodeRess.results().put(reqId, qr) != null) throw new IllegalStateException(); // Prepare query context. @@ -550,14 +541,10 @@ public class GridMapQueryExecutor { reserved = null; try { - Boolean old = qryHist.putIfAbsent(new QueryKey(node.id(), reqId), Boolean.TRUE); - - if (old != null) { - assert !old; - - GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, MAP); + if (nodeRess.cancelled(reqId)) { + GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type()); - nodeRess.remove(reqId); + nodeRess.results().remove(reqId); return; } @@ -612,7 +599,7 @@ public class GridMapQueryExecutor { } catch (Throwable e) { if (qr != null) { - nodeRess.remove(reqId, qr); + nodeRess.results().remove(reqId, qr); qr.cancel(); } @@ -666,9 +653,9 @@ public class GridMapQueryExecutor { * @param req Request. */ private void onNextPageRequest(ClusterNode node, GridQueryNextPageRequest req) { - ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(node.id()); + NodeResults nodeRess = qryRess.get(node.id()); - QueryResults qr = nodeRess == null ? null : nodeRess.get(req.queryRequestId()); + QueryResults qr = nodeRess == null ? null : nodeRess.results().get(req.queryRequestId()); if (qr == null || qr.canceled) sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req)); @@ -677,12 +664,13 @@ public class GridMapQueryExecutor { } /** + * @param nodeRess Results. * @param node Node. * @param qr Query results. * @param qry Query. * @param pageSize Page size. */ - private void sendNextPage(ConcurrentMap<Long, QueryResults> nodeRess, ClusterNode node, QueryResults qr, int qry, + private void sendNextPage(NodeResults nodeRess, ClusterNode node, QueryResults qr, int qry, int pageSize) { QueryResult res = qr.result(qry); @@ -698,14 +686,14 @@ public class GridMapQueryExecutor { res.close(); if (qr.isAllClosed()) - nodeRess.remove(qr.qryReqId, qr); + nodeRess.results().remove(qr.qryReqId, qr); } try { boolean loc = node.isLocal(); GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.qryReqId, qry, page, - page == 0 ? res.rowCount : -1 , + page == 0 ? res.rowCnt : -1 , res.cols, loc ? null : toMessages(rows, new ArrayList<Message>(res.cols)), loc ? rows : null); @@ -758,6 +746,52 @@ public class GridMapQueryExecutor { } } + + /** + * + */ + private static class NodeResults { + /** */ + private final ConcurrentMap<Long, QueryResults> res = new ConcurrentHashMap8<>(); + + /** */ + private final GridBoundedConcurrentLinkedHashMap<Long, Boolean> qryHist = + new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q); + + /** + * @return All results. + */ + ConcurrentMap<Long, QueryResults> results() { + return res; + } + + /** + * @param qryId Query ID. + * @return {@code False} if query was already cancelled. + */ + boolean cancelled(long qryId) { + Boolean old = qryHist.putIfAbsent(qryId, Boolean.TRUE); + + if (old != null) { + assert !old; + + return true; + } + + return false; + } + + /** + * @param qryId Query ID. + * @return {@code True} if cancelled started query. + */ + boolean onCancel(long qryId) { + Boolean old = qryHist.putIfAbsent(qryId, Boolean.FALSE); + + return old != null && old; + } + } + /** * */ @@ -769,7 +803,7 @@ public class GridMapQueryExecutor { private final AtomicReferenceArray<QueryResult> results; /** */ - private final GridCacheContext<?,?> cctx; + private final GridCacheContext<?, ?> cctx; /** */ private volatile boolean canceled; @@ -863,7 +897,7 @@ public class GridMapQueryExecutor { private int page; /** */ - private final int rowCount; + private final int rowCnt; /** */ private volatile boolean closed; @@ -887,7 +921,7 @@ public class GridMapQueryExecutor { throw new IllegalStateException(e); // Must not happen. } - rowCount = res.getRowCount(); + rowCnt = res.getRowCount(); cols = res.getVisibleColumnCount(); } @@ -979,51 +1013,4 @@ public class GridMapQueryExecutor { throw new IllegalStateException(); } } - - /** - * - */ - private static class QueryKey { - /** */ - private final UUID nodeId; - - /** */ - private final long qryId; - - /** - * @param nodeId Node ID. - * @param qryId Query ID. - */ - public QueryKey(UUID nodeId, long qryId) { - this.nodeId = nodeId; - this.qryId = qryId; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - QueryKey key = (QueryKey)o; - - return qryId == key.qryId && nodeId.equals(key.nodeId); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = nodeId.hashCode(); - - res = 31 * res + (int) (qryId ^ (qryId >>> 32)); - - return res; - } - - /** {@inheritDoc} */ - public String toString() { - return S.toString(QueryKey.class, this); - } - } } \ No newline at end of file
