Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 38209f45e -> 0b78f318b


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6961ddc2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6961ddc2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6961ddc2

Branch: refs/heads/ignite-zk
Commit: 6961ddc210a982b195a8e079ff2556f338a4a54d
Parents: 38209f4
Author: sboikov <sboi...@gridgain.com>
Authored: Wed Dec 27 14:32:58 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Wed Dec 27 14:32:58 2017 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |  12 +-
 .../managers/discovery/IgniteDiscoverySpi.java  |   6 -
 .../continuous/GridContinuousProcessor.java     | 207 +++++++++----------
 ...DiscoverySpiMutableCustomMessageSupport.java |  39 ++++
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   7 +-
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java |   7 +-
 6 files changed, 149 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6961ddc2/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 96d636f..2791492 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -121,6 +121,7 @@ import 
org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
 import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
 import org.apache.ignite.spi.discovery.DiscoverySpiListener;
+import org.apache.ignite.spi.discovery.DiscoverySpiMutableCustomMessageSupport;
 import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
 import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -2394,14 +2395,13 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
-     * @return {@code True} if configured {@link DiscoverySpi} does not 
support mutable custom messages.
+     * @return {@code True} if configured {@link DiscoverySpi} supports 
mutable custom messages.
      */
-    public boolean unmutableCustomMessages() {
-        DiscoverySpi spi = getSpi();
-
-        return (spi instanceof IgniteDiscoverySpi) &&
-            !((IgniteDiscoverySpi)spi).supportsMutableCustomEvents();
+    public boolean mutableCustomMessages() {
+        DiscoverySpiMutableCustomMessageSupport ann = 
U.getAnnotation(ctx.config().getDiscoverySpi().getClass(),
+            DiscoverySpiMutableCustomMessageSupport.class);
 
+        return ann != null && ann.value();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/6961ddc2/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
index 2e2b9af..bf117f1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.managers.discovery;
 import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 
 /**
  *
@@ -65,9 +64,4 @@ public interface IgniteDiscoverySpi extends DiscoverySpi {
      * @param err Connection error.
      */
     public void resolveCommunicationError(ClusterNode node, Exception err);
-
-    /**
-     * @return {@code True} if mutable {@link DiscoverySpiCustomMessage}s are 
supported.
-     */
-    public boolean supportsMutableCustomEvents();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6961ddc2/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 899af48..ee12de3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -55,7 +55,6 @@ import 
org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.managers.discovery.CustomEventListener;
-import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
@@ -85,7 +84,6 @@ import 
org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
 import 
org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
-import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
@@ -165,7 +163,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
-        discoProtoVer = ctx.discovery().unmutableCustomMessages() ? 2 : 1;
+        discoProtoVer = ctx.discovery().mutableCustomMessages() ? 1 : 2;
 
         if (ctx.config().isDaemon())
             return;
@@ -1393,144 +1391,139 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     private void processStartRequestV2(final AffinityTopologyVersion topVer,
         final ClusterNode snd,
         final StartRoutineDiscoveryMessageV2 msg) {
-        try {
-            StartRequestDataV2 reqData = msg.startRequestData();
+        StartRequestDataV2 reqData = msg.startRequestData();
 
-            ContinuousRoutineInfo routineInfo = new 
ContinuousRoutineInfo(snd.id(),
-                msg.routineId(),
-                reqData.handlerBytes(),
-                reqData.nodeFilterBytes(),
-                reqData.bufferSize(),
-                reqData.interval(),
-                reqData.autoUnsubscribe());
+        ContinuousRoutineInfo routineInfo = new ContinuousRoutineInfo(snd.id(),
+            msg.routineId(),
+            reqData.handlerBytes(),
+            reqData.nodeFilterBytes(),
+            reqData.bufferSize(),
+            reqData.interval(),
+            reqData.autoUnsubscribe());
 
-            routinesInfo.addRoutineInfo(routineInfo);
+        routinesInfo.addRoutineInfo(routineInfo);
 
-            final Collection<ClusterNode> nodes = 
ctx.discovery().nodes(topVer);
+        final Collection<ClusterNode> nodes = ctx.discovery().nodes(topVer);
 
-            // Should not use marshaller and send messages from discovery 
thread.
-            ctx.pools().poolForPolicy(GridIoPolicy.SYSTEM_POOL).execute(new 
Runnable() {
-                @Override public void run() {
-                    if (snd.id().equals(ctx.localNodeId())) {
-                        StartFuture fut = startFuts.get(msg.routineId());
+        // Should not use marshaller and send messages from discovery thread.
+        ctx.getSystemExecutorService().execute(new Runnable() {
+            @Override public void run() {
+                if (snd.id().equals(ctx.localNodeId())) {
+                    StartFuture fut = startFuts.get(msg.routineId());
 
-                        if (fut != null)
-                            fut.initRemoteNodes(topVer, nodes);
+                    if (fut != null)
+                        fut.initRemoteNodes(topVer, nodes);
 
-                        return;
-                    }
+                    return;
+                }
 
-                    StartRequestDataV2 reqData = msg.startRequestData();
+                StartRequestDataV2 reqData = msg.startRequestData();
 
-                    Exception err = null;
+                Exception err = null;
 
-                    IgnitePredicate<ClusterNode> nodeFilter = null;
+                IgnitePredicate<ClusterNode> nodeFilter = null;
 
-                    byte[] cntrs = null;
+                byte[] cntrs = null;
 
-                    if (reqData.nodeFilterBytes() != null) {
-                        try {
-                            if (ctx.config().isPeerClassLoadingEnabled() && 
reqData.className() != null) {
-                                String clsName = reqData.className();
-                                GridDeploymentInfo depInfo = 
reqData.deploymentInfo();
-
-                                GridDeployment dep = 
ctx.deploy().getGlobalDeployment(depInfo.deployMode(),
-                                    clsName,
-                                    clsName,
-                                    depInfo.userVersion(),
-                                    snd.id(),
-                                    depInfo.classLoaderId(),
-                                    depInfo.participants(),
-                                    null);
-
-                                if (dep == null) {
-                                    throw new 
IgniteDeploymentCheckedException("Failed to obtain deployment " +
-                                        "for class: " + clsName);
-                                }
+                if (reqData.nodeFilterBytes() != null) {
+                    try {
+                        if (ctx.config().isPeerClassLoadingEnabled() && 
reqData.className() != null) {
+                            String clsName = reqData.className();
+                            GridDeploymentInfo depInfo = 
reqData.deploymentInfo();
 
-                                nodeFilter = U.unmarshal(marsh,
-                                    reqData.nodeFilterBytes(),
-                                    U.resolveClassLoader(dep.classLoader(), 
ctx.config()));
-                            }
-                            else {
-                                nodeFilter = U.unmarshal(marsh,
-                                    reqData.nodeFilterBytes(),
-                                    U.resolveClassLoader(ctx.config()));
+                            GridDeployment dep = 
ctx.deploy().getGlobalDeployment(depInfo.deployMode(),
+                                clsName,
+                                clsName,
+                                depInfo.userVersion(),
+                                snd.id(),
+                                depInfo.classLoaderId(),
+                                depInfo.participants(),
+                                null);
+
+                            if (dep == null) {
+                                throw new 
IgniteDeploymentCheckedException("Failed to obtain deployment " +
+                                    "for class: " + clsName);
                             }
 
-                            if (nodeFilter != null)
-                                ctx.resource().injectGeneric(nodeFilter);
+                            nodeFilter = U.unmarshal(marsh,
+                                reqData.nodeFilterBytes(),
+                                U.resolveClassLoader(dep.classLoader(), 
ctx.config()));
                         }
-                        catch (Exception e) {
-                            err = e;
-
-                            U.error(log, "Failed to unmarshal continuous 
routine filter [" +
-                                "routineId=" + msg.routineId +
-                                ", srcNodeId=" + snd.id() + ']', e);
+                        else {
+                            nodeFilter = U.unmarshal(marsh,
+                                reqData.nodeFilterBytes(),
+                                U.resolveClassLoader(ctx.config()));
                         }
+
+                        if (nodeFilter != null)
+                            ctx.resource().injectGeneric(nodeFilter);
+                    }
+                    catch (Exception e) {
+                        err = e;
+
+                        U.error(log, "Failed to unmarshal continuous routine 
filter [" +
+                            "routineId=" + msg.routineId +
+                            ", srcNodeId=" + snd.id() + ']', e);
                     }
+                }
 
-                    boolean register = err == null &&
-                        (nodeFilter == null || 
nodeFilter.apply(ctx.discovery().localNode()));
+                boolean register = err == null &&
+                    (nodeFilter == null || 
nodeFilter.apply(ctx.discovery().localNode()));
 
-                    if (register) {
-                        try {
-                            GridContinuousHandler hnd = U.unmarshal(marsh,
-                                reqData.handlerBytes(),
-                                U.resolveClassLoader(ctx.config()));
+                if (register) {
+                    try {
+                        GridContinuousHandler hnd = U.unmarshal(marsh,
+                            reqData.handlerBytes(),
+                            U.resolveClassLoader(ctx.config()));
 
-                            if (ctx.config().isPeerClassLoadingEnabled())
-                                hnd.p2pUnmarshal(snd.id(), ctx);
+                        if (ctx.config().isPeerClassLoadingEnabled())
+                            hnd.p2pUnmarshal(snd.id(), ctx);
 
-                            if (msg.keepBinary()) {
-                                assert hnd instanceof 
CacheContinuousQueryHandler : hnd;
+                        if (msg.keepBinary()) {
+                            assert hnd instanceof CacheContinuousQueryHandler 
: hnd;
 
-                                
((CacheContinuousQueryHandler)hnd).keepBinary(true);
-                            }
+                            
((CacheContinuousQueryHandler)hnd).keepBinary(true);
+                        }
 
-                            GridContinuousHandler hnd0 = hnd instanceof 
GridMessageListenHandler ?
-                                new 
GridMessageListenHandler((GridMessageListenHandler)hnd) :
-                                hnd;
+                        GridContinuousHandler hnd0 = hnd instanceof 
GridMessageListenHandler ?
+                            new 
GridMessageListenHandler((GridMessageListenHandler)hnd) :
+                            hnd;
 
-                            registerHandler(snd.id(),
-                                msg.routineId,
-                                hnd0,
-                                reqData.bufferSize(),
-                                reqData.interval(),
-                                reqData.autoUnsubscribe(),
-                                false);
+                        registerHandler(snd.id(),
+                            msg.routineId,
+                            hnd0,
+                            reqData.bufferSize(),
+                            reqData.interval(),
+                            reqData.autoUnsubscribe(),
+                            false);
 
-                            if (hnd0.isQuery()) {
-                                GridCacheProcessor proc = ctx.cache();
+                        if (hnd0.isQuery()) {
+                            GridCacheProcessor proc = ctx.cache();
 
-                                if (proc != null) {
-                                    GridCacheAdapter cache = 
ctx.cache().internalCache(hnd0.cacheName());
+                            if (proc != null) {
+                                GridCacheAdapter cache = 
ctx.cache().internalCache(hnd0.cacheName());
 
-                                    if (cache != null && !cache.isLocal() && 
cache.context().userCache()) {
-                                        CachePartitionPartialCountersMap 
cntrsMap =
-                                            
cache.context().topology().localUpdateCounters(false);
+                                if (cache != null && !cache.isLocal() && 
cache.context().userCache()) {
+                                    CachePartitionPartialCountersMap cntrsMap =
+                                        
cache.context().topology().localUpdateCounters(false);
 
-                                        cntrs = U.marshal(marsh, cntrsMap);
-                                    }
+                                    cntrs = U.marshal(marsh, cntrsMap);
                                 }
                             }
                         }
-                        catch (Exception e) {
-                            err = e;
-
-                            U.error(log, "Failed to register continuous 
routine handler [" +
-                                "routineId=" + msg.routineId +
-                                ", srcNodeId=" + snd.id() + ']', e);
-                        }
                     }
+                    catch (Exception e) {
+                        err = e;
 
-                    sendMessageStartResult(snd, msg.routineId(), cntrs, err);
+                        U.error(log, "Failed to register continuous routine 
handler [" +
+                            "routineId=" + msg.routineId +
+                            ", srcNodeId=" + snd.id() + ']', e);
+                    }
                 }
-            });
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to submit continuous routine started result 
closure: " + e, e);
-        }
+
+                sendMessageStartResult(snd, msg.routineId(), cntrs, err);
+            }
+        });
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/6961ddc2/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java
new file mode 100644
index 0000000..ca2fecd
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * TODO ZK
+ */
+@Documented
+@Inherited
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+public @interface DiscoverySpiMutableCustomMessageSupport {
+    /**
+     * @return Whether or not target SPI supports mutable {@link 
DiscoverySpiCustomMessage}s.
+     */
+    public boolean value();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/6961ddc2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 781272c..292d67e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -89,6 +89,7 @@ import 
org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
 import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
 import org.apache.ignite.spi.discovery.DiscoverySpiListener;
+import org.apache.ignite.spi.discovery.DiscoverySpiMutableCustomMessageSupport;
 import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
 import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
 import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
@@ -225,6 +226,7 @@ import static 
org.apache.ignite.IgniteSystemProperties.getBoolean;
 @IgniteSpiMultipleInstancesSupport(true)
 @DiscoverySpiOrderSupport(true)
 @DiscoverySpiHistorySupport(true)
+@DiscoverySpiMutableCustomMessageSupport(true)
 public class TcpDiscoverySpi extends IgniteSpiAdapter implements 
IgniteDiscoverySpi {
     /** Node attribute that is mapped to node's external addresses (value is 
<tt>disc.tcp.ext-addrs</tt>). */
     public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs";
@@ -2111,11 +2113,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements IgniteDiscovery
     }
 
     /** {@inheritDoc} */
-    @Override public boolean supportsMutableCustomEvents() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
     @Override public void resolveCommunicationError(ClusterNode node, 
Exception err) {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6961ddc2/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index ebb667f..b695e9a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -51,6 +51,7 @@ import 
org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
 import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
 import org.apache.ignite.spi.discovery.DiscoverySpiListener;
+import org.apache.ignite.spi.discovery.DiscoverySpiMutableCustomMessageSupport;
 import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
 import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
 import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClusterNode;
@@ -66,6 +67,7 @@ import static 
org.apache.ignite.IgniteSystemProperties.getBoolean;
 @IgniteSpiMultipleInstancesSupport(true)
 @DiscoverySpiOrderSupport(true)
 @DiscoverySpiHistorySupport(true)
+@DiscoverySpiMutableCustomMessageSupport(false)
 public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements 
DiscoverySpi, IgniteDiscoverySpi {
     /** */
     public static final String DFLT_ROOT_PATH = "/apacheIgnite";
@@ -246,11 +248,6 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
     }
 
     /** {@inheritDoc} */
-    @Override public boolean supportsMutableCustomEvents() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
     @Override public void resolveCommunicationError(ClusterNode node, 
Exception err) {
         impl.resolveCommunicationError(node, err);
     }

Reply via email to