Repository: incubator-gossip Updated Branches: refs/heads/master 21a263b07 -> 7106cc400
GOSSIP-47 sign data Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/7106cc40 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/7106cc40 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/7106cc40 Branch: refs/heads/master Commit: 7106cc400ab7750fb3a503a1490a2bd1d3c9f741 Parents: 21a263b Author: Edward Capriolo <edlinuxg...@gmail.com> Authored: Sat Feb 11 00:03:56 2017 -0500 Committer: Edward Capriolo <edlinuxg...@gmail.com> Committed: Sat Feb 11 21:11:19 2017 -0500 ---------------------------------------------------------------------- .../java/org/apache/gossip/GossipSettings.java | 21 ++++ .../org/apache/gossip/manager/GossipCore.java | 67 +++++++++- .../apache/gossip/manager/GossipManager.java | 10 +- .../gossip/manager/PassiveGossipConstants.java | 23 ++++ .../gossip/manager/PassiveGossipThread.java | 20 ++- .../org/apache/gossip/model/SignedPayload.java | 36 ++++++ .../java/org/apache/gossip/secure/KeyTool.java | 57 +++++++++ src/test/java/org/apache/gossip/DataTest.java | 1 - .../org/apache/gossip/SignedMessageTest.java | 121 +++++++++++++++++++ 9 files changed, 346 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/7106cc40/src/main/java/org/apache/gossip/GossipSettings.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/GossipSettings.java b/src/main/java/org/apache/gossip/GossipSettings.java index 60a443f..bcea75c 100644 --- a/src/main/java/org/apache/gossip/GossipSettings.java +++ b/src/main/java/org/apache/gossip/GossipSettings.java @@ -56,6 +56,11 @@ public class GossipSettings { private boolean persistDataState = true; + private String pathToKeyStore = "./keys"; + + private boolean signMessages = false; + + /** * Construct GossipSettings with default settings. */ @@ -202,5 +207,21 @@ public class GossipSettings { public void setPersistDataState(boolean persistDataState) { this.persistDataState = persistDataState; } + + public String getPathToKeyStore() { + return pathToKeyStore; + } + + public void setPathToKeyStore(String pathToKeyStore) { + this.pathToKeyStore = pathToKeyStore; + } + + public boolean isSignMessages() { + return signMessages; + } + + public void setSignMessages(boolean signMessages) { + this.signMessages = signMessages; + } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/7106cc40/src/main/java/org/apache/gossip/manager/GossipCore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java index e23ee54..6f97a74 100644 --- a/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -17,12 +17,23 @@ */ package org.apache.gossip.manager; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.security.KeyFactory; +import java.security.NoSuchAlgorithmException; +import java.security.NoSuchProviderException; +import java.security.PrivateKey; +import java.security.Signature; +import java.security.SignatureException; +import java.security.spec.InvalidKeySpecException; +import java.security.spec.PKCS8EncodedKeySpec; import java.util.ArrayList; import java.util.List; import java.util.Map.Entry; @@ -45,6 +56,7 @@ import org.apache.gossip.model.GossipDataMessage; import org.apache.gossip.model.Response; import org.apache.gossip.model.SharedGossipDataMessage; import org.apache.gossip.model.ShutdownMessage; +import org.apache.gossip.model.SignedPayload; import org.apache.gossip.udp.Trackable; import org.apache.gossip.udp.UdpActiveGossipMessage; import org.apache.gossip.udp.UdpActiveGossipOk; @@ -66,6 +78,8 @@ public class GossipCore implements GossipCoreConstants { private final ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> perNodeData; private final ConcurrentHashMap<String, SharedGossipDataMessage> sharedData; private final BlockingQueue<Runnable> workQueue; + private final PKCS8EncodedKeySpec privKeySpec; + private final PrivateKey privKey; private final Meter messageSerdeException; private final Meter tranmissionException; private final Meter tranmissionSuccess; @@ -86,6 +100,41 @@ public class GossipCore implements GossipCoreConstants { messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION); tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION); tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS); + if (manager.getSettings().isSignMessages()){ + File privateKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId()); + File publicKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId() + ".pub"); + if (!privateKey.exists()){ + throw new IllegalArgumentException("private key not found " + privateKey); + } + if (!publicKey.exists()){ + throw new IllegalArgumentException("public key not found " + publicKey); + } + try (FileInputStream keyfis = new FileInputStream(privateKey)) { + byte[] encKey = new byte[keyfis.available()]; + keyfis.read(encKey); + keyfis.close(); + privKeySpec = new PKCS8EncodedKeySpec(encKey); + KeyFactory keyFactory = KeyFactory.getInstance("DSA"); + privKey = keyFactory.generatePrivate(privKeySpec); + } catch (NoSuchAlgorithmException | InvalidKeySpecException | IOException e) { + throw new RuntimeException("failed hard", e); + } + } else { + privKeySpec = null; + privKey = null; + } + } + + private byte [] sign(byte [] bytes){ + Signature dsa; + try { + dsa = Signature.getInstance("SHA1withDSA", "SUN"); + dsa.initSign(privKey); + dsa.update(bytes); + return dsa.sign(); + } catch (NoSuchAlgorithmException | NoSuchProviderException | InvalidKeyException | SignatureException e) { + throw new RuntimeException(e); + } } public void addSharedData(SharedGossipDataMessage message){ @@ -207,7 +256,14 @@ public class GossipCore implements GossipCoreConstants { private void sendInternal(Base message, URI uri){ byte[] json_bytes; try { - json_bytes = gossipManager.getObjectMapper().writeValueAsString(message).getBytes(); + if (privKey == null){ + json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message); + } else { + SignedPayload p = new SignedPayload(); + p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes()); + p.setSignature(sign(p.getData())); + json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p); + } } catch (IOException e) { messageSerdeException.mark(); throw new RuntimeException(e); @@ -285,7 +341,14 @@ public class GossipCore implements GossipCoreConstants { public void sendOneWay(Base message, URI u){ byte[] json_bytes; try { - json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message); + if (privKey == null){ + json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message); + } else { + SignedPayload p = new SignedPayload(); + p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes()); + p.setSignature(sign(p.getData())); + json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p); + } } catch (IOException e) { messageSerdeException.mark(); throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/7106cc40/src/main/java/org/apache/gossip/manager/GossipManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index 67cb06b..9221aa6 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -76,11 +76,11 @@ public abstract class GossipManager { URI uri, String id, Map<String,String> properties, GossipSettings settings, List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper) { this.settings = settings; - gossipCore = new GossipCore(this, registry); - clock = new SystemClock(); - dataReaper = new DataReaper(gossipCore, clock); + clock = new SystemClock(); me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(), properties, settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution()); + gossipCore = new GossipCore(this, registry); + dataReaper = new DataReaper(gossipCore, clock); members = new ConcurrentSkipListMap<>(); for (GossipMember startupMember : gossipMembers) { if (!startupMember.equals(me)) { @@ -337,5 +337,9 @@ public abstract class GossipManager { public ObjectMapper getObjectMapper() { return objectMapper; } + + public MetricRegistry getRegistry() { + return registry; + } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/7106cc40/src/main/java/org/apache/gossip/manager/PassiveGossipConstants.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipConstants.java b/src/main/java/org/apache/gossip/manager/PassiveGossipConstants.java new file mode 100644 index 0000000..3bcc344 --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/PassiveGossipConstants.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +public interface PassiveGossipConstants { + String SIGNED_MESSAGE = "gossip.passive.signed_message"; + String UNSIGNED_MESSAGE = "gossip.passive.unsigned_message"; +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/7106cc40/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java index 51cf264..bfce2dd 100644 --- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java @@ -26,8 +26,11 @@ import java.net.SocketException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.gossip.model.Base; +import org.apache.gossip.model.SignedPayload; import org.apache.log4j.Logger; +import com.codahale.metrics.Meter; + /** * This class handles the passive cycle, * where this client has received an incoming message. @@ -41,6 +44,8 @@ abstract public class PassiveGossipThread implements Runnable { private final AtomicBoolean keepRunning; private final GossipCore gossipCore; private final GossipManager gossipManager; + private final Meter signed; + private final Meter unsigned; public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) { this.gossipManager = gossipManager; @@ -52,14 +57,13 @@ abstract public class PassiveGossipThread implements Runnable { SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(), gossipManager.getMyself().getUri().getPort()); server = new DatagramSocket(socketAddress); - LOGGER.debug("Gossip service successfully initialized on port " - + gossipManager.getMyself().getUri().getPort()); - LOGGER.debug("I am " + gossipManager.getMyself()); } catch (SocketException ex) { LOGGER.warn(ex); throw new RuntimeException(ex); } keepRunning = new AtomicBoolean(true); + signed = gossipManager.getRegistry().meter(PassiveGossipConstants.SIGNED_MESSAGE); + unsigned = gossipManager.getRegistry().meter(PassiveGossipConstants.UNSIGNED_MESSAGE); } @Override @@ -72,7 +76,15 @@ abstract public class PassiveGossipThread implements Runnable { debug(p.getData()); try { Base activeGossipMessage = gossipManager.getObjectMapper().readValue(p.getData(), Base.class); - gossipCore.receive(activeGossipMessage); + if (activeGossipMessage instanceof SignedPayload){ + SignedPayload s = (SignedPayload) activeGossipMessage; + Base nested = gossipManager.getObjectMapper().readValue(s.getData(), Base.class); + gossipCore.receive(nested); + signed.mark(); + } else { + gossipCore.receive(activeGossipMessage); + unsigned.mark(); + } } catch (RuntimeException ex) {//TODO trap json exception LOGGER.error("Unable to process message", ex); } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/7106cc40/src/main/java/org/apache/gossip/model/SignedPayload.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/model/SignedPayload.java b/src/main/java/org/apache/gossip/model/SignedPayload.java new file mode 100644 index 0000000..9ffbcf1 --- /dev/null +++ b/src/main/java/org/apache/gossip/model/SignedPayload.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.model; + +public class SignedPayload extends Base{ + private byte [] data; + private byte [] signature; + public byte[] getData() { + return data; + } + public void setData(byte[] data) { + this.data = data; + } + public byte[] getSignature() { + return signature; + } + public void setSignature(byte[] signature) { + this.signature = signature; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/7106cc40/src/main/java/org/apache/gossip/secure/KeyTool.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/secure/KeyTool.java b/src/main/java/org/apache/gossip/secure/KeyTool.java new file mode 100644 index 0000000..69f4e72 --- /dev/null +++ b/src/main/java/org/apache/gossip/secure/KeyTool.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.secure; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.NoSuchAlgorithmException; +import java.security.NoSuchProviderException; +import java.security.PrivateKey; +import java.security.PublicKey; +import java.security.SecureRandom; + +public class KeyTool { + + public static void generatePubandPrivateKeyFiles(String path, String id) + throws NoSuchAlgorithmException, NoSuchProviderException, IOException{ + SecureRandom r = new SecureRandom(); + KeyPairGenerator keyGen = KeyPairGenerator.getInstance("DSA", "SUN"); + keyGen.initialize(1024, r); + KeyPair pair = keyGen.generateKeyPair(); + PrivateKey priv = pair.getPrivate(); + PublicKey pub = pair.getPublic(); + { + FileOutputStream sigfos = new FileOutputStream(new File(path, id)); + sigfos.write(priv.getEncoded()); + sigfos.close(); + } + { + FileOutputStream sigfos = new FileOutputStream(new File(path, id + ".pub")); + sigfos.write(pub.getEncoded()); + sigfos.close(); + } + } + + public static void main (String [] args) throws + NoSuchAlgorithmException, NoSuchProviderException, IOException{ + generatePubandPrivateKeyFiles(args[0], args[1]); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/7106cc40/src/test/java/org/apache/gossip/DataTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java index 98c7ee0..b5fa705 100644 --- a/src/test/java/org/apache/gossip/DataTest.java +++ b/src/test/java/org/apache/gossip/DataTest.java @@ -79,7 +79,6 @@ public class DataTest { } }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b"); - TUnit.assertThat(new Callable<Object>() { public Object call() throws Exception { SharedGossipDataMessage x = clients.get(1).findSharedData("a"); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/7106cc40/src/test/java/org/apache/gossip/SignedMessageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/gossip/SignedMessageTest.java b/src/test/java/org/apache/gossip/SignedMessageTest.java new file mode 100644 index 0000000..6bea974 --- /dev/null +++ b/src/test/java/org/apache/gossip/SignedMessageTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.security.NoSuchAlgorithmException; +import java.security.NoSuchProviderException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.gossip.manager.PassiveGossipConstants; +import org.apache.gossip.secure.KeyTool; +import org.junit.Assert; +import org.junit.Test; + +import com.codahale.metrics.MetricRegistry; + +import io.teknek.tunit.TUnit; + +public class SignedMessageTest { + + @Test(expected=IllegalArgumentException.class) + public void ifSignMustHaveKeys() + throws URISyntaxException, UnknownHostException, InterruptedException { + String cluster = UUID.randomUUID().toString(); + GossipSettings settings = gossiperThatSigns(); + List<GossipMember> startupMembers = new ArrayList<>(); + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + 1)); + GossipService gossipService = new GossipService(cluster, uri, 1 + "", + new HashMap<String, String>(), startupMembers, settings, (a, b) -> { }, + new MetricRegistry()); + gossipService.start(); + } + + private GossipSettings gossiperThatSigns(){ + GossipSettings settings = new GossipSettings(); + settings.setPersistRingState(false); + settings.setPersistDataState(false); + settings.setSignMessages(true); + return settings; + } + + @Test + public void dataTest() throws InterruptedException, URISyntaxException, NoSuchAlgorithmException, NoSuchProviderException, IOException{ + String keys = "./keys"; + GossipSettings settings = gossiperThatSigns(); + setup(keys); + String cluster = UUID.randomUUID().toString(); + List<GossipMember> startupMembers = new ArrayList<>(); + for (int i = 1; i < 2; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + i)); + startupMembers.add(new RemoteGossipMember(cluster, uri, i + "")); + } + final List<GossipService> clients = new ArrayList<>(); + for (int i = 1; i < 3; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + i)); + GossipService gossipService = new GossipService(cluster, uri, i + "", + new HashMap<String,String>(), startupMembers, settings, + (a,b) -> {}, new MetricRegistry()); + clients.add(gossipService); + gossipService.start(); + } + assertTwoAlive(clients); + assertOnlySignedMessages(clients); + cleanup(keys, clients); + } + + private void assertTwoAlive(List<GossipService> clients){ + TUnit.assertThat(() -> { + int total = 0; + for (int i = 0; i < clients.size(); ++i) { + total += clients.get(i).getGossipManager().getLiveMembers().size(); + } + return total; + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); + } + + private void assertOnlySignedMessages(List<GossipService> clients){ + Assert.assertEquals(0, clients.get(0).getGossipManager().getRegistry() + .meter(PassiveGossipConstants.UNSIGNED_MESSAGE).getCount()); + Assert.assertTrue(clients.get(0).getGossipManager().getRegistry() + .meter(PassiveGossipConstants.SIGNED_MESSAGE).getCount() > 0); + } + + private void cleanup(String keys, List<GossipService> clients){ + new File(keys, "1").delete(); + new File(keys, "2").delete(); + new File(keys).delete(); + for (int i = 0; i < clients.size(); ++i) { + clients.get(i).shutdown(); + } + } + + private void setup(String keys) throws NoSuchAlgorithmException, NoSuchProviderException, IOException { + new File(keys).mkdir(); + KeyTool.generatePubandPrivateKeyFiles(keys, "1"); + KeyTool.generatePubandPrivateKeyFiles(keys, "2"); + } +}