Repository: incubator-gossip Updated Branches: refs/heads/master 2c1dc4375 -> 4f1993a7f
GOSSIP-10 simplify code Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/4f1993a7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/4f1993a7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/4f1993a7 Branch: refs/heads/master Commit: 4f1993a7fca3769569d6cea98c8e3a6be10eb7b5 Parents: 2c1dc43 Author: Edward Capriolo <edlinuxg...@gmail.com> Authored: Tue Jun 14 09:54:45 2016 -0400 Committer: Edward Capriolo <edlinuxg...@gmail.com> Committed: Tue Jun 14 09:54:45 2016 -0400 ---------------------------------------------------------------------- .../java/org/apache/gossip/GossipService.java | 1 - .../gossip/manager/PassiveGossipThread.java | 10 +- .../OnlyProcessReceivedPassiveGossipThread.java | 1 - .../impl/SendMembersActiveGossipThread.java | 98 -------------------- .../random/RandomActiveGossipThread.java | 66 ++++++++++++- .../manager/random/RandomGossipManager.java | 1 - 6 files changed, 65 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/4f1993a7/src/main/java/org/apache/gossip/GossipService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java index b149719..ce15992 100644 --- a/src/main/java/org/apache/gossip/GossipService.java +++ b/src/main/java/org/apache/gossip/GossipService.java @@ -17,7 +17,6 @@ */ package org.apache.gossip; -import java.net.InetAddress; import java.net.URI; import java.net.UnknownHostException; import java.util.List; http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/4f1993a7/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java index a057e7d..0b12ee4 100644 --- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java @@ -165,12 +165,4 @@ abstract public class PassiveGossipThread implements Runnable { */ abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, List<GossipMember> remoteList); -} - -/* - * random comments // Check whether the package is smaller than the maximal packet length. // A - * package larger than this would not be possible to be send from a GossipService, // since this is - * check before sending the message. // This could normally only occur when the list of members is - * very big, // or when the packet is malformed, and the first 4 bytes is not the right in anymore. - * // For this reason we regards the message. - */ +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/4f1993a7/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java index d0acfc1..bde497f 100644 --- a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java @@ -20,7 +20,6 @@ package org.apache.gossip.manager.impl; import java.util.List; import org.apache.gossip.GossipMember; -import org.apache.gossip.GossipService; import org.apache.gossip.LocalGossipMember; import org.apache.gossip.RemoteGossipMember; import org.apache.gossip.manager.GossipManager; http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/4f1993a7/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java deleted file mode 100644 index c296156..0000000 --- a/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.manager.impl; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetAddress; -import java.nio.ByteBuffer; -import java.util.List; - -import org.apache.gossip.GossipService; -import org.apache.gossip.LocalGossipMember; -import org.apache.gossip.manager.ActiveGossipThread; -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.model.ActiveGossipMessage; -import org.apache.gossip.model.GossipMember; -import org.codehaus.jackson.map.ObjectMapper; - -abstract public class SendMembersActiveGossipThread extends ActiveGossipThread { - - protected ObjectMapper om = new ObjectMapper(); - - public SendMembersActiveGossipThread(GossipManager gossipManager) { - super(gossipManager); - } - - private GossipMember convert(LocalGossipMember member){ - GossipMember gm = new GossipMember(); - gm.setCluster(member.getClusterName()); - gm.setHeartbeat(member.getHeartbeat()); - gm.setUri(member.getUri().toASCIIString()); - gm.setId(member.getId()); - return gm; - } - - /** - * Performs the sending of the membership list, after we have incremented our own heartbeat. - */ - protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) { - GossipService.LOGGER.debug("Send sendMembershipList() is called."); - me.setHeartbeat(System.currentTimeMillis()); - LocalGossipMember member = selectPartner(memberList); - if (member == null) { - return; - } - try (DatagramSocket socket = new DatagramSocket()) { - socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); - InetAddress dest = InetAddress.getByName(member.getUri().getHost()); - ActiveGossipMessage message = new ActiveGossipMessage(); - message.getMembers().add(convert(me)); - for (LocalGossipMember other : memberList) { - message.getMembers().add(convert(other)); - } - byte[] json_bytes = om.writeValueAsString(message).getBytes(); - int packet_length = json_bytes.length; - if (packet_length < GossipManager.MAX_PACKET_SIZE) { - byte[] buf = createBuffer(packet_length, json_bytes); - DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getUri().getPort()); - socket.send(datagramPacket); - } else { - GossipService.LOGGER.error("The length of the to be send message is too large (" - + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); - } - } catch (IOException e1) { - GossipService.LOGGER.warn(e1); - } - } - - private byte[] createBuffer(int packetLength, byte[] jsonBytes) { - byte[] lengthBytes = new byte[4]; - lengthBytes[0] = (byte) (packetLength >> 24); - lengthBytes[1] = (byte) ((packetLength << 8) >> 24); - lengthBytes[2] = (byte) ((packetLength << 16) >> 24); - lengthBytes[3] = (byte) ((packetLength << 24) >> 24); - ByteBuffer byteBuffer = ByteBuffer.allocate(4 + jsonBytes.length); - byteBuffer.put(lengthBytes); - byteBuffer.put(jsonBytes); - byte[] buf = byteBuffer.array(); - return buf; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/4f1993a7/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java index 23a41f5..53885b6 100644 --- a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java @@ -17,16 +17,26 @@ */ package org.apache.gossip.manager.random; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.nio.ByteBuffer; import java.util.List; import java.util.Random; import org.apache.gossip.GossipService; import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.manager.ActiveGossipThread; import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.manager.impl.SendMembersActiveGossipThread; +import org.apache.gossip.model.ActiveGossipMessage; +import org.apache.gossip.model.GossipMember; +import org.codehaus.jackson.map.ObjectMapper; -public class RandomActiveGossipThread extends SendMembersActiveGossipThread { +public class RandomActiveGossipThread extends ActiveGossipThread { + protected ObjectMapper om = new ObjectMapper(); + /** The Random used for choosing a member to gossip with. */ private final Random random; @@ -52,4 +62,56 @@ public class RandomActiveGossipThread extends SendMembersActiveGossipThread { return member; } + protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) { + GossipService.LOGGER.debug("Send sendMembershipList() is called."); + me.setHeartbeat(System.currentTimeMillis()); + LocalGossipMember member = selectPartner(memberList); + if (member == null) { + return; + } + try (DatagramSocket socket = new DatagramSocket()) { + socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); + InetAddress dest = InetAddress.getByName(member.getUri().getHost()); + ActiveGossipMessage message = new ActiveGossipMessage(); + message.getMembers().add(convert(me)); + for (LocalGossipMember other : memberList) { + message.getMembers().add(convert(other)); + } + byte[] json_bytes = om.writeValueAsString(message).getBytes(); + int packet_length = json_bytes.length; + if (packet_length < GossipManager.MAX_PACKET_SIZE) { + byte[] buf = createBuffer(packet_length, json_bytes); + DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getUri().getPort()); + socket.send(datagramPacket); + } else { + GossipService.LOGGER.error("The length of the to be send message is too large (" + + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); + } + } catch (IOException e1) { + GossipService.LOGGER.warn(e1); + } + } + + private byte[] createBuffer(int packetLength, byte[] jsonBytes) { + byte[] lengthBytes = new byte[4]; + lengthBytes[0] = (byte) (packetLength >> 24); + lengthBytes[1] = (byte) ((packetLength << 8) >> 24); + lengthBytes[2] = (byte) ((packetLength << 16) >> 24); + lengthBytes[3] = (byte) ((packetLength << 24) >> 24); + ByteBuffer byteBuffer = ByteBuffer.allocate(4 + jsonBytes.length); + byteBuffer.put(lengthBytes); + byteBuffer.put(jsonBytes); + byte[] buf = byteBuffer.array(); + return buf; + } + + private GossipMember convert(LocalGossipMember member){ + GossipMember gm = new GossipMember(); + gm.setCluster(member.getClusterName()); + gm.setHeartbeat(member.getHeartbeat()); + gm.setUri(member.getUri().toASCIIString()); + gm.setId(member.getId()); + return gm; + } + } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/4f1993a7/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java index 1d2075e..fa2b1c5 100644 --- a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java +++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java @@ -27,7 +27,6 @@ import java.net.URI; import java.util.List; import java.util.ArrayList; -import java.util.List; public class RandomGossipManager extends GossipManager {