Repository: ignite Updated Branches: refs/heads/ignite-1232-1 6e7ec888e -> bdc4147f7
ignite-1232 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bdc4147f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bdc4147f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bdc4147f Branch: refs/heads/ignite-1232-1 Commit: bdc4147f7771f74033afbc45315d6ccf4993dd6a Parents: 6e7ec88 Author: sboikov <[email protected]> Authored: Thu Jul 21 12:45:11 2016 +0300 Committer: sboikov <[email protected]> Committed: Thu Jul 21 12:45:11 2016 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 13 +++++++++++++ .../processors/query/h2/IgniteH2Indexing.java | 15 +++++++++------ .../processors/query/h2/opt/GridH2IndexBase.java | 10 ++++++++-- .../query/h2/twostep/GridReduceQueryExecutor.java | 9 ++++++++- 4 files changed, 38 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/bdc4147f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 8b0465f..99cb7f4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -1374,6 +1374,19 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** * @param node Destination node. * @param topic Topic to send the message to. + * @param topicOrd GridTopic enumeration ordinal. + * @param msg Message to send. + * @param plc Type of processing. + * @throws IgniteCheckedException Thrown in case of any errors. + */ + public void send(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc) + throws IgniteCheckedException { + send(node, topic, topicOrd, msg, plc, false, 0, false, null); + } + + /** + * @param node Destination node. + * @param topic Topic to send the message to. * @param msg Message to send. * @param plc Type of processing. * @param timeout Timeout to keep a message on receiving queue. http://git-wip-us.apache.org/repos/asf/ignite/blob/bdc4147f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 9c12046..535881e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -64,6 +64,7 @@ import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryImpl; @@ -1676,20 +1677,22 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** * @param topic Topic. + * @param topicOrd Topic ordinal for {@link GridTopic}. * @param nodes Nodes. * @param msg Message. * @param specialize Optional closure to specialize message for each node. - * @param locNodeHandler Handler for local node. + * @param locNodeHnd Handler for local node. * @param plc Policy identifying the executor service which will process message. * @param runLocParallel Run local handler in parallel thread. * @return {@code true} If all messages sent successfully. */ public boolean send( Object topic, + int topicOrd, Collection<ClusterNode> nodes, Message msg, @Nullable IgniteBiClosure<ClusterNode, Message, Message> specialize, - @Nullable final IgniteInClosure2X<ClusterNode, Message> locNodeHandler, + @Nullable final IgniteInClosure2X<ClusterNode, Message> locNodeHnd, byte plc, boolean runLocParallel ) { @@ -1715,7 +1718,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { ((GridCacheQueryMarshallable)msg).marshall(marshaller); } - ctx.io().send(node, topic, msg, plc); + ctx.io().send(node, topic, topicOrd, msg, plc); } catch (IgniteCheckedException e) { ok = false; @@ -1738,7 +1741,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { // We prefer runLocal to runLocalSafe, because the latter can produce deadlock here. ctx.closure().runLocal(new GridPlainRunnable() { @Override public void run() { - locNodeHandler.apply(finalLocNode, finalMsg); + locNodeHnd.apply(finalLocNode, finalMsg); } }, plc).listen(logger); } @@ -1749,7 +1752,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { } } else - locNodeHandler.apply(locNode, msg); + locNodeHnd.apply(locNode, msg); } return ok; @@ -1758,7 +1761,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** * @return Serializer. */ - protected JavaObjectSerializer h2Serializer() { + private JavaObjectSerializer h2Serializer() { return new JavaObjectSerializer() { @Override public byte[] serialize(Object obj) throws Exception { return marshaller.marshal(obj); http://git-wip-us.apache.org/repos/asf/ignite/blob/bdc4147f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java index 23a3ebd..78cd271 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java @@ -390,8 +390,14 @@ public abstract class GridH2IndexBase extends BaseIndex { * @param msg Message. */ private void send(Collection<ClusterNode> nodes, Message msg) { - if (!getTable().rowDescriptor().indexing().send(msgTopic, nodes, msg, null, locNodeHnd, - GridIoPolicy.IDX_POOL, false)) + if (!getTable().rowDescriptor().indexing().send(msgTopic, + -1, + nodes, + msg, + null, + locNodeHnd, + GridIoPolicy.IDX_POOL, + false)) throw new GridH2RetryException("Failed to send message to nodes: " + nodes + "."); } http://git-wip-us.apache.org/repos/asf/ignite/blob/bdc4147f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 2e0a2c9..c48dccd 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -1143,7 +1143,14 @@ public class GridReduceQueryExecutor { if (log.isDebugEnabled()) log.debug("Sending: [msg=" + msg + ", nodes=" + nodes + ", specialize=" + specialize + "]"); - return h2.send(GridTopic.TOPIC_QUERY, nodes, msg, specialize, locNodeHnd, QUERY_POOL, runLocParallel); + return h2.send(GridTopic.TOPIC_QUERY, + GridTopic.TOPIC_QUERY.ordinal(), + nodes, + msg, + specialize, + locNodeHnd, + QUERY_POOL, + runLocParallel); } /**
