Repository: incubator-gossip Updated Branches: refs/heads/master 968571a56 -> ac8303893
GOSSIP-38 Multiple async GossipListeners Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/ac830389 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/ac830389 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/ac830389 Branch: refs/heads/master Commit: ac830389327508688c664a01463d1ddfa3fb6721 Parents: 968571a Author: pxsalehi <[email protected]> Authored: Mon Aug 21 11:58:15 2017 +0200 Committer: pxsalehi <[email protected]> Committed: Mon Aug 21 11:58:15 2017 +0200 ---------------------------------------------------------------------- .../java/org/apache/gossip/GossipSettings.java | 2 +- .../src/main/java/org/apache/gossip/Member.java | 2 +- .../apache/gossip/manager/GossipManager.java | 7 +- .../gossip/manager/GossipManagerBuilder.java | 3 +- .../manager/GossipMemberStateRefresher.java | 50 +++++++-- .../gossip/manager/SimpleActiveGossiper.java | 110 +++++++++++++++++++ .../gossip/manager/SimpleActiveGossipper.java | 110 ------------------- .../gossip/transport/TransportManager.java | 2 +- 8 files changed, 164 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java index 792af85..32c00c9 100644 --- a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java +++ b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java @@ -43,7 +43,7 @@ public class GossipSettings { private String distribution = "normal"; - private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper"; + private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossiper"; private String transportManagerClass = "org.apache.gossip.transport.udp.UdpTransportManager"; private String protocolManagerClass = "org.apache.gossip.protocol.json.JacksonProtocolManager"; http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/Member.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/Member.java b/gossip-base/src/main/java/org/apache/gossip/Member.java index d04a7b6..54a6737 100644 --- a/gossip-base/src/main/java/org/apache/gossip/Member.java +++ b/gossip-base/src/main/java/org/apache/gossip/Member.java @@ -22,7 +22,7 @@ import java.net.URI; import java.util.Map; /** - * A abstract class representing a gossip member. + * An abstract class representing a gossip member. * */ public abstract class Member implements Comparable<Member> { http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java index d839b2e..db442c6 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -185,7 +185,7 @@ public abstract class GossipManager { if (settings.isPersistDataState()) { scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS); } - scheduledServiced.scheduleAtFixedRate(memberStateRefresher, 0, 100, TimeUnit.MILLISECONDS); + memberStateRefresher.init(); LOGGER.debug("The GossipManager is started."); } @@ -224,6 +224,7 @@ public abstract class GossipManager { gossipCore.shutdown(); transportManager.shutdown(); dataReaper.close(); + memberStateRefresher.shutdown(); scheduledServiced.shutdown(); try { scheduledServiced.awaitTermination(1, TimeUnit.SECONDS); @@ -366,4 +367,8 @@ public abstract class GossipManager { public void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler){ gossipCore.unregisterSharedDataSubscriber(handler); } + + public void registerGossipListener(GossipListener listener) { + memberStateRefresher.register(listener); + } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java index 86dca57..f3ca23a 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java @@ -18,10 +18,11 @@ package org.apache.gossip.manager; import com.codahale.metrics.MetricRegistry; -import org.apache.gossip.Member; import org.apache.gossip.GossipSettings; +import org.apache.gossip.Member; import org.apache.gossip.StartupSettings; import org.apache.gossip.event.GossipListener; +import org.apache.gossip.event.GossipState; import org.apache.gossip.manager.handlers.MessageHandler; import org.apache.gossip.manager.handlers.MessageHandlerFactory; http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java index 1836309..652bf5c 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java @@ -26,27 +26,40 @@ import org.apache.gossip.model.PerNodeDataMessage; import org.apache.gossip.model.ShutdownMessage; import org.apache.log4j.Logger; +import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.function.BiFunction; -public class GossipMemberStateRefresher implements Runnable { +public class GossipMemberStateRefresher { public static final Logger LOGGER = Logger.getLogger(GossipMemberStateRefresher.class); private final Map<LocalMember, GossipState> members; private final GossipSettings settings; - private final GossipListener listener; + private final List<GossipListener> listeners = new CopyOnWriteArrayList<>(); private final Clock clock; private final BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData; + private final ExecutorService listenerExecutor; + private final ScheduledExecutorService scheduledExecutor; + private final BlockingQueue<Runnable> workQueue; public GossipMemberStateRefresher(Map<LocalMember, GossipState> members, GossipSettings settings, - GossipListener listener, BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData) { + GossipListener listener, + BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData) { this.members = members; this.settings = settings; - this.listener = listener; + listeners.add(listener); this.findPerNodeGossipData = findPerNodeGossipData; clock = new SystemClock(); + workQueue = new ArrayBlockingQueue<>(1024); + listenerExecutor = new ThreadPoolExecutor(1, 20, 1, TimeUnit.SECONDS, workQueue, + new ThreadPoolExecutor.DiscardOldestPolicy()); + scheduledExecutor = Executors.newScheduledThreadPool(1); + } + + public void init() { + scheduledExecutor.scheduleAtFixedRate(() -> run(), 0, 100, TimeUnit.MILLISECONDS); } public void run() { @@ -74,7 +87,9 @@ public class GossipMemberStateRefresher implements Runnable { if (entry.getValue() != requiredState) { members.put(entry.getKey(), requiredState); - listener.gossipEvent(entry.getKey(), requiredState); + /* Call listeners asynchronously */ + for (GossipListener listener: listeners) + listenerExecutor.execute(() -> listener.gossipEvent(entry.getKey(), requiredState)); } } } @@ -112,10 +127,31 @@ public class GossipMemberStateRefresher implements Runnable { if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()) { members.put(l.getKey(), GossipState.DOWN); if (l.getValue() == GossipState.UP) { - listener.gossipEvent(l.getKey(), GossipState.DOWN); + for (GossipListener listener: listeners) + listenerExecutor.execute(() -> listener.gossipEvent(l.getKey(), GossipState.DOWN)); } return true; } return false; } + + public void register(GossipListener listener) { + listeners.add(listener); + } + + public void shutdown() { + scheduledExecutor.shutdown(); + try { + scheduledExecutor.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.debug("Issue during shutdown", e); + } + listenerExecutor.shutdown(); + try { + listenerExecutor.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.debug("Issue during shutdown", e); + } + listenerExecutor.shutdownNow(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java b/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java new file mode 100644 index 0000000..7d498b4 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.gossip.LocalMember; + +import com.codahale.metrics.MetricRegistry; + +/** + * Base implementation gossips randomly to live nodes periodically gossips to dead ones + * + */ +public class SimpleActiveGossiper extends AbstractActiveGossiper { + + private ScheduledExecutorService scheduledExecutorService; + private final BlockingQueue<Runnable> workQueue; + private ThreadPoolExecutor threadService; + + public SimpleActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, + MetricRegistry registry) { + super(gossipManager, gossipCore, registry); + scheduledExecutorService = Executors.newScheduledThreadPool(2); + workQueue = new ArrayBlockingQueue<Runnable>(1024); + threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue, + new ThreadPoolExecutor.DiscardOldestPolicy()); + } + + @Override + public void init() { + super.init(); + scheduledExecutorService.scheduleAtFixedRate(() -> { + threadService.execute(() -> { + sendToALiveMember(); + }); + }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); + scheduledExecutorService.scheduleAtFixedRate(() -> { + sendToDeadMember(); + }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); + scheduledExecutorService.scheduleAtFixedRate( + () -> sendPerNodeData(gossipManager.getMyself(), + selectPartner(gossipManager.getLiveMembers())), + 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); + scheduledExecutorService.scheduleAtFixedRate( + () -> sendSharedData(gossipManager.getMyself(), + selectPartner(gossipManager.getLiveMembers())), + 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); + } + + @Override + public void shutdown() { + super.shutdown(); + scheduledExecutorService.shutdown(); + try { + scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.debug("Issue during shutdown", e); + } + sendShutdownMessage(); + threadService.shutdown(); + try { + threadService.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.debug("Issue during shutdown", e); + } + } + + protected void sendToALiveMember(){ + LocalMember member = selectPartner(gossipManager.getLiveMembers()); + sendMembershipList(gossipManager.getMyself(), member); + } + + protected void sendToDeadMember(){ + LocalMember member = selectPartner(gossipManager.getDeadMembers()); + sendMembershipList(gossipManager.getMyself(), member); + } + + /** + * sends an optimistic shutdown message to several clusters nodes + */ + protected void sendShutdownMessage(){ + List<LocalMember> l = gossipManager.getLiveMembers(); + int sendTo = l.size() < 3 ? 1 : l.size() / 2; + for (int i = 0; i < sendTo; i++) { + threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l))); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java b/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java deleted file mode 100644 index e47fe2a..0000000 --- a/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.manager; - -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.apache.gossip.LocalMember; - -import com.codahale.metrics.MetricRegistry; - -/** - * Base implementation gossips randomly to live nodes periodically gossips to dead ones - * - */ -public class SimpleActiveGossipper extends AbstractActiveGossiper { - - private ScheduledExecutorService scheduledExecutorService; - private final BlockingQueue<Runnable> workQueue; - private ThreadPoolExecutor threadService; - - public SimpleActiveGossipper(GossipManager gossipManager, GossipCore gossipCore, - MetricRegistry registry) { - super(gossipManager, gossipCore, registry); - scheduledExecutorService = Executors.newScheduledThreadPool(2); - workQueue = new ArrayBlockingQueue<Runnable>(1024); - threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue, - new ThreadPoolExecutor.DiscardOldestPolicy()); - } - - @Override - public void init() { - super.init(); - scheduledExecutorService.scheduleAtFixedRate(() -> { - threadService.execute(() -> { - sendToALiveMember(); - }); - }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); - scheduledExecutorService.scheduleAtFixedRate(() -> { - sendToDeadMember(); - }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); - scheduledExecutorService.scheduleAtFixedRate( - () -> sendPerNodeData(gossipManager.getMyself(), - selectPartner(gossipManager.getLiveMembers())), - 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); - scheduledExecutorService.scheduleAtFixedRate( - () -> sendSharedData(gossipManager.getMyself(), - selectPartner(gossipManager.getLiveMembers())), - 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); - } - - @Override - public void shutdown() { - super.shutdown(); - scheduledExecutorService.shutdown(); - try { - scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOGGER.debug("Issue during shutdown", e); - } - sendShutdownMessage(); - threadService.shutdown(); - try { - threadService.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOGGER.debug("Issue during shutdown", e); - } - } - - protected void sendToALiveMember(){ - LocalMember member = selectPartner(gossipManager.getLiveMembers()); - sendMembershipList(gossipManager.getMyself(), member); - } - - protected void sendToDeadMember(){ - LocalMember member = selectPartner(gossipManager.getDeadMembers()); - sendMembershipList(gossipManager.getMyself(), member); - } - - /** - * sends an optimistic shutdown message to several clusters nodes - */ - protected void sendShutdownMessage(){ - List<LocalMember> l = gossipManager.getLiveMembers(); - int sendTo = l.size() < 3 ? 1 : l.size() / 2; - for (int i = 0; i < sendTo; i++) { - threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l))); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java b/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java index 031d90e..99354d1 100644 --- a/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java +++ b/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java @@ -20,7 +20,7 @@ package org.apache.gossip.transport; import java.io.IOException; import java.net.URI; -/** interface for manage that sends and receives messages that have already been serialized. */ +/** interface for manager that sends and receives messages that have already been serialized. */ public interface TransportManager { /** starts the active gossip thread responsible for reaching out to remote nodes. Not related to `startEndpoint()` */
