Repository: incubator-gossip Updated Branches: refs/heads/master c544b8bf1 -> 851cd93e6
GOSSIP-79 Isolate UDP and JSON code With these changes, it should now be possible to create alternate serialization (e.g. Gson or native) and transports (like HTTP). To make this PR reviewable I decided against creating new modules right now. That can be done subsequently in another PR that doesn't modify any code. * Creates two new interfaces: `TransportManager` and `ProtocolManager` * Implementation classes must honor a common constructor interface * Includes UDP and Jackson implementations of those. * `AbstractTransportManager` has a lot of boilerplate that includes: * starting the active gossiper, and * starting the passive gossiper. I spent some time trying to polish the implementations to become less dependent on references to `GossipManager`. I still feel there is a lot of room for improvement. Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/851cd93e Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/851cd93e Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/851cd93e Branch: refs/heads/master Commit: 851cd93e67674133407dd180e0d340b41e0fa4f9 Parents: c544b8b Author: Gary Dusbabek <gdusba...@gmail.com> Authored: Mon Apr 17 12:37:42 2017 -0500 Committer: Gary Dusbabek <gdusba...@gmail.com> Committed: Wed Apr 19 09:32:58 2017 -0500 ---------------------------------------------------------------------- .../java/org/apache/gossip/GossipSettings.java | 12 +- .../org/apache/gossip/manager/GossipCore.java | 65 +------- .../apache/gossip/manager/GossipManager.java | 154 +++++++++++-------- .../gossip/manager/GossipManagerBuilder.java | 20 +-- .../gossip/manager/PassiveGossipThread.java | 61 ++------ .../gossip/manager/RingStatePersister.java | 43 +++--- .../gossip/manager/UserDataPersister.java | 50 +++--- .../OnlyProcessReceivedPassiveGossipThread.java | 33 ---- .../gossip/protocol/JacksonProtocolManager.java | 131 ++++++++++++++++ .../apache/gossip/protocol/ProtocolManager.java | 41 +++++ .../transport/AbstractTransportManager.java | 86 +++++++++++ .../gossip/transport/TransportManager.java | 40 +++++ .../gossip/transport/UdpTransportManager.java | 98 ++++++++++++ .../apache/gossip/utils/ReflectionUtils.java | 53 +++++++ .../java/org/apache/gossip/crdt/OrSetTest.java | 16 +- .../manager/GossipManagerBuilderTest.java | 38 ++--- .../gossip/manager/RingPersistenceTest.java | 2 +- .../gossip/manager/UserDataPersistenceTest.java | 4 +- .../manager/handlers/MessageHandlerTest.java | 2 +- 19 files changed, 636 insertions(+), 313 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java index 6b2bf8b..e4a95d3 100644 --- a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java +++ b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java @@ -45,6 +45,9 @@ public class GossipSettings { private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper"; + private String transportManagerClass = "org.apache.gossip.transport.UdpTransportManager"; + private String protocolManagerClass = "org.apache.gossip.protocol.JacksonProtocolManager"; + private Map<String,String> activeGossipProperties = new HashMap<>(); private String pathToRingState = "./"; @@ -222,5 +225,12 @@ public class GossipSettings { public void setSignMessages(boolean signMessages) { this.signMessages = signMessages; } - + + public String getTransportManagerClass() { + return transportManagerClass; + } + + public String getProtocolManagerClass() { + return protocolManagerClass; + } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java index d01a84c..e034432 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -29,16 +29,8 @@ import org.apache.gossip.model.*; import org.apache.gossip.udp.Trackable; import org.apache.log4j.Logger; -import java.io.File; -import java.io.FileInputStream; import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetAddress; import java.net.URI; -import java.security.*; -import java.security.spec.InvalidKeySpecException; -import java.security.spec.PKCS8EncodedKeySpec; import java.util.List; import java.util.Map.Entry; import java.util.concurrent.*; @@ -60,8 +52,6 @@ public class GossipCore implements GossipCoreConstants { private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData; private final ConcurrentHashMap<String, SharedDataMessage> sharedData; private final BlockingQueue<Runnable> workQueue; - private final PKCS8EncodedKeySpec privKeySpec; - private final PrivateKey privKey; private final Meter messageSerdeException; private final Meter tranmissionException; private final Meter tranmissionSuccess; @@ -79,42 +69,6 @@ public class GossipCore implements GossipCoreConstants { messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION); tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION); tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS); - - if (manager.getSettings().isSignMessages()){ - File privateKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId()); - File publicKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId() + ".pub"); - if (!privateKey.exists()){ - throw new IllegalArgumentException("private key not found " + privateKey); - } - if (!publicKey.exists()){ - throw new IllegalArgumentException("public key not found " + publicKey); - } - try (FileInputStream keyfis = new FileInputStream(privateKey)) { - byte[] encKey = new byte[keyfis.available()]; - keyfis.read(encKey); - keyfis.close(); - privKeySpec = new PKCS8EncodedKeySpec(encKey); - KeyFactory keyFactory = KeyFactory.getInstance("DSA"); - privKey = keyFactory.generatePrivate(privKeySpec); - } catch (NoSuchAlgorithmException | InvalidKeySpecException | IOException e) { - throw new RuntimeException("failed hard", e); - } - } else { - privKeySpec = null; - privKey = null; - } - } - - private byte [] sign(byte [] bytes){ - Signature dsa; - try { - dsa = Signature.getInstance("SHA1withDSA", "SUN"); - dsa.initSign(privKey); - dsa.update(bytes); - return dsa.sign(); - } catch (NoSuchAlgorithmException | NoSuchProviderException | InvalidKeyException | SignatureException e) { - throw new RuntimeException(e); - } } @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -184,30 +138,21 @@ public class GossipCore implements GossipCoreConstants { /** * Sends a blocking message. + * todo: move functionality to TransportManager layer. * @param message * @param uri * @throws RuntimeException if data can not be serialized or in transmission error */ - private void sendInternal(Base message, URI uri){ + private void sendInternal(Base message, URI uri) { byte[] json_bytes; try { - if (privKey == null){ - json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message); - } else { - SignedPayload p = new SignedPayload(); - p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes()); - p.setSignature(sign(p.getData())); - json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p); - } + json_bytes = gossipManager.getProtocolManager().write(message); } catch (IOException e) { messageSerdeException.mark(); throw new RuntimeException(e); } - try (DatagramSocket socket = new DatagramSocket()) { - socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2); - InetAddress dest = InetAddress.getByName(uri.getHost()); - DatagramPacket datagramPacket = new DatagramPacket(json_bytes, json_bytes.length, dest, uri.getPort()); - socket.send(datagramPacket); + try { + gossipManager.getTransportManager().send(uri, json_bytes); tranmissionSuccess.mark(); } catch (IOException e) { tranmissionException.mark(); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java index ff70ccc..b1752cd 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -18,6 +18,7 @@ package org.apache.gossip.manager; import com.codahale.metrics.MetricRegistry; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.gossip.GossipSettings; import org.apache.gossip.LocalMember; @@ -26,13 +27,14 @@ import org.apache.gossip.crdt.Crdt; import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; import org.apache.gossip.manager.handlers.MessageHandler; -import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; import org.apache.gossip.model.PerNodeDataMessage; import org.apache.gossip.model.SharedDataMessage; +import org.apache.gossip.protocol.ProtocolManager; +import org.apache.gossip.transport.TransportManager; +import org.apache.gossip.utils.ReflectionUtils; import org.apache.log4j.Logger; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; +import java.io.File; import java.net.URI; import java.util.Collections; import java.util.List; @@ -46,14 +48,21 @@ import java.util.stream.Collectors; public abstract class GossipManager { public static final Logger LOGGER = Logger.getLogger(GossipManager.class); + + // this mapper is used for ring and user-data persistence only. NOT messages. + public static final ObjectMapper metdataObjectMapper = new ObjectMapper() {{ + enableDefaultTyping(); + configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false); + }}; private final ConcurrentSkipListMap<LocalMember, GossipState> members; private final LocalMember me; private final GossipSettings settings; private final AtomicBoolean gossipServiceRunning; - private AbstractActiveGossiper activeGossipThread; - private PassiveGossipThread passiveGossipThread; - private ExecutorService gossipThreadExecutor; + + private TransportManager transportManager; + private ProtocolManager protocolManager; + private final GossipCore gossipCore; private final DataReaper dataReaper; private final Clock clock; @@ -62,14 +71,13 @@ public abstract class GossipManager { private final RingStatePersister ringState; private final UserDataPersister userDataState; private final GossipMemberStateRefresher memberStateRefresher; - private final ObjectMapper objectMapper; - + private final MessageHandler messageHandler; - + public GossipManager(String cluster, URI uri, String id, Map<String, String> properties, GossipSettings settings, List<Member> gossipMembers, GossipListener listener, MetricRegistry registry, - ObjectMapper objectMapper, MessageHandler messageHandler) { + MessageHandler messageHandler) { this.settings = settings; this.messageHandler = messageHandler; @@ -89,14 +97,15 @@ public abstract class GossipManager { members.put(member, GossipState.DOWN); } } - gossipThreadExecutor = Executors.newCachedThreadPool(); gossipServiceRunning = new AtomicBoolean(true); this.scheduledServiced = Executors.newScheduledThreadPool(1); this.registry = registry; - this.ringState = new RingStatePersister(this); - this.userDataState = new UserDataPersister(this, this.gossipCore); + this.ringState = new RingStatePersister(GossipManager.buildRingStatePath(this), this); + this.userDataState = new UserDataPersister( + gossipCore, + GossipManager.buildPerNodeDataPath(this), + GossipManager.buildSharedDataPath(this)); this.memberStateRefresher = new GossipMemberStateRefresher(members, settings, listener, this::findPerNodeGossipData); - this.objectMapper = objectMapper; readSavedRingState(); readSavedDataState(); } @@ -140,49 +149,66 @@ public abstract class GossipManager { return me; } - private AbstractActiveGossiper constructActiveGossiper(){ - try { - Constructor<?> c = Class.forName(settings.getActiveGossipClass()).getConstructor(GossipManager.class, GossipCore.class, MetricRegistry.class); - return (AbstractActiveGossiper) c.newInstance(this, gossipCore, registry); - } catch (NoSuchMethodException | SecurityException | ClassNotFoundException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - /** * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip * thread and start the receiver thread. */ public void init() { - passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore); - gossipThreadExecutor.execute(passiveGossipThread); - activeGossipThread = constructActiveGossiper(); - activeGossipThread.init(); + + // protocol manager and transport managers are specified in settings. + // construct them here via reflection. + + protocolManager = ReflectionUtils.constructWithReflection( + settings.getProtocolManagerClass(), + new Class<?>[] { GossipSettings.class, String.class, MetricRegistry.class }, + new Object[] { settings, me.getId(), this.getRegistry() } + ); + + transportManager = ReflectionUtils.constructWithReflection( + settings.getTransportManagerClass(), + new Class<?>[] { GossipManager.class, GossipCore.class}, + new Object[] { this, gossipCore } + ); + + // start processing gossip messages. + transportManager.startEndpoint(); + transportManager.startActiveGossiper(); + dataReaper.init(); - scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS); - scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS); + if (settings.isPersistRingState()) { + scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS); + } + if (settings.isPersistDataState()) { + scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS); + } scheduledServiced.scheduleAtFixedRate(memberStateRefresher, 0, 100, TimeUnit.MILLISECONDS); LOGGER.debug("The GossipManager is started."); } - + private void readSavedRingState() { - for (LocalMember l : ringState.readFromDisk()){ - LocalMember member = new LocalMember(l.getClusterName(), - l.getUri(), l.getId(), - clock.nanoTime(), l.getProperties(), settings.getWindowSize(), - settings.getMinimumSamples(), settings.getDistribution()); - members.putIfAbsent(member, GossipState.DOWN); + if (settings.isPersistRingState()) { + for (LocalMember l : ringState.readFromDisk()) { + LocalMember member = new LocalMember(l.getClusterName(), + l.getUri(), l.getId(), + clock.nanoTime(), l.getProperties(), settings.getWindowSize(), + settings.getMinimumSamples(), settings.getDistribution()); + members.putIfAbsent(member, GossipState.DOWN); + } } } private void readSavedDataState() { - for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> l : userDataState.readPerNodeFromDisk().entrySet()){ - for (Entry<String, PerNodeDataMessage> j : l.getValue().entrySet()){ - gossipCore.addPerNodeData(j.getValue()); + if (settings.isPersistDataState()) { + for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> l : userDataState.readPerNodeFromDisk().entrySet()) { + for (Entry<String, PerNodeDataMessage> j : l.getValue().entrySet()) { + gossipCore.addPerNodeData(j.getValue()); + } } } - for (Entry<String, SharedDataMessage> l: userDataState.readSharedDataFromDisk().entrySet()){ - gossipCore.addSharedData(l.getValue()); + if (settings.isPersistRingState()) { + for (Entry<String, SharedDataMessage> l : userDataState.readSharedDataFromDisk().entrySet()) { + gossipCore.addSharedData(l.getValue()); + } } } @@ -191,24 +217,9 @@ public abstract class GossipManager { */ public void shutdown() { gossipServiceRunning.set(false); - gossipThreadExecutor.shutdown(); gossipCore.shutdown(); + transportManager.shutdown(); dataReaper.close(); - if (passiveGossipThread != null) { - passiveGossipThread.shutdown(); - } - if (activeGossipThread != null) { - activeGossipThread.shutdown(); - } - try { - boolean result = gossipThreadExecutor.awaitTermination(10, TimeUnit.MILLISECONDS); - if (!result) { - LOGGER.error("executor shutdown timed out"); - } - } catch (InterruptedException e) { - LOGGER.error(e); - } - gossipThreadExecutor.shutdownNow(); scheduledServiced.shutdown(); try { scheduledServiced.awaitTermination(1, TimeUnit.SECONDS); @@ -234,7 +245,6 @@ public abstract class GossipManager { gossipCore.addSharedData(message); } - @SuppressWarnings("rawtypes") public Crdt findCrdt(String key){ SharedDataMessage l = gossipCore.getSharedData().get(key); @@ -308,12 +318,32 @@ public abstract class GossipManager { return clock; } - public ObjectMapper getObjectMapper() { - return objectMapper; - } - public MetricRegistry getRegistry() { return registry; } + public ProtocolManager getProtocolManager() { + return protocolManager; + } + + public TransportManager getTransportManager() { + return transportManager; + } + + // todo: consider making these path methods part of GossipSettings + + public static File buildRingStatePath(GossipManager manager) { + return new File(manager.getSettings().getPathToRingState(), "ringstate." + manager.getMyself().getClusterName() + "." + + manager.getMyself().getId() + ".json"); + } + + public static File buildSharedDataPath(GossipManager manager){ + return new File(manager.getSettings().getPathToDataState(), "shareddata." + + manager.getMyself().getClusterName() + "." + manager.getMyself().getId() + ".json"); + } + + public static File buildPerNodeDataPath(GossipManager manager) { + return new File(manager.getSettings().getPathToDataState(), "pernodedata." + + manager.getMyself().getClusterName() + "." + manager.getMyself().getId() + ".json"); + } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java index bb73177..86dca57 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java @@ -18,12 +18,9 @@ package org.apache.gossip.manager; import com.codahale.metrics.MetricRegistry; -import com.fasterxml.jackson.core.JsonGenerator.Feature; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.gossip.Member; import org.apache.gossip.GossipSettings; import org.apache.gossip.StartupSettings; -import org.apache.gossip.crdt.CrdtModule; import org.apache.gossip.event.GossipListener; import org.apache.gossip.manager.handlers.MessageHandler; import org.apache.gossip.manager.handlers.MessageHandlerFactory; @@ -49,7 +46,6 @@ public class GossipManagerBuilder { private GossipListener listener; private MetricRegistry registry; private Map<String,String> properties; - private ObjectMapper objectMapper; private MessageHandler messageHandler; private ManagerBuilder() {} @@ -108,11 +104,6 @@ public class GossipManagerBuilder { this.uri = uri; return this; } - - public ManagerBuilder mapper(ObjectMapper objectMapper){ - this.objectMapper = objectMapper; - return this; - } public ManagerBuilder messageHandler(MessageHandler messageHandler) { this.messageHandler = messageHandler; @@ -136,16 +127,11 @@ public class GossipManagerBuilder { if (gossipMembers == null) { gossipMembers = new ArrayList<>(); } - if (objectMapper == null) { - objectMapper = new ObjectMapper(); - objectMapper.enableDefaultTyping(); - objectMapper.registerModule(new CrdtModule()); - objectMapper.configure(Feature.WRITE_NUMBERS_AS_STRINGS, false); - } + if (messageHandler == null) { messageHandler = MessageHandlerFactory.defaultHandler(); - } - return new GossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageHandler) {} ; + } + return new GossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, messageHandler) {} ; } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java index ae28bf7..30e39d5 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java @@ -18,34 +18,25 @@ package org.apache.gossip.manager; import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.SocketException; + import java.util.concurrent.atomic.AtomicBoolean; import org.apache.gossip.model.Base; -import org.apache.gossip.model.SignedPayload; import org.apache.log4j.Logger; -import com.codahale.metrics.Meter; /** * This class handles the passive cycle, * where this client has received an incoming message. */ -abstract public class PassiveGossipThread implements Runnable { +public class PassiveGossipThread implements Runnable { public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class); - /** The socket used for the passive thread of the gossip service. */ - private final DatagramSocket server; + private final AtomicBoolean keepRunning; private final GossipCore gossipCore; private final GossipManager gossipManager; - private final Meter signed; - private final Meter unsigned; public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) { this.gossipManager = gossipManager; @@ -53,38 +44,18 @@ abstract public class PassiveGossipThread implements Runnable { if (gossipManager.getMyself().getClusterName() == null){ throw new IllegalArgumentException("Cluster was null"); } - try { - SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(), - gossipManager.getMyself().getUri().getPort()); - server = new DatagramSocket(socketAddress); - } catch (SocketException ex) { - LOGGER.warn(ex); - throw new RuntimeException(ex); - } + keepRunning = new AtomicBoolean(true); - signed = gossipManager.getRegistry().meter(PassiveGossipConstants.SIGNED_MESSAGE); - unsigned = gossipManager.getRegistry().meter(PassiveGossipConstants.UNSIGNED_MESSAGE); } @Override public void run() { while (keepRunning.get()) { try { - byte[] buf = new byte[server.getReceiveBufferSize()]; - DatagramPacket p = new DatagramPacket(buf, buf.length); - server.receive(p); - debug(p.getData()); + byte[] buf = gossipManager.getTransportManager().read(); try { - Base activeGossipMessage = gossipManager.getObjectMapper().readValue(p.getData(), Base.class); - if (activeGossipMessage instanceof SignedPayload){ - SignedPayload s = (SignedPayload) activeGossipMessage; - Base nested = gossipManager.getObjectMapper().readValue(s.getData(), Base.class); - gossipCore.receive(nested); - signed.mark(); - } else { - gossipCore.receive(activeGossipMessage); - unsigned.mark(); - } + Base message = gossipManager.getProtocolManager().read(buf); + gossipCore.receive(message); gossipManager.getMemberStateRefresher().run(); } catch (RuntimeException ex) {//TODO trap json exception LOGGER.error("Unable to process message", ex); @@ -94,21 +65,9 @@ abstract public class PassiveGossipThread implements Runnable { keepRunning.set(false); } } - shutdown(); - } - - private void debug(byte[] jsonBytes) { - if (LOGGER.isDebugEnabled()){ - String receivedMessage = new String(jsonBytes); - LOGGER.debug("Received message ( bytes): " + receivedMessage); - } } - - public void shutdown() { - try { - server.close(); - } catch (RuntimeException ex) { - } + + public void requestStop() { + keepRunning.set(false); } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java b/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java index 7e42562..0af9f12 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java @@ -26,16 +26,24 @@ import java.util.Collections; import java.util.List; import java.util.NavigableSet; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.gossip.LocalMember; +import org.apache.gossip.crdt.CrdtModule; import org.apache.log4j.Logger; public class RingStatePersister implements Runnable { private static final Logger LOGGER = Logger.getLogger(RingStatePersister.class); - private GossipManager parent; + private final File path; + // NOTE: this is a different instance than what gets used for message marshalling. + private final ObjectMapper objectMapper; + private final GossipManager manager; - public RingStatePersister(GossipManager parent){ - this.parent = parent; + public RingStatePersister(File path, GossipManager manager){ + this.path = path; + this.objectMapper = GossipManager.metdataObjectMapper; + this.manager = manager; } @Override @@ -43,34 +51,25 @@ public class RingStatePersister implements Runnable { writeToDisk(); } - File computeTarget(){ - return new File(parent.getSettings().getPathToRingState(), "ringstate." + parent.getMyself().getClusterName() + "." - + parent.getMyself().getId() + ".json"); - } - - void writeToDisk(){ - if (!parent.getSettings().isPersistRingState()){ - return; - } - NavigableSet<LocalMember> i = parent.getMembers().keySet(); - try (FileOutputStream fos = new FileOutputStream(computeTarget())){ - parent.getObjectMapper().writeValue(fos, i); + void writeToDisk() { + NavigableSet<LocalMember> i = manager.getMembers().keySet(); + try (FileOutputStream fos = new FileOutputStream(path)){ + objectMapper.writeValue(fos, i); } catch (IOException e) { LOGGER.debug(e); } } @SuppressWarnings("unchecked") - List<LocalMember> readFromDisk(){ - if (!parent.getSettings().isPersistRingState()){ - return Collections.emptyList(); + List<LocalMember> readFromDisk() { + if (!path.exists()) { + return new ArrayList<>(); } - try (FileInputStream fos = new FileInputStream(computeTarget())){ - return parent.getObjectMapper().readValue(fos, ArrayList.class); + try (FileInputStream fos = new FileInputStream(path)){ + return objectMapper.readValue(fos, ArrayList.class); } catch (IOException e) { LOGGER.debug(e); } - return Collections.emptyList(); + return new ArrayList<>(); } - } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/manager/UserDataPersister.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/UserDataPersister.java b/gossip-base/src/main/java/org/apache/gossip/manager/UserDataPersister.java index 3b9eafa..28c3151 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/UserDataPersister.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/UserDataPersister.java @@ -23,6 +23,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.gossip.model.PerNodeDataMessage; import org.apache.gossip.model.SharedDataMessage; import org.apache.log4j.Logger; @@ -30,31 +31,26 @@ import org.apache.log4j.Logger; public class UserDataPersister implements Runnable { private static final Logger LOGGER = Logger.getLogger(UserDataPersister.class); - private final GossipManager parent; private final GossipCore gossipCore; - UserDataPersister(GossipManager parent, GossipCore gossipCore){ - this.parent = parent; - this.gossipCore = gossipCore; - } - - File computeSharedTarget(){ - return new File(parent.getSettings().getPathToDataState(), "shareddata." - + parent.getMyself().getClusterName() + "." + parent.getMyself().getId() + ".json"); - } + private final File perNodePath; + private final File sharedPath; + private final ObjectMapper objectMapper; - File computePerNodeTarget() { - return new File(parent.getSettings().getPathToDataState(), "pernodedata." - + parent.getMyself().getClusterName() + "." + parent.getMyself().getId() + ".json"); + UserDataPersister(GossipCore gossipCore, File perNodePath, File sharedPath) { + this.gossipCore = gossipCore; + this.objectMapper = GossipManager.metdataObjectMapper; + this.perNodePath = perNodePath; + this.sharedPath = sharedPath; } @SuppressWarnings("unchecked") ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> readPerNodeFromDisk(){ - if (!parent.getSettings().isPersistDataState()){ + if (!perNodePath.exists()) { return new ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>>(); } - try (FileInputStream fos = new FileInputStream(computePerNodeTarget())){ - return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class); + try (FileInputStream fos = new FileInputStream(perNodePath)){ + return objectMapper.readValue(fos, ConcurrentHashMap.class); } catch (IOException e) { LOGGER.debug(e); } @@ -62,22 +58,16 @@ public class UserDataPersister implements Runnable { } void writePerNodeToDisk(){ - if (!parent.getSettings().isPersistDataState()){ - return; - } - try (FileOutputStream fos = new FileOutputStream(computePerNodeTarget())){ - parent.getObjectMapper().writeValue(fos, gossipCore.getPerNodeData()); + try (FileOutputStream fos = new FileOutputStream(perNodePath)){ + objectMapper.writeValue(fos, gossipCore.getPerNodeData()); } catch (IOException e) { LOGGER.warn(e); } } void writeSharedToDisk(){ - if (!parent.getSettings().isPersistDataState()){ - return; - } - try (FileOutputStream fos = new FileOutputStream(computeSharedTarget())){ - parent.getObjectMapper().writeValue(fos, gossipCore.getSharedData()); + try (FileOutputStream fos = new FileOutputStream(sharedPath)){ + objectMapper.writeValue(fos, gossipCore.getSharedData()); } catch (IOException e) { LOGGER.warn(e); } @@ -85,11 +75,11 @@ public class UserDataPersister implements Runnable { @SuppressWarnings("unchecked") ConcurrentHashMap<String, SharedDataMessage> readSharedDataFromDisk(){ - if (!parent.getSettings().isPersistRingState()){ - return new ConcurrentHashMap<String, SharedDataMessage>(); + if (!sharedPath.exists()) { + return new ConcurrentHashMap<>(); } - try (FileInputStream fos = new FileInputStream(computeSharedTarget())){ - return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class); + try (FileInputStream fos = new FileInputStream(sharedPath)){ + return objectMapper.readValue(fos, ConcurrentHashMap.class); } catch (IOException e) { LOGGER.debug(e); } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/gossip-base/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java deleted file mode 100644 index dff5056..0000000 --- a/gossip-base/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.manager.impl; - -import org.apache.gossip.manager.GossipCore; -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.manager.PassiveGossipThread; -import org.apache.log4j.Logger; - -public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread { - - public static final Logger LOGGER = Logger.getLogger(OnlyProcessReceivedPassiveGossipThread.class); - - public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) { - super(gossipManager, gossipCore); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/protocol/JacksonProtocolManager.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/protocol/JacksonProtocolManager.java b/gossip-base/src/main/java/org/apache/gossip/protocol/JacksonProtocolManager.java new file mode 100644 index 0000000..91ed7f9 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/protocol/JacksonProtocolManager.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.protocol; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.gossip.GossipSettings; +import org.apache.gossip.crdt.CrdtModule; +import org.apache.gossip.manager.PassiveGossipConstants; +import org.apache.gossip.model.Base; +import org.apache.gossip.model.SignedPayload; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.security.InvalidKeyException; +import java.security.KeyFactory; +import java.security.NoSuchAlgorithmException; +import java.security.NoSuchProviderException; +import java.security.PrivateKey; +import java.security.Signature; +import java.security.SignatureException; +import java.security.spec.InvalidKeySpecException; +import java.security.spec.PKCS8EncodedKeySpec; + +// this class is constructed by reflection in GossipManager. +public class JacksonProtocolManager implements ProtocolManager { + + private final ObjectMapper objectMapper; + private final PrivateKey privKey; + private final Meter signed; + private final Meter unsigned; + + /** required for reflection to work! */ + public JacksonProtocolManager(GossipSettings settings, String id, MetricRegistry registry) { + // set up object mapper. + objectMapper = buildObjectMapper(settings); + + // set up message signing. + if (settings.isSignMessages()){ + File privateKey = new File(settings.getPathToKeyStore(), id); + File publicKey = new File(settings.getPathToKeyStore(), id + ".pub"); + if (!privateKey.exists()){ + throw new IllegalArgumentException("private key not found " + privateKey); + } + if (!publicKey.exists()){ + throw new IllegalArgumentException("public key not found " + publicKey); + } + try (FileInputStream keyfis = new FileInputStream(privateKey)) { + byte[] encKey = new byte[keyfis.available()]; + keyfis.read(encKey); + keyfis.close(); + PKCS8EncodedKeySpec privKeySpec = new PKCS8EncodedKeySpec(encKey); + KeyFactory keyFactory = KeyFactory.getInstance("DSA"); + privKey = keyFactory.generatePrivate(privKeySpec); + } catch (NoSuchAlgorithmException | InvalidKeySpecException | IOException e) { + throw new RuntimeException("failed hard", e); + } + } else { + privKey = null; + } + + signed = registry.meter(PassiveGossipConstants.SIGNED_MESSAGE); + unsigned = registry.meter(PassiveGossipConstants.UNSIGNED_MESSAGE); + } + + @Override + public byte[] write(Base message) throws IOException { + byte[] json_bytes; + if (privKey == null){ + json_bytes = objectMapper.writeValueAsBytes(message); + } else { + SignedPayload p = new SignedPayload(); + p.setData(objectMapper.writeValueAsString(message).getBytes()); + p.setSignature(sign(p.getData(), privKey)); + json_bytes = objectMapper.writeValueAsBytes(p); + } + return json_bytes; + } + + @Override + public Base read(byte[] buf) throws IOException { + Base activeGossipMessage = objectMapper.readValue(buf, Base.class); + if (activeGossipMessage instanceof SignedPayload){ + SignedPayload s = (SignedPayload) activeGossipMessage; + signed.mark(); + return objectMapper.readValue(s.getData(), Base.class); + } else { + unsigned.mark(); + return activeGossipMessage; + } + } + + public static ObjectMapper buildObjectMapper(GossipSettings settings) { + ObjectMapper om = new ObjectMapper(); + om.enableDefaultTyping(); + // todo: should be specified in the configuration. + om.registerModule(new CrdtModule()); + om.configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false); + return om; + } + + private static byte[] sign(byte [] bytes, PrivateKey pk){ + Signature dsa; + try { + dsa = Signature.getInstance("SHA1withDSA", "SUN"); + dsa.initSign(pk); + dsa.update(bytes); + return dsa.sign(); + } catch (NoSuchAlgorithmException | NoSuchProviderException | InvalidKeyException | SignatureException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/protocol/ProtocolManager.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/protocol/ProtocolManager.java b/gossip-base/src/main/java/org/apache/gossip/protocol/ProtocolManager.java new file mode 100644 index 0000000..0f553c7 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/protocol/ProtocolManager.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.protocol; + +import org.apache.gossip.model.Base; + +import java.io.IOException; + +/** interface for managing message marshaling. */ +public interface ProtocolManager { + + /** serialize a message + * @param message + * @return serialized message. + * @throws IOException + */ + byte[] write(Base message) throws IOException; + + /** + * Reads the next message from a byte source. + * @param buf + * @return a gossip message. + * @throws IOException + */ + Base read(byte[] buf) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java b/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java new file mode 100644 index 0000000..497e605 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.transport; + +import com.codahale.metrics.MetricRegistry; +import org.apache.gossip.manager.AbstractActiveGossiper; +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.PassiveGossipThread; +import org.apache.gossip.utils.ReflectionUtils; +import org.apache.log4j.Logger; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Manage the protcol threads (active and passive gossipers). + */ +public abstract class AbstractTransportManager implements TransportManager { + + public static final Logger LOGGER = Logger.getLogger(AbstractTransportManager.class); + + private final PassiveGossipThread passiveGossipThread; + private final ExecutorService gossipThreadExecutor; + + private final AbstractActiveGossiper activeGossipThread; + + public AbstractTransportManager(GossipManager gossipManager, GossipCore gossipCore) { + + passiveGossipThread = new PassiveGossipThread(gossipManager, gossipCore); + gossipThreadExecutor = Executors.newCachedThreadPool(); + activeGossipThread = ReflectionUtils.constructWithReflection( + gossipManager.getSettings().getActiveGossipClass(), + new Class<?>[]{ + GossipManager.class, GossipCore.class, MetricRegistry.class + }, + new Object[]{ + gossipManager, gossipCore, gossipManager.getRegistry() + }); + } + + // shut down threads etc. + @Override + public void shutdown() { + passiveGossipThread.requestStop(); + gossipThreadExecutor.shutdown(); + if (activeGossipThread != null) { + activeGossipThread.shutdown(); + } + try { + boolean result = gossipThreadExecutor.awaitTermination(10, TimeUnit.MILLISECONDS); + if (!result) { + LOGGER.error("executor shutdown timed out"); + } + } catch (InterruptedException e) { + LOGGER.error(e); + } + gossipThreadExecutor.shutdownNow(); + } + + @Override + public void startActiveGossiper() { + activeGossipThread.init(); + } + + @Override + public void startEndpoint() { + gossipThreadExecutor.execute(passiveGossipThread); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java b/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java new file mode 100644 index 0000000..031d90e --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.transport; + +import java.io.IOException; +import java.net.URI; + +/** interface for manage that sends and receives messages that have already been serialized. */ +public interface TransportManager { + + /** starts the active gossip thread responsible for reaching out to remote nodes. Not related to `startEndpoint()` */ + void startActiveGossiper(); + + /** starts the passive gossip thread that receives messages from remote nodes. Not related to `startActiveGossiper()` */ + void startEndpoint(); + + /** attempts to shutdown all threads. */ + void shutdown(); + + /** sends a payload to an endpoint. */ + void send(URI endpoint, byte[] buf) throws IOException; + + /** gets the next payload being sent to this node */ + byte[] read() throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/transport/UdpTransportManager.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/transport/UdpTransportManager.java b/gossip-base/src/main/java/org/apache/gossip/transport/UdpTransportManager.java new file mode 100644 index 0000000..d80deec --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/transport/UdpTransportManager.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.transport; + +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.SocketException; +import java.net.URI; + +/** + * This class is constructed by reflection in GossipManager. + * It manages transport (byte read/write) operations over UDP. + */ +public class UdpTransportManager extends AbstractTransportManager { + + public static final Logger LOGGER = Logger.getLogger(UdpTransportManager.class); + + /** The socket used for the passive thread of the gossip service. */ + private final DatagramSocket server; + + private final int soTimeout; + + /** required for reflection to work! */ + public UdpTransportManager(GossipManager gossipManager, GossipCore gossipCore) { + super(gossipManager, gossipCore); + + soTimeout = gossipManager.getSettings().getGossipInterval() * 2; + + try { + SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(), + gossipManager.getMyself().getUri().getPort()); + server = new DatagramSocket(socketAddress); + } catch (SocketException ex) { + LOGGER.warn(ex); + throw new RuntimeException(ex); + } + } + + @Override + public void shutdown() { + server.close(); + super.shutdown(); + } + + /** + * blocking read a message. + * @return buffer of message contents. + * @throws IOException + */ + public byte[] read() throws IOException { + byte[] buf = new byte[server.getReceiveBufferSize()]; + DatagramPacket p = new DatagramPacket(buf, buf.length); + server.receive(p); + debug(p.getData()); + return p.getData(); + } + + @Override + public void send(URI endpoint, byte[] buf) throws IOException { + DatagramSocket socket = new DatagramSocket(); + socket.setSoTimeout(soTimeout); + InetAddress dest = InetAddress.getByName(endpoint.getHost()); + DatagramPacket payload = new DatagramPacket(buf, buf.length, dest, endpoint.getPort()); + socket.send(payload); + // todo: investigate UDP socket reuse. It would save a little setup/teardown time wrt to the local socket. + socket.close(); + } + + private void debug(byte[] jsonBytes) { + if (LOGGER.isDebugEnabled()){ + String receivedMessage = new String(jsonBytes); + LOGGER.debug("Received message ( bytes): " + receivedMessage); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/main/java/org/apache/gossip/utils/ReflectionUtils.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/utils/ReflectionUtils.java b/gossip-base/src/main/java/org/apache/gossip/utils/ReflectionUtils.java new file mode 100644 index 0000000..2ae4eb1 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/utils/ReflectionUtils.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.utils; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +public class ReflectionUtils { + + /** + * Create an instance of a thing. This method essentially makes code more readable by handing the various exception + * trapping. + * @param className + * @param constructorTypes + * @param constructorArgs + * @param <T> + * @return constructed instance of a thing. + */ + @SuppressWarnings("unchecked") + public static <T> T constructWithReflection(String className, Class<?>[] constructorTypes, Object[] constructorArgs) { + try { + Constructor<?> c = Class.forName(className).getConstructor(constructorTypes); + c.setAccessible(true); + return (T) c.newInstance(constructorArgs); + } catch (InvocationTargetException e) { + // catch ITE and throw the target if it is a RTE. + if (e.getTargetException() != null && RuntimeException.class.isAssignableFrom(e.getTargetException().getClass())) { + throw (RuntimeException) e.getTargetException(); + } else { + throw new RuntimeException(e); + } + } catch (ReflectiveOperationException others) { + // Note: No class in the above list should be a descendent of RuntimeException. Otherwise, we're just wrapping + // and making stack traces confusing. + throw new RuntimeException(others); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java index b19f221..4c6014a 100644 --- a/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java @@ -18,15 +18,14 @@ package org.apache.gossip.crdt; import java.io.IOException; -import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; import java.util.SortedSet; import java.util.TreeSet; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.gossip.GossipSettings; -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.manager.GossipManagerBuilder; +import org.apache.gossip.protocol.JacksonProtocolManager; import org.junit.Assert; import org.junit.Test; @@ -88,16 +87,11 @@ public class OrSetTest { @Test public void serialTest() throws InterruptedException, URISyntaxException, IOException { - GossipManager gossipService2 = GossipManagerBuilder.newBuilder() - .cluster("a") - .uri(new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1))) - .id("1") - .gossipSettings(new GossipSettings()) - .build(); + ObjectMapper objectMapper = JacksonProtocolManager.buildObjectMapper(new GossipSettings()); OrSet<Integer> i = new OrSet<Integer>(new OrSet.Builder<Integer>().add(1).remove(1)); - String s = gossipService2.getObjectMapper().writeValueAsString(i); + String s = objectMapper.writeValueAsString(i); @SuppressWarnings("unchecked") - OrSet<Integer> back = gossipService2.getObjectMapper().readValue(s, OrSet.class); + OrSet<Integer> back = objectMapper.readValue(s, OrSet.class); Assert.assertEquals(back, i); } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java index 959f818..bc0b46a 100644 --- a/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java @@ -25,6 +25,7 @@ import org.apache.gossip.manager.handlers.MessageHandler; import org.apache.gossip.manager.handlers.ResponseHandler; import org.apache.gossip.manager.handlers.TypedMessageHandler; import org.junit.Assert; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.platform.runner.JUnitPlatform; import org.junit.runner.RunWith; @@ -43,6 +44,17 @@ import static org.junit.jupiter.api.Assertions.expectThrows; @RunWith(JUnitPlatform.class) public class GossipManagerBuilderTest { + private GossipManagerBuilder.ManagerBuilder builder; + + @BeforeEach + public void setup() throws Exception { + builder = GossipManagerBuilder.newBuilder() + .id("id") + .cluster("aCluster") + .uri(new URI("udp://localhost:2000")) + .gossipSettings(new GossipSettings()); + } + @Test public void idShouldNotBeNull() { expectThrows(IllegalArgumentException.class,() -> { @@ -66,35 +78,20 @@ public class GossipManagerBuilderTest { @Test public void createMembersListIfNull() throws URISyntaxException { - GossipManager gossipManager = GossipManagerBuilder.newBuilder() - .id("id") - .cluster("aCluster") - .uri(new URI("udp://localhost:2000")) - .gossipSettings(new GossipSettings()) - .gossipMembers(null).registry(new MetricRegistry()).build(); + GossipManager gossipManager = builder.gossipMembers(null).registry(new MetricRegistry()).build(); assertNotNull(gossipManager.getLiveMembers()); } @Test public void createDefaultMessageHandlerIfNull() throws URISyntaxException { - GossipManager gossipManager = GossipManagerBuilder.newBuilder() - .id("id") - .cluster("aCluster") - .uri(new URI("udp://localhost:2000")) - .gossipSettings(new GossipSettings()) - .messageHandler(null).registry(new MetricRegistry()).build(); + GossipManager gossipManager = builder.messageHandler(null).registry(new MetricRegistry()).build(); assertNotNull(gossipManager.getMessageHandler()); } @Test public void testMessageHandlerKeeping() throws URISyntaxException { MessageHandler mi = new TypedMessageHandler(Response.class, new ResponseHandler()); - GossipManager gossipManager = GossipManagerBuilder.newBuilder() - .id("id") - .cluster("aCluster") - .uri(new URI("udp://localhost:2000")) - .gossipSettings(new GossipSettings()) - .messageHandler(mi).registry(new MetricRegistry()).build(); + GossipManager gossipManager = builder.messageHandler(mi).registry(new MetricRegistry()).build(); assertNotNull(gossipManager.getMessageHandler()); Assert.assertEquals(gossipManager.getMessageHandler(), mi); } @@ -106,10 +103,7 @@ public class GossipManagerBuilderTest { System.nanoTime(), new HashMap<String, String>(), 1000, 1, "exponential"); List<Member> memberList = new ArrayList<>(); memberList.add(member); - GossipManager gossipManager = GossipManagerBuilder.newBuilder() - .id("id") - .cluster("aCluster") - .gossipSettings(new GossipSettings()) + GossipManager gossipManager = builder .uri(new URI("udp://localhost:8000")) .gossipMembers(memberList).registry(new MetricRegistry()).build(); assertEquals(1, gossipManager.getDeadMembers().size()); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java index d448b98..ebe0e2c 100644 --- a/gossip-base/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java @@ -49,7 +49,7 @@ public class RingPersistenceTest { new RemoteMember("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0"), new RemoteMember("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 2)), "2"))).build(); gossipService.getRingState().writeToDisk(); - return gossipService.getRingState().computeTarget(); + return GossipManager.buildRingStatePath(gossipService); } private void aNewInstanceGetsRingInfo(GossipSettings settings) throws UnknownHostException, InterruptedException, URISyntaxException { http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java index 7b17e41..dde4b74 100644 --- a/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java @@ -68,8 +68,8 @@ public class UserDataPersistenceTest { gossipService.init(); Assert.assertEquals("red", ((AToothpick) gossipService.findPerNodeGossipData(nodeId, "a").getPayload()).getColor()); Assert.assertEquals("blue", ((AToothpick) gossipService.findSharedGossipData("a").getPayload()).getColor()); - File f = gossipService.getUserDataState().computeSharedTarget(); - File g = gossipService.getUserDataState().computePerNodeTarget(); + File f = GossipManager.buildSharedDataPath(gossipService); + File g = GossipManager.buildPerNodeDataPath(gossipService); gossipService.shutdown(); f.delete(); g.delete(); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/851cd93e/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java index c035d21..ec91d67 100644 --- a/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java @@ -64,7 +64,7 @@ public class MessageHandlerTest { return true; } } - + @Test public void testSimpleHandler() { MessageHandler mi = new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler());