Repository: ignite
Updated Branches:
  refs/heads/ignite-1232-1 81c586583 -> 53e1a79d9


ignite-1232


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/53e1a79d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/53e1a79d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/53e1a79d

Branch: refs/heads/ignite-1232-1
Commit: 53e1a79d9ff2d49f43c00397a29b337cc2d821aa
Parents: 81c5865
Author: sboikov <[email protected]>
Authored: Wed Jul 20 10:11:04 2016 +0300
Committer: sboikov <[email protected]>
Committed: Wed Jul 20 10:11:04 2016 +0300

----------------------------------------------------------------------
 .../query/h2/twostep/GridMapQueryExecutor.java  | 167 +++++++++----------
 1 file changed, 77 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/53e1a79d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index d7b6cb0..7d85ddc 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -46,7 +46,6 @@ import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsReservation;
@@ -71,13 +70,11 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.h2.jdbc.JdbcResultSet;
 import org.h2.result.ResultInterface;
 import org.h2.value.Value;
-import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
@@ -121,7 +118,7 @@ public class GridMapQueryExecutor {
     private IgniteH2Indexing h2;
 
     /** */
-    private ConcurrentMap<UUID, ConcurrentMap<Long, QueryResults>> qryRess = 
new ConcurrentHashMap8<>();
+    private ConcurrentMap<UUID, NodeResults> qryRess = new 
ConcurrentHashMap8<>();
 
     /** */
     private final GridSpinBusyLock busyLock;
@@ -130,10 +127,6 @@ public class GridMapQueryExecutor {
     private final ConcurrentMap<T2<String, AffinityTopologyVersion>, 
GridReservable> reservations =
         new ConcurrentHashMap8<>();
 
-    /** */
-    private final GridBoundedConcurrentLinkedHashMap<QueryKey, Boolean> 
qryHist =
-        new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, 
PER_SEGMENT_Q);
-
     /**
      * @param busyLock Busy lock.
      */
@@ -160,12 +153,12 @@ public class GridMapQueryExecutor {
 
                 GridH2QueryContext.clearAfterDeadNode(locNodeId, nodeId);
 
-                ConcurrentMap<Long,QueryResults> nodeRess = 
qryRess.remove(nodeId);
+                NodeResults nodeRess = qryRess.remove(nodeId);
 
                 if (nodeRess == null)
                     return;
 
-                for (QueryResults ress : nodeRess.values())
+                for (QueryResults ress : nodeRess.results().values())
                     ress.cancel();
             }
         }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
@@ -229,16 +222,14 @@ public class GridMapQueryExecutor {
     private void onCancel(ClusterNode node, GridQueryCancelRequest msg) {
         long qryReqId = msg.queryRequestId();
 
-        Boolean old = qryHist.putIfAbsent(new QueryKey(node.id(), qryReqId), 
Boolean.FALSE);
+        NodeResults nodeRess = resultsForNode(node.id());
 
-        if (old == null || !old)
+        if (!nodeRess.onCancel(qryReqId))
             return;
 
-        ConcurrentMap<Long, QueryResults> nodeRess = resultsForNode(node.id());
-
         GridH2QueryContext.clear(ctx.localNodeId(), node.id(), qryReqId, MAP);
 
-        QueryResults results = nodeRess.remove(qryReqId);
+        QueryResults results = nodeRess.results().remove(qryReqId);
 
         if (results == null)
             return;
@@ -250,13 +241,13 @@ public class GridMapQueryExecutor {
      * @param nodeId Node ID.
      * @return Results for node.
      */
-    private ConcurrentMap<Long, QueryResults> resultsForNode(UUID nodeId) {
-        ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(nodeId);
+    private NodeResults resultsForNode(UUID nodeId) {
+        NodeResults nodeRess = qryRess.get(nodeId);
 
         if (nodeRess == null) {
-            nodeRess = new ConcurrentHashMap8<>();
+            nodeRess = new NodeResults();
 
-            ConcurrentMap<Long, QueryResults> old = 
qryRess.putIfAbsent(nodeId, nodeRess);
+            NodeResults old = qryRess.putIfAbsent(nodeId, nodeRess);
 
             if (old != null)
                 nodeRess = old;
@@ -484,7 +475,13 @@ public class GridMapQueryExecutor {
         int pageSize,
         boolean distributedJoins
     ) {
-        ConcurrentMap<Long, QueryResults> nodeRess = resultsForNode(node.id());
+        // Prepare to run queries.
+        GridCacheContext<?, ?> mainCctx = 
ctx.cache().context().cacheContext(cacheIds.get(0));
+
+        if (mainCctx == null)
+            throw new CacheException("Failed to find cache.");
+
+        NodeResults nodeRess = resultsForNode(node.id());
 
         QueryResults qr = null;
 
@@ -500,15 +497,9 @@ public class GridMapQueryExecutor {
                 }
             }
 
-            // Prepare to run queries.
-            GridCacheContext<?, ?> mainCctx = 
ctx.cache().context().cacheContext(cacheIds.get(0));
-
-            if (mainCctx == null)
-                throw new CacheException("Failed to find cache.");
-
             qr = new QueryResults(reqId, qrys.size(), mainCctx);
 
-            if (nodeRess.put(reqId, qr) != null)
+            if (nodeRess.results().put(reqId, qr) != null)
                 throw new IllegalStateException();
 
             // Prepare query context.
@@ -550,14 +541,10 @@ public class GridMapQueryExecutor {
             reserved = null;
 
             try {
-                Boolean old = qryHist.putIfAbsent(new QueryKey(node.id(), 
reqId), Boolean.TRUE);
-
-                if (old != null) {
-                    assert !old;
-
-                    GridH2QueryContext.clear(ctx.localNodeId(), node.id(), 
reqId, MAP);
+                if (nodeRess.cancelled(reqId)) {
+                    GridH2QueryContext.clear(ctx.localNodeId(), node.id(), 
reqId, qctx.type());
 
-                    nodeRess.remove(reqId);
+                    nodeRess.results().remove(reqId);
 
                     return;
                 }
@@ -612,7 +599,7 @@ public class GridMapQueryExecutor {
         }
         catch (Throwable e) {
             if (qr != null) {
-                nodeRess.remove(reqId, qr);
+                nodeRess.results().remove(reqId, qr);
 
                 qr.cancel();
             }
@@ -666,9 +653,9 @@ public class GridMapQueryExecutor {
      * @param req Request.
      */
     private void onNextPageRequest(ClusterNode node, GridQueryNextPageRequest 
req) {
-        ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(node.id());
+        NodeResults nodeRess = qryRess.get(node.id());
 
-        QueryResults qr = nodeRess == null ? null : 
nodeRess.get(req.queryRequestId());
+        QueryResults qr = nodeRess == null ? null : 
nodeRess.results().get(req.queryRequestId());
 
         if (qr == null || qr.canceled)
             sendError(node, req.queryRequestId(), new CacheException("No query 
result found for request: " + req));
@@ -677,12 +664,13 @@ public class GridMapQueryExecutor {
     }
 
     /**
+     * @param nodeRess Results.
      * @param node Node.
      * @param qr Query results.
      * @param qry Query.
      * @param pageSize Page size.
      */
-    private void sendNextPage(ConcurrentMap<Long, QueryResults> nodeRess, 
ClusterNode node, QueryResults qr, int qry,
+    private void sendNextPage(NodeResults nodeRess, ClusterNode node, 
QueryResults qr, int qry,
         int pageSize) {
         QueryResult res = qr.result(qry);
 
@@ -698,14 +686,14 @@ public class GridMapQueryExecutor {
             res.close();
 
             if (qr.isAllClosed())
-                nodeRess.remove(qr.qryReqId, qr);
+                nodeRess.results().remove(qr.qryReqId, qr);
         }
 
         try {
             boolean loc = node.isLocal();
 
             GridQueryNextPageResponse msg = new 
GridQueryNextPageResponse(qr.qryReqId, qry, page,
-                page == 0 ? res.rowCount : -1 ,
+                page == 0 ? res.rowCnt : -1 ,
                 res.cols,
                 loc ? null : toMessages(rows, new 
ArrayList<Message>(res.cols)),
                 loc ? rows : null);
@@ -758,6 +746,52 @@ public class GridMapQueryExecutor {
         }
     }
 
+
+    /**
+     *
+     */
+    private static class NodeResults {
+        /** */
+        private final ConcurrentMap<Long, QueryResults> res = new 
ConcurrentHashMap8<>();
+
+        /** */
+        private final GridBoundedConcurrentLinkedHashMap<Long, Boolean> 
qryHist =
+            new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, 
PER_SEGMENT_Q);
+
+        /**
+         * @return All results.
+         */
+        ConcurrentMap<Long, QueryResults> results() {
+            return res;
+        }
+
+        /**
+         * @param qryId Query ID.
+         * @return {@code False} if query was already cancelled.
+         */
+        boolean cancelled(long qryId) {
+            Boolean old = qryHist.putIfAbsent(qryId, Boolean.TRUE);
+
+            if (old != null) {
+                assert !old;
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /**
+         * @param qryId Query ID.
+         * @return {@code True} if cancelled started query.
+         */
+        boolean onCancel(long qryId) {
+            Boolean old = qryHist.putIfAbsent(qryId, Boolean.FALSE);
+
+            return old != null && old;
+        }
+    }
+
     /**
      *
      */
@@ -769,7 +803,7 @@ public class GridMapQueryExecutor {
         private final AtomicReferenceArray<QueryResult> results;
 
         /** */
-        private final GridCacheContext<?,?> cctx;
+        private final GridCacheContext<?, ?> cctx;
 
         /** */
         private volatile boolean canceled;
@@ -863,7 +897,7 @@ public class GridMapQueryExecutor {
         private int page;
 
         /** */
-        private final int rowCount;
+        private final int rowCnt;
 
         /** */
         private volatile boolean closed;
@@ -887,7 +921,7 @@ public class GridMapQueryExecutor {
                 throw new IllegalStateException(e); // Must not happen.
             }
 
-            rowCount = res.getRowCount();
+            rowCnt = res.getRowCount();
             cols = res.getVisibleColumnCount();
         }
 
@@ -979,51 +1013,4 @@ public class GridMapQueryExecutor {
             throw new IllegalStateException();
         }
     }
-
-    /**
-     *
-     */
-    private static class QueryKey {
-        /** */
-        private final UUID nodeId;
-
-        /** */
-        private final long qryId;
-
-        /**
-         * @param nodeId Node ID.
-         * @param qryId Query ID.
-         */
-        public QueryKey(UUID nodeId, long qryId) {
-            this.nodeId = nodeId;
-            this.qryId = qryId;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            QueryKey key = (QueryKey)o;
-
-            return qryId == key.qryId && nodeId.equals(key.nodeId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = nodeId.hashCode();
-
-            res = 31 * res + (int) (qryId ^ (qryId >>> 32));
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        public String toString() {
-            return S.toString(QueryKey.class, this);
-        }
-    }
 }
\ No newline at end of file

Reply via email to