http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java b/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java new file mode 100644 index 0000000..b73550e --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +import java.util.Map.Entry; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; +import org.apache.gossip.LocalMember; +import org.apache.gossip.model.ActiveGossipOk; +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.model.Member; +import org.apache.gossip.model.Response; +import org.apache.gossip.model.SharedDataMessage; +import org.apache.gossip.model.ShutdownMessage; +import org.apache.gossip.udp.UdpActiveGossipMessage; +import org.apache.gossip.udp.UdpPerNodeDataMessage; +import org.apache.gossip.udp.UdpSharedDataMessage; +import org.apache.log4j.Logger; + +import static com.codahale.metrics.MetricRegistry.name; + +/** + * The ActiveGossipThread is sends information. Pick a random partner and send the membership list to that partner + */ +public abstract class AbstractActiveGossiper { + + protected static final Logger LOGGER = Logger.getLogger(AbstractActiveGossiper.class); + + protected final GossipManager gossipManager; + protected final GossipCore gossipCore; + private final Histogram sharedDataHistogram; + private final Histogram sendPerNodeDataHistogram; + private final Histogram sendMembershipHistorgram; + private final Random random; + + public AbstractActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) { + this.gossipManager = gossipManager; + this.gossipCore = gossipCore; + sharedDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sharedDataHistogram-time")); + sendPerNodeDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendPerNodeDataHistogram-time")); + sendMembershipHistorgram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistorgram-time")); + random = new Random(); + } + + public void init() { + + } + + public void shutdown() { + + } + + public final void sendShutdownMessage(LocalMember me, LocalMember target){ + if (target == null){ + return; + } + ShutdownMessage m = new ShutdownMessage(); + m.setNodeId(me.getId()); + m.setShutdownAtNanos(gossipManager.getClock().nanoTime()); + gossipCore.sendOneWay(m, target.getUri()); + } + + public final void sendSharedData(LocalMember me, LocalMember member){ + if (member == null){ + return; + } + long startTime = System.currentTimeMillis(); + for (Entry<String, SharedDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){ + UdpSharedDataMessage message = new UdpSharedDataMessage(); + message.setUuid(UUID.randomUUID().toString()); + message.setUriFrom(me.getId()); + message.setExpireAt(innerEntry.getValue().getExpireAt()); + message.setKey(innerEntry.getValue().getKey()); + message.setNodeId(innerEntry.getValue().getNodeId()); + message.setTimestamp(innerEntry.getValue().getTimestamp()); + message.setPayload(innerEntry.getValue().getPayload()); + gossipCore.sendOneWay(message, member.getUri()); + } + sharedDataHistogram.update(System.currentTimeMillis() - startTime); + } + + public final void sendPerNodeData(LocalMember me, LocalMember member){ + if (member == null){ + return; + } + long startTime = System.currentTimeMillis(); + for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){ + for (Entry<String, PerNodeDataMessage> innerEntry : entry.getValue().entrySet()){ + UdpPerNodeDataMessage message = new UdpPerNodeDataMessage(); + message.setUuid(UUID.randomUUID().toString()); + message.setUriFrom(me.getId()); + message.setExpireAt(innerEntry.getValue().getExpireAt()); + message.setKey(innerEntry.getValue().getKey()); + message.setNodeId(innerEntry.getValue().getNodeId()); + message.setTimestamp(innerEntry.getValue().getTimestamp()); + message.setPayload(innerEntry.getValue().getPayload()); + gossipCore.sendOneWay(message, member.getUri()); + } + } + sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime); + } + + /** + * Performs the sending of the membership list, after we have incremented our own heartbeat. + */ + protected void sendMembershipList(LocalMember me, LocalMember member) { + if (member == null){ + return; + } + long startTime = System.currentTimeMillis(); + me.setHeartbeat(System.nanoTime()); + UdpActiveGossipMessage message = new UdpActiveGossipMessage(); + message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString()); + message.setUuid(UUID.randomUUID().toString()); + message.getMembers().add(convert(me)); + for (LocalMember other : gossipManager.getMembers().keySet()) { + message.getMembers().add(convert(other)); + } + Response r = gossipCore.send(message, member.getUri()); + if (r instanceof ActiveGossipOk){ + //maybe count metrics here + } else { + LOGGER.debug("Message " + message + " generated response " + r); + } + sendMembershipHistorgram.update(System.currentTimeMillis() - startTime); + } + + protected final Member convert(LocalMember member){ + Member gm = new Member(); + gm.setCluster(member.getClusterName()); + gm.setHeartbeat(member.getHeartbeat()); + gm.setUri(member.getUri().toASCIIString()); + gm.setId(member.getId()); + gm.setProperties(member.getProperties()); + return gm; + } + + /** + * + * @param memberList + * An immutable list + * @return The chosen LocalGossipMember to gossip with. + */ + protected LocalMember selectPartner(List<LocalMember> memberList) { + LocalMember member = null; + if (memberList.size() > 0) { + int randomNeighborIndex = random.nextInt(memberList.size()); + member = memberList.get(randomNeighborIndex); + } + return member; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/Clock.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/Clock.java b/gossip-base/src/main/java/org/apache/gossip/manager/Clock.java new file mode 100644 index 0000000..6629c62 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/manager/Clock.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +public interface Clock { + + long currentTimeMillis(); + long nanoTime(); + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/DataReaper.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/DataReaper.java b/gossip-base/src/main/java/org/apache/gossip/manager/DataReaper.java new file mode 100644 index 0000000..8175a1b --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/manager/DataReaper.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.model.SharedDataMessage; + +/** + * We wish to periodically sweep user data and remove entries past their timestamp. This + * implementation periodically sweeps through the data and removes old entries. While it might make + * sense to use a more specific high performance data-structure to handle eviction, keep in mind + * that we are not looking to store a large quantity of data as we currently have to transmit this + * data cluster wide. + */ +public class DataReaper { + + private final GossipCore gossipCore; + private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1); + private final Clock clock; + + public DataReaper(GossipCore gossipCore, Clock clock){ + this.gossipCore = gossipCore; + this.clock = clock; + } + + public void init(){ + Runnable reapPerNodeData = () -> { + runPerNodeOnce(); + runSharedOnce(); + }; + scheduledExecutor.scheduleAtFixedRate(reapPerNodeData, 0, 5, TimeUnit.SECONDS); + } + + void runSharedOnce(){ + for (Entry<String, SharedDataMessage> entry : gossipCore.getSharedData().entrySet()){ + if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){ + gossipCore.getSharedData().remove(entry.getKey(), entry.getValue()); + } + } + } + + void runPerNodeOnce(){ + for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> node : gossipCore.getPerNodeData().entrySet()){ + reapData(node.getValue()); + } + } + + void reapData(ConcurrentHashMap<String, PerNodeDataMessage> concurrentHashMap){ + for (Entry<String, PerNodeDataMessage> entry : concurrentHashMap.entrySet()){ + if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){ + concurrentHashMap.remove(entry.getKey(), entry.getValue()); + } + } + } + + public void close(){ + scheduledExecutor.shutdown(); + try { + scheduledExecutor.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java b/gossip-base/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java new file mode 100644 index 0000000..2f489a2 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +import java.util.List; +import java.util.ArrayList; +import java.util.Collections; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.gossip.LocalMember; + +import com.codahale.metrics.MetricRegistry; + +/** + * Sends gossip traffic at different rates to other racks and data-centers. + * This implementation controls the rate at which gossip traffic is shared. + * There are two constructs Datacenter and Rack. It is assumed that bandwidth and latency is higher + * in the rack than in the the datacenter. We can adjust the rate at which we send messages to each group. + * + */ +public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper { + + public static final String DATACENTER = "datacenter"; + public static final String RACK = "rack"; + + private int sameRackGossipIntervalMs = 100; + private int sameDcGossipIntervalMs = 500; + private int differentDatacenterGossipIntervalMs = 1000; + private int randomDeadMemberSendIntervalMs = 250; + + private ScheduledExecutorService scheduledExecutorService; + private final BlockingQueue<Runnable> workQueue; + private ThreadPoolExecutor threadService; + + public DatacenterRackAwareActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, + MetricRegistry registry) { + super(gossipManager, gossipCore, registry); + scheduledExecutorService = Executors.newScheduledThreadPool(2); + workQueue = new ArrayBlockingQueue<Runnable>(1024); + threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue, + new ThreadPoolExecutor.DiscardOldestPolicy()); + try { + sameRackGossipIntervalMs = Integer.parseInt(gossipManager.getSettings() + .getActiveGossipProperties().get("sameRackGossipIntervalMs")); + } catch (RuntimeException ex) { } + try { + sameDcGossipIntervalMs = Integer.parseInt(gossipManager.getSettings() + .getActiveGossipProperties().get("sameDcGossipIntervalMs")); + } catch (RuntimeException ex) { } + try { + differentDatacenterGossipIntervalMs = Integer.parseInt(gossipManager.getSettings() + .getActiveGossipProperties().get("differentDatacenterGossipIntervalMs")); + } catch (RuntimeException ex) { } + try { + randomDeadMemberSendIntervalMs = Integer.parseInt(gossipManager.getSettings() + .getActiveGossipProperties().get("randomDeadMemberSendIntervalMs")); + } catch (RuntimeException ex) { } + } + + @Override + public void init() { + super.init(); + //same rack + scheduledExecutorService.scheduleAtFixedRate(() -> + threadService.execute(() -> sendToSameRackMember()), + 0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS); + + scheduledExecutorService.scheduleAtFixedRate(() -> + threadService.execute(() -> sendToSameRackMemberPerNode()), + 0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS); + + scheduledExecutorService.scheduleAtFixedRate(() -> + threadService.execute(() -> sendToSameRackShared()), + 0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS); + + //same dc different rack + scheduledExecutorService.scheduleAtFixedRate(() -> + threadService.execute(() -> sameDcDiffernetRackMember()), + 0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS); + + scheduledExecutorService.scheduleAtFixedRate(() -> + threadService.execute(() -> sameDcDiffernetRackPerNode()), + 0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS); + + scheduledExecutorService.scheduleAtFixedRate(() -> + threadService.execute(() -> sameDcDiffernetRackShared()), + 0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS); + + //different dc + scheduledExecutorService.scheduleAtFixedRate(() -> + threadService.execute(() -> differentDcMember()), + 0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS); + + scheduledExecutorService.scheduleAtFixedRate(() -> + threadService.execute(() -> differentDcPerNode()), + 0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS); + + scheduledExecutorService.scheduleAtFixedRate(() -> + threadService.execute(() -> differentDcShared()), + 0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS); + + //the dead + scheduledExecutorService.scheduleAtFixedRate(() -> + threadService.execute(() -> sendToDeadMember()), + 0, randomDeadMemberSendIntervalMs, TimeUnit.MILLISECONDS); + + } + + private void sendToDeadMember() { + sendMembershipList(gossipManager.getMyself(), selectPartner(gossipManager.getDeadMembers())); + } + + private List<LocalMember> differentDataCenter(){ + String myDc = gossipManager.getMyself().getProperties().get(DATACENTER); + String rack = gossipManager.getMyself().getProperties().get(RACK); + if (myDc == null|| rack == null){ + return Collections.emptyList(); + } + List<LocalMember> notMyDc = new ArrayList<LocalMember>(10); + for (LocalMember i : gossipManager.getLiveMembers()){ + if (!myDc.equals(i.getProperties().get(DATACENTER))){ + notMyDc.add(i); + } + } + return notMyDc; + } + + private List<LocalMember> sameDatacenterDifferentRack(){ + String myDc = gossipManager.getMyself().getProperties().get(DATACENTER); + String rack = gossipManager.getMyself().getProperties().get(RACK); + if (myDc == null|| rack == null){ + return Collections.emptyList(); + } + List<LocalMember> notMyDc = new ArrayList<LocalMember>(10); + for (LocalMember i : gossipManager.getLiveMembers()){ + if (myDc.equals(i.getProperties().get(DATACENTER)) && !rack.equals(i.getProperties().get(RACK))){ + notMyDc.add(i); + } + } + return notMyDc; + } + + private List<LocalMember> sameRackNodes(){ + String myDc = gossipManager.getMyself().getProperties().get(DATACENTER); + String rack = gossipManager.getMyself().getProperties().get(RACK); + if (myDc == null|| rack == null){ + return Collections.emptyList(); + } + List<LocalMember> sameDcAndRack = new ArrayList<LocalMember>(10); + for (LocalMember i : gossipManager.getLiveMembers()){ + if (myDc.equals(i.getProperties().get(DATACENTER)) + && rack.equals(i.getProperties().get(RACK))){ + sameDcAndRack.add(i); + } + } + return sameDcAndRack; + } + + private void sendToSameRackMember() { + LocalMember i = selectPartner(sameRackNodes()); + sendMembershipList(gossipManager.getMyself(), i); + } + + private void sendToSameRackMemberPerNode() { + sendPerNodeData(gossipManager.getMyself(), selectPartner(sameRackNodes())); + } + + private void sendToSameRackShared() { + sendSharedData(gossipManager.getMyself(), selectPartner(sameRackNodes())); + } + + private void differentDcMember() { + sendMembershipList(gossipManager.getMyself(), selectPartner(differentDataCenter())); + } + + private void differentDcPerNode() { + sendPerNodeData(gossipManager.getMyself(), selectPartner(differentDataCenter())); + } + + private void differentDcShared() { + sendSharedData(gossipManager.getMyself(), selectPartner(differentDataCenter())); + } + + private void sameDcDiffernetRackMember() { + sendMembershipList(gossipManager.getMyself(), selectPartner(sameDatacenterDifferentRack())); + } + + private void sameDcDiffernetRackPerNode() { + sendPerNodeData(gossipManager.getMyself(), selectPartner(sameDatacenterDifferentRack())); + } + + private void sameDcDiffernetRackShared() { + sendSharedData(gossipManager.getMyself(), selectPartner(sameDatacenterDifferentRack())); + } + + @Override + public void shutdown() { + super.shutdown(); + scheduledExecutorService.shutdown(); + try { + scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.debug("Issue during shutdown", e); + } + sendShutdownMessage(); + threadService.shutdown(); + try { + threadService.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.debug("Issue during shutdown", e); + } + } + + /** + * sends an optimistic shutdown message to several clusters nodes + */ + protected void sendShutdownMessage(){ + List<LocalMember> l = gossipManager.getLiveMembers(); + int sendTo = l.size() < 3 ? 1 : l.size() / 3; + for (int i = 0; i < sendTo; i++) { + threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l))); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java new file mode 100644 index 0000000..f53419d --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.gossip.Member; +import org.apache.gossip.LocalMember; +import org.apache.gossip.RemoteMember; +import org.apache.gossip.crdt.Crdt; +import org.apache.gossip.event.GossipState; +import org.apache.gossip.model.*; +import org.apache.gossip.udp.Trackable; +import org.apache.log4j.Logger; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.URI; +import java.security.*; +import java.security.spec.InvalidKeySpecException; +import java.security.spec.PKCS8EncodedKeySpec; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.*; + +public class GossipCore implements GossipCoreConstants { + + class LatchAndBase { + private final CountDownLatch latch; + private volatile Base base; + + LatchAndBase(){ + latch = new CountDownLatch(1); + } + + } + public static final Logger LOGGER = Logger.getLogger(GossipCore.class); + private final GossipManager gossipManager; + private ConcurrentHashMap<String, LatchAndBase> requests; + private ThreadPoolExecutor service; + private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData; + private final ConcurrentHashMap<String, SharedDataMessage> sharedData; + private final BlockingQueue<Runnable> workQueue; + private final PKCS8EncodedKeySpec privKeySpec; + private final PrivateKey privKey; + private final Meter messageSerdeException; + private final Meter tranmissionException; + private final Meter tranmissionSuccess; + + public GossipCore(GossipManager manager, MetricRegistry metrics){ + this.gossipManager = manager; + requests = new ConcurrentHashMap<>(); + workQueue = new ArrayBlockingQueue<>(1024); + service = new ThreadPoolExecutor(1, 5, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy()); + perNodeData = new ConcurrentHashMap<>(); + sharedData = new ConcurrentHashMap<>(); + metrics.register(WORKQUEUE_SIZE, (Gauge<Integer>)() -> workQueue.size()); + metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size()); + metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() -> sharedData.size()); + metrics.register(REQUEST_SIZE, (Gauge<Integer>)() -> requests.size()); + metrics.register(THREADPOOL_ACTIVE, (Gauge<Integer>)() -> service.getActiveCount()); + metrics.register(THREADPOOL_SIZE, (Gauge<Integer>)() -> service.getPoolSize()); + messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION); + tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION); + tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS); + + if (manager.getSettings().isSignMessages()){ + File privateKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId()); + File publicKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId() + ".pub"); + if (!privateKey.exists()){ + throw new IllegalArgumentException("private key not found " + privateKey); + } + if (!publicKey.exists()){ + throw new IllegalArgumentException("public key not found " + publicKey); + } + try (FileInputStream keyfis = new FileInputStream(privateKey)) { + byte[] encKey = new byte[keyfis.available()]; + keyfis.read(encKey); + keyfis.close(); + privKeySpec = new PKCS8EncodedKeySpec(encKey); + KeyFactory keyFactory = KeyFactory.getInstance("DSA"); + privKey = keyFactory.generatePrivate(privKeySpec); + } catch (NoSuchAlgorithmException | InvalidKeySpecException | IOException e) { + throw new RuntimeException("failed hard", e); + } + } else { + privKeySpec = null; + privKey = null; + } + } + + private byte [] sign(byte [] bytes){ + Signature dsa; + try { + dsa = Signature.getInstance("SHA1withDSA", "SUN"); + dsa.initSign(privKey); + dsa.update(bytes); + return dsa.sign(); + } catch (NoSuchAlgorithmException | NoSuchProviderException | InvalidKeyException | SignatureException e) { + throw new RuntimeException(e); + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void addSharedData(SharedDataMessage message) { + while (true){ + SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), message); + if (previous == null){ + return; + } + if (message.getPayload() instanceof Crdt){ + SharedDataMessage merged = new SharedDataMessage(); + merged.setExpireAt(message.getExpireAt()); + merged.setKey(message.getKey()); + merged.setNodeId(message.getNodeId()); + merged.setTimestamp(message.getTimestamp()); + Crdt mergedCrdt = ((Crdt) previous.getPayload()).merge((Crdt) message.getPayload()); + merged.setPayload(mergedCrdt); + boolean replaced = sharedData.replace(message.getKey(), previous, merged); + if (replaced){ + return; + } + } else { + if (previous.getTimestamp() < message.getTimestamp()){ + boolean result = sharedData.replace(message.getKey(), previous, message); + if (result){ + return; + } + } else { + return; + } + } + } + } + + public void addPerNodeData(PerNodeDataMessage message){ + ConcurrentHashMap<String,PerNodeDataMessage> nodeMap = new ConcurrentHashMap<>(); + nodeMap.put(message.getKey(), message); + nodeMap = perNodeData.putIfAbsent(message.getNodeId(), nodeMap); + if (nodeMap != null){ + PerNodeDataMessage current = nodeMap.get(message.getKey()); + if (current == null){ + nodeMap.putIfAbsent(message.getKey(), message); + } else { + if (current.getTimestamp() < message.getTimestamp()){ + nodeMap.replace(message.getKey(), current, message); + } + } + } + } + + public ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> getPerNodeData(){ + return perNodeData; + } + + public ConcurrentHashMap<String, SharedDataMessage> getSharedData() { + return sharedData; + } + + public void shutdown(){ + service.shutdown(); + try { + service.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.warn(e); + } + service.shutdownNow(); + } + + public void receive(Base base) { + if (!gossipManager.getMessageInvoker().invoke(this, gossipManager, base)) { + LOGGER.warn("received message can not be handled"); + } + } + + /** + * Sends a blocking message. + * @param message + * @param uri + * @throws RuntimeException if data can not be serialized or in transmission error + */ + private void sendInternal(Base message, URI uri){ + byte[] json_bytes; + try { + if (privKey == null){ + json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message); + } else { + SignedPayload p = new SignedPayload(); + p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes()); + p.setSignature(sign(p.getData())); + json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p); + } + } catch (IOException e) { + messageSerdeException.mark(); + throw new RuntimeException(e); + } + try (DatagramSocket socket = new DatagramSocket()) { + socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2); + InetAddress dest = InetAddress.getByName(uri.getHost()); + DatagramPacket datagramPacket = new DatagramPacket(json_bytes, json_bytes.length, dest, uri.getPort()); + socket.send(datagramPacket); + tranmissionSuccess.mark(); + } catch (IOException e) { + tranmissionException.mark(); + throw new RuntimeException(e); + } + } + + public Response send(Base message, URI uri){ + if (LOGGER.isDebugEnabled()){ + LOGGER.debug("Sending " + message); + LOGGER.debug("Current request queue " + requests); + } + + final Trackable t; + LatchAndBase latchAndBase = null; + if (message instanceof Trackable){ + t = (Trackable) message; + latchAndBase = new LatchAndBase(); + requests.put(t.getUuid() + "/" + t.getUriFrom(), latchAndBase); + } else { + t = null; + } + sendInternal(message, uri); + if (latchAndBase == null){ + return null; + } + + try { + boolean complete = latchAndBase.latch.await(1, TimeUnit.SECONDS); + if (complete){ + return (Response) latchAndBase.base; + } else{ + return null; + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + if (latchAndBase != null){ + requests.remove(t.getUuid() + "/" + t.getUriFrom()); + } + } + } + + /** + * Sends a message across the network while blocking. Catches and ignores IOException in transmission. Used + * when the protocol for the message is not to wait for a response + * @param message the message to send + * @param u the uri to send it to + */ + public void sendOneWay(Base message, URI u){ + byte[] json_bytes; + try { + if (privKey == null){ + json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message); + } else { + SignedPayload p = new SignedPayload(); + p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes()); + p.setSignature(sign(p.getData())); + json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p); + } + } catch (IOException e) { + messageSerdeException.mark(); + throw new RuntimeException(e); + } + try (DatagramSocket socket = new DatagramSocket()) { + socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2); + InetAddress dest = InetAddress.getByName(u.getHost()); + DatagramPacket datagramPacket = new DatagramPacket(json_bytes, json_bytes.length, dest, u.getPort()); + socket.send(datagramPacket); + tranmissionSuccess.mark(); + } catch (IOException ex) { + tranmissionException.mark(); + LOGGER.debug("Send one way failed", ex); + } + } + + public void handleResponse(String k, Base v) { + LatchAndBase latch = requests.get(k); + latch.base = v; + latch.latch.countDown(); + } + + /** + * Merge lists from remote members and update heartbeats + * + * @param gossipManager + * @param senderMember + * @param remoteList + * + */ + public void mergeLists(GossipManager gossipManager, RemoteMember senderMember, + List<Member> remoteList) { + if (LOGGER.isDebugEnabled()){ + debugState(senderMember, remoteList); + } + for (LocalMember i : gossipManager.getDeadMembers()) { + if (i.getId().equals(senderMember.getId())) { + LOGGER.debug(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri()); + i.recordHeartbeat(senderMember.getHeartbeat()); + i.setHeartbeat(senderMember.getHeartbeat()); + //TODO consider forcing an UP here + } + } + for (Member remoteMember : remoteList) { + if (remoteMember.getId().equals(gossipManager.getMyself().getId())) { + continue; + } + LocalMember aNewMember = new LocalMember(remoteMember.getClusterName(), + remoteMember.getUri(), + remoteMember.getId(), + remoteMember.getHeartbeat(), + remoteMember.getProperties(), + gossipManager.getSettings().getWindowSize(), + gossipManager.getSettings().getMinimumSamples(), + gossipManager.getSettings().getDistribution()); + aNewMember.recordHeartbeat(remoteMember.getHeartbeat()); + Object result = gossipManager.getMembers().putIfAbsent(aNewMember, GossipState.UP); + if (result != null){ + for (Entry<LocalMember, GossipState> localMember : gossipManager.getMembers().entrySet()){ + if (localMember.getKey().getId().equals(remoteMember.getId())){ + localMember.getKey().recordHeartbeat(remoteMember.getHeartbeat()); + localMember.getKey().setHeartbeat(remoteMember.getHeartbeat()); + localMember.getKey().setProperties(remoteMember.getProperties()); + } + } + } + } + if (LOGGER.isDebugEnabled()){ + debugState(senderMember, remoteList); + } + } + + private void debugState(RemoteMember senderMember, + List<Member> remoteList){ + LOGGER.warn( + "-----------------------\n" + + "Me " + gossipManager.getMyself() + "\n" + + "Sender " + senderMember + "\n" + + "RemoteList " + remoteList + "\n" + + "Live " + gossipManager.getLiveMembers()+ "\n" + + "Dead " + gossipManager.getDeadMembers()+ "\n" + + "======================="); + } + + @SuppressWarnings("rawtypes") + public Crdt merge(SharedDataMessage message) { + for (;;){ + SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), message); + if (previous == null){ + return (Crdt) message.getPayload(); + } + SharedDataMessage copy = new SharedDataMessage(); + copy.setExpireAt(message.getExpireAt()); + copy.setKey(message.getKey()); + copy.setNodeId(message.getNodeId()); + copy.setTimestamp(message.getTimestamp()); + @SuppressWarnings("unchecked") + Crdt merged = ((Crdt) previous.getPayload()).merge((Crdt) message.getPayload()); + copy.setPayload(merged); + boolean replaced = sharedData.replace(message.getKey(), previous, copy); + if (replaced){ + return merged; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java new file mode 100644 index 0000000..6d3765a --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +public interface GossipCoreConstants { + String WORKQUEUE_SIZE = "gossip.core.workqueue.size"; + String PER_NODE_DATA_SIZE = "gossip.core.pernodedata.size"; + String SHARED_DATA_SIZE = "gossip.core.shareddata.size"; + String REQUEST_SIZE = "gossip.core.requests.size"; + String THREADPOOL_ACTIVE = "gossip.core.threadpool.active"; + String THREADPOOL_SIZE = "gossip.core.threadpool.size"; + String MESSAGE_SERDE_EXCEPTION = "gossip.core.message_serde_exception"; + String MESSAGE_TRANSMISSION_EXCEPTION = "gossip.core.message_transmission_exception"; + String MESSAGE_TRANSMISSION_SUCCESS = "gossip.core.message_transmission_success"; +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java new file mode 100644 index 0000000..c2b50ae --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +import com.codahale.metrics.MetricRegistry; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.gossip.GossipSettings; +import org.apache.gossip.LocalMember; +import org.apache.gossip.Member; +import org.apache.gossip.crdt.Crdt; +import org.apache.gossip.event.GossipListener; +import org.apache.gossip.event.GossipState; +import org.apache.gossip.manager.handlers.MessageInvoker; +import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.model.SharedDataMessage; +import org.apache.log4j.Logger; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.net.URI; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +public abstract class GossipManager { + + public static final Logger LOGGER = Logger.getLogger(GossipManager.class); + + private final ConcurrentSkipListMap<LocalMember, GossipState> members; + private final LocalMember me; + private final GossipSettings settings; + private final AtomicBoolean gossipServiceRunning; + private AbstractActiveGossiper activeGossipThread; + private PassiveGossipThread passiveGossipThread; + private ExecutorService gossipThreadExecutor; + private final GossipCore gossipCore; + private final DataReaper dataReaper; + private final Clock clock; + private final ScheduledExecutorService scheduledServiced; + private final MetricRegistry registry; + private final RingStatePersister ringState; + private final UserDataPersister userDataState; + private final GossipMemberStateRefresher memberStateRefresher; + private final ObjectMapper objectMapper; + + private final MessageInvoker messageInvoker; + + public GossipManager(String cluster, + URI uri, String id, Map<String, String> properties, GossipSettings settings, + List<Member> gossipMembers, GossipListener listener, MetricRegistry registry, + ObjectMapper objectMapper, MessageInvoker messageInvoker) { + this.settings = settings; + this.messageInvoker = messageInvoker; + + clock = new SystemClock(); + me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties, + settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution()); + gossipCore = new GossipCore(this, registry); + dataReaper = new DataReaper(gossipCore, clock); + members = new ConcurrentSkipListMap<>(); + for (Member startupMember : gossipMembers) { + if (!startupMember.equals(me)) { + LocalMember member = new LocalMember(startupMember.getClusterName(), + startupMember.getUri(), startupMember.getId(), + clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(), + settings.getMinimumSamples(), settings.getDistribution()); + //TODO should members start in down state? + members.put(member, GossipState.DOWN); + } + } + gossipThreadExecutor = Executors.newCachedThreadPool(); + gossipServiceRunning = new AtomicBoolean(true); + this.scheduledServiced = Executors.newScheduledThreadPool(1); + this.registry = registry; + this.ringState = new RingStatePersister(this); + this.userDataState = new UserDataPersister(this, this.gossipCore); + this.memberStateRefresher = new GossipMemberStateRefresher(members, settings, listener, this::findPerNodeGossipData); + this.objectMapper = objectMapper; + readSavedRingState(); + readSavedDataState(); + } + + public MessageInvoker getMessageInvoker() { + return messageInvoker; + } + + public ConcurrentSkipListMap<LocalMember, GossipState> getMembers() { + return members; + } + + public GossipSettings getSettings() { + return settings; + } + + /** + * @return a read only list of members found in the DOWN state. + */ + public List<LocalMember> getDeadMembers() { + return Collections.unmodifiableList( + members.entrySet() + .stream() + .filter(entry -> GossipState.DOWN.equals(entry.getValue())) + .map(Entry::getKey).collect(Collectors.toList())); + } + + /** + * + * @return a read only list of members found in the UP state + */ + public List<LocalMember> getLiveMembers() { + return Collections.unmodifiableList( + members.entrySet() + .stream() + .filter(entry -> GossipState.UP.equals(entry.getValue())) + .map(Entry::getKey).collect(Collectors.toList())); + } + + public LocalMember getMyself() { + return me; + } + + private AbstractActiveGossiper constructActiveGossiper(){ + try { + Constructor<?> c = Class.forName(settings.getActiveGossipClass()).getConstructor(GossipManager.class, GossipCore.class, MetricRegistry.class); + return (AbstractActiveGossiper) c.newInstance(this, gossipCore, registry); + } catch (NoSuchMethodException | SecurityException | ClassNotFoundException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + /** + * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip + * thread and start the receiver thread. + */ + public void init() { + passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore); + gossipThreadExecutor.execute(passiveGossipThread); + activeGossipThread = constructActiveGossiper(); + activeGossipThread.init(); + dataReaper.init(); + scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS); + scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS); + scheduledServiced.scheduleAtFixedRate(memberStateRefresher, 0, 100, TimeUnit.MILLISECONDS); + LOGGER.debug("The GossipManager is started."); + } + + private void readSavedRingState() { + for (LocalMember l : ringState.readFromDisk()){ + LocalMember member = new LocalMember(l.getClusterName(), + l.getUri(), l.getId(), + clock.nanoTime(), l.getProperties(), settings.getWindowSize(), + settings.getMinimumSamples(), settings.getDistribution()); + members.putIfAbsent(member, GossipState.DOWN); + } + } + + private void readSavedDataState() { + for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> l : userDataState.readPerNodeFromDisk().entrySet()){ + for (Entry<String, PerNodeDataMessage> j : l.getValue().entrySet()){ + gossipCore.addPerNodeData(j.getValue()); + } + } + for (Entry<String, SharedDataMessage> l: userDataState.readSharedDataFromDisk().entrySet()){ + gossipCore.addSharedData(l.getValue()); + } + } + + /** + * Shutdown the gossip service. + */ + public void shutdown() { + gossipServiceRunning.set(false); + gossipThreadExecutor.shutdown(); + gossipCore.shutdown(); + dataReaper.close(); + if (passiveGossipThread != null) { + passiveGossipThread.shutdown(); + } + if (activeGossipThread != null) { + activeGossipThread.shutdown(); + } + try { + boolean result = gossipThreadExecutor.awaitTermination(10, TimeUnit.MILLISECONDS); + if (!result) { + LOGGER.error("executor shutdown timed out"); + } + } catch (InterruptedException e) { + LOGGER.error(e); + } + gossipThreadExecutor.shutdownNow(); + scheduledServiced.shutdown(); + try { + scheduledServiced.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.error(e); + } + scheduledServiced.shutdownNow(); + } + + public void gossipPerNodeData(PerNodeDataMessage message){ + Objects.nonNull(message.getKey()); + Objects.nonNull(message.getTimestamp()); + Objects.nonNull(message.getPayload()); + message.setNodeId(me.getId()); + gossipCore.addPerNodeData(message); + } + + public void gossipSharedData(SharedDataMessage message){ + Objects.nonNull(message.getKey()); + Objects.nonNull(message.getTimestamp()); + Objects.nonNull(message.getPayload()); + message.setNodeId(me.getId()); + gossipCore.addSharedData(message); + } + + + @SuppressWarnings("rawtypes") + public Crdt findCrdt(String key){ + SharedDataMessage l = gossipCore.getSharedData().get(key); + if (l == null){ + return null; + } + if (l.getExpireAt() < clock.currentTimeMillis()){ + return null; + } else { + return (Crdt) l.getPayload(); + } + } + + @SuppressWarnings("rawtypes") + public Crdt merge(SharedDataMessage message){ + Objects.nonNull(message.getKey()); + Objects.nonNull(message.getTimestamp()); + Objects.nonNull(message.getPayload()); + message.setNodeId(me.getId()); + if (! (message.getPayload() instanceof Crdt)){ + throw new IllegalArgumentException("Not a subclass of CRDT " + message.getPayload()); + } + return gossipCore.merge(message); + } + + public PerNodeDataMessage findPerNodeGossipData(String nodeId, String key){ + ConcurrentHashMap<String, PerNodeDataMessage> j = gossipCore.getPerNodeData().get(nodeId); + if (j == null){ + return null; + } else { + PerNodeDataMessage l = j.get(key); + if (l == null){ + return null; + } + if (l.getExpireAt() != null && l.getExpireAt() < clock.currentTimeMillis()) { + return null; + } + return l; + } + } + + public SharedDataMessage findSharedGossipData(String key){ + SharedDataMessage l = gossipCore.getSharedData().get(key); + if (l == null){ + return null; + } + if (l.getExpireAt() < clock.currentTimeMillis()){ + return null; + } else { + return l; + } + } + + public DataReaper getDataReaper() { + return dataReaper; + } + + public RingStatePersister getRingState() { + return ringState; + } + + public UserDataPersister getUserDataState() { + return userDataState; + } + + public GossipMemberStateRefresher getMemberStateRefresher() { + return memberStateRefresher; + } + + public Clock getClock() { + return clock; + } + + public ObjectMapper getObjectMapper() { + return objectMapper; + } + + public MetricRegistry getRegistry() { + return registry; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java new file mode 100644 index 0000000..b87045b --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +import com.codahale.metrics.MetricRegistry; +import com.fasterxml.jackson.core.JsonGenerator.Feature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.gossip.Member; +import org.apache.gossip.GossipSettings; +import org.apache.gossip.StartupSettings; +import org.apache.gossip.crdt.CrdtModule; +import org.apache.gossip.event.GossipListener; +import org.apache.gossip.manager.handlers.DefaultMessageInvoker; +import org.apache.gossip.manager.handlers.MessageInvoker; + +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class GossipManagerBuilder { + + public static ManagerBuilder newBuilder() { + return new ManagerBuilder(); + } + + public static final class ManagerBuilder { + private String cluster; + private URI uri; + private String id; + private GossipSettings settings; + private List<Member> gossipMembers; + private GossipListener listener; + private MetricRegistry registry; + private Map<String,String> properties; + private ObjectMapper objectMapper; + private MessageInvoker messageInvoker; + + private ManagerBuilder() {} + + private void checkArgument(boolean check, String msg) { + if (!check) { + throw new IllegalArgumentException(msg); + } + } + + public ManagerBuilder cluster(String cluster) { + this.cluster = cluster; + return this; + } + + public ManagerBuilder properties(Map<String,String> properties) { + this.properties = properties; + return this; + } + + public ManagerBuilder id(String id) { + this.id = id; + return this; + } + + public ManagerBuilder gossipSettings(GossipSettings settings) { + this.settings = settings; + return this; + } + + public ManagerBuilder startupSettings(StartupSettings startupSettings) { + this.cluster = startupSettings.getCluster(); + this.id = startupSettings.getId(); + this.settings = startupSettings.getGossipSettings(); + this.gossipMembers = startupSettings.getGossipMembers(); + this.uri = startupSettings.getUri(); + return this; + } + + public ManagerBuilder gossipMembers(List<Member> members) { + this.gossipMembers = members; + return this; + } + + public ManagerBuilder listener(GossipListener listener) { + this.listener = listener; + return this; + } + + public ManagerBuilder registry(MetricRegistry registry) { + this.registry = registry; + return this; + } + + public ManagerBuilder uri(URI uri){ + this.uri = uri; + return this; + } + + public ManagerBuilder mapper(ObjectMapper objectMapper){ + this.objectMapper = objectMapper; + return this; + } + + public ManagerBuilder messageInvoker(MessageInvoker messageInvoker) { + this.messageInvoker = messageInvoker; + return this; + } + + public GossipManager build() { + checkArgument(id != null, "You must specify an id"); + checkArgument(cluster != null, "You must specify a cluster name"); + checkArgument(settings != null, "You must specify gossip settings"); + checkArgument(uri != null, "You must specify a uri"); + if (registry == null){ + registry = new MetricRegistry(); + } + if (properties == null){ + properties = new HashMap<String,String>(); + } + if (listener == null){ + listener((a,b) -> {}); + } + if (gossipMembers == null) { + gossipMembers = new ArrayList<>(); + } + if (objectMapper == null) { + objectMapper = new ObjectMapper(); + objectMapper.enableDefaultTyping(); + objectMapper.registerModule(new CrdtModule()); + objectMapper.configure(Feature.WRITE_NUMBERS_AS_STRINGS, false); + } + if (messageInvoker == null) { + messageInvoker = new DefaultMessageInvoker(); + } + return new GossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageInvoker) {} ; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java new file mode 100644 index 0000000..1836309 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gossip.manager; + +import org.apache.gossip.GossipSettings; +import org.apache.gossip.LocalMember; +import org.apache.gossip.event.GossipListener; +import org.apache.gossip.event.GossipState; +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.model.ShutdownMessage; +import org.apache.log4j.Logger; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; + +public class GossipMemberStateRefresher implements Runnable { + public static final Logger LOGGER = Logger.getLogger(GossipMemberStateRefresher.class); + + private final Map<LocalMember, GossipState> members; + private final GossipSettings settings; + private final GossipListener listener; + private final Clock clock; + private final BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData; + + public GossipMemberStateRefresher(Map<LocalMember, GossipState> members, GossipSettings settings, + GossipListener listener, BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData) { + this.members = members; + this.settings = settings; + this.listener = listener; + this.findPerNodeGossipData = findPerNodeGossipData; + clock = new SystemClock(); + } + + public void run() { + try { + runOnce(); + } catch (RuntimeException ex) { + LOGGER.warn("scheduled state had exception", ex); + } + } + + public void runOnce() { + for (Entry<LocalMember, GossipState> entry : members.entrySet()) { + boolean userDown = processOptimisticShutdown(entry); + if (userDown) + continue; + + Double phiMeasure = entry.getKey().detect(clock.nanoTime()); + GossipState requiredState; + + if (phiMeasure != null) { + requiredState = calcRequiredState(phiMeasure); + } else { + requiredState = calcRequiredStateCleanupInterval(entry.getKey(), entry.getValue()); + } + + if (entry.getValue() != requiredState) { + members.put(entry.getKey(), requiredState); + listener.gossipEvent(entry.getKey(), requiredState); + } + } + } + + public GossipState calcRequiredState(Double phiMeasure) { + if (phiMeasure > settings.getConvictThreshold()) + return GossipState.DOWN; + else + return GossipState.UP; + } + + public GossipState calcRequiredStateCleanupInterval(LocalMember member, GossipState state) { + long now = clock.nanoTime(); + long nowInMillis = TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS); + if (nowInMillis - settings.getCleanupInterval() > member.getHeartbeat()) { + return GossipState.DOWN; + } else { + return state; + } + } + + /** + * If we have a special key the per-node data that means that the node has sent us + * a pre-emptive shutdown message. We process this so node is seen down sooner + * + * @param l member to consider + * @return true if node forced down + */ + public boolean processOptimisticShutdown(Entry<LocalMember, GossipState> l) { + PerNodeDataMessage m = findPerNodeGossipData.apply(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY); + if (m == null) { + return false; + } + ShutdownMessage s = (ShutdownMessage) m.getPayload(); + if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()) { + members.put(l.getKey(), GossipState.DOWN); + if (l.getValue() == GossipState.UP) { + listener.gossipEvent(l.getKey(), GossipState.DOWN); + } + return true; + } + return false; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipConstants.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipConstants.java b/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipConstants.java new file mode 100644 index 0000000..3bcc344 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipConstants.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +public interface PassiveGossipConstants { + String SIGNED_MESSAGE = "gossip.passive.signed_message"; + String UNSIGNED_MESSAGE = "gossip.passive.unsigned_message"; +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java new file mode 100644 index 0000000..ae28bf7 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.SocketException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.gossip.model.Base; +import org.apache.gossip.model.SignedPayload; +import org.apache.log4j.Logger; + +import com.codahale.metrics.Meter; + +/** + * This class handles the passive cycle, + * where this client has received an incoming message. + */ +abstract public class PassiveGossipThread implements Runnable { + + public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class); + + /** The socket used for the passive thread of the gossip service. */ + private final DatagramSocket server; + private final AtomicBoolean keepRunning; + private final GossipCore gossipCore; + private final GossipManager gossipManager; + private final Meter signed; + private final Meter unsigned; + + public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) { + this.gossipManager = gossipManager; + this.gossipCore = gossipCore; + if (gossipManager.getMyself().getClusterName() == null){ + throw new IllegalArgumentException("Cluster was null"); + } + try { + SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(), + gossipManager.getMyself().getUri().getPort()); + server = new DatagramSocket(socketAddress); + } catch (SocketException ex) { + LOGGER.warn(ex); + throw new RuntimeException(ex); + } + keepRunning = new AtomicBoolean(true); + signed = gossipManager.getRegistry().meter(PassiveGossipConstants.SIGNED_MESSAGE); + unsigned = gossipManager.getRegistry().meter(PassiveGossipConstants.UNSIGNED_MESSAGE); + } + + @Override + public void run() { + while (keepRunning.get()) { + try { + byte[] buf = new byte[server.getReceiveBufferSize()]; + DatagramPacket p = new DatagramPacket(buf, buf.length); + server.receive(p); + debug(p.getData()); + try { + Base activeGossipMessage = gossipManager.getObjectMapper().readValue(p.getData(), Base.class); + if (activeGossipMessage instanceof SignedPayload){ + SignedPayload s = (SignedPayload) activeGossipMessage; + Base nested = gossipManager.getObjectMapper().readValue(s.getData(), Base.class); + gossipCore.receive(nested); + signed.mark(); + } else { + gossipCore.receive(activeGossipMessage); + unsigned.mark(); + } + gossipManager.getMemberStateRefresher().run(); + } catch (RuntimeException ex) {//TODO trap json exception + LOGGER.error("Unable to process message", ex); + } + } catch (IOException e) { + LOGGER.error(e); + keepRunning.set(false); + } + } + shutdown(); + } + + private void debug(byte[] jsonBytes) { + if (LOGGER.isDebugEnabled()){ + String receivedMessage = new String(jsonBytes); + LOGGER.debug("Received message ( bytes): " + receivedMessage); + } + } + + public void shutdown() { + try { + server.close(); + } catch (RuntimeException ex) { + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java b/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java new file mode 100644 index 0000000..7e42562 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.NavigableSet; + +import org.apache.gossip.LocalMember; +import org.apache.log4j.Logger; + +public class RingStatePersister implements Runnable { + + private static final Logger LOGGER = Logger.getLogger(RingStatePersister.class); + private GossipManager parent; + + public RingStatePersister(GossipManager parent){ + this.parent = parent; + } + + @Override + public void run() { + writeToDisk(); + } + + File computeTarget(){ + return new File(parent.getSettings().getPathToRingState(), "ringstate." + parent.getMyself().getClusterName() + "." + + parent.getMyself().getId() + ".json"); + } + + void writeToDisk(){ + if (!parent.getSettings().isPersistRingState()){ + return; + } + NavigableSet<LocalMember> i = parent.getMembers().keySet(); + try (FileOutputStream fos = new FileOutputStream(computeTarget())){ + parent.getObjectMapper().writeValue(fos, i); + } catch (IOException e) { + LOGGER.debug(e); + } + } + + @SuppressWarnings("unchecked") + List<LocalMember> readFromDisk(){ + if (!parent.getSettings().isPersistRingState()){ + return Collections.emptyList(); + } + try (FileInputStream fos = new FileInputStream(computeTarget())){ + return parent.getObjectMapper().readValue(fos, ArrayList.class); + } catch (IOException e) { + LOGGER.debug(e); + } + return Collections.emptyList(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java b/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java new file mode 100644 index 0000000..e47fe2a --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.gossip.LocalMember; + +import com.codahale.metrics.MetricRegistry; + +/** + * Base implementation gossips randomly to live nodes periodically gossips to dead ones + * + */ +public class SimpleActiveGossipper extends AbstractActiveGossiper { + + private ScheduledExecutorService scheduledExecutorService; + private final BlockingQueue<Runnable> workQueue; + private ThreadPoolExecutor threadService; + + public SimpleActiveGossipper(GossipManager gossipManager, GossipCore gossipCore, + MetricRegistry registry) { + super(gossipManager, gossipCore, registry); + scheduledExecutorService = Executors.newScheduledThreadPool(2); + workQueue = new ArrayBlockingQueue<Runnable>(1024); + threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue, + new ThreadPoolExecutor.DiscardOldestPolicy()); + } + + @Override + public void init() { + super.init(); + scheduledExecutorService.scheduleAtFixedRate(() -> { + threadService.execute(() -> { + sendToALiveMember(); + }); + }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); + scheduledExecutorService.scheduleAtFixedRate(() -> { + sendToDeadMember(); + }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); + scheduledExecutorService.scheduleAtFixedRate( + () -> sendPerNodeData(gossipManager.getMyself(), + selectPartner(gossipManager.getLiveMembers())), + 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); + scheduledExecutorService.scheduleAtFixedRate( + () -> sendSharedData(gossipManager.getMyself(), + selectPartner(gossipManager.getLiveMembers())), + 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); + } + + @Override + public void shutdown() { + super.shutdown(); + scheduledExecutorService.shutdown(); + try { + scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.debug("Issue during shutdown", e); + } + sendShutdownMessage(); + threadService.shutdown(); + try { + threadService.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.debug("Issue during shutdown", e); + } + } + + protected void sendToALiveMember(){ + LocalMember member = selectPartner(gossipManager.getLiveMembers()); + sendMembershipList(gossipManager.getMyself(), member); + } + + protected void sendToDeadMember(){ + LocalMember member = selectPartner(gossipManager.getDeadMembers()); + sendMembershipList(gossipManager.getMyself(), member); + } + + /** + * sends an optimistic shutdown message to several clusters nodes + */ + protected void sendShutdownMessage(){ + List<LocalMember> l = gossipManager.getLiveMembers(); + int sendTo = l.size() < 3 ? 1 : l.size() / 2; + for (int i = 0; i < sendTo; i++) { + threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l))); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/SystemClock.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/SystemClock.java b/gossip-base/src/main/java/org/apache/gossip/manager/SystemClock.java new file mode 100644 index 0000000..04a7080 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/manager/SystemClock.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +public class SystemClock implements Clock { + + @Override + public long currentTimeMillis() { + return System.currentTimeMillis(); + } + + @Override + public long nanoTime() { + return System.nanoTime(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/UserDataPersister.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/UserDataPersister.java b/gossip-base/src/main/java/org/apache/gossip/manager/UserDataPersister.java new file mode 100644 index 0000000..3b9eafa --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/manager/UserDataPersister.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.model.SharedDataMessage; +import org.apache.log4j.Logger; + +public class UserDataPersister implements Runnable { + + private static final Logger LOGGER = Logger.getLogger(UserDataPersister.class); + private final GossipManager parent; + private final GossipCore gossipCore; + + UserDataPersister(GossipManager parent, GossipCore gossipCore){ + this.parent = parent; + this.gossipCore = gossipCore; + } + + File computeSharedTarget(){ + return new File(parent.getSettings().getPathToDataState(), "shareddata." + + parent.getMyself().getClusterName() + "." + parent.getMyself().getId() + ".json"); + } + + File computePerNodeTarget() { + return new File(parent.getSettings().getPathToDataState(), "pernodedata." + + parent.getMyself().getClusterName() + "." + parent.getMyself().getId() + ".json"); + } + + @SuppressWarnings("unchecked") + ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> readPerNodeFromDisk(){ + if (!parent.getSettings().isPersistDataState()){ + return new ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>>(); + } + try (FileInputStream fos = new FileInputStream(computePerNodeTarget())){ + return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class); + } catch (IOException e) { + LOGGER.debug(e); + } + return new ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>>(); + } + + void writePerNodeToDisk(){ + if (!parent.getSettings().isPersistDataState()){ + return; + } + try (FileOutputStream fos = new FileOutputStream(computePerNodeTarget())){ + parent.getObjectMapper().writeValue(fos, gossipCore.getPerNodeData()); + } catch (IOException e) { + LOGGER.warn(e); + } + } + + void writeSharedToDisk(){ + if (!parent.getSettings().isPersistDataState()){ + return; + } + try (FileOutputStream fos = new FileOutputStream(computeSharedTarget())){ + parent.getObjectMapper().writeValue(fos, gossipCore.getSharedData()); + } catch (IOException e) { + LOGGER.warn(e); + } + } + + @SuppressWarnings("unchecked") + ConcurrentHashMap<String, SharedDataMessage> readSharedDataFromDisk(){ + if (!parent.getSettings().isPersistRingState()){ + return new ConcurrentHashMap<String, SharedDataMessage>(); + } + try (FileInputStream fos = new FileInputStream(computeSharedTarget())){ + return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class); + } catch (IOException e) { + LOGGER.debug(e); + } + return new ConcurrentHashMap<String, SharedDataMessage>(); + } + + /** + * Writes all pernode and shared data to disk + */ + @Override + public void run() { + writePerNodeToDisk(); + writeSharedToDisk(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java new file mode 100644 index 0000000..f5e568e --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager.handlers; + +import org.apache.gossip.Member; +import org.apache.gossip.RemoteMember; +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.Base; +import org.apache.gossip.udp.UdpActiveGossipMessage; +import org.apache.gossip.udp.UdpActiveGossipOk; +import org.apache.gossip.udp.UdpNotAMemberFault; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; + +public class ActiveGossipMessageHandler implements MessageHandler { + @Override + public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + List<Member> remoteGossipMembers = new ArrayList<>(); + RemoteMember senderMember = null; + UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base; + for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) { + URI u; + try { + u = new URI(activeGossipMessage.getMembers().get(i).getUri()); + } catch (URISyntaxException e) { + GossipCore.LOGGER.debug("Gossip message with faulty URI", e); + continue; + } + RemoteMember member = new RemoteMember( + activeGossipMessage.getMembers().get(i).getCluster(), + u, + activeGossipMessage.getMembers().get(i).getId(), + activeGossipMessage.getMembers().get(i).getHeartbeat(), + activeGossipMessage.getMembers().get(i).getProperties()); + if (i == 0) { + senderMember = member; + } + if (!(member.getClusterName().equals(gossipManager.getMyself().getClusterName()))) { + UdpNotAMemberFault f = new UdpNotAMemberFault(); + f.setException("Not a member of this cluster " + i); + f.setUriFrom(activeGossipMessage.getUriFrom()); + f.setUuid(activeGossipMessage.getUuid()); + GossipCore.LOGGER.warn(f); + gossipCore.sendOneWay(f, member.getUri()); + continue; + } + remoteGossipMembers.add(member); + } + UdpActiveGossipOk o = new UdpActiveGossipOk(); + o.setUriFrom(activeGossipMessage.getUriFrom()); + o.setUuid(activeGossipMessage.getUuid()); + gossipCore.sendOneWay(o, senderMember.getUri()); + gossipCore.mergeLists(gossipManager, senderMember, remoteGossipMembers); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/298b1ae3/gossip-base/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java new file mode 100644 index 0000000..5b78ce3 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gossip.manager.handlers; + +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.*; + +public class DefaultMessageInvoker implements MessageInvoker { + private final MessageInvokerCombiner mic; + + public DefaultMessageInvoker() { + mic = new MessageInvokerCombiner(); + mic.add(new SimpleMessageInvoker(Response.class, new ResponseHandler())); + mic.add(new SimpleMessageInvoker(ShutdownMessage.class, new ShutdownMessageHandler())); + mic.add(new SimpleMessageInvoker(PerNodeDataMessage.class, new PerNodeDataMessageHandler())); + mic.add(new SimpleMessageInvoker(SharedDataMessage.class, new SharedDataMessageHandler())); + mic.add(new SimpleMessageInvoker(ActiveGossipMessage.class, new ActiveGossipMessageHandler())); + } + + public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + return mic.invoke(gossipCore, gossipManager, base); + } +}
