Repository: incubator-gossip Updated Branches: refs/heads/master 851cd93e6 -> e3010c854
GOSSIP-81 Move Jackson and UDP to their own modules Part of what makes this work is the test implementation of TransportManager. This PR is pretty straightforward. A few gotchas though: * A message signing test was moved into `JacksonTests` because that is where the signing actually happens. * A CRDT serializing test was moved there as well. It's the best place for now. * No UDP tests at all. I plan to fix that in a bit. Reasoning is that it is difficult to test any TransportManager implementation without bring up a full stack. I plan to address this in the future (GOSSIP-83). * Simple round trip Jackson serialization tests. Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/e3010c85 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/e3010c85 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/e3010c85 Branch: refs/heads/master Commit: e3010c8542ab02fc20766d3eb63f75c1560b7bc3 Parents: 851cd93 Author: Gary Dusbabek <[email protected]> Authored: Wed Apr 19 13:46:04 2017 -0500 Committer: Gary Dusbabek <[email protected]> Committed: Fri Apr 21 08:47:31 2017 -0500 ---------------------------------------------------------------------- gossip-base/pom.xml | 47 ++--- .../java/org/apache/gossip/GossipSettings.java | 12 +- .../java/org/apache/gossip/StartupSettings.java | 18 +- .../gossip/manager/PassiveGossipThread.java | 5 +- .../gossip/protocol/JacksonProtocolManager.java | 131 ------------ .../transport/AbstractTransportManager.java | 3 +- .../gossip/transport/UdpTransportManager.java | 98 --------- .../test/java/org/apache/gossip/DataTest.java | 2 + .../org/apache/gossip/IdAndPropertyTest.java | 2 + .../org/apache/gossip/ShutdownDeadtimeTest.java | 6 +- .../org/apache/gossip/SignedMessageTest.java | 20 +- .../org/apache/gossip/StartupSettingsTest.java | 7 +- .../org/apache/gossip/TenNodeThreeSeedTest.java | 2 + .../java/org/apache/gossip/crdt/OrSetTest.java | 15 -- .../apache/gossip/manager/DataReaperTest.java | 4 + .../gossip/manager/UserDataPersistenceTest.java | 2 + .../protocol/UnitTestProtocolManager.java | 82 ++++++++ .../transport/UnitTestTransportManager.java | 76 +++++++ gossip-protocol-jackson/pom.xml | 51 +++++ .../protocol/json/JacksonProtocolManager.java | 132 ++++++++++++ .../gossip/protocol/json/JacksonTest.java | 120 +++++++++++ .../gossip/protocol/json/TestMessage.java | 199 +++++++++++++++++++ gossip-transport-udp/pom.xml | 43 ++++ .../transport/udp/UdpTransportManager.java | 99 +++++++++ .../udp/UdpTransportIntegrationTest.java | 65 ++++++ pom.xml | 35 ++++ 26 files changed, 975 insertions(+), 301 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-base/pom.xml ---------------------------------------------------------------------- diff --git a/gossip-base/pom.xml b/gossip-base/pom.xml index 3529bd1..b9739f6 100644 --- a/gossip-base/pom.xml +++ b/gossip-base/pom.xml @@ -1,4 +1,21 @@ <?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + + See the License for the specific language governing permissions and + limitations under the License. +--> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> @@ -36,36 +53,6 @@ <artifactId>metrics-core</artifactId> <version>${metrics.version}</version></dependency> <dependency> - <groupId>org.junit.jupiter</groupId> - <artifactId>junit-jupiter-api</artifactId> - <version>${junit.jupiter.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.junit.jupiter</groupId> - <artifactId>junit-jupiter-engine</artifactId> - <version>${junit.jupiter.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.junit.vintage</groupId> - <artifactId>junit-vintage-engine</artifactId> - <version>${junit.vintage.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.junit.platform</groupId> - <artifactId>junit-platform-runner</artifactId> - <version>${junit.platform.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>io.teknek</groupId> - <artifactId>tunit</artifactId> - <version>${tunit.version}</version> - <scope>test</scope> - </dependency> - <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java index e4a95d3..2ceb453 100644 --- a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java +++ b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java @@ -45,8 +45,8 @@ public class GossipSettings { private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper"; - private String transportManagerClass = "org.apache.gossip.transport.UdpTransportManager"; - private String protocolManagerClass = "org.apache.gossip.protocol.JacksonProtocolManager"; + private String transportManagerClass = "org.apache.gossip.transport.udp.UdpTransportManager"; + private String protocolManagerClass = "org.apache.gossip.protocol.json.JacksonProtocolManager"; private Map<String,String> activeGossipProperties = new HashMap<>(); @@ -230,7 +230,15 @@ public class GossipSettings { return transportManagerClass; } + public void setTransportManagerClass(String transportManagerClass) { + this.transportManagerClass = transportManagerClass; + } + public String getProtocolManagerClass() { return protocolManagerClass; } + + public void setProtocolManagerClass(String protocolManagerClass) { + this.protocolManagerClass = protocolManagerClass; + } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java b/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java index 17eaaf2..dd30e88 100644 --- a/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java +++ b/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java @@ -185,10 +185,22 @@ public class StartupSettings { if (cluster == null){ throw new IllegalArgumentException("cluster was null. It is required"); } + String transportClass = jsonObject.has("transport_manager_class") ? + jsonObject.get("transport_manager_class").textValue() : + null; + String protocolClass = jsonObject.has("protocol_manager_class") ? + jsonObject.get("protocol_manager_class").textValue() : + null; URI uri2 = new URI(uri); - StartupSettings settings = new StartupSettings(id, uri2, - new GossipSettings(gossipInterval, cleanupInterval, windowSize, - minSamples, convictThreshold, distribution), cluster); + GossipSettings gossipSettings = new GossipSettings(gossipInterval, cleanupInterval, windowSize, minSamples, + convictThreshold, distribution); + if (transportClass != null) { + gossipSettings.setTransportManagerClass(transportClass); + } + if (protocolClass != null) { + gossipSettings.setProtocolManagerClass(protocolClass); + } + StartupSettings settings = new StartupSettings(id, uri2, gossipSettings, cluster); String configMembersDetails = "Config-members ["; JsonNode membersJSON = jsonObject.get("members"); Iterator<JsonNode> it = membersJSON.iterator(); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java index 30e39d5..03a874c 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java @@ -61,7 +61,10 @@ public class PassiveGossipThread implements Runnable { LOGGER.error("Unable to process message", ex); } } catch (IOException e) { - LOGGER.error(e); + // InterruptedException are completely normal here because of the blocking lifecycle. + if (!(e.getCause() instanceof InterruptedException)) { + LOGGER.error(e); + } keepRunning.set(false); } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-base/src/main/java/org/apache/gossip/protocol/JacksonProtocolManager.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/protocol/JacksonProtocolManager.java b/gossip-base/src/main/java/org/apache/gossip/protocol/JacksonProtocolManager.java deleted file mode 100644 index 91ed7f9..0000000 --- a/gossip-base/src/main/java/org/apache/gossip/protocol/JacksonProtocolManager.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.protocol; - -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.gossip.GossipSettings; -import org.apache.gossip.crdt.CrdtModule; -import org.apache.gossip.manager.PassiveGossipConstants; -import org.apache.gossip.model.Base; -import org.apache.gossip.model.SignedPayload; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.security.InvalidKeyException; -import java.security.KeyFactory; -import java.security.NoSuchAlgorithmException; -import java.security.NoSuchProviderException; -import java.security.PrivateKey; -import java.security.Signature; -import java.security.SignatureException; -import java.security.spec.InvalidKeySpecException; -import java.security.spec.PKCS8EncodedKeySpec; - -// this class is constructed by reflection in GossipManager. -public class JacksonProtocolManager implements ProtocolManager { - - private final ObjectMapper objectMapper; - private final PrivateKey privKey; - private final Meter signed; - private final Meter unsigned; - - /** required for reflection to work! */ - public JacksonProtocolManager(GossipSettings settings, String id, MetricRegistry registry) { - // set up object mapper. - objectMapper = buildObjectMapper(settings); - - // set up message signing. - if (settings.isSignMessages()){ - File privateKey = new File(settings.getPathToKeyStore(), id); - File publicKey = new File(settings.getPathToKeyStore(), id + ".pub"); - if (!privateKey.exists()){ - throw new IllegalArgumentException("private key not found " + privateKey); - } - if (!publicKey.exists()){ - throw new IllegalArgumentException("public key not found " + publicKey); - } - try (FileInputStream keyfis = new FileInputStream(privateKey)) { - byte[] encKey = new byte[keyfis.available()]; - keyfis.read(encKey); - keyfis.close(); - PKCS8EncodedKeySpec privKeySpec = new PKCS8EncodedKeySpec(encKey); - KeyFactory keyFactory = KeyFactory.getInstance("DSA"); - privKey = keyFactory.generatePrivate(privKeySpec); - } catch (NoSuchAlgorithmException | InvalidKeySpecException | IOException e) { - throw new RuntimeException("failed hard", e); - } - } else { - privKey = null; - } - - signed = registry.meter(PassiveGossipConstants.SIGNED_MESSAGE); - unsigned = registry.meter(PassiveGossipConstants.UNSIGNED_MESSAGE); - } - - @Override - public byte[] write(Base message) throws IOException { - byte[] json_bytes; - if (privKey == null){ - json_bytes = objectMapper.writeValueAsBytes(message); - } else { - SignedPayload p = new SignedPayload(); - p.setData(objectMapper.writeValueAsString(message).getBytes()); - p.setSignature(sign(p.getData(), privKey)); - json_bytes = objectMapper.writeValueAsBytes(p); - } - return json_bytes; - } - - @Override - public Base read(byte[] buf) throws IOException { - Base activeGossipMessage = objectMapper.readValue(buf, Base.class); - if (activeGossipMessage instanceof SignedPayload){ - SignedPayload s = (SignedPayload) activeGossipMessage; - signed.mark(); - return objectMapper.readValue(s.getData(), Base.class); - } else { - unsigned.mark(); - return activeGossipMessage; - } - } - - public static ObjectMapper buildObjectMapper(GossipSettings settings) { - ObjectMapper om = new ObjectMapper(); - om.enableDefaultTyping(); - // todo: should be specified in the configuration. - om.registerModule(new CrdtModule()); - om.configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false); - return om; - } - - private static byte[] sign(byte [] bytes, PrivateKey pk){ - Signature dsa; - try { - dsa = Signature.getInstance("SHA1withDSA", "SUN"); - dsa.initSign(pk); - dsa.update(bytes); - return dsa.sign(); - } catch (NoSuchAlgorithmException | NoSuchProviderException | InvalidKeyException | SignatureException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java b/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java index 497e605..33db038 100644 --- a/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java +++ b/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java @@ -66,7 +66,8 @@ public abstract class AbstractTransportManager implements TransportManager { try { boolean result = gossipThreadExecutor.awaitTermination(10, TimeUnit.MILLISECONDS); if (!result) { - LOGGER.error("executor shutdown timed out"); + // common when blocking patterns are used to read data from a socket. + LOGGER.warn("executor shutdown timed out"); } } catch (InterruptedException e) { LOGGER.error(e); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-base/src/main/java/org/apache/gossip/transport/UdpTransportManager.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/transport/UdpTransportManager.java b/gossip-base/src/main/java/org/apache/gossip/transport/UdpTransportManager.java deleted file mode 100644 index d80deec..0000000 --- a/gossip-base/src/main/java/org/apache/gossip/transport/UdpTransportManager.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.transport; - -import org.apache.gossip.manager.GossipCore; -import org.apache.gossip.manager.GossipManager; -import org.apache.log4j.Logger; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.SocketException; -import java.net.URI; - -/** - * This class is constructed by reflection in GossipManager. - * It manages transport (byte read/write) operations over UDP. - */ -public class UdpTransportManager extends AbstractTransportManager { - - public static final Logger LOGGER = Logger.getLogger(UdpTransportManager.class); - - /** The socket used for the passive thread of the gossip service. */ - private final DatagramSocket server; - - private final int soTimeout; - - /** required for reflection to work! */ - public UdpTransportManager(GossipManager gossipManager, GossipCore gossipCore) { - super(gossipManager, gossipCore); - - soTimeout = gossipManager.getSettings().getGossipInterval() * 2; - - try { - SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(), - gossipManager.getMyself().getUri().getPort()); - server = new DatagramSocket(socketAddress); - } catch (SocketException ex) { - LOGGER.warn(ex); - throw new RuntimeException(ex); - } - } - - @Override - public void shutdown() { - server.close(); - super.shutdown(); - } - - /** - * blocking read a message. - * @return buffer of message contents. - * @throws IOException - */ - public byte[] read() throws IOException { - byte[] buf = new byte[server.getReceiveBufferSize()]; - DatagramPacket p = new DatagramPacket(buf, buf.length); - server.receive(p); - debug(p.getData()); - return p.getData(); - } - - @Override - public void send(URI endpoint, byte[] buf) throws IOException { - DatagramSocket socket = new DatagramSocket(); - socket.setSoTimeout(soTimeout); - InetAddress dest = InetAddress.getByName(endpoint.getHost()); - DatagramPacket payload = new DatagramPacket(buf, buf.length, dest, endpoint.getPort()); - socket.send(payload); - // todo: investigate UDP socket reuse. It would save a little setup/teardown time wrt to the local socket. - socket.close(); - } - - private void debug(byte[] jsonBytes) { - if (LOGGER.isDebugEnabled()){ - String receivedMessage = new String(jsonBytes); - LOGGER.debug("Received message ( bytes): " + receivedMessage); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-base/src/test/java/org/apache/gossip/DataTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/DataTest.java b/gossip-base/src/test/java/org/apache/gossip/DataTest.java index f0c2186..bb33dc2 100644 --- a/gossip-base/src/test/java/org/apache/gossip/DataTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/DataTest.java @@ -47,6 +47,8 @@ public class DataTest extends AbstractIntegrationBase { GossipSettings settings = new GossipSettings(); settings.setPersistRingState(false); settings.setPersistDataState(false); + settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); + settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); String cluster = UUID.randomUUID().toString(); int seedNodes = 1; List<Member> startupMembers = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java b/gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java index 7f550de..1b6a32a 100644 --- a/gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java @@ -43,6 +43,8 @@ public class IdAndPropertyTest extends AbstractIntegrationBase { public void testDatacenterRackGossiper() throws URISyntaxException, UnknownHostException, InterruptedException { GossipSettings settings = new GossipSettings(); settings.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName()); + settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); + settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); List<Member> startupMembers = new ArrayList<>(); Map<String, String> x = new HashMap<>(); x.put("a", "b"); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java index 54005c3..30c52bc 100644 --- a/gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java @@ -44,10 +44,14 @@ public class ShutdownDeadtimeTest { private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class); + // Note: this test is floppy depending on the values in GossipSettings (smaller values seem to do harm), and the + // sleep that happens after startup. @Test public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException, URISyntaxException { GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal"); + settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); + settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); settings.setPersistRingState(false); settings.setPersistDataState(false); String cluster = UUID.randomUUID().toString(); @@ -70,7 +74,7 @@ public class ShutdownDeadtimeTest { .build(); clients.add(gossipService); gossipService.init(); - + Thread.sleep(1000); } TUnit.assertThat(new Callable<Integer>() { public Integer call() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java b/gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java index 5c3bb76..f669a23 100644 --- a/gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java @@ -21,7 +21,6 @@ import java.io.File; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; -import java.net.UnknownHostException; import java.security.NoSuchAlgorithmException; import java.security.NoSuchProviderException; import java.util.ArrayList; @@ -41,28 +40,13 @@ import io.teknek.tunit.TUnit; public class SignedMessageTest extends AbstractIntegrationBase { - @Test(expected = IllegalArgumentException.class) - public void ifSignMustHaveKeys() - throws URISyntaxException, UnknownHostException, InterruptedException { - String cluster = UUID.randomUUID().toString(); - GossipSettings settings = gossiperThatSigns(); - List<Member> startupMembers = new ArrayList<>(); - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + 1)); - GossipManager gossipService = GossipManagerBuilder.newBuilder() - .cluster(cluster) - .uri(uri) - .id(1 + "") - .gossipMembers(startupMembers) - .gossipSettings(settings) - .build(); - gossipService.init(); - } - private GossipSettings gossiperThatSigns(){ GossipSettings settings = new GossipSettings(); settings.setPersistRingState(false); settings.setPersistDataState(false); settings.setSignMessages(true); + settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); + settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); return settings; } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-base/src/test/java/org/apache/gossip/StartupSettingsTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/StartupSettingsTest.java b/gossip-base/src/test/java/org/apache/gossip/StartupSettingsTest.java index d6c4a1e..ea93a90 100644 --- a/gossip-base/src/test/java/org/apache/gossip/StartupSettingsTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/StartupSettingsTest.java @@ -47,11 +47,14 @@ public class StartupSettingsTest { settingsFile.deleteOnExit(); writeSettingsFile(settingsFile); URI uri = new URI("udp://" + "127.0.0.1" + ":" + 50000); + GossipSettings firstGossipSettings = new GossipSettings(); + firstGossipSettings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); + firstGossipSettings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); GossipManager firstService = GossipManagerBuilder.newBuilder() .cluster(CLUSTER) .uri(uri) .id("1") - .gossipSettings(new GossipSettings()).build(); + .gossipSettings(firstGossipSettings).build(); firstService.init(); GossipManager manager = GossipManagerBuilder.newBuilder() .startupSettings(StartupSettings.fromJSONFile(settingsFile)).build(); @@ -72,6 +75,8 @@ public class StartupSettingsTest { " \"cleanup_interval\":10000,\n" + " \"convict_threshold\":2.6,\n" + " \"distribution\":\"exponential\",\n" + + " \"transport_manager_class\":\"org.apache.gossip.transport.UnitTestTransportManager\",\n" + + " \"protocol_manager_class\":\"org.apache.gossip.protocol.UnitTestProtocolManager\",\n" + " \"properties\":{},\n" + " \"members\":[\n" + " {\"cluster\": \"" + CLUSTER + "\",\"uri\":\"udp://127.0.0.1:5000\"}\n" + http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java index 8ae783e..c6d7d46 100644 --- a/gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java @@ -50,6 +50,8 @@ public class TenNodeThreeSeedTest { GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 1.6, "exponential"); settings.setPersistRingState(false); settings.setPersistDataState(false); + settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); + settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); String cluster = UUID.randomUUID().toString(); int seedNodes = 3; List<Member> startupMembers = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java index 4c6014a..70c0d51 100644 --- a/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java @@ -17,15 +17,10 @@ */ package org.apache.gossip.crdt; -import java.io.IOException; -import java.net.URISyntaxException; import java.util.Arrays; import java.util.SortedSet; import java.util.TreeSet; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.gossip.GossipSettings; -import org.apache.gossip.protocol.JacksonProtocolManager; import org.junit.Assert; import org.junit.Test; @@ -86,16 +81,6 @@ public class OrSetTest { } @Test - public void serialTest() throws InterruptedException, URISyntaxException, IOException { - ObjectMapper objectMapper = JacksonProtocolManager.buildObjectMapper(new GossipSettings()); - OrSet<Integer> i = new OrSet<Integer>(new OrSet.Builder<Integer>().add(1).remove(1)); - String s = objectMapper.writeValueAsString(i); - @SuppressWarnings("unchecked") - OrSet<Integer> back = objectMapper.readValue(s, OrSet.class); - Assert.assertEquals(back, i); - } - - @Test public void mergeTestSame() { OrSet<Integer> i = new OrSet<>(19); OrSet<Integer> j = new OrSet<>(19); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-base/src/test/java/org/apache/gossip/manager/DataReaperTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/DataReaperTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/DataReaperTest.java index e328c24..1a9d43b 100644 --- a/gossip-base/src/test/java/org/apache/gossip/manager/DataReaperTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/manager/DataReaperTest.java @@ -40,6 +40,8 @@ public class DataReaperTest { GossipSettings settings = new GossipSettings(); settings.setPersistRingState(false); settings.setPersistDataState(false); + settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); + settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); GossipManager gm = GossipManagerBuilder.newBuilder().cluster("abc").gossipSettings(settings) .id(myId).uri(URI.create("udp://localhost:6000")).registry(registry).build(); gm.init(); @@ -88,6 +90,8 @@ public class DataReaperTest { String key = "key"; String value = "a"; GossipSettings settings = new GossipSettings(); + settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); + settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); GossipManager gm = GossipManagerBuilder.newBuilder().cluster("abc").gossipSettings(settings) .id(myId).uri(URI.create("udp://localhost:7000")).registry(registry).build(); gm.init(); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java index dde4b74..e1e1127 100644 --- a/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java @@ -35,6 +35,8 @@ public class UserDataPersistenceTest { private GossipManager sameService() throws URISyntaxException { GossipSettings settings = new GossipSettings(); + settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); + settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); return GossipManagerBuilder.newBuilder() .cluster("a") .uri(new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1))) http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-base/src/test/java/org/apache/gossip/protocol/UnitTestProtocolManager.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/protocol/UnitTestProtocolManager.java b/gossip-base/src/test/java/org/apache/gossip/protocol/UnitTestProtocolManager.java new file mode 100644 index 0000000..3d52c4a --- /dev/null +++ b/gossip-base/src/test/java/org/apache/gossip/protocol/UnitTestProtocolManager.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gossip.protocol; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.gossip.GossipSettings; +import org.apache.gossip.manager.PassiveGossipConstants; +import org.apache.gossip.model.Base; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +// doesn't serialize anything besides longs. Uses a static lookup table to read and write objects. +public class UnitTestProtocolManager implements ProtocolManager { + + // so it can be shared across gossipers. this works as long as each object has a different memory address. + private static final Map<Long, Base> lookup = new ConcurrentHashMap<>(); + private final Meter meter; + + public UnitTestProtocolManager(GossipSettings settings, String id, MetricRegistry registry) { + meter = settings.isSignMessages() ? + registry.meter(PassiveGossipConstants.SIGNED_MESSAGE) : + registry.meter(PassiveGossipConstants.UNSIGNED_MESSAGE); + } + + private static byte[] longToBytes(long val) { + byte[] b = new byte[8]; + b[7] = (byte) (val); + b[6] = (byte) (val >>> 8); + b[5] = (byte) (val >>> 16); + b[4] = (byte) (val >>> 24); + b[3] = (byte) (val >>> 32); + b[2] = (byte) (val >>> 40); + b[1] = (byte) (val >>> 48); + b[0] = (byte) (val >>> 56); + return b; + } + + static long bytesToLong(byte[] b) { + return ((b[7] & 0xFFL)) + + ((b[6] & 0xFFL) << 8) + + ((b[5] & 0xFFL) << 16) + + ((b[4] & 0xFFL) << 24) + + ((b[3] & 0xFFL) << 32) + + ((b[2] & 0xFFL) << 40) + + ((b[1] & 0xFFL) << 48) + + (((long) b[0]) << 56); + } + + @Override + public byte[] write(Base message) throws IOException { + long hashCode = System.identityHashCode(message); + byte[] serialized = longToBytes(hashCode); + lookup.put(hashCode, message); + meter.mark(); + return serialized; + } + + @Override + public Base read(byte[] buf) throws IOException { + long hashCode = bytesToLong(buf); + return lookup.remove(hashCode); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java b/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java new file mode 100644 index 0000000..a783b75 --- /dev/null +++ b/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gossip.transport; + +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; + +import java.io.IOException; +import java.net.URI; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; + +/** Only use in unit tests! */ +public class UnitTestTransportManager extends AbstractTransportManager { + + private static final Map<URI, UnitTestTransportManager> allManagers = new ConcurrentHashMap<>(); + + private final URI localEndpoint; + private BlockingQueue<byte[]> buffers = new ArrayBlockingQueue<byte[]>(1000); + + public UnitTestTransportManager(GossipManager gossipManager, GossipCore gossipCore) { + super(gossipManager, gossipCore); + localEndpoint = gossipManager.getMyself().getUri(); + } + + @Override + public void send(URI endpoint, byte[] buf) throws IOException { + if (allManagers.containsKey(endpoint)) { + try { + allManagers.get(endpoint).buffers.put(buf); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } + + @Override + public byte[] read() throws IOException { + try { + return buffers.take(); + } catch (InterruptedException ex) { + // probably not the right thing to do, but we'll see. + throw new IOException(ex); + } + } + + @Override + public void shutdown() { + allManagers.remove(localEndpoint); + super.shutdown(); + } + + @Override + public void startEndpoint() { + allManagers.put(localEndpoint, this); + super.startEndpoint(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-protocol-jackson/pom.xml ---------------------------------------------------------------------- diff --git a/gossip-protocol-jackson/pom.xml b/gossip-protocol-jackson/pom.xml new file mode 100644 index 0000000..067a27e --- /dev/null +++ b/gossip-protocol-jackson/pom.xml @@ -0,0 +1,51 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.gossip</groupId> + <artifactId>gossip-parent</artifactId> + <version>0.1.3-incubating-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <name>Gossip Jackson Protocol</name> + <artifactId>gossip-protocol-jackson</artifactId> + <version>0.1.3-incubating-SNAPSHOT</version> + + <dependencies> + <dependency> + <groupId>org.apache.gossip</groupId> + <artifactId>gossip-base</artifactId> + <version>0.1.3-incubating-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.gossip</groupId> + <artifactId>gossip-base</artifactId> + <version>0.1.3-incubating-SNAPSHOT</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-protocol-jackson/src/main/java/org/apache/gossip/protocol/json/JacksonProtocolManager.java ---------------------------------------------------------------------- diff --git a/gossip-protocol-jackson/src/main/java/org/apache/gossip/protocol/json/JacksonProtocolManager.java b/gossip-protocol-jackson/src/main/java/org/apache/gossip/protocol/json/JacksonProtocolManager.java new file mode 100644 index 0000000..499c5ee --- /dev/null +++ b/gossip-protocol-jackson/src/main/java/org/apache/gossip/protocol/json/JacksonProtocolManager.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.protocol.json; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.gossip.GossipSettings; +import org.apache.gossip.crdt.CrdtModule; +import org.apache.gossip.manager.PassiveGossipConstants; +import org.apache.gossip.model.Base; +import org.apache.gossip.model.SignedPayload; +import org.apache.gossip.protocol.ProtocolManager; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.security.InvalidKeyException; +import java.security.KeyFactory; +import java.security.NoSuchAlgorithmException; +import java.security.NoSuchProviderException; +import java.security.PrivateKey; +import java.security.Signature; +import java.security.SignatureException; +import java.security.spec.InvalidKeySpecException; +import java.security.spec.PKCS8EncodedKeySpec; + +// this class is constructed by reflection in GossipManager. +public class JacksonProtocolManager implements ProtocolManager { + + private final ObjectMapper objectMapper; + private final PrivateKey privKey; + private final Meter signed; + private final Meter unsigned; + + /** required for reflection to work! */ + public JacksonProtocolManager(GossipSettings settings, String id, MetricRegistry registry) { + // set up object mapper. + objectMapper = buildObjectMapper(settings); + + // set up message signing. + if (settings.isSignMessages()){ + File privateKey = new File(settings.getPathToKeyStore(), id); + File publicKey = new File(settings.getPathToKeyStore(), id + ".pub"); + if (!privateKey.exists()){ + throw new IllegalArgumentException("private key not found " + privateKey); + } + if (!publicKey.exists()){ + throw new IllegalArgumentException("public key not found " + publicKey); + } + try (FileInputStream keyfis = new FileInputStream(privateKey)) { + byte[] encKey = new byte[keyfis.available()]; + keyfis.read(encKey); + keyfis.close(); + PKCS8EncodedKeySpec privKeySpec = new PKCS8EncodedKeySpec(encKey); + KeyFactory keyFactory = KeyFactory.getInstance("DSA"); + privKey = keyFactory.generatePrivate(privKeySpec); + } catch (NoSuchAlgorithmException | InvalidKeySpecException | IOException e) { + throw new RuntimeException("failed hard", e); + } + } else { + privKey = null; + } + + signed = registry.meter(PassiveGossipConstants.SIGNED_MESSAGE); + unsigned = registry.meter(PassiveGossipConstants.UNSIGNED_MESSAGE); + } + + @Override + public byte[] write(Base message) throws IOException { + byte[] json_bytes; + if (privKey == null){ + json_bytes = objectMapper.writeValueAsBytes(message); + } else { + SignedPayload p = new SignedPayload(); + p.setData(objectMapper.writeValueAsString(message).getBytes()); + p.setSignature(sign(p.getData(), privKey)); + json_bytes = objectMapper.writeValueAsBytes(p); + } + return json_bytes; + } + + @Override + public Base read(byte[] buf) throws IOException { + Base activeGossipMessage = objectMapper.readValue(buf, Base.class); + if (activeGossipMessage instanceof SignedPayload){ + SignedPayload s = (SignedPayload) activeGossipMessage; + signed.mark(); + return objectMapper.readValue(s.getData(), Base.class); + } else { + unsigned.mark(); + return activeGossipMessage; + } + } + + public static ObjectMapper buildObjectMapper(GossipSettings settings) { + ObjectMapper om = new ObjectMapper(); + om.enableDefaultTyping(); + // todo: should be specified in the configuration. + om.registerModule(new CrdtModule()); + om.configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false); + return om; + } + + private static byte[] sign(byte [] bytes, PrivateKey pk){ + Signature dsa; + try { + dsa = Signature.getInstance("SHA1withDSA", "SUN"); + dsa.initSign(pk); + dsa.update(bytes); + return dsa.sign(); + } catch (NoSuchAlgorithmException | NoSuchProviderException | InvalidKeyException | SignatureException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java ---------------------------------------------------------------------- diff --git a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java new file mode 100644 index 0000000..bd8a949 --- /dev/null +++ b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gossip.protocol.json; + +import com.codahale.metrics.MetricRegistry; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.gossip.GossipSettings; +import org.apache.gossip.Member; +import org.apache.gossip.crdt.OrSet; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; +import org.apache.gossip.model.Base; +import org.apache.gossip.protocol.ProtocolManager; +import org.apache.gossip.udp.Trackable; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; + +public class JacksonTest { + + private static GossipSettings simpleSettings(GossipSettings settings) { + settings.setPersistRingState(false); + settings.setPersistDataState(false); + settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); + settings.setProtocolManagerClass("org.apache.gossip.protocol.json.JacksonProtocolManager"); + return settings; + } + + private static GossipSettings withSigning(GossipSettings settings) { + settings.setSignMessages(true); + return settings; + } + + // formerly of SignedMessageTest. + @Test(expected = IllegalArgumentException.class) + public void ifSignMustHaveKeys() + throws URISyntaxException, UnknownHostException, InterruptedException { + String cluster = UUID.randomUUID().toString(); + GossipSettings settings = withSigning(simpleSettings(new GossipSettings())); + List<Member> startupMembers = new ArrayList<>(); + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + 1)); + GossipManager gossipService = GossipManagerBuilder.newBuilder() + .cluster(cluster) + .uri(uri) + .id(1 + "") + .gossipMembers(startupMembers) + .gossipSettings(settings) + .build(); + gossipService.init(); + } + + @Test + public void jacksonSerialTest() throws InterruptedException, URISyntaxException, IOException { + ObjectMapper objectMapper = JacksonProtocolManager.buildObjectMapper(simpleSettings(new GossipSettings())); + + OrSet<Integer> i = new OrSet<Integer>(new OrSet.Builder<Integer>().add(1).remove(1)); + String s = objectMapper.writeValueAsString(i); + @SuppressWarnings("unchecked") + OrSet<Integer> back = objectMapper.readValue(s, OrSet.class); + Assert.assertEquals(back, i); + } + + @Test + public void testMessageEqualityAssumptions() { + long timeA = System.nanoTime(); + long timeB = System.nanoTime(); + Assert.assertNotEquals(timeA, timeB); + + TestMessage messageA0 = new TestMessage(Long.toHexString(timeA)); + TestMessage messageA1 = new TestMessage(Long.toHexString(timeA)); + TestMessage messageB = new TestMessage(Long.toHexString(timeB)); + + Assert.assertEquals(messageA0, messageA1); + Assert.assertFalse(messageA0 == messageA1); + Assert.assertNotEquals(messageA0, messageB); + Assert.assertNotEquals(messageA1, messageB); + } + + // ideally, we would test the serializability of every message type, but we just want to make sure this works in + // basic cases. + @Test + public void testMessageSerializationRoundTrip() throws Exception { + ProtocolManager mgr = new JacksonProtocolManager(simpleSettings(new GossipSettings()), "foo", new MetricRegistry()); + for (int i = 0; i < 100; i++) { + TestMessage a = new TestMessage(Long.toHexString(System.nanoTime())); + byte[] bytes = mgr.write(a); + TestMessage b = (TestMessage) mgr.read(bytes); + Assert.assertFalse(a == b); + Assert.assertEquals(a, b); + Assert.assertEquals(a.getMapOfThings(), b.getMapOfThings()); // concerned about that one, so explicit check. + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java ---------------------------------------------------------------------- diff --git a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java new file mode 100644 index 0000000..43032de --- /dev/null +++ b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.protocol.json; + +import org.apache.gossip.model.Base; +import org.apache.gossip.udp.Trackable; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/* + * Here is a test class for serialization. I've tried to include a lot of things in it including nested classes. + * Note that there are no Jackson annotations. + * getters and setters are the keys to making this work without the Jackson annotations. + */ +class TestMessage extends Base implements Trackable { + private String unique; + private String from; + private String uuid; + private String derivedField; + private Subclass otherThing; + private float floatValue; + private double doubleValue; + private Object[] arrayOfThings; + private Map<String, String> mapOfThings = new HashMap<>(); + + private TestMessage() { + } + + TestMessage(String unique) { + this.unique = unique; + from = Integer.toHexString(unique.hashCode()); + uuid = Integer.toHexString(from.hashCode()); + derivedField = Integer.toHexString(uuid.hashCode()); + otherThing = new Subclass(Integer.toHexString(derivedField.hashCode())); + floatValue = (float) unique.hashCode() / (float) from.hashCode(); + doubleValue = (double) uuid.hashCode() / (double) derivedField.hashCode(); + arrayOfThings = new Object[]{ + this.unique, from, uuid, derivedField, otherThing, floatValue, doubleValue + }; + + String curThing = unique; + for (int i = 0; i < 100; i++) { + String key = Integer.toHexString(curThing.hashCode()); + String value = Integer.toHexString(key.hashCode()); + curThing = value; + mapOfThings.put(key, value); + } + } + + @Override + public String getUriFrom() { + return from; + } + + @Override + public void setUriFrom(String uriFrom) { + this.from = uriFrom; + } + + @Override + public String getUuid() { + return uuid; + } + + @Override + public void setUuid(String uuid) { + this.uuid = uuid; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof TestMessage)) return false; + TestMessage that = (TestMessage) o; + return Objects.equals(unique, that.unique) && + Objects.equals(from, that.from) && + Objects.equals(getUuid(), that.getUuid()) && + Objects.equals(derivedField, that.derivedField) && + Objects.equals(floatValue, that.floatValue) && + Objects.equals(doubleValue, that.doubleValue) && + Arrays.equals(arrayOfThings, that.arrayOfThings) && + Objects.equals(mapOfThings, that.mapOfThings); + } + + public String getUnique() { + return unique; + } + + public void setUnique(String unique) { + this.unique = unique; + } + + public String getFrom() { + return from; + } + + public void setFrom(String from) { + this.from = from; + } + + public String getDerivedField() { + return derivedField; + } + + public void setDerivedField(String derivedField) { + this.derivedField = derivedField; + } + + public Subclass getOtherThing() { + return otherThing; + } + + public void setOtherThing(Subclass otherThing) { + this.otherThing = otherThing; + } + + public float getFloatValue() { + return floatValue; + } + + public void setFloatValue(float floatValue) { + this.floatValue = floatValue; + } + + public double getDoubleValue() { + return doubleValue; + } + + public void setDoubleValue(double doubleValue) { + this.doubleValue = doubleValue; + } + + public Object[] getArrayOfThings() { + return arrayOfThings; + } + + public void setArrayOfThings(Object[] arrayOfThings) { + this.arrayOfThings = arrayOfThings; + } + + public Map<String, String> getMapOfThings() { + return mapOfThings; + } + + public void setMapOfThings(Map<String, String> mapOfThings) { + this.mapOfThings = mapOfThings; + } + + @Override + public int hashCode() { + return Objects.hash(unique, getUriFrom(), getUuid(), derivedField, floatValue, doubleValue, arrayOfThings, mapOfThings); + } + + static class Subclass { + private String thing; + + public Subclass() { + } + + public Subclass(String thing) { + this.thing = thing; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof Subclass)) return false; + Subclass subclass = (Subclass) o; + return Objects.equals(thing, subclass.thing); + } + + @Override + public int hashCode() { + return Objects.hash(thing); + } + + public String getThing() { + return thing; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-transport-udp/pom.xml ---------------------------------------------------------------------- diff --git a/gossip-transport-udp/pom.xml b/gossip-transport-udp/pom.xml new file mode 100644 index 0000000..2e79b1a --- /dev/null +++ b/gossip-transport-udp/pom.xml @@ -0,0 +1,43 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.gossip</groupId> + <artifactId>gossip-parent</artifactId> + <version>0.1.3-incubating-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <name>Gossip UDP Transport</name> + <artifactId>gossip-transport-udp</artifactId> + <version>0.1.3-incubating-SNAPSHOT</version> + + <dependencies> + <dependency> + <groupId>org.apache.gossip</groupId> + <artifactId>gossip-base</artifactId> + <version>0.1.3-incubating-SNAPSHOT</version> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java ---------------------------------------------------------------------- diff --git a/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java b/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java new file mode 100644 index 0000000..3f509a6 --- /dev/null +++ b/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.transport.udp; + +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.transport.AbstractTransportManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.SocketException; +import java.net.URI; + +/** + * This class is constructed by reflection in GossipManager. + * It manages transport (byte read/write) operations over UDP. + */ +public class UdpTransportManager extends AbstractTransportManager { + + public static final Logger LOGGER = Logger.getLogger(UdpTransportManager.class); + + /** The socket used for the passive thread of the gossip service. */ + private final DatagramSocket server; + + private final int soTimeout; + + /** required for reflection to work! */ + public UdpTransportManager(GossipManager gossipManager, GossipCore gossipCore) { + super(gossipManager, gossipCore); + + soTimeout = gossipManager.getSettings().getGossipInterval() * 2; + + try { + SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(), + gossipManager.getMyself().getUri().getPort()); + server = new DatagramSocket(socketAddress); + } catch (SocketException ex) { + LOGGER.warn(ex); + throw new RuntimeException(ex); + } + } + + @Override + public void shutdown() { + server.close(); + super.shutdown(); + } + + /** + * blocking read a message. + * @return buffer of message contents. + * @throws IOException + */ + public byte[] read() throws IOException { + byte[] buf = new byte[server.getReceiveBufferSize()]; + DatagramPacket p = new DatagramPacket(buf, buf.length); + server.receive(p); + debug(p.getData()); + return p.getData(); + } + + @Override + public void send(URI endpoint, byte[] buf) throws IOException { + DatagramSocket socket = new DatagramSocket(); + socket.setSoTimeout(soTimeout); + InetAddress dest = InetAddress.getByName(endpoint.getHost()); + DatagramPacket payload = new DatagramPacket(buf, buf.length, dest, endpoint.getPort()); + socket.send(payload); + // todo: investigate UDP socket reuse. It would save a little setup/teardown time wrt to the local socket. + socket.close(); + } + + private void debug(byte[] jsonBytes) { + if (LOGGER.isDebugEnabled()){ + String receivedMessage = new String(jsonBytes); + LOGGER.debug("Received message ( bytes): " + receivedMessage); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java ---------------------------------------------------------------------- diff --git a/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java b/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java new file mode 100644 index 0000000..5258374 --- /dev/null +++ b/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.transport.udp; + +import org.apache.gossip.GossipSettings; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +public class UdpTransportIntegrationTest { + + // It's currently impossible to create a UdpTransportManager without bringing up an entire stack. + // This is because AbstractTransportManager creates a PassiveGossipThread (requires GossipManager, + // GossipCore) and also requires those same things plus a MetricsRegistry to create the + // ActiveGossiper. + // TODO: test UDPTransportManger semantics (read and write) in isolation. + // I've written this test to indicate the direction I want things to go. + // Uncomment/Fix it once the coupling issues are worked out. + @Test @Ignore + public void testRoundTrip() { + /* + GossipSettings settings0 = new GossipSettings(); + GossipSettings settings1 = new GossipSettings(); + UdpTransportManager mgr0 = new UdpTransportManager(settings0); + UdpTransportManager mgr1 = new UdpTransportManager(settings1); + + mgr0.startEndpoint(); + mgr1.startEndpoint(); + mgr0.startActiveGossiper(); + mgr1.startActiveGossiper(); + + // wait a little while for convergence + // perhaps there is a Mockito Whitebox way to foce members + + byte[] data = new byte[] {0,1,2,3,4,5}; + Future<byte[]> someData = asyncWaitForData(mgr1); + mgr0.send(toURI(settings1), data); + + Assert.assertEquals(data, someData.get(1000, TimeUnit.MILLISECONDS)); + + mgr0.shutdown(); + mgr1.shutdown(); + */ + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/e3010c85/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index f9c7814..97aa409 100644 --- a/pom.xml +++ b/pom.xml @@ -56,6 +56,8 @@ <modules> <module>gossip-base</module> + <module>gossip-transport-udp</module> + <module>gossip-protocol-jackson</module> </modules> <description>A peer to peer cluster discovery service</description> @@ -81,6 +83,39 @@ <url>https://issues.apache.org/jira/browse/GOSSIP</url> </issueManagement> + <dependencies> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <version>${junit.jupiter.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-engine</artifactId> + <version>${junit.jupiter.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.vintage</groupId> + <artifactId>junit-vintage-engine</artifactId> + <version>${junit.vintage.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.platform</groupId> + <artifactId>junit-platform-runner</artifactId> + <version>${junit.platform.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.teknek</groupId> + <artifactId>tunit</artifactId> + <version>${tunit.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + <build> <pluginManagement> <plugins>
