Repository: incubator-gossip Updated Branches: refs/heads/master 298b1ae3a -> c544b8bf1
GOSSIP-80 Sundry cleanups * remove redundant parameter from method call. * remove uncessary threadpool. * Simplify `GossipCore.sendOneWay()` * Cleanup useage of `MessageInvoker` * `DefaultMessageInvoker` replaced by a factory * `MessageInvokerCombiner` replaced by same factory * Alter `MessageInvokerTest` to not rely on specific types * Remove type assertion from `GossipManagerBuilderTest` * Merge `MessageInvoker` with `MessageHandler` * This required changing method signature return type from `void` to `boolean`. Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/c544b8bf Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/c544b8bf Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/c544b8bf Branch: refs/heads/master Commit: c544b8bf167a099ec4cac6a94b059fa71ce8a8cc Parents: 298b1ae Author: Gary Dusbabek <[email protected]> Authored: Thu Apr 13 13:02:48 2017 -0500 Committer: Gary Dusbabek <[email protected]> Committed: Mon Apr 17 10:54:24 2017 -0500 ---------------------------------------------------------------------- .../org/apache/gossip/manager/GossipCore.java | 42 +---- .../apache/gossip/manager/GossipManager.java | 12 +- .../gossip/manager/GossipManagerBuilder.java | 16 +- .../handlers/ActiveGossipMessageHandler.java | 12 +- .../manager/handlers/DefaultMessageInvoker.java | 40 ---- .../gossip/manager/handlers/MessageHandler.java | 8 +- .../manager/handlers/MessageHandlerFactory.java | 58 ++++++ .../gossip/manager/handlers/MessageInvoker.java | 33 ---- .../handlers/MessageInvokerCombiner.java | 48 ----- .../handlers/PerNodeDataMessageHandler.java | 10 +- .../manager/handlers/ResponseHandler.java | 11 +- .../handlers/SharedDataMessageHandler.java | 10 +- .../handlers/ShutdownMessageHandler.java | 10 +- .../manager/handlers/SimpleMessageInvoker.java | 45 ----- .../manager/handlers/TypedMessageHandler.java | 51 ++++++ .../manager/GossipManagerBuilderTest.java | 22 +-- .../manager/handlers/MessageHandlerTest.java | 182 +++++++++++++++++++ .../manager/handlers/MessageInvokerTest.java | 178 ------------------ 18 files changed, 374 insertions(+), 414 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java index f53419d..d01a84c 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -57,7 +57,6 @@ public class GossipCore implements GossipCoreConstants { public static final Logger LOGGER = Logger.getLogger(GossipCore.class); private final GossipManager gossipManager; private ConcurrentHashMap<String, LatchAndBase> requests; - private ThreadPoolExecutor service; private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData; private final ConcurrentHashMap<String, SharedDataMessage> sharedData; private final BlockingQueue<Runnable> workQueue; @@ -71,15 +70,12 @@ public class GossipCore implements GossipCoreConstants { this.gossipManager = manager; requests = new ConcurrentHashMap<>(); workQueue = new ArrayBlockingQueue<>(1024); - service = new ThreadPoolExecutor(1, 5, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy()); perNodeData = new ConcurrentHashMap<>(); sharedData = new ConcurrentHashMap<>(); metrics.register(WORKQUEUE_SIZE, (Gauge<Integer>)() -> workQueue.size()); metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size()); metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() -> sharedData.size()); metrics.register(REQUEST_SIZE, (Gauge<Integer>)() -> requests.size()); - metrics.register(THREADPOOL_ACTIVE, (Gauge<Integer>)() -> service.getActiveCount()); - metrics.register(THREADPOOL_SIZE, (Gauge<Integer>)() -> service.getPoolSize()); messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION); tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION); tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS); @@ -178,17 +174,10 @@ public class GossipCore implements GossipCoreConstants { } public void shutdown(){ - service.shutdown(); - try { - service.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOGGER.warn(e); - } - service.shutdownNow(); } public void receive(Base base) { - if (!gossipManager.getMessageInvoker().invoke(this, gossipManager, base)) { + if (!gossipManager.getMessageHandler().invoke(this, gossipManager, base)) { LOGGER.warn("received message can not be handled"); } } @@ -268,29 +257,10 @@ public class GossipCore implements GossipCoreConstants { * @param message the message to send * @param u the uri to send it to */ - public void sendOneWay(Base message, URI u){ - byte[] json_bytes; + public void sendOneWay(Base message, URI u) { try { - if (privKey == null){ - json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message); - } else { - SignedPayload p = new SignedPayload(); - p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes()); - p.setSignature(sign(p.getData())); - json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p); - } - } catch (IOException e) { - messageSerdeException.mark(); - throw new RuntimeException(e); - } - try (DatagramSocket socket = new DatagramSocket()) { - socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2); - InetAddress dest = InetAddress.getByName(u.getHost()); - DatagramPacket datagramPacket = new DatagramPacket(json_bytes, json_bytes.length, dest, u.getPort()); - socket.send(datagramPacket); - tranmissionSuccess.mark(); - } catch (IOException ex) { - tranmissionException.mark(); + sendInternal(message, u); + } catch (RuntimeException ex) { LOGGER.debug("Send one way failed", ex); } } @@ -304,13 +274,11 @@ public class GossipCore implements GossipCoreConstants { /** * Merge lists from remote members and update heartbeats * - * @param gossipManager * @param senderMember * @param remoteList * */ - public void mergeLists(GossipManager gossipManager, RemoteMember senderMember, - List<Member> remoteList) { + public void mergeLists(RemoteMember senderMember, List<Member> remoteList) { if (LOGGER.isDebugEnabled()){ debugState(senderMember, remoteList); } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java index c2b50ae..ff70ccc 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -25,7 +25,7 @@ import org.apache.gossip.Member; import org.apache.gossip.crdt.Crdt; import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; -import org.apache.gossip.manager.handlers.MessageInvoker; +import org.apache.gossip.manager.handlers.MessageHandler; import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; import org.apache.gossip.model.PerNodeDataMessage; import org.apache.gossip.model.SharedDataMessage; @@ -64,14 +64,14 @@ public abstract class GossipManager { private final GossipMemberStateRefresher memberStateRefresher; private final ObjectMapper objectMapper; - private final MessageInvoker messageInvoker; + private final MessageHandler messageHandler; public GossipManager(String cluster, URI uri, String id, Map<String, String> properties, GossipSettings settings, List<Member> gossipMembers, GossipListener listener, MetricRegistry registry, - ObjectMapper objectMapper, MessageInvoker messageInvoker) { + ObjectMapper objectMapper, MessageHandler messageHandler) { this.settings = settings; - this.messageInvoker = messageInvoker; + this.messageHandler = messageHandler; clock = new SystemClock(); me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties, @@ -101,8 +101,8 @@ public abstract class GossipManager { readSavedDataState(); } - public MessageInvoker getMessageInvoker() { - return messageInvoker; + public MessageHandler getMessageHandler() { + return messageHandler; } public ConcurrentSkipListMap<LocalMember, GossipState> getMembers() { http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java index b87045b..bb73177 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java @@ -25,8 +25,8 @@ import org.apache.gossip.GossipSettings; import org.apache.gossip.StartupSettings; import org.apache.gossip.crdt.CrdtModule; import org.apache.gossip.event.GossipListener; -import org.apache.gossip.manager.handlers.DefaultMessageInvoker; -import org.apache.gossip.manager.handlers.MessageInvoker; +import org.apache.gossip.manager.handlers.MessageHandler; +import org.apache.gossip.manager.handlers.MessageHandlerFactory; import java.net.URI; import java.util.ArrayList; @@ -50,7 +50,7 @@ public class GossipManagerBuilder { private MetricRegistry registry; private Map<String,String> properties; private ObjectMapper objectMapper; - private MessageInvoker messageInvoker; + private MessageHandler messageHandler; private ManagerBuilder() {} @@ -114,8 +114,8 @@ public class GossipManagerBuilder { return this; } - public ManagerBuilder messageInvoker(MessageInvoker messageInvoker) { - this.messageInvoker = messageInvoker; + public ManagerBuilder messageHandler(MessageHandler messageHandler) { + this.messageHandler = messageHandler; return this; } @@ -142,10 +142,10 @@ public class GossipManagerBuilder { objectMapper.registerModule(new CrdtModule()); objectMapper.configure(Feature.WRITE_NUMBERS_AS_STRINGS, false); } - if (messageInvoker == null) { - messageInvoker = new DefaultMessageInvoker(); + if (messageHandler == null) { + messageHandler = MessageHandlerFactory.defaultHandler(); } - return new GossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageInvoker) {} ; + return new GossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageHandler) {} ; } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java index f5e568e..e89179b 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java @@ -32,8 +32,15 @@ import java.util.ArrayList; import java.util.List; public class ActiveGossipMessageHandler implements MessageHandler { + + /** + * @param gossipCore context. + * @param gossipManager context. + * @param base message reference. + * @return boolean indicating success. + */ @Override - public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { List<Member> remoteGossipMembers = new ArrayList<>(); RemoteMember senderMember = null; UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base; @@ -69,6 +76,7 @@ public class ActiveGossipMessageHandler implements MessageHandler { o.setUriFrom(activeGossipMessage.getUriFrom()); o.setUuid(activeGossipMessage.getUuid()); gossipCore.sendOneWay(o, senderMember.getUri()); - gossipCore.mergeLists(gossipManager, senderMember, remoteGossipMembers); + gossipCore.mergeLists(senderMember, remoteGossipMembers); + return true; } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java deleted file mode 100644 index 5b78ce3..0000000 --- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gossip.manager.handlers; - -import org.apache.gossip.manager.GossipCore; -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.model.*; - -public class DefaultMessageInvoker implements MessageInvoker { - private final MessageInvokerCombiner mic; - - public DefaultMessageInvoker() { - mic = new MessageInvokerCombiner(); - mic.add(new SimpleMessageInvoker(Response.class, new ResponseHandler())); - mic.add(new SimpleMessageInvoker(ShutdownMessage.class, new ShutdownMessageHandler())); - mic.add(new SimpleMessageInvoker(PerNodeDataMessage.class, new PerNodeDataMessageHandler())); - mic.add(new SimpleMessageInvoker(SharedDataMessage.class, new SharedDataMessageHandler())); - mic.add(new SimpleMessageInvoker(ActiveGossipMessage.class, new ActiveGossipMessageHandler())); - } - - public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { - return mic.invoke(gossipCore, gossipManager, base); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java index 4b5d49d..5af9b14 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java @@ -22,5 +22,11 @@ import org.apache.gossip.manager.GossipManager; import org.apache.gossip.model.Base; public interface MessageHandler { - void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base); + /** + * @param gossipCore context. + * @param gossipManager context. + * @param base message reference. + * @return boolean indicating success. + */ + boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java new file mode 100644 index 0000000..fff9430 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gossip.manager.handlers; + +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.ActiveGossipMessage; +import org.apache.gossip.model.Base; +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.model.Response; +import org.apache.gossip.model.SharedDataMessage; +import org.apache.gossip.model.ShutdownMessage; + +import java.util.Arrays; + +public class MessageHandlerFactory { + + public static MessageHandler defaultHandler() { + return concurrentHandler( + new TypedMessageHandler(Response.class, new ResponseHandler()), + new TypedMessageHandler(ShutdownMessage.class, new ShutdownMessageHandler()), + new TypedMessageHandler(PerNodeDataMessage.class, new PerNodeDataMessageHandler()), + new TypedMessageHandler(SharedDataMessage.class, new SharedDataMessageHandler()), + new TypedMessageHandler(ActiveGossipMessage.class, new ActiveGossipMessageHandler()) + ); + } + + public static MessageHandler concurrentHandler(MessageHandler... handlers) { + if (handlers == null) throw new NullPointerException("handlers cannot be null"); + if (Arrays.asList(handlers).stream().filter(i -> i != null).count() != handlers.length) { + throw new NullPointerException("found at least one null handler"); + } + return new MessageHandler() { + @Override + public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + // return true if at least one of the component handlers return true. + return Arrays.asList(handlers).stream().filter((mi) -> mi.invoke(gossipCore, gossipManager, base)).count() > 0; + } + }; + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvoker.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvoker.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvoker.java deleted file mode 100644 index 70be408..0000000 --- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvoker.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.manager.handlers; - -import org.apache.gossip.manager.GossipCore; -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.model.Base; - -public interface MessageInvoker { - /** - * - * @param gossipCore - * @param gossipManager - * @param base - * @return true if the invoker processed the message type - */ - boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base); -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java deleted file mode 100644 index 5faf6a5..0000000 --- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gossip.manager.handlers; - -import org.apache.gossip.manager.GossipCore; -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.model.Base; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -public class MessageInvokerCombiner implements MessageInvoker { - private final List<MessageInvoker> invokers = new CopyOnWriteArrayList<>(); - - public MessageInvokerCombiner() { - } - - public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { - return invokers.stream().filter((mi) -> mi.invoke(gossipCore, gossipManager, base)).count() > 0; - } - - public void clear() { - invokers.clear(); - } - - public void add(MessageInvoker mi) { - if (mi == null) { - throw new NullPointerException(); - } - invokers.add(mi); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java index b3a785e..0ad0d91 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java @@ -23,9 +23,17 @@ import org.apache.gossip.model.Base; import org.apache.gossip.udp.UdpPerNodeDataMessage; public class PerNodeDataMessageHandler implements MessageHandler { + + /** + * @param gossipCore context. + * @param gossipManager context. + * @param base message reference. + * @return boolean indicating success. + */ @Override - public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { UdpPerNodeDataMessage message = (UdpPerNodeDataMessage) base; gossipCore.addPerNodeData(message); + return true; } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java index 2f33b01..1070ff7 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java @@ -23,11 +23,20 @@ import org.apache.gossip.model.Base; import org.apache.gossip.udp.Trackable; public class ResponseHandler implements MessageHandler { + + /** + * @param gossipCore context. + * @param gossipManager context. + * @param base message reference. + * @return boolean indicating success. + */ @Override - public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { if (base instanceof Trackable) { Trackable t = (Trackable) base; gossipCore.handleResponse(t.getUuid() + "/" + t.getUriFrom(), (Base) t); + return true; } + return false; } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java index 89ca4a0..3fe3033 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java @@ -23,9 +23,17 @@ import org.apache.gossip.model.Base; import org.apache.gossip.udp.UdpSharedDataMessage; public class SharedDataMessageHandler implements MessageHandler{ + + /** + * @param gossipCore context. + * @param gossipManager context. + * @param base message reference. + * @return boolean indicating success. + */ @Override - public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { UdpSharedDataMessage message = (UdpSharedDataMessage) base; gossipCore.addSharedData(message); + return true; } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java index a40c7a1..40e4c07 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java @@ -24,8 +24,15 @@ import org.apache.gossip.model.PerNodeDataMessage; import org.apache.gossip.model.ShutdownMessage; public class ShutdownMessageHandler implements MessageHandler { + + /** + * @param gossipCore context. + * @param gossipManager context. + * @param base message reference. + * @return boolean indicating success. + */ @Override - public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { ShutdownMessage s = (ShutdownMessage) base; PerNodeDataMessage m = new PerNodeDataMessage(); m.setKey(ShutdownMessage.PER_NODE_KEY); @@ -34,5 +41,6 @@ public class ShutdownMessageHandler implements MessageHandler { m.setTimestamp(System.currentTimeMillis()); m.setExpireAt(System.currentTimeMillis() + 30L * 1000L); gossipCore.addPerNodeData(m); + return true; } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SimpleMessageInvoker.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SimpleMessageInvoker.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SimpleMessageInvoker.java deleted file mode 100644 index 0f410d2..0000000 --- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SimpleMessageInvoker.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.manager.handlers; - -import org.apache.gossip.manager.GossipCore; -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.model.Base; - -public class SimpleMessageInvoker implements MessageInvoker { - final private Class<?> messageClass; - final private MessageHandler messageHandler; - - public SimpleMessageInvoker(Class<?> messageClass, MessageHandler messageHandler) { - if (messageClass == null || messageHandler == null) { - throw new NullPointerException(); - } - this.messageClass = messageClass; - this.messageHandler = messageHandler; - } - - @Override - public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { - if (messageClass.isAssignableFrom(base.getClass())) { - messageHandler.invoke(gossipCore, gossipManager, base); - return true; - } else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/TypedMessageHandler.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/TypedMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/TypedMessageHandler.java new file mode 100644 index 0000000..b40461d --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/TypedMessageHandler.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager.handlers; + +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.Base; + +public class TypedMessageHandler implements MessageHandler { + final private Class<?> messageClass; + final private MessageHandler messageHandler; + + public TypedMessageHandler(Class<?> messageClass, MessageHandler messageHandler) { + if (messageClass == null || messageHandler == null) { + throw new NullPointerException(); + } + this.messageClass = messageClass; + this.messageHandler = messageHandler; + } + + /** + * @param gossipCore context. + * @param gossipManager context. + * @param base message reference. + * @return true if types match, false otherwise. + */ + @Override + public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + if (messageClass.isAssignableFrom(base.getClass())) { + messageHandler.invoke(gossipCore, gossipManager, base); + return true; + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java index 8842643..959f818 100644 --- a/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java @@ -21,10 +21,9 @@ import com.codahale.metrics.MetricRegistry; import org.apache.gossip.Member; import org.apache.gossip.GossipSettings; import org.apache.gossip.LocalMember; -import org.apache.gossip.manager.handlers.DefaultMessageInvoker; -import org.apache.gossip.manager.handlers.MessageInvoker; +import org.apache.gossip.manager.handlers.MessageHandler; import org.apache.gossip.manager.handlers.ResponseHandler; -import org.apache.gossip.manager.handlers.SimpleMessageInvoker; +import org.apache.gossip.manager.handlers.TypedMessageHandler; import org.junit.Assert; import org.junit.jupiter.api.Test; import org.junit.platform.runner.JUnitPlatform; @@ -77,28 +76,27 @@ public class GossipManagerBuilderTest { } @Test - public void createDefaultMessageInvokerIfNull() throws URISyntaxException { + public void createDefaultMessageHandlerIfNull() throws URISyntaxException { GossipManager gossipManager = GossipManagerBuilder.newBuilder() .id("id") .cluster("aCluster") .uri(new URI("udp://localhost:2000")) .gossipSettings(new GossipSettings()) - .messageInvoker(null).registry(new MetricRegistry()).build(); - assertNotNull(gossipManager.getMessageInvoker()); - Assert.assertEquals(gossipManager.getMessageInvoker().getClass(), new DefaultMessageInvoker().getClass()); + .messageHandler(null).registry(new MetricRegistry()).build(); + assertNotNull(gossipManager.getMessageHandler()); } @Test - public void testMessageInvokerKeeping() throws URISyntaxException { - MessageInvoker mi = new SimpleMessageInvoker(Response.class, new ResponseHandler()); + public void testMessageHandlerKeeping() throws URISyntaxException { + MessageHandler mi = new TypedMessageHandler(Response.class, new ResponseHandler()); GossipManager gossipManager = GossipManagerBuilder.newBuilder() .id("id") .cluster("aCluster") .uri(new URI("udp://localhost:2000")) .gossipSettings(new GossipSettings()) - .messageInvoker(mi).registry(new MetricRegistry()).build(); - assertNotNull(gossipManager.getMessageInvoker()); - Assert.assertEquals(gossipManager.getMessageInvoker(), mi); + .messageHandler(mi).registry(new MetricRegistry()).build(); + assertNotNull(gossipManager.getMessageHandler()); + Assert.assertEquals(gossipManager.getMessageHandler(), mi); } @Test http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java new file mode 100644 index 0000000..c035d21 --- /dev/null +++ b/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager.handlers; + +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.ActiveGossipMessage; +import org.apache.gossip.model.Base; +import org.apache.gossip.udp.UdpSharedDataMessage; +import org.junit.Assert; +import org.junit.Test; + +public class MessageHandlerTest { + private class FakeMessage extends Base { + public FakeMessage() { + } + } + + private class FakeMessageData extends Base { + public int data; + + public FakeMessageData(int data) { + this.data = data; + } + } + + private class FakeMessageDataHandler implements MessageHandler { + public int data; + + public FakeMessageDataHandler() { + data = 0; + } + + public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + data = ((FakeMessageData) base).data; + return true; + } + } + + private class FakeMessageHandler implements MessageHandler { + public int counter; + + public FakeMessageHandler() { + counter = 0; + } + + public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + counter++; + return true; + } + } + + @Test + public void testSimpleHandler() { + MessageHandler mi = new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler()); + Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); + Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage())); + } + + @Test(expected = NullPointerException.class) + public void testSimpleHandlerNullClassConstructor() { + new TypedMessageHandler(null, new FakeMessageHandler()); + } + + @Test(expected = NullPointerException.class) + public void testSimpleHandlerNullHandlerConstructor() { + new TypedMessageHandler(FakeMessage.class, null); + } + + @Test + public void testCallCountSimpleHandler() { + FakeMessageHandler h = new FakeMessageHandler(); + MessageHandler mi = new TypedMessageHandler(FakeMessage.class, h); + mi.invoke(null, null, new FakeMessage()); + Assert.assertEquals(1, h.counter); + mi.invoke(null, null, new ActiveGossipMessage()); + Assert.assertEquals(1, h.counter); + mi.invoke(null, null, new FakeMessage()); + Assert.assertEquals(2, h.counter); + } + + @Test(expected = NullPointerException.class) + @SuppressWarnings("all") + public void cantAddNullHandler() { + MessageHandler handler = MessageHandlerFactory.concurrentHandler(null); + } + + @Test(expected = NullPointerException.class) + public void cantAddNullHandler2() { + MessageHandler handler = MessageHandlerFactory.concurrentHandler( + new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler()), + null, + new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler()) + ); + } + + @Test + public void testMessageHandlerCombiner() { + //Empty combiner - false result + MessageHandler mi = MessageHandlerFactory.concurrentHandler(); + Assert.assertFalse(mi.invoke(null, null, new Base())); + + FakeMessageHandler h = new FakeMessageHandler(); + mi = MessageHandlerFactory.concurrentHandler( + new TypedMessageHandler(FakeMessage.class, h), + new TypedMessageHandler(FakeMessage.class, h) + ); + + Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); + Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage())); + Assert.assertEquals(2, h.counter); + + //Increase size in runtime. Should be 3 calls: 2+3 = 5 + mi = MessageHandlerFactory.concurrentHandler(mi, new TypedMessageHandler(FakeMessage.class, h)); + Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); + Assert.assertEquals(5, h.counter); + } + + @Test + public void testMessageHandlerCombiner2levels() { + FakeMessageHandler h = new FakeMessageHandler(); + + MessageHandler mi1 = MessageHandlerFactory.concurrentHandler( + new TypedMessageHandler(FakeMessage.class, h), + new TypedMessageHandler(FakeMessage.class, h) + ); + + MessageHandler mi2 = MessageHandlerFactory.concurrentHandler( + new TypedMessageHandler(FakeMessage.class, h), + new TypedMessageHandler(FakeMessage.class, h) + ); + + MessageHandler mi = MessageHandlerFactory.concurrentHandler(mi1, mi2); + + Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); + Assert.assertEquals(4, h.counter); + } + + @Test + public void testMessageHandlerCombinerDataShipping() { + MessageHandler mi = MessageHandlerFactory.concurrentHandler(); + FakeMessageDataHandler h = new FakeMessageDataHandler(); + mi = MessageHandlerFactory.concurrentHandler(mi, new TypedMessageHandler(FakeMessageData.class, h)); + + Assert.assertTrue(mi.invoke(null, null, new FakeMessageData(101))); + Assert.assertEquals(101, h.data); + } + + @Test + public void testCombiningDefaultHandler() { + MessageHandler mi = MessageHandlerFactory.concurrentHandler( + MessageHandlerFactory.defaultHandler(), + new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler()) + ); + //UdpSharedGossipDataMessage with null gossipCore -> exception + boolean thrown = false; + try { + mi.invoke(null, null, new UdpSharedDataMessage()); + } catch (NullPointerException e) { + thrown = true; + } + Assert.assertTrue(thrown); + //skips FakeMessage and FakeHandler works ok + Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java deleted file mode 100644 index 571d7ba..0000000 --- a/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.manager.handlers; - -import org.apache.gossip.manager.GossipCore; -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.model.ActiveGossipMessage; -import org.apache.gossip.model.Base; -import org.apache.gossip.udp.UdpSharedDataMessage; -import org.junit.Assert; -import org.junit.Test; - -public class MessageInvokerTest { - private class FakeMessage extends Base { - public FakeMessage() { - } - } - - private class FakeMessageData extends Base { - public int data; - - public FakeMessageData(int data) { - this.data = data; - } - } - - private class FakeMessageDataHandler implements MessageHandler { - public int data; - - public FakeMessageDataHandler() { - data = 0; - } - - public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { - data = ((FakeMessageData) base).data; - } - } - - private class FakeMessageHandler implements MessageHandler { - public int counter; - - public FakeMessageHandler() { - counter = 0; - } - - public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { - counter++; - } - } - - @Test - public void testSimpleInvoker() { - MessageInvoker mi = new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler()); - Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); - Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage())); - } - - @Test(expected = NullPointerException.class) - public void testSimpleInvokerNullClassConstructor() { - new SimpleMessageInvoker(null, new FakeMessageHandler()); - } - - @Test(expected = NullPointerException.class) - public void testSimpleInvokerNullHandlerConstructor() { - new SimpleMessageInvoker(FakeMessage.class, null); - } - - @Test - public void testCallCountSimpleInvoker() { - FakeMessageHandler h = new FakeMessageHandler(); - MessageInvoker mi = new SimpleMessageInvoker(FakeMessage.class, h); - mi.invoke(null, null, new FakeMessage()); - Assert.assertEquals(1, h.counter); - mi.invoke(null, null, new ActiveGossipMessage()); - Assert.assertEquals(1, h.counter); - mi.invoke(null, null, new FakeMessage()); - Assert.assertEquals(2, h.counter); - } - - @Test(expected = NullPointerException.class) - public void cantAddNullInvoker() { - MessageInvokerCombiner mi = new MessageInvokerCombiner(); - mi.add(null); - } - - @Test - public void testCombinerClear() { - MessageInvokerCombiner mi = new MessageInvokerCombiner(); - mi.add(new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler())); - Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); - - mi.clear(); - Assert.assertFalse(mi.invoke(null, null, new FakeMessage())); - } - - @Test - public void testMessageInvokerCombiner() { - //Empty combiner - false result - MessageInvokerCombiner mi = new MessageInvokerCombiner(); - Assert.assertFalse(mi.invoke(null, null, new Base())); - - FakeMessageHandler h = new FakeMessageHandler(); - mi.add(new SimpleMessageInvoker(FakeMessage.class, h)); - mi.add(new SimpleMessageInvoker(FakeMessage.class, h)); - - Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); - Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage())); - Assert.assertEquals(2, h.counter); - - //Increase size in runtime. Should be 3 calls: 2+3 = 5 - mi.add(new SimpleMessageInvoker(FakeMessage.class, h)); - Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); - Assert.assertEquals(5, h.counter); - } - - @Test - public void testMessageInvokerCombiner2levels() { - MessageInvokerCombiner mi = new MessageInvokerCombiner(); - FakeMessageHandler h = new FakeMessageHandler(); - - MessageInvokerCombiner mi1 = new MessageInvokerCombiner(); - mi1.add(new SimpleMessageInvoker(FakeMessage.class, h)); - mi1.add(new SimpleMessageInvoker(FakeMessage.class, h)); - - MessageInvokerCombiner mi2 = new MessageInvokerCombiner(); - mi2.add(new SimpleMessageInvoker(FakeMessage.class, h)); - mi2.add(new SimpleMessageInvoker(FakeMessage.class, h)); - - mi.add(mi1); - mi.add(mi2); - - Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); - Assert.assertEquals(4, h.counter); - } - - @Test - public void testMessageInvokerCombinerDataShipping() { - MessageInvokerCombiner mi = new MessageInvokerCombiner(); - FakeMessageDataHandler h = new FakeMessageDataHandler(); - mi.add(new SimpleMessageInvoker(FakeMessageData.class, h)); - - Assert.assertTrue(mi.invoke(null, null, new FakeMessageData(101))); - Assert.assertEquals(101, h.data); - } - - @Test - public void testCombiningDefaultInvoker() { - MessageInvokerCombiner mi = new MessageInvokerCombiner(); - mi.add(new DefaultMessageInvoker()); - mi.add(new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler())); - //UdpSharedGossipDataMessage with null gossipCore -> exception - boolean thrown = false; - try { - mi.invoke(null, null, new UdpSharedDataMessage()); - } catch (NullPointerException e) { - thrown = true; - } - Assert.assertTrue(thrown); - //DefaultInvoker skips FakeMessage and FakeHandler works ok - Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); - } - -}
