Repository: ignite Updated Branches: refs/heads/ignite-zk 38209f45e -> 0b78f318b
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6961ddc2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6961ddc2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6961ddc2 Branch: refs/heads/ignite-zk Commit: 6961ddc210a982b195a8e079ff2556f338a4a54d Parents: 38209f4 Author: sboikov <sboi...@gridgain.com> Authored: Wed Dec 27 14:32:58 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Dec 27 14:32:58 2017 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 12 +- .../managers/discovery/IgniteDiscoverySpi.java | 6 - .../continuous/GridContinuousProcessor.java | 207 +++++++++---------- ...DiscoverySpiMutableCustomMessageSupport.java | 39 ++++ .../spi/discovery/tcp/TcpDiscoverySpi.java | 7 +- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 7 +- 6 files changed, 149 insertions(+), 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6961ddc2/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 96d636f..2791492 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -121,6 +121,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange; import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport; import org.apache.ignite.spi.discovery.DiscoverySpiListener; +import org.apache.ignite.spi.discovery.DiscoverySpiMutableCustomMessageSupport; import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -2394,14 +2395,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * @return {@code True} if configured {@link DiscoverySpi} does not support mutable custom messages. + * @return {@code True} if configured {@link DiscoverySpi} supports mutable custom messages. */ - public boolean unmutableCustomMessages() { - DiscoverySpi spi = getSpi(); - - return (spi instanceof IgniteDiscoverySpi) && - !((IgniteDiscoverySpi)spi).supportsMutableCustomEvents(); + public boolean mutableCustomMessages() { + DiscoverySpiMutableCustomMessageSupport ann = U.getAnnotation(ctx.config().getDiscoverySpi().getClass(), + DiscoverySpiMutableCustomMessageSupport.class); + return ann != null && ann.value(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/6961ddc2/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java index 2e2b9af..bf117f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.managers.discovery; import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.spi.discovery.DiscoverySpi; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; /** * @@ -65,9 +64,4 @@ public interface IgniteDiscoverySpi extends DiscoverySpi { * @param err Connection error. */ public void resolveCommunicationError(ClusterNode node, Exception err); - - /** - * @return {@code True} if mutable {@link DiscoverySpiCustomMessage}s are supported. - */ - public boolean supportsMutableCustomEvents(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6961ddc2/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 899af48..ee12de3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -55,7 +55,6 @@ import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.managers.discovery.CustomEventListener; -import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; @@ -85,7 +84,6 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData; -import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -165,7 +163,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { - discoProtoVer = ctx.discovery().unmutableCustomMessages() ? 2 : 1; + discoProtoVer = ctx.discovery().mutableCustomMessages() ? 1 : 2; if (ctx.config().isDaemon()) return; @@ -1393,144 +1391,139 @@ public class GridContinuousProcessor extends GridProcessorAdapter { private void processStartRequestV2(final AffinityTopologyVersion topVer, final ClusterNode snd, final StartRoutineDiscoveryMessageV2 msg) { - try { - StartRequestDataV2 reqData = msg.startRequestData(); + StartRequestDataV2 reqData = msg.startRequestData(); - ContinuousRoutineInfo routineInfo = new ContinuousRoutineInfo(snd.id(), - msg.routineId(), - reqData.handlerBytes(), - reqData.nodeFilterBytes(), - reqData.bufferSize(), - reqData.interval(), - reqData.autoUnsubscribe()); + ContinuousRoutineInfo routineInfo = new ContinuousRoutineInfo(snd.id(), + msg.routineId(), + reqData.handlerBytes(), + reqData.nodeFilterBytes(), + reqData.bufferSize(), + reqData.interval(), + reqData.autoUnsubscribe()); - routinesInfo.addRoutineInfo(routineInfo); + routinesInfo.addRoutineInfo(routineInfo); - final Collection<ClusterNode> nodes = ctx.discovery().nodes(topVer); + final Collection<ClusterNode> nodes = ctx.discovery().nodes(topVer); - // Should not use marshaller and send messages from discovery thread. - ctx.pools().poolForPolicy(GridIoPolicy.SYSTEM_POOL).execute(new Runnable() { - @Override public void run() { - if (snd.id().equals(ctx.localNodeId())) { - StartFuture fut = startFuts.get(msg.routineId()); + // Should not use marshaller and send messages from discovery thread. + ctx.getSystemExecutorService().execute(new Runnable() { + @Override public void run() { + if (snd.id().equals(ctx.localNodeId())) { + StartFuture fut = startFuts.get(msg.routineId()); - if (fut != null) - fut.initRemoteNodes(topVer, nodes); + if (fut != null) + fut.initRemoteNodes(topVer, nodes); - return; - } + return; + } - StartRequestDataV2 reqData = msg.startRequestData(); + StartRequestDataV2 reqData = msg.startRequestData(); - Exception err = null; + Exception err = null; - IgnitePredicate<ClusterNode> nodeFilter = null; + IgnitePredicate<ClusterNode> nodeFilter = null; - byte[] cntrs = null; + byte[] cntrs = null; - if (reqData.nodeFilterBytes() != null) { - try { - if (ctx.config().isPeerClassLoadingEnabled() && reqData.className() != null) { - String clsName = reqData.className(); - GridDeploymentInfo depInfo = reqData.deploymentInfo(); - - GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), - clsName, - clsName, - depInfo.userVersion(), - snd.id(), - depInfo.classLoaderId(), - depInfo.participants(), - null); - - if (dep == null) { - throw new IgniteDeploymentCheckedException("Failed to obtain deployment " + - "for class: " + clsName); - } + if (reqData.nodeFilterBytes() != null) { + try { + if (ctx.config().isPeerClassLoadingEnabled() && reqData.className() != null) { + String clsName = reqData.className(); + GridDeploymentInfo depInfo = reqData.deploymentInfo(); - nodeFilter = U.unmarshal(marsh, - reqData.nodeFilterBytes(), - U.resolveClassLoader(dep.classLoader(), ctx.config())); - } - else { - nodeFilter = U.unmarshal(marsh, - reqData.nodeFilterBytes(), - U.resolveClassLoader(ctx.config())); + GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), + clsName, + clsName, + depInfo.userVersion(), + snd.id(), + depInfo.classLoaderId(), + depInfo.participants(), + null); + + if (dep == null) { + throw new IgniteDeploymentCheckedException("Failed to obtain deployment " + + "for class: " + clsName); } - if (nodeFilter != null) - ctx.resource().injectGeneric(nodeFilter); + nodeFilter = U.unmarshal(marsh, + reqData.nodeFilterBytes(), + U.resolveClassLoader(dep.classLoader(), ctx.config())); } - catch (Exception e) { - err = e; - - U.error(log, "Failed to unmarshal continuous routine filter [" + - "routineId=" + msg.routineId + - ", srcNodeId=" + snd.id() + ']', e); + else { + nodeFilter = U.unmarshal(marsh, + reqData.nodeFilterBytes(), + U.resolveClassLoader(ctx.config())); } + + if (nodeFilter != null) + ctx.resource().injectGeneric(nodeFilter); + } + catch (Exception e) { + err = e; + + U.error(log, "Failed to unmarshal continuous routine filter [" + + "routineId=" + msg.routineId + + ", srcNodeId=" + snd.id() + ']', e); } + } - boolean register = err == null && - (nodeFilter == null || nodeFilter.apply(ctx.discovery().localNode())); + boolean register = err == null && + (nodeFilter == null || nodeFilter.apply(ctx.discovery().localNode())); - if (register) { - try { - GridContinuousHandler hnd = U.unmarshal(marsh, - reqData.handlerBytes(), - U.resolveClassLoader(ctx.config())); + if (register) { + try { + GridContinuousHandler hnd = U.unmarshal(marsh, + reqData.handlerBytes(), + U.resolveClassLoader(ctx.config())); - if (ctx.config().isPeerClassLoadingEnabled()) - hnd.p2pUnmarshal(snd.id(), ctx); + if (ctx.config().isPeerClassLoadingEnabled()) + hnd.p2pUnmarshal(snd.id(), ctx); - if (msg.keepBinary()) { - assert hnd instanceof CacheContinuousQueryHandler : hnd; + if (msg.keepBinary()) { + assert hnd instanceof CacheContinuousQueryHandler : hnd; - ((CacheContinuousQueryHandler)hnd).keepBinary(true); - } + ((CacheContinuousQueryHandler)hnd).keepBinary(true); + } - GridContinuousHandler hnd0 = hnd instanceof GridMessageListenHandler ? - new GridMessageListenHandler((GridMessageListenHandler)hnd) : - hnd; + GridContinuousHandler hnd0 = hnd instanceof GridMessageListenHandler ? + new GridMessageListenHandler((GridMessageListenHandler)hnd) : + hnd; - registerHandler(snd.id(), - msg.routineId, - hnd0, - reqData.bufferSize(), - reqData.interval(), - reqData.autoUnsubscribe(), - false); + registerHandler(snd.id(), + msg.routineId, + hnd0, + reqData.bufferSize(), + reqData.interval(), + reqData.autoUnsubscribe(), + false); - if (hnd0.isQuery()) { - GridCacheProcessor proc = ctx.cache(); + if (hnd0.isQuery()) { + GridCacheProcessor proc = ctx.cache(); - if (proc != null) { - GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName()); + if (proc != null) { + GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName()); - if (cache != null && !cache.isLocal() && cache.context().userCache()) { - CachePartitionPartialCountersMap cntrsMap = - cache.context().topology().localUpdateCounters(false); + if (cache != null && !cache.isLocal() && cache.context().userCache()) { + CachePartitionPartialCountersMap cntrsMap = + cache.context().topology().localUpdateCounters(false); - cntrs = U.marshal(marsh, cntrsMap); - } + cntrs = U.marshal(marsh, cntrsMap); } } } - catch (Exception e) { - err = e; - - U.error(log, "Failed to register continuous routine handler [" + - "routineId=" + msg.routineId + - ", srcNodeId=" + snd.id() + ']', e); - } } + catch (Exception e) { + err = e; - sendMessageStartResult(snd, msg.routineId(), cntrs, err); + U.error(log, "Failed to register continuous routine handler [" + + "routineId=" + msg.routineId + + ", srcNodeId=" + snd.id() + ']', e); + } } - }); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to submit continuous routine started result closure: " + e, e); - } + + sendMessageStartResult(snd, msg.routineId(), cntrs, err); + } + }); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/6961ddc2/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java new file mode 100644 index 0000000..ca2fecd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * TODO ZK + */ +@Documented +@Inherited +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +public @interface DiscoverySpiMutableCustomMessageSupport { + /** + * @return Whether or not target SPI supports mutable {@link DiscoverySpiCustomMessage}s. + */ + public boolean value(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/6961ddc2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 781272c..292d67e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -89,6 +89,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange; import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport; import org.apache.ignite.spi.discovery.DiscoverySpiListener; +import org.apache.ignite.spi.discovery.DiscoverySpiMutableCustomMessageSupport; import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; @@ -225,6 +226,7 @@ import static org.apache.ignite.IgniteSystemProperties.getBoolean; @IgniteSpiMultipleInstancesSupport(true) @DiscoverySpiOrderSupport(true) @DiscoverySpiHistorySupport(true) +@DiscoverySpiMutableCustomMessageSupport(true) public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscoverySpi { /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */ public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs"; @@ -2111,11 +2113,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery } /** {@inheritDoc} */ - @Override public boolean supportsMutableCustomEvents() { - return true; - } - - /** {@inheritDoc} */ @Override public void resolveCommunicationError(ClusterNode node, Exception err) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6961ddc2/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index ebb667f..b695e9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -51,6 +51,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange; import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport; import org.apache.ignite.spi.discovery.DiscoverySpiListener; +import org.apache.ignite.spi.discovery.DiscoverySpiMutableCustomMessageSupport; import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport; import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClusterNode; @@ -66,6 +67,7 @@ import static org.apache.ignite.IgniteSystemProperties.getBoolean; @IgniteSpiMultipleInstancesSupport(true) @DiscoverySpiOrderSupport(true) @DiscoverySpiHistorySupport(true) +@DiscoverySpiMutableCustomMessageSupport(false) public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, IgniteDiscoverySpi { /** */ public static final String DFLT_ROOT_PATH = "/apacheIgnite"; @@ -246,11 +248,6 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery } /** {@inheritDoc} */ - @Override public boolean supportsMutableCustomEvents() { - return false; - } - - /** {@inheritDoc} */ @Override public void resolveCommunicationError(ClusterNode node, Exception err) { impl.resolveCommunicationError(node, err); }