Repository: incubator-gossip Updated Branches: refs/heads/master 95cce48a8 -> 94da0bb64
GOSSIP-88 Data Replication Control Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/94da0bb6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/94da0bb6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/94da0bb6 Branch: refs/heads/master Commit: 94da0bb64303fdb4de352395f878835ed792a1e1 Parents: 95cce48 Author: Mirage Abeysekara <mirage...@cse.mrt.ac.lk> Authored: Wed Jun 21 23:49:39 2017 +0530 Committer: Mirage Abeysekara <mirage...@cse.mrt.ac.lk> Committed: Tue Jul 18 23:25:52 2017 +0530 ---------------------------------------------------------------------- .../java/org/apache/gossip/crdt/CrdtModule.java | 29 +++ .../gossip/manager/AbstractActiveGossiper.java | 10 + .../apache/gossip/model/PerNodeDataMessage.java | 17 +- .../apache/gossip/model/SharedDataMessage.java | 18 +- .../gossip/replication/AllReplicable.java | 36 ++++ .../gossip/replication/BlackListReplicable.java | 52 +++++ .../replication/DataCenterReplicable.java | 46 ++++ .../gossip/replication/NotReplicable.java | 35 +++ .../apache/gossip/replication/Replicable.java | 38 ++++ .../gossip/replication/WhiteListReplicable.java | 52 +++++ .../gossip/udp/UdpPerNodeDataMessage.java | 3 +- .../apache/gossip/udp/UdpSharedDataMessage.java | 3 +- .../apache/gossip/AbstractIntegrationBase.java | 34 ++- .../gossip/replication/DataReplicationTest.java | 210 ++++++++++++++++++ .../PerNodeDataReplicationControlTest.java | 207 ++++++++++++++++++ .../SharedDataReplicationControlTest.java | 212 +++++++++++++++++++ 16 files changed, 993 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/94da0bb6/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java index ab5cefa..396ec03 100644 --- a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java +++ b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java @@ -20,9 +20,15 @@ package org.apache.gossip.crdt; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.core.Version; import com.fasterxml.jackson.databind.module.SimpleModule; +import org.apache.gossip.LocalMember; +import org.apache.gossip.replication.BlackListReplicable; +import org.apache.gossip.replication.Replicable; +import org.apache.gossip.replication.WhiteListReplicable; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -81,6 +87,26 @@ abstract class PNCounterMixin { @JsonProperty("n-counters") abstract Map<String, Long> getNCounters(); } +@JsonTypeInfo( + use = JsonTypeInfo.Id.CLASS, + include = JsonTypeInfo.As.PROPERTY, + property = "type") +abstract class ReplicableMixin { + +} + +abstract class WhiteListReplicableMixin { + @JsonCreator + WhiteListReplicableMixin(@JsonProperty("whiteListMembers") List<LocalMember> whiteListMembers) { } + @JsonProperty("whiteListMembers") abstract List<LocalMember> getWhiteListMembers(); +} + +abstract class BlackListReplicableMixin { + @JsonCreator + BlackListReplicableMixin(@JsonProperty("blackListMembers") List<LocalMember> blackListMembers) { } + @JsonProperty("blackListMembers") abstract List<LocalMember> getBlackListMembers(); +} + //If anyone wants to take a stab at this. please have at it //https://github.com/FasterXML/jackson-datatype-guava/blob/master/src/main/java/com/fasterxml/jackson/datatype/guava/ser/MultimapSerializer.java public class CrdtModule extends SimpleModule { @@ -101,6 +127,9 @@ public class CrdtModule extends SimpleModule { context.setMixInAnnotations(LwwSet.Timestamps.class, LWWSetTimestampsMixin.class); context.setMixInAnnotations(MaxChangeSet.class, MaxChangeSetMixin.class); context.setMixInAnnotations(TwoPhaseSet.class, TwoPhaseSetMixin.class); + context.setMixInAnnotations(Replicable.class, ReplicableMixin.class); + context.setMixInAnnotations(WhiteListReplicable.class, WhiteListReplicableMixin.class); + context.setMixInAnnotations(BlackListReplicable.class, BlackListReplicableMixin.class); } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/94da0bb6/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java b/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java index b73550e..adf2530 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java @@ -85,6 +85,10 @@ public abstract class AbstractActiveGossiper { } long startTime = System.currentTimeMillis(); for (Entry<String, SharedDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){ + if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable() + .shouldReplicate(me, member, innerEntry.getValue())) { + continue; + } UdpSharedDataMessage message = new UdpSharedDataMessage(); message.setUuid(UUID.randomUUID().toString()); message.setUriFrom(me.getId()); @@ -93,6 +97,7 @@ public abstract class AbstractActiveGossiper { message.setNodeId(innerEntry.getValue().getNodeId()); message.setTimestamp(innerEntry.getValue().getTimestamp()); message.setPayload(innerEntry.getValue().getPayload()); + message.setReplicable(innerEntry.getValue().getReplicable()); gossipCore.sendOneWay(message, member.getUri()); } sharedDataHistogram.update(System.currentTimeMillis() - startTime); @@ -105,6 +110,10 @@ public abstract class AbstractActiveGossiper { long startTime = System.currentTimeMillis(); for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){ for (Entry<String, PerNodeDataMessage> innerEntry : entry.getValue().entrySet()){ + if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable() + .shouldReplicate(me, member, innerEntry.getValue())) { + continue; + } UdpPerNodeDataMessage message = new UdpPerNodeDataMessage(); message.setUuid(UUID.randomUUID().toString()); message.setUriFrom(me.getId()); @@ -113,6 +122,7 @@ public abstract class AbstractActiveGossiper { message.setNodeId(innerEntry.getValue().getNodeId()); message.setTimestamp(innerEntry.getValue().getTimestamp()); message.setPayload(innerEntry.getValue().getPayload()); + message.setReplicable(innerEntry.getValue().getReplicable()); gossipCore.sendOneWay(message, member.getUri()); } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/94da0bb6/gossip-base/src/main/java/org/apache/gossip/model/PerNodeDataMessage.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/model/PerNodeDataMessage.java b/gossip-base/src/main/java/org/apache/gossip/model/PerNodeDataMessage.java index 2d1cdef..2394e76 100644 --- a/gossip-base/src/main/java/org/apache/gossip/model/PerNodeDataMessage.java +++ b/gossip-base/src/main/java/org/apache/gossip/model/PerNodeDataMessage.java @@ -17,6 +17,8 @@ */ package org.apache.gossip.model; +import org.apache.gossip.replication.Replicable; + public class PerNodeDataMessage extends Base { private String nodeId; @@ -24,7 +26,8 @@ public class PerNodeDataMessage extends Base { private Object payload; private Long timestamp; private Long expireAt; - + private Replicable<PerNodeDataMessage> replicable; + public String getNodeId() { return nodeId; } @@ -55,10 +58,20 @@ public class PerNodeDataMessage extends Base { public void setExpireAt(Long expireAt) { this.expireAt = expireAt; } + + public Replicable<PerNodeDataMessage> getReplicable() { + return replicable; + } + + public void setReplicable(Replicable<PerNodeDataMessage> replicable) { + this.replicable = replicable; + } + @Override public String toString() { return "GossipDataMessage [nodeId=" + nodeId + ", key=" + key + ", payload=" + payload - + ", timestamp=" + timestamp + ", expireAt=" + expireAt + "]"; + + ", timestamp=" + timestamp + ", expireAt=" + expireAt + + ", replicable=" + replicable + "]"; } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/94da0bb6/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java b/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java index e423be8..4b1a1ea 100644 --- a/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java +++ b/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java @@ -17,6 +17,9 @@ */ package org.apache.gossip.model; +import org.apache.gossip.replication.AllReplicable; +import org.apache.gossip.replication.Replicable; + public class SharedDataMessage extends Base { private String nodeId; @@ -24,6 +27,7 @@ public class SharedDataMessage extends Base { private Object payload; private Long timestamp; private Long expireAt; + private Replicable<SharedDataMessage> replicable; public String getNodeId() { return nodeId; @@ -55,10 +59,20 @@ public class SharedDataMessage extends Base { public void setExpireAt(Long expireAt) { this.expireAt = expireAt; } + + public Replicable<SharedDataMessage> getReplicable() { + return replicable; + } + + public void setReplicable(Replicable<SharedDataMessage> replicable) { + this.replicable = replicable; + } + @Override public String toString() { return "SharedGossipDataMessage [nodeId=" + nodeId + ", key=" + key + ", payload=" + payload - + ", timestamp=" + timestamp + ", expireAt=" + expireAt + "]"; - } + + ", timestamp=" + timestamp + ", expireAt=" + expireAt + + ", replicable=" + replicable + "]"; + } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/94da0bb6/gossip-base/src/main/java/org/apache/gossip/replication/AllReplicable.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/replication/AllReplicable.java b/gossip-base/src/main/java/org/apache/gossip/replication/AllReplicable.java new file mode 100644 index 0000000..573fd25 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/replication/AllReplicable.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.replication; + +import org.apache.gossip.LocalMember; +import org.apache.gossip.model.Base; + +/** + * Replicable implementation which replicates data to any node. This is the default replication + * strategy if a data item not specified its replication behaviour. + * + * @param <T> A subtype of the class {@link org.apache.gossip.model.Base} which uses this interface + * @see Replicable + */ +public class AllReplicable<T extends Base> implements Replicable<T> { + + @Override + public boolean shouldReplicate(LocalMember me, LocalMember destination, T message) { + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/94da0bb6/gossip-base/src/main/java/org/apache/gossip/replication/BlackListReplicable.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/replication/BlackListReplicable.java b/gossip-base/src/main/java/org/apache/gossip/replication/BlackListReplicable.java new file mode 100644 index 0000000..33e1706 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/replication/BlackListReplicable.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.replication; + +import org.apache.gossip.LocalMember; +import org.apache.gossip.model.Base; + +import java.util.ArrayList; +import java.util.List; + +/** + * Replicable implementation which does not replicate data to given set of nodes. + * + * @param <T> A subtype of the class {@link org.apache.gossip.model.Base} which uses this interface + * @see Replicable + */ +public class BlackListReplicable<T extends Base> implements Replicable<T> { + + private final List<LocalMember> blackListMembers; + + public BlackListReplicable(List<LocalMember> blackListMembers) { + if (blackListMembers == null) { + this.blackListMembers = new ArrayList<>(); + } else { + this.blackListMembers = blackListMembers; + } + } + + public List<LocalMember> getBlackListMembers() { + return blackListMembers; + } + + @Override + public boolean shouldReplicate(LocalMember me, LocalMember destination, T message) { + return !blackListMembers.contains(destination); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/94da0bb6/gossip-base/src/main/java/org/apache/gossip/replication/DataCenterReplicable.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/replication/DataCenterReplicable.java b/gossip-base/src/main/java/org/apache/gossip/replication/DataCenterReplicable.java new file mode 100644 index 0000000..1067c49 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/replication/DataCenterReplicable.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.replication; + +import org.apache.gossip.LocalMember; +import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper; +import org.apache.gossip.model.Base; + +/** + * Replicable implementation which does replicate data only in the same data center. + * + * @param <T> A subtype of the class {@link org.apache.gossip.model.Base} which uses this interface + * @see Replicable + */ +public class DataCenterReplicable<T extends Base> implements Replicable<T> { + + @Override + public boolean shouldReplicate(LocalMember me, LocalMember destination, T message) { + if (!me.getProperties().containsKey(DatacenterRackAwareActiveGossiper.DATACENTER)) { + // replicate to others if I am not belong to any data center + return true; + } else if (!destination.getProperties() + .containsKey(DatacenterRackAwareActiveGossiper.DATACENTER)) { + // Do not replicate if the destination data center is not defined + return false; + } else { + return me.getProperties().get(DatacenterRackAwareActiveGossiper.DATACENTER) + .equals(destination.getProperties().get(DatacenterRackAwareActiveGossiper.DATACENTER)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/94da0bb6/gossip-base/src/main/java/org/apache/gossip/replication/NotReplicable.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/replication/NotReplicable.java b/gossip-base/src/main/java/org/apache/gossip/replication/NotReplicable.java new file mode 100644 index 0000000..c3fa538 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/replication/NotReplicable.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.replication; + +import org.apache.gossip.LocalMember; +import org.apache.gossip.model.Base; + +/** + * Replicable implementation which never replicates data on any node + * + * @param <T> A subtype of the class {@link org.apache.gossip.model.Base} which uses this interface + * @see Replicable + */ +public class NotReplicable<T extends Base> implements Replicable<T> { + + @Override + public boolean shouldReplicate(LocalMember me, LocalMember destination, T message) { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/94da0bb6/gossip-base/src/main/java/org/apache/gossip/replication/Replicable.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/replication/Replicable.java b/gossip-base/src/main/java/org/apache/gossip/replication/Replicable.java new file mode 100644 index 0000000..68098df --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/replication/Replicable.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.replication; + +import org.apache.gossip.LocalMember; +import org.apache.gossip.model.Base; + +/** + * This interface is used to determine whether a data item needs to be replicated to + * another gossip member. + * + * @param <T> A subtype of the class {@link org.apache.gossip.model.Base} which uses this interface + */ +public interface Replicable<T extends Base> { + /** + * Test for a given data item needs to be replicated. + * @param me node that the data item is going to transmit from. + * @param destination target node to replicate. + * @param message this parameter is currently ignored + * @return true if the data item needs to be replicated to the destination. Otherwise false. + */ + boolean shouldReplicate(LocalMember me, LocalMember destination, T message); +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/94da0bb6/gossip-base/src/main/java/org/apache/gossip/replication/WhiteListReplicable.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/replication/WhiteListReplicable.java b/gossip-base/src/main/java/org/apache/gossip/replication/WhiteListReplicable.java new file mode 100644 index 0000000..299d929 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/replication/WhiteListReplicable.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.replication; + +import org.apache.gossip.LocalMember; +import org.apache.gossip.model.Base; + +import java.util.ArrayList; +import java.util.List; + +/** + * Replicable implementation which replicates data to given set of nodes. + * + * @param <T> A subtype of the class {@link org.apache.gossip.model.Base} which uses this interface + * @see Replicable + */ +public class WhiteListReplicable<T extends Base> implements Replicable<T> { + + private final List<LocalMember> whiteListMembers; + + public WhiteListReplicable(List<LocalMember> whiteListMembers) { + if (whiteListMembers == null) { + this.whiteListMembers = new ArrayList<>(); + } else { + this.whiteListMembers = whiteListMembers; + } + } + + public List<LocalMember> getWhiteListMembers() { + return whiteListMembers; + } + + @Override + public boolean shouldReplicate(LocalMember me, LocalMember destination, T message) { + return whiteListMembers.contains(destination); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/94da0bb6/gossip-base/src/main/java/org/apache/gossip/udp/UdpPerNodeDataMessage.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/udp/UdpPerNodeDataMessage.java b/gossip-base/src/main/java/org/apache/gossip/udp/UdpPerNodeDataMessage.java index 6eb170a..9ba1e85 100644 --- a/gossip-base/src/main/java/org/apache/gossip/udp/UdpPerNodeDataMessage.java +++ b/gossip-base/src/main/java/org/apache/gossip/udp/UdpPerNodeDataMessage.java @@ -42,7 +42,8 @@ public class UdpPerNodeDataMessage extends PerNodeDataMessage implements Trackab @Override public String toString() { - return "UdpGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + "]"; + return "UdpGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + + ", getReplicable()=" + getReplicable() + "]"; } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/94da0bb6/gossip-base/src/main/java/org/apache/gossip/udp/UdpSharedDataMessage.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/udp/UdpSharedDataMessage.java b/gossip-base/src/main/java/org/apache/gossip/udp/UdpSharedDataMessage.java index 1658503..0059bdb 100644 --- a/gossip-base/src/main/java/org/apache/gossip/udp/UdpSharedDataMessage.java +++ b/gossip-base/src/main/java/org/apache/gossip/udp/UdpSharedDataMessage.java @@ -44,7 +44,8 @@ public class UdpSharedDataMessage extends SharedDataMessage implements Trackable public String toString() { return "UdpSharedGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + ", getNodeId()=" + getNodeId() + ", getKey()=" + getKey() + ", getPayload()=" + getPayload() - + ", getTimestamp()=" + getTimestamp() + ", getExpireAt()=" + getExpireAt() + "]"; + + ", getTimestamp()=" + getTimestamp() + ", getExpireAt()=" + getExpireAt() + + ", getReplicable()=" + getReplicable() + "]"; } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/94da0bb6/gossip-base/src/test/java/org/apache/gossip/AbstractIntegrationBase.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/AbstractIntegrationBase.java b/gossip-base/src/test/java/org/apache/gossip/AbstractIntegrationBase.java index 896157f..d074706 100644 --- a/gossip-base/src/test/java/org/apache/gossip/AbstractIntegrationBase.java +++ b/gossip-base/src/test/java/org/apache/gossip/AbstractIntegrationBase.java @@ -18,24 +18,52 @@ package org.apache.gossip; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; import org.junit.After; import org.junit.Before; public abstract class AbstractIntegrationBase { - List <GossipManager> nodes = new ArrayList<GossipManager>(); + List <GossipManager> nodes = new ArrayList<>(); public void register(GossipManager manager){ nodes.add(manager); } - + + public void generateStandardNodes(final int memberCount) throws URISyntaxException { + if(nodes.size() > 0){ + after(); + nodes.clear(); + } + GossipSettings settings = new GossipSettings(); + settings.setPersistRingState(false); + settings.setPersistDataState(false); + String cluster = UUID.randomUUID().toString(); + int seedNodes = 1; + List<Member> startupMembers = new ArrayList<>(); + for (int i = 1; i < seedNodes + 1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + startupMembers.add(new RemoteMember(cluster, uri, i + "")); + } + + for (int i = 1; i < memberCount + 1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri) + .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build(); + gossipService.init(); + register(gossipService); + } + } @Before public void before(){ - nodes = new ArrayList<GossipManager>(); + nodes = new ArrayList<>(); } @After http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/94da0bb6/gossip-base/src/test/java/org/apache/gossip/replication/DataReplicationTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/replication/DataReplicationTest.java b/gossip-base/src/test/java/org/apache/gossip/replication/DataReplicationTest.java new file mode 100644 index 0000000..dd073a8 --- /dev/null +++ b/gossip-base/src/test/java/org/apache/gossip/replication/DataReplicationTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.replication; + +import org.apache.gossip.LocalMember; +import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper; +import org.apache.gossip.model.SharedDataMessage; +import org.junit.Assert; +import org.junit.Test; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@RunWith(JUnitPlatform.class) +public class DataReplicationTest { + + @Test + public void dataReplicateAllTest() throws URISyntaxException { + SharedDataMessage message = getSharedNodeData("public","public", new AllReplicable<>()); + LocalMember me = getLocalMember(new URI("udp://127.0.0.1:8001"),"1"); + LocalMember member = getLocalMember(new URI("udp://127.0.0.1:8002"),"2"); + Assert.assertEquals(true, message.getReplicable().shouldReplicate(me, member, message)); + } + + @Test + public void dataReplicateNoneTest() throws URISyntaxException { + SharedDataMessage message = getSharedNodeData("private","private", new NotReplicable<>()); + LocalMember me = getLocalMember(new URI("udp://127.0.0.1:8001"),"1"); + LocalMember member = getLocalMember(new URI("udp://127.0.0.1:8002"),"2"); + Assert.assertEquals(false, message.getReplicable().shouldReplicate(me, member, message)); + } + + @Test + public void dataReplicateWhiteListTest() throws URISyntaxException { + List<LocalMember> memberList = new ArrayList<>(); + memberList.add(getLocalMember(new URI("udp://127.0.0.1:8001"),"1")); + memberList.add(getLocalMember(new URI("udp://127.0.0.1:8002"),"2")); + memberList.add(getLocalMember(new URI("udp://127.0.0.1:8003"),"3")); + // add node 1 and 2 to the white list + List<LocalMember> whiteList = new ArrayList<>(); + whiteList.add(memberList.get(0)); + whiteList.add(memberList.get(1)); + + SharedDataMessage message = getSharedNodeData("whiteList", "Only allow some nodes", + new WhiteListReplicable<>(whiteList)); + LocalMember me = getLocalMember(new URI("udp://127.0.0.1:8004"),"4"); + + // data should replicate to node 1 and 2 but not 3 + Assert.assertEquals(true, + message.getReplicable().shouldReplicate(me, memberList.get(0), message)); + Assert.assertEquals(true, + message.getReplicable().shouldReplicate(me, memberList.get(1), message)); + Assert.assertEquals(false, + message.getReplicable().shouldReplicate(me, memberList.get(2), message)); + } + + @Test + public void dataReplicateWhiteListNullTest() throws URISyntaxException { + List<LocalMember> memberList = new ArrayList<>(); + memberList.add(getLocalMember(new URI("udp://127.0.0.1:8001"),"1")); + memberList.add(getLocalMember(new URI("udp://127.0.0.1:8002"),"2")); + + SharedDataMessage message = getSharedNodeData("whiteList", "Only allow some nodes", + new WhiteListReplicable<>(null)); + + // data should not replicate if no whitelist specified + Assert.assertEquals(false, + message.getReplicable().shouldReplicate(memberList.get(0), memberList.get(1), message)); + Assert.assertEquals(false, + message.getReplicable().shouldReplicate(memberList.get(1), memberList.get(0), message)); + + } + + @Test + public void dataReplicateBlackListTest() throws URISyntaxException { + List<LocalMember> memberList = new ArrayList<>(); + memberList.add(getLocalMember(new URI("udp://127.0.0.1:8001"),"1")); + memberList.add(getLocalMember(new URI("udp://127.0.0.1:8002"),"2")); + memberList.add(getLocalMember(new URI("udp://127.0.0.1:8003"),"3")); + // add node 1 and 2 to the black list + List<LocalMember> blackList = new ArrayList<>(); + blackList.add(memberList.get(0)); + blackList.add(memberList.get(1)); + + SharedDataMessage message = getSharedNodeData("blackList", "Disallow some nodes", + new BlackListReplicable<>(blackList)); + LocalMember me = getLocalMember(new URI("udp://127.0.0.1:8004"),"4"); + + // data should not replicate to node 1 and 2 + Assert.assertEquals(false, + message.getReplicable().shouldReplicate(me, memberList.get(0), message)); + Assert.assertEquals(false, + message.getReplicable().shouldReplicate(me, memberList.get(1), message)); + Assert.assertEquals(true, + message.getReplicable().shouldReplicate(me, memberList.get(2), message)); + } + + @Test + public void dataReplicateBlackListNullTest() throws URISyntaxException { + + List<LocalMember> memberList = new ArrayList<>(); + memberList.add(getLocalMember(new URI("udp://127.0.0.1:8001"),"1")); + memberList.add(getLocalMember(new URI("udp://127.0.0.1:8002"),"2")); + + SharedDataMessage message = getSharedNodeData("blackList", "Disallow some nodes", + new BlackListReplicable<>(null)); + + // data should replicate if no blacklist specified + Assert.assertEquals(true, + message.getReplicable().shouldReplicate(memberList.get(0), memberList.get(1), message)); + Assert.assertEquals(true, + message.getReplicable().shouldReplicate(memberList.get(1), memberList.get(0), message)); + } + + @Test + public void dataReplicateDataCenterTest() throws URISyntaxException { + + List<LocalMember> memberListDc1 = new ArrayList<>(); + List<LocalMember> memberListDc2 = new ArrayList<>(); + + memberListDc1 + .add(getLocalMemberDc(new URI("udp://10.0.0.1:8000"), "1", "DataCenter1", "Rack1")); + memberListDc1 + .add(getLocalMemberDc(new URI("udp://10.0.0.2:8000"), "2", "DataCenter1", "Rack2")); + memberListDc2 + .add(getLocalMemberDc(new URI("udp://10.0.1.1:8000"), "11", "DataCenter2", "Rack1")); + memberListDc2 + .add(getLocalMemberDc(new URI("udp://10.0.1.2:8000"), "12", "DataCenter2", "Rack2")); + + SharedDataMessage message = getSharedNodeData("datacenter1", "I am in data center 1 rack 1", + new DataCenterReplicable<>()); + + // data should replicate in data center 1 + Assert.assertEquals(true, message.getReplicable() + .shouldReplicate(memberListDc1.get(0), memberListDc1.get(1), message)); + Assert.assertEquals(true, message.getReplicable() + .shouldReplicate(memberListDc2.get(0), memberListDc2.get(1), message)); + + // data should not replicate to data center 2 + Assert.assertEquals(false, message.getReplicable() + .shouldReplicate(memberListDc1.get(0), memberListDc2.get(0), message)); + Assert.assertEquals(false, message.getReplicable() + .shouldReplicate(memberListDc1.get(1), memberListDc2.get(1), message)); + } + + @Test + public void dataReplicateDataCenterUnknownDataCenterTest() throws URISyntaxException { + + List<LocalMember> memberListDc1 = new ArrayList<>(); + memberListDc1 + .add(getLocalMemberDc(new URI("udp://10.0.0.1:8000"), "1", "DataCenter1", "Rack1")); + + Map<String, String> properties = new HashMap<>(); + LocalMember unknownDc = new LocalMember("cluster1", new URI("udp://10.0.1.2:8000"), "12", 0, + properties, 1, 0, ""); + + SharedDataMessage message = getSharedNodeData("datacenter1","I am in data center 1 rack 1", new DataCenterReplicable<>()); + + // data should not replicate from dc1 to unknown node + Assert.assertEquals(false, message.getReplicable() + .shouldReplicate(memberListDc1.get(0), unknownDc, message)); + // data can replicate from unknown node to dc + Assert.assertEquals(true, message.getReplicable() + .shouldReplicate(unknownDc, memberListDc1.get(0), message)); + + } + + private static SharedDataMessage getSharedNodeData(String key, String value, + Replicable<SharedDataMessage> replicable) { + SharedDataMessage g = new SharedDataMessage(); + g.setExpireAt(Long.MAX_VALUE); + g.setKey(key); + g.setPayload(value); + g.setTimestamp(System.currentTimeMillis()); + g.setReplicable(replicable); + return g; + } + + private static LocalMember getLocalMember(URI uri, String id){ + return new LocalMember("cluster1", uri, id, 0, null, 1, 0, ""); + } + + private static LocalMember getLocalMemberDc(URI uri, String id, String dataCenter, String rack){ + Map<String, String> props = new HashMap<>(); + props.put(DatacenterRackAwareActiveGossiper.DATACENTER, dataCenter); + props.put(DatacenterRackAwareActiveGossiper.RACK, rack); + return new LocalMember("cluster1", uri, id, 0, props, 1, 0, ""); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/94da0bb6/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataReplicationControlTest.java ---------------------------------------------------------------------- diff --git a/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataReplicationControlTest.java b/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataReplicationControlTest.java new file mode 100644 index 0000000..e715410 --- /dev/null +++ b/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataReplicationControlTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import io.teknek.tunit.TUnit; +import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.model.SharedDataMessage; +import org.apache.gossip.replication.*; +import org.junit.Assert; +import org.junit.Test; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.*; +import java.util.concurrent.TimeUnit; + +@RunWith(JUnitPlatform.class) +public class PerNodeDataReplicationControlTest extends AbstractIntegrationBase { + + @Test + public void perNodeDataReplicationTest() + throws InterruptedException, UnknownHostException, URISyntaxException { + + generateStandardNodes(3); + + // check whether the members are discovered + TUnit.assertThat(() -> { + int total = 0; + for (GossipManager node : nodes) { + total += node.getLiveMembers().size(); + } + return total; + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); + + // Adding new per node data to Node 1 with default replication (replicate all) + nodes.get(0).gossipPerNodeData(getPerNodeData("public", "I am visible to all", + new AllReplicable<>())); + // Adding new per node data to Node 1 with no replication (replicate none) + nodes.get(0).gossipPerNodeData(getPerNodeData("private", "I am private", + new NotReplicable<>())); + + List<LocalMember> whiteList = new ArrayList<>(); + whiteList.add(nodes.get(1).getMyself()); + // Adding new per node data to Node 1 with white list Node 2 + nodes.get(0).gossipPerNodeData(getPerNodeData("wl", "white list", + new WhiteListReplicable<>(whiteList))); + + List<LocalMember> blackList = new ArrayList<>(); + blackList.add(nodes.get(1).getMyself()); + // Adding new per node data to Node 1 with black list Node 2 + nodes.get(0).gossipPerNodeData(getPerNodeData("bl", "black list", + new BlackListReplicable<>(blackList))); + + // Node 2 and 3 must have the shared data with key 'public' + TUnit.assertThat(() -> { + PerNodeDataMessage message = nodes.get(1).findPerNodeGossipData("1", "public"); + if(message == null){ + return ""; + }else { + return message.getPayload(); + } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am visible to all"); + + TUnit.assertThat(() -> { + PerNodeDataMessage message = nodes.get(2).findPerNodeGossipData("1", "public"); + if(message == null){ + return ""; + }else { + return message.getPayload(); + } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am visible to all"); + + // Node 2 must have shared data with key wl + TUnit.assertThat(() -> { + PerNodeDataMessage message = nodes.get(1).findPerNodeGossipData("1", "wl"); + if(message == null){ + return ""; + }else { + return message.getPayload(); + } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("white list"); + + // Node 3 must have shared data with key bl + TUnit.assertThat(() -> { + PerNodeDataMessage message = nodes.get(2).findPerNodeGossipData("1", "bl"); + if(message == null){ + return ""; + }else { + return message.getPayload(); + } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("black list"); + + } + + @Test + public void perNodeDataDcReplicationTest() + throws InterruptedException, UnknownHostException, URISyntaxException { + + GossipSettings settings = new GossipSettings(); + settings.setPersistRingState(false); + settings.setPersistDataState(false); + String cluster = UUID.randomUUID().toString(); + settings.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName()); + + Map<String, String> gossipProps = new HashMap<>(); + gossipProps.put("sameRackGossipIntervalMs", "500"); + gossipProps.put("differentDatacenterGossipIntervalMs", "1000"); + settings.setActiveGossipProperties(gossipProps); + + RemoteMember seeder = new RemoteMember(cluster, URI.create("udp://127.0.0.1:5001"), "1"); + + // initialize 2 data centers with each having two racks + createDcNode(URI.create("udp://127.0.0.1:5001"), "1", settings, seeder, cluster, + "DataCenter1", "Rack1"); + createDcNode(URI.create("udp://127.0.0.1:5002"), "2", settings, seeder, cluster, + "DataCenter1", "Rack2"); + + createDcNode(URI.create("udp://127.0.0.1:5006"), "6", settings, seeder, cluster, + "DataCenter2", "Rack1"); + createDcNode(URI.create("udp://127.0.0.1:5007"), "7", settings, seeder, cluster, + "DataCenter2", "Rack1"); + + // check whether the members are discovered + TUnit.assertThat(() -> { + int total = 0; + for (int i = 0; i < 4; ++i) { + total += nodes.get(i).getLiveMembers().size(); + } + return total; + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(12); + + // Node 1 has a shared key with 'Dc1Rack1' + nodes.get(0).gossipPerNodeData(getPerNodeData("Dc1Rack1", "I am belong to Dc1", + new DataCenterReplicable<>())); + // Node 6 has a shared key with 'Dc2Rack1' + nodes.get(2).gossipPerNodeData(getPerNodeData("Dc2Rack1", "I am belong to Dc2", + new DataCenterReplicable<>())); + + // Node 2 must have the shared data with key 'Dc1Rack1' + TUnit.assertThat(() -> { + PerNodeDataMessage message = nodes.get(1).findPerNodeGossipData("1", "Dc1Rack1"); + if(message == null){ + return ""; + }else { + return message.getPayload(); + } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am belong to Dc1"); + + + // Node 7 must have the shared data with key 'Dc2Rack1' + TUnit.assertThat(() -> { + PerNodeDataMessage message = nodes.get(3).findPerNodeGossipData("6", "Dc2Rack1"); + if(message == null){ + return ""; + }else { + return message.getPayload(); + } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am belong to Dc2"); + + } + + private PerNodeDataMessage getPerNodeData(String key, String value, + Replicable<PerNodeDataMessage> replicable) { + PerNodeDataMessage g = new PerNodeDataMessage(); + g.setExpireAt(Long.MAX_VALUE); + g.setKey(key); + g.setPayload(value); + g.setTimestamp(System.currentTimeMillis()); + g.setReplicable(replicable); + return g; + } + + private void createDcNode(URI uri, String id, GossipSettings settings, RemoteMember seeder, + String cluster, String dataCenter, String rack){ + Map<String, String> props = new HashMap<>(); + props.put(DatacenterRackAwareActiveGossiper.DATACENTER, dataCenter); + props.put(DatacenterRackAwareActiveGossiper.RACK, rack); + + GossipManager dcNode = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri).id(id) + .gossipSettings(settings).gossipMembers(Arrays.asList(seeder)).properties(props) + .build(); + dcNode.init(); + register(dcNode); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/94da0bb6/gossip-itest/src/test/java/org/apache/gossip/SharedDataReplicationControlTest.java ---------------------------------------------------------------------- diff --git a/gossip-itest/src/test/java/org/apache/gossip/SharedDataReplicationControlTest.java b/gossip-itest/src/test/java/org/apache/gossip/SharedDataReplicationControlTest.java new file mode 100644 index 0000000..8ce063d --- /dev/null +++ b/gossip-itest/src/test/java/org/apache/gossip/SharedDataReplicationControlTest.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import io.teknek.tunit.TUnit; +import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; +import org.apache.gossip.model.SharedDataMessage; +import org.apache.gossip.replication.AllReplicable; +import org.apache.gossip.replication.BlackListReplicable; +import org.apache.gossip.replication.DataCenterReplicable; +import org.apache.gossip.replication.NotReplicable; +import org.apache.gossip.replication.Replicable; +import org.apache.gossip.replication.WhiteListReplicable; +import org.junit.Assert; +import org.junit.Test; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +@RunWith(JUnitPlatform.class) +public class SharedDataReplicationControlTest extends AbstractIntegrationBase { + + @Test + public void sharedDataReplicationTest() + throws InterruptedException, UnknownHostException, URISyntaxException { + generateStandardNodes(3); + + // check whether the members are discovered + TUnit.assertThat(() -> { + int total = 0; + for (GossipManager node : nodes) { + total += node.getLiveMembers().size(); + } + return total; + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); + + // Adding new shared data to Node 1 with default replication (replicate all) + nodes.get(0).gossipSharedData(sharedNodeData("public", "I am visible to all", + new AllReplicable<>())); + // Adding new shared data to Node 1 with no replication (replicate none) + nodes.get(0).gossipSharedData(sharedNodeData("private", "I am private", + new NotReplicable<>())); + + List<LocalMember> whiteList = new ArrayList<>(); + whiteList.add(nodes.get(1).getMyself()); + // Adding new shared data to Node 1 with white list Node 2 + nodes.get(0).gossipSharedData(sharedNodeData("wl", "white list", + new WhiteListReplicable<>(whiteList))); + + List<LocalMember> blackList = new ArrayList<>(); + blackList.add(nodes.get(1).getMyself()); + // Adding new shared data to Node 1 with black list Node 2 + nodes.get(0).gossipSharedData(sharedNodeData("bl", "black list", + new BlackListReplicable<>(blackList))); + + TUnit.assertThat(() -> { + SharedDataMessage message = nodes.get(1).findSharedGossipData("public"); + if(message == null){ + return ""; + }else { + return message.getPayload(); + } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am visible to all"); + + TUnit.assertThat(() -> { + SharedDataMessage message = nodes.get(2).findSharedGossipData("public"); + if(message == null){ + return ""; + }else { + return message.getPayload(); + } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am visible to all"); + + // Node 2 must have shared data with key wl + TUnit.assertThat(() -> { + SharedDataMessage message = nodes.get(1).findSharedGossipData("wl"); + if(message == null){ + return ""; + }else { + return message.getPayload(); + } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("white list"); + + // Node 3 must have shared data with key bl + TUnit.assertThat(() -> { + SharedDataMessage message = nodes.get(2).findSharedGossipData("bl"); + if(message == null){ + return ""; + }else { + return message.getPayload(); + } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("black list"); + } + + @Test + public void sharedDataDcReplicationTest() + throws InterruptedException, UnknownHostException, URISyntaxException { + + GossipSettings settings = new GossipSettings(); + settings.setPersistRingState(false); + settings.setPersistDataState(false); + String cluster = UUID.randomUUID().toString(); + settings.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName()); + + Map<String, String> gossipProps = new HashMap<>(); + gossipProps.put("sameRackGossipIntervalMs", "500"); + gossipProps.put("differentDatacenterGossipIntervalMs", "1000"); + settings.setActiveGossipProperties(gossipProps); + + RemoteMember seeder = new RemoteMember(cluster, URI.create("udp://127.0.0.1:5001"), "1"); + + // initialize 2 data centers with each having two racks + createDcNode(URI.create("udp://127.0.0.1:5001"), "1", settings, seeder, cluster, + "DataCenter1", "Rack1"); + createDcNode(URI.create("udp://127.0.0.1:5002"), "2", settings, seeder, cluster, + "DataCenter1", "Rack2"); + + createDcNode(URI.create("udp://127.0.0.1:5006"), "6", settings, seeder, cluster, + "DataCenter2", "Rack1"); + createDcNode(URI.create("udp://127.0.0.1:5007"), "7", settings, seeder, cluster, + "DataCenter2", "Rack1"); + + // check whether the members are discovered + TUnit.assertThat(() -> { + int total = 0; + for (int i = 0; i < 4; ++i) { + total += nodes.get(i).getLiveMembers().size(); + } + return total; + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(12); + + // Node 1 has a shared key with 'Dc1Rack1' + nodes.get(0).gossipSharedData(sharedNodeData("Dc1Rack1", "I am belong to Dc1", + new DataCenterReplicable<>())); + // Node 6 has a shared key with 'Dc2Rack1' + nodes.get(2).gossipSharedData(sharedNodeData("Dc2Rack1", "I am belong to Dc2", + new DataCenterReplicable<>())); + + // Node 2 must have the shared data with key 'Dc1Rack1' + TUnit.assertThat(() -> { + SharedDataMessage message = nodes.get(1).findSharedGossipData("Dc1Rack1"); + if(message == null){ + return ""; + }else { + return message.getPayload(); + } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am belong to Dc1"); + + // Node 7 must have the shared data with key 'Dc2Rack1' + TUnit.assertThat(() -> { + SharedDataMessage message = nodes.get(3).findSharedGossipData("Dc2Rack1"); + if(message == null){ + return ""; + }else { + return message.getPayload(); + } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("I am belong to Dc2"); + + } + + private SharedDataMessage sharedNodeData(String key, String value, + Replicable<SharedDataMessage> replicable) { + SharedDataMessage g = new SharedDataMessage(); + g.setExpireAt(Long.MAX_VALUE); + g.setKey(key); + g.setPayload(value); + g.setTimestamp(System.currentTimeMillis()); + g.setReplicable(replicable); + return g; + } + + private void createDcNode(URI uri, String id, GossipSettings settings, RemoteMember seeder, + String cluster, String dataCenter, String rack){ + Map<String, String> props = new HashMap<>(); + props.put(DatacenterRackAwareActiveGossiper.DATACENTER, dataCenter); + props.put(DatacenterRackAwareActiveGossiper.RACK, rack); + + GossipManager dcNode = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri).id(id) + .gossipSettings(settings).gossipMembers(Arrays.asList(seeder)).properties(props) + .build(); + dcNode.init(); + register(dcNode); + } + +}