Repository: ignite
Updated Branches:
  refs/heads/ignite-1232-1 6e7ec888e -> bdc4147f7


ignite-1232


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bdc4147f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bdc4147f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bdc4147f

Branch: refs/heads/ignite-1232-1
Commit: bdc4147f7771f74033afbc45315d6ccf4993dd6a
Parents: 6e7ec88
Author: sboikov <[email protected]>
Authored: Thu Jul 21 12:45:11 2016 +0300
Committer: sboikov <[email protected]>
Committed: Thu Jul 21 12:45:11 2016 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java        | 13 +++++++++++++
 .../processors/query/h2/IgniteH2Indexing.java        | 15 +++++++++------
 .../processors/query/h2/opt/GridH2IndexBase.java     | 10 ++++++++--
 .../query/h2/twostep/GridReduceQueryExecutor.java    |  9 ++++++++-
 4 files changed, 38 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bdc4147f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 8b0465f..99cb7f4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1374,6 +1374,19 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
     /**
      * @param node Destination node.
      * @param topic Topic to send the message to.
+     * @param topicOrd GridTopic enumeration ordinal.
+     * @param msg Message to send.
+     * @param plc Type of processing.
+     * @throws IgniteCheckedException Thrown in case of any errors.
+     */
+    public void send(ClusterNode node, Object topic, int topicOrd, Message 
msg, byte plc)
+        throws IgniteCheckedException {
+        send(node, topic, topicOrd, msg, plc, false, 0, false, null);
+    }
+
+    /**
+     * @param node Destination node.
+     * @param topic Topic to send the message to.
      * @param msg Message to send.
      * @param plc Type of processing.
      * @param timeout Timeout to keep a message on receiving queue.

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdc4147f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 9c12046..535881e 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -64,6 +64,7 @@ import 
org.apache.ignite.cache.query.annotations.QuerySqlFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
@@ -1676,20 +1677,22 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
     /**
      * @param topic Topic.
+     * @param topicOrd Topic ordinal for {@link GridTopic}.
      * @param nodes Nodes.
      * @param msg Message.
      * @param specialize Optional closure to specialize message for each node.
-     * @param locNodeHandler Handler for local node.
+     * @param locNodeHnd Handler for local node.
      * @param plc Policy identifying the executor service which will process 
message.
      * @param runLocParallel Run local handler in parallel thread.
      * @return {@code true} If all messages sent successfully.
      */
     public boolean send(
         Object topic,
+        int topicOrd,
         Collection<ClusterNode> nodes,
         Message msg,
         @Nullable IgniteBiClosure<ClusterNode, Message, Message> specialize,
-        @Nullable final IgniteInClosure2X<ClusterNode, Message> locNodeHandler,
+        @Nullable final IgniteInClosure2X<ClusterNode, Message> locNodeHnd,
         byte plc,
         boolean runLocParallel
     ) {
@@ -1715,7 +1718,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                         ((GridCacheQueryMarshallable)msg).marshall(marshaller);
                 }
 
-                ctx.io().send(node, topic, msg, plc);
+                ctx.io().send(node, topic, topicOrd, msg, plc);
             }
             catch (IgniteCheckedException e) {
                 ok = false;
@@ -1738,7 +1741,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                     // We prefer runLocal to runLocalSafe, because the latter 
can produce deadlock here.
                     ctx.closure().runLocal(new GridPlainRunnable() {
                         @Override public void run() {
-                            locNodeHandler.apply(finalLocNode, finalMsg);
+                            locNodeHnd.apply(finalLocNode, finalMsg);
                         }
                     }, plc).listen(logger);
                 }
@@ -1749,7 +1752,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                 }
             }
             else
-                locNodeHandler.apply(locNode, msg);
+                locNodeHnd.apply(locNode, msg);
         }
 
         return ok;
@@ -1758,7 +1761,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     /**
      * @return Serializer.
      */
-    protected JavaObjectSerializer h2Serializer() {
+    private JavaObjectSerializer h2Serializer() {
         return new JavaObjectSerializer() {
                 @Override public byte[] serialize(Object obj) throws Exception 
{
                     return marshaller.marshal(obj);

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdc4147f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 23a3ebd..78cd271 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -390,8 +390,14 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @param msg Message.
      */
     private void send(Collection<ClusterNode> nodes, Message msg) {
-        if (!getTable().rowDescriptor().indexing().send(msgTopic, nodes, msg, 
null, locNodeHnd,
-            GridIoPolicy.IDX_POOL, false))
+        if (!getTable().rowDescriptor().indexing().send(msgTopic,
+            -1,
+            nodes,
+            msg,
+            null,
+            locNodeHnd,
+            GridIoPolicy.IDX_POOL,
+            false))
             throw new GridH2RetryException("Failed to send message to nodes: " 
+ nodes + ".");
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdc4147f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 2e0a2c9..c48dccd 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -1143,7 +1143,14 @@ public class GridReduceQueryExecutor {
         if (log.isDebugEnabled())
             log.debug("Sending: [msg=" + msg + ", nodes=" + nodes + ", 
specialize=" + specialize + "]");
 
-        return h2.send(GridTopic.TOPIC_QUERY, nodes, msg, specialize, 
locNodeHnd, QUERY_POOL, runLocParallel);
+        return h2.send(GridTopic.TOPIC_QUERY,
+            GridTopic.TOPIC_QUERY.ordinal(),
+            nodes,
+            msg,
+            specialize,
+            locNodeHnd,
+            QUERY_POOL,
+            runLocParallel);
     }
 
     /**

Reply via email to