http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/exceptions/ReadFailureException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java b/src/java/org/apache/cassandra/exceptions/ReadFailureException.java index 82885e3..72242fd 100644 --- a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java +++ b/src/java/org/apache/cassandra/exceptions/ReadFailureException.java @@ -17,16 +17,16 @@ */ package org.apache.cassandra.exceptions; -import java.net.InetAddress; import java.util.Map; import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.locator.InetAddressAndPort; public class ReadFailureException extends RequestFailureException { public final boolean dataPresent; - public ReadFailureException(ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map<InetAddress, RequestFailureReason> failureReasonByEndpoint) + public ReadFailureException(ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint) { super(ExceptionCode.READ_FAILURE, consistency, received, blockFor, failureReasonByEndpoint); this.dataPresent = dataPresent;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/exceptions/RequestFailureException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureException.java b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java index 1a5289c..2b57a75 100644 --- a/src/java/org/apache/cassandra/exceptions/RequestFailureException.java +++ b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java @@ -17,20 +17,20 @@ */ package org.apache.cassandra.exceptions; -import java.net.InetAddress; import java.util.HashMap; import java.util.Map; import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.locator.InetAddressAndPort; public class RequestFailureException extends RequestExecutionException { public final ConsistencyLevel consistency; public final int received; public final int blockFor; - public final Map<InetAddress, RequestFailureReason> failureReasonByEndpoint; + public final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint; - protected RequestFailureException(ExceptionCode code, ConsistencyLevel consistency, int received, int blockFor, Map<InetAddress, RequestFailureReason> failureReasonByEndpoint) + protected RequestFailureException(ExceptionCode code, ConsistencyLevel consistency, int received, int blockFor, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint) { super(code, String.format("Operation failed - received %d responses and %d failures", received, failureReasonByEndpoint.size())); this.consistency = consistency; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/exceptions/WriteFailureException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/exceptions/WriteFailureException.java b/src/java/org/apache/cassandra/exceptions/WriteFailureException.java index 1a857fe..a7dc66a 100644 --- a/src/java/org/apache/cassandra/exceptions/WriteFailureException.java +++ b/src/java/org/apache/cassandra/exceptions/WriteFailureException.java @@ -17,17 +17,17 @@ */ package org.apache.cassandra.exceptions; -import java.net.InetAddress; import java.util.Map; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.WriteType; +import org.apache.cassandra.locator.InetAddressAndPort; public class WriteFailureException extends RequestFailureException { public final WriteType writeType; - public WriteFailureException(ConsistencyLevel consistency, int received, int blockFor, WriteType writeType, Map<InetAddress, RequestFailureReason> failureReasonByEndpoint) + public WriteFailureException(ConsistencyLevel consistency, int received, int blockFor, WriteType writeType, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint) { super(ExceptionCode.WRITE_FAILURE, consistency, received, blockFor, failureReasonByEndpoint); this.writeType = writeType; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/ApplicationState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/ApplicationState.java b/src/java/org/apache/cassandra/gms/ApplicationState.java index ade9208..211387d 100644 --- a/src/java/org/apache/cassandra/gms/ApplicationState.java +++ b/src/java/org/apache/cassandra/gms/ApplicationState.java @@ -19,15 +19,15 @@ package org.apache.cassandra.gms; public enum ApplicationState { - STATUS, + @Deprecated STATUS, //Deprecated and unsued in 4.0, stop publishing in 5.0, reclaim in 6.0 LOAD, SCHEMA, DC, RACK, RELEASE_VERSION, REMOVAL_COORDINATOR, - INTERNAL_IP, - RPC_ADDRESS, + @Deprecated INTERNAL_IP, //Deprecated and unused in 4.0, stop publishing in 5.0, reclaim in 6.0 + @Deprecated RPC_ADDRESS, // ^ Same X_11_PADDING, // padding specifically for 1.1 SEVERITY, NET_VERSION, @@ -35,8 +35,9 @@ public enum ApplicationState TOKENS, RPC_READY, // pad to allow adding new states to existing cluster - X1, - X2, + INTERNAL_ADDRESS_AND_PORT, //Replacement for INTERNAL_IP with up to two ports + NATIVE_ADDRESS_AND_PORT, //Replacement for RPC_ADDRESS + STATUS_WITH_PORT, //Replacement for STATUS X3, X4, X5, http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/EndpointState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java index 847041f..1085447 100644 --- a/src/java/org/apache/cassandra/gms/EndpointState.java +++ b/src/java/org/apache/cassandra/gms/EndpointState.java @@ -146,9 +146,15 @@ public class EndpointState public String getStatus() { - VersionedValue status = getApplicationState(ApplicationState.STATUS); + VersionedValue status = getApplicationState(ApplicationState.STATUS_WITH_PORT); if (status == null) + { + status = getApplicationState(ApplicationState.STATUS); + } + if (status == null) + { return ""; + } String[] pieces = status.value.split(VersionedValue.DELIMITER_STR, -1); assert (pieces.length > 0); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/FailureDetector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java index b895082..e567b7b 100644 --- a/src/java/org/apache/cassandra/gms/FailureDetector.java +++ b/src/java/org/apache/cassandra/gms/FailureDetector.java @@ -22,7 +22,6 @@ import java.nio.file.StandardOpenOption; import java.nio.file.Path; import java.io.*; import java.lang.management.ManagementFactory; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -38,7 +37,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSWriteError; -import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; @@ -79,7 +78,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean // change. private final double PHI_FACTOR = 1.0 / Math.log(10.0); // 0.434... - private final ConcurrentHashMap<InetAddress, ArrivalWindow> arrivalSamples = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<InetAddressAndPort, ArrivalWindow> arrivalSamples = new ConcurrentHashMap<>(); private final List<IFailureDetectionEventListener> fdEvntListeners = new CopyOnWriteArrayList<>(); public FailureDetector() @@ -112,10 +111,20 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean public String getAllEndpointStates() { + return getAllEndpointStates(false); + } + + public String getAllEndpointStatesWithPort() + { + return getAllEndpointStates(true); + } + + public String getAllEndpointStates(boolean withPort) + { StringBuilder sb = new StringBuilder(); - for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.endpointStateMap.entrySet()) + for (Map.Entry<InetAddressAndPort, EndpointState> entry : Gossiper.instance.endpointStateMap.entrySet()) { - sb.append(entry.getKey()).append("\n"); + sb.append(entry.getKey().toString(withPort)).append("\n"); appendEndpointState(sb, entry.getValue()); } return sb.toString(); @@ -123,13 +132,23 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean public Map<String, String> getSimpleStates() { + return getSimpleStates(false); + } + + public Map<String, String> getSimpleStatesWithPort() + { + return getSimpleStates(true); + } + + private Map<String, String> getSimpleStates(boolean withPort) + { Map<String, String> nodesStatus = new HashMap<String, String>(Gossiper.instance.endpointStateMap.size()); - for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.endpointStateMap.entrySet()) + for (Map.Entry<InetAddressAndPort, EndpointState> entry : Gossiper.instance.endpointStateMap.entrySet()) { if (entry.getValue().isAlive()) - nodesStatus.put(entry.getKey().toString(), "UP"); + nodesStatus.put(entry.getKey().toString(withPort), "UP"); else - nodesStatus.put(entry.getKey().toString(), "DOWN"); + nodesStatus.put(entry.getKey().toString(withPort), "DOWN"); } return nodesStatus; } @@ -137,7 +156,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean public int getDownEndpointCount() { int count = 0; - for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.endpointStateMap.entrySet()) + for (Map.Entry<InetAddressAndPort, EndpointState> entry : Gossiper.instance.endpointStateMap.entrySet()) { if (!entry.getValue().isAlive()) count++; @@ -148,7 +167,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean public int getUpEndpointCount() { int count = 0; - for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.endpointStateMap.entrySet()) + for (Map.Entry<InetAddressAndPort, EndpointState> entry : Gossiper.instance.endpointStateMap.entrySet()) { if (entry.getValue().isAlive()) count++; @@ -159,13 +178,24 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean @Override public TabularData getPhiValues() throws OpenDataException { + return getPhiValues(false); + } + + @Override + public TabularData getPhiValuesWithPort() throws OpenDataException + { + return getPhiValues(true); + } + + private TabularData getPhiValues(boolean withPort) throws OpenDataException + { final CompositeType ct = new CompositeType("Node", "Node", new String[]{"Endpoint", "PHI"}, new String[]{"IP of the endpoint", "PHI value"}, new OpenType[]{SimpleType.STRING, SimpleType.DOUBLE}); final TabularDataSupport results = new TabularDataSupport(new TabularType("PhiList", "PhiList", ct, new String[]{"Endpoint"})); - for (final Map.Entry<InetAddress, ArrivalWindow> entry : arrivalSamples.entrySet()) + for (final Map.Entry<InetAddressAndPort, ArrivalWindow> entry : arrivalSamples.entrySet()) { final ArrivalWindow window = entry.getValue(); if (window.mean() > 0) @@ -176,7 +206,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean // returned values are scaled by PHI_FACTOR so that the are on the same scale as PhiConvictThreshold final CompositeData data = new CompositeDataSupport(ct, new String[]{"Endpoint", "PHI"}, - new Object[]{entry.getKey().toString(), phi * PHI_FACTOR}); + new Object[]{entry.getKey().toString(withPort), phi * PHI_FACTOR}); results.put(data); } } @@ -187,7 +217,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean public String getEndpointState(String address) throws UnknownHostException { StringBuilder sb = new StringBuilder(); - EndpointState endpointState = Gossiper.instance.getEndpointStateForEndpoint(InetAddress.getByName(address)); + EndpointState endpointState = Gossiper.instance.getEndpointStateForEndpoint(InetAddressAndPort.getByName(address)); appendEndpointState(sb, endpointState); return sb.toString(); } @@ -243,9 +273,9 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean return DatabaseDescriptor.getPhiConvictThreshold(); } - public boolean isAlive(InetAddress ep) + public boolean isAlive(InetAddressAndPort ep) { - if (ep.equals(FBUtilities.getBroadcastAddress())) + if (ep.equals(FBUtilities.getBroadcastAddressAndPort())) return true; EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(ep); @@ -257,7 +287,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean return epState != null && epState.isAlive(); } - public void report(InetAddress ep) + public void report(InetAddressAndPort ep) { long now = Clock.instance.nanoTime(); ArrivalWindow heartbeatWindow = arrivalSamples.get(ep); @@ -279,7 +309,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean logger.trace("Average for {} is {}ns", ep, heartbeatWindow.mean()); } - public void interpret(InetAddress ep) + public void interpret(InetAddressAndPort ep) { ArrivalWindow hbWnd = arrivalSamples.get(ep); if (hbWnd == null) @@ -324,7 +354,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean } } - public void forceConviction(InetAddress ep) + public void forceConviction(InetAddressAndPort ep) { logger.debug("Forcing conviction of {}", ep); for (IFailureDetectionEventListener listener : fdEvntListeners) @@ -333,7 +363,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean } } - public void remove(InetAddress ep) + public void remove(InetAddressAndPort ep) { arrivalSamples.remove(ep); } @@ -351,10 +381,10 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean public String toString() { StringBuilder sb = new StringBuilder(); - Set<InetAddress> eps = arrivalSamples.keySet(); + Set<InetAddressAndPort> eps = arrivalSamples.keySet(); sb.append("-----------------------------------------------------------------------"); - for (InetAddress ep : eps) + for (InetAddressAndPort ep : eps) { ArrivalWindow hWnd = arrivalSamples.get(ep); sb.append(ep).append(" : "); @@ -447,7 +477,7 @@ class ArrivalWindow } } - synchronized void add(long value, InetAddress ep) + synchronized void add(long value, InetAddressAndPort ep) { assert tLast >= 0; if (tLast > 0L) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java b/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java index 23fae3a..6be31b0 100644 --- a/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java +++ b/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java @@ -31,15 +31,18 @@ public interface FailureDetectorMBean public double getPhiConvictThreshold(); - public String getAllEndpointStates(); + @Deprecated public String getAllEndpointStates(); + public String getAllEndpointStatesWithPort(); public String getEndpointState(String address) throws UnknownHostException; - public Map<String, String> getSimpleStates(); + @Deprecated public Map<String, String> getSimpleStates(); + public Map<String, String> getSimpleStatesWithPort(); public int getDownEndpointCount(); public int getUpEndpointCount(); - public TabularData getPhiValues() throws OpenDataException; + @Deprecated public TabularData getPhiValues() throws OpenDataException; + public TabularData getPhiValuesWithPort() throws OpenDataException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/GossipDigest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/GossipDigest.java b/src/java/org/apache/cassandra/gms/GossipDigest.java index 9dfd486..c7e60c4 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigest.java +++ b/src/java/org/apache/cassandra/gms/GossipDigest.java @@ -18,12 +18,12 @@ package org.apache.cassandra.gms; import java.io.*; -import java.net.InetAddress; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.CompactEndpointSerializationHelper; /** @@ -34,18 +34,18 @@ public class GossipDigest implements Comparable<GossipDigest> { public static final IVersionedSerializer<GossipDigest> serializer = new GossipDigestSerializer(); - final InetAddress endpoint; + final InetAddressAndPort endpoint; final int generation; final int maxVersion; - GossipDigest(InetAddress ep, int gen, int version) + GossipDigest(InetAddressAndPort ep, int gen, int version) { endpoint = ep; generation = gen; maxVersion = version; } - InetAddress getEndpoint() + InetAddressAndPort getEndpoint() { return endpoint; } @@ -83,14 +83,14 @@ class GossipDigestSerializer implements IVersionedSerializer<GossipDigest> { public void serialize(GossipDigest gDigest, DataOutputPlus out, int version) throws IOException { - CompactEndpointSerializationHelper.serialize(gDigest.endpoint, out); + CompactEndpointSerializationHelper.instance.serialize(gDigest.endpoint, out, version); out.writeInt(gDigest.generation); out.writeInt(gDigest.maxVersion); } public GossipDigest deserialize(DataInputPlus in, int version) throws IOException { - InetAddress endpoint = CompactEndpointSerializationHelper.deserialize(in); + InetAddressAndPort endpoint = CompactEndpointSerializationHelper.instance.deserialize(in, version); int generation = in.readInt(); int maxVersion = in.readInt(); return new GossipDigest(endpoint, generation, maxVersion); @@ -98,7 +98,7 @@ class GossipDigestSerializer implements IVersionedSerializer<GossipDigest> public long serializedSize(GossipDigest gDigest, int version) { - long size = CompactEndpointSerializationHelper.serializedSize(gDigest.endpoint); + long size = CompactEndpointSerializationHelper.instance.serializedSize(gDigest.endpoint, version); size += TypeSizes.sizeof(gDigest.generation); size += TypeSizes.sizeof(gDigest.maxVersion); return size; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/GossipDigestAck.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck.java b/src/java/org/apache/cassandra/gms/GossipDigestAck.java index cf71ae6..a7d5b92 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigestAck.java +++ b/src/java/org/apache/cassandra/gms/GossipDigestAck.java @@ -18,7 +18,6 @@ package org.apache.cassandra.gms; import java.io.IOException; -import java.net.InetAddress; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -27,6 +26,7 @@ import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.CompactEndpointSerializationHelper; /** @@ -38,9 +38,9 @@ public class GossipDigestAck public static final IVersionedSerializer<GossipDigestAck> serializer = new GossipDigestAckSerializer(); final List<GossipDigest> gDigestList; - final Map<InetAddress, EndpointState> epStateMap; + final Map<InetAddressAndPort, EndpointState> epStateMap; - GossipDigestAck(List<GossipDigest> gDigestList, Map<InetAddress, EndpointState> epStateMap) + GossipDigestAck(List<GossipDigest> gDigestList, Map<InetAddressAndPort, EndpointState> epStateMap) { this.gDigestList = gDigestList; this.epStateMap = epStateMap; @@ -51,7 +51,7 @@ public class GossipDigestAck return gDigestList; } - Map<InetAddress, EndpointState> getEndpointStateMap() + Map<InetAddressAndPort, EndpointState> getEndpointStateMap() { return epStateMap; } @@ -63,10 +63,10 @@ class GossipDigestAckSerializer implements IVersionedSerializer<GossipDigestAck> { GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList, out, version); out.writeInt(gDigestAckMessage.epStateMap.size()); - for (Map.Entry<InetAddress, EndpointState> entry : gDigestAckMessage.epStateMap.entrySet()) + for (Map.Entry<InetAddressAndPort, EndpointState> entry : gDigestAckMessage.epStateMap.entrySet()) { - InetAddress ep = entry.getKey(); - CompactEndpointSerializationHelper.serialize(ep, out); + InetAddressAndPort ep = entry.getKey(); + CompactEndpointSerializationHelper.instance.serialize(ep, out, version); EndpointState.serializer.serialize(entry.getValue(), out, version); } } @@ -75,11 +75,11 @@ class GossipDigestAckSerializer implements IVersionedSerializer<GossipDigestAck> { List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(in, version); int size = in.readInt(); - Map<InetAddress, EndpointState> epStateMap = new HashMap<InetAddress, EndpointState>(size); + Map<InetAddressAndPort, EndpointState> epStateMap = new HashMap<InetAddressAndPort, EndpointState>(size); for (int i = 0; i < size; ++i) { - InetAddress ep = CompactEndpointSerializationHelper.deserialize(in); + InetAddressAndPort ep = CompactEndpointSerializationHelper.instance.deserialize(in, version); EndpointState epState = EndpointState.serializer.deserialize(in, version); epStateMap.put(ep, epState); } @@ -90,8 +90,8 @@ class GossipDigestAckSerializer implements IVersionedSerializer<GossipDigestAck> { int size = GossipDigestSerializationHelper.serializedSize(ack.gDigestList, version); size += TypeSizes.sizeof(ack.epStateMap.size()); - for (Map.Entry<InetAddress, EndpointState> entry : ack.epStateMap.entrySet()) - size += CompactEndpointSerializationHelper.serializedSize(entry.getKey()) + for (Map.Entry<InetAddressAndPort, EndpointState> entry : ack.epStateMap.entrySet()) + size += CompactEndpointSerializationHelper.instance.serializedSize(entry.getKey(), version) + EndpointState.serializer.serializedSize(entry.getValue(), version); return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/GossipDigestAck2.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck2.java b/src/java/org/apache/cassandra/gms/GossipDigestAck2.java index 9d779fe..a6d1d2b 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigestAck2.java +++ b/src/java/org/apache/cassandra/gms/GossipDigestAck2.java @@ -18,7 +18,6 @@ package org.apache.cassandra.gms; import java.io.*; -import java.net.InetAddress; import java.util.HashMap; import java.util.Map; @@ -26,6 +25,7 @@ import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.CompactEndpointSerializationHelper; /** @@ -36,14 +36,14 @@ public class GossipDigestAck2 { public static final IVersionedSerializer<GossipDigestAck2> serializer = new GossipDigestAck2Serializer(); - final Map<InetAddress, EndpointState> epStateMap; + final Map<InetAddressAndPort, EndpointState> epStateMap; - GossipDigestAck2(Map<InetAddress, EndpointState> epStateMap) + GossipDigestAck2(Map<InetAddressAndPort, EndpointState> epStateMap) { this.epStateMap = epStateMap; } - Map<InetAddress, EndpointState> getEndpointStateMap() + Map<InetAddressAndPort, EndpointState> getEndpointStateMap() { return epStateMap; } @@ -54,10 +54,10 @@ class GossipDigestAck2Serializer implements IVersionedSerializer<GossipDigestAck public void serialize(GossipDigestAck2 ack2, DataOutputPlus out, int version) throws IOException { out.writeInt(ack2.epStateMap.size()); - for (Map.Entry<InetAddress, EndpointState> entry : ack2.epStateMap.entrySet()) + for (Map.Entry<InetAddressAndPort, EndpointState> entry : ack2.epStateMap.entrySet()) { - InetAddress ep = entry.getKey(); - CompactEndpointSerializationHelper.serialize(ep, out); + InetAddressAndPort ep = entry.getKey(); + CompactEndpointSerializationHelper.instance.serialize(ep, out, version); EndpointState.serializer.serialize(entry.getValue(), out, version); } } @@ -65,11 +65,11 @@ class GossipDigestAck2Serializer implements IVersionedSerializer<GossipDigestAck public GossipDigestAck2 deserialize(DataInputPlus in, int version) throws IOException { int size = in.readInt(); - Map<InetAddress, EndpointState> epStateMap = new HashMap<InetAddress, EndpointState>(size); + Map<InetAddressAndPort, EndpointState> epStateMap = new HashMap<>(size); for (int i = 0; i < size; ++i) { - InetAddress ep = CompactEndpointSerializationHelper.deserialize(in); + InetAddressAndPort ep = CompactEndpointSerializationHelper.instance.deserialize(in, version); EndpointState epState = EndpointState.serializer.deserialize(in, version); epStateMap.put(ep, epState); } @@ -79,8 +79,8 @@ class GossipDigestAck2Serializer implements IVersionedSerializer<GossipDigestAck public long serializedSize(GossipDigestAck2 ack2, int version) { long size = TypeSizes.sizeof(ack2.epStateMap.size()); - for (Map.Entry<InetAddress, EndpointState> entry : ack2.epStateMap.entrySet()) - size += CompactEndpointSerializationHelper.serializedSize(entry.getKey()) + for (Map.Entry<InetAddressAndPort, EndpointState> entry : ack2.epStateMap.entrySet()) + size += CompactEndpointSerializationHelper.instance.serializedSize(entry.getKey(), version) + EndpointState.serializer.serializedSize(entry.getValue(), version); return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java index 240bb40..fd5d487 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java +++ b/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java @@ -17,12 +17,12 @@ */ package org.apache.cassandra.gms; -import java.net.InetAddress; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; @@ -34,7 +34,7 @@ public class GossipDigestAck2VerbHandler implements IVerbHandler<GossipDigestAck { if (logger.isTraceEnabled()) { - InetAddress from = message.from; + InetAddressAndPort from = message.from; logger.trace("Received a GossipDigestAck2Message from {}", from); } if (!Gossiper.instance.isEnabled()) @@ -43,7 +43,7 @@ public class GossipDigestAck2VerbHandler implements IVerbHandler<GossipDigestAck logger.trace("Ignoring GossipDigestAck2Message because gossip is disabled"); return; } - Map<InetAddress, EndpointState> remoteEpStateMap = message.payload.getEndpointStateMap(); + Map<InetAddressAndPort, EndpointState> remoteEpStateMap = message.payload.getEndpointStateMap(); /* Notify the Failure Detector */ Gossiper.instance.notifyFailureDetector(remoteEpStateMap); Gossiper.instance.applyStateLocally(remoteEpStateMap); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java index d6d9dfb..2a12b7c 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java +++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.gms; -import java.net.InetAddress; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -25,6 +24,7 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; @@ -36,7 +36,7 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck> public void doVerb(MessageIn<GossipDigestAck> message, int id) { - InetAddress from = message.from; + InetAddressAndPort from = message.from; if (logger.isTraceEnabled()) logger.trace("Received a GossipDigestAckMessage from {}", from); if (!Gossiper.instance.isEnabled() && !Gossiper.instance.isInShadowRound()) @@ -48,7 +48,7 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck> GossipDigestAck gDigestAckMessage = message.payload; List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList(); - Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap(); + Map<InetAddressAndPort, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap(); logger.trace("Received ack with {} digests and {} states", gDigestList.size(), epStateMap.size()); if (Gossiper.instance.isInShadowRound()) @@ -79,10 +79,10 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck> } /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */ - Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>(); + Map<InetAddressAndPort, EndpointState> deltaEpStateMap = new HashMap<InetAddressAndPort, EndpointState>(); for (GossipDigest gDigest : gDigestList) { - InetAddress addr = gDigest.getEndpoint(); + InetAddressAndPort addr = gDigest.getEndpoint(); EndpointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion()); if (localEpStatePtr != null) deltaEpStateMap.put(addr, localEpStatePtr); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java index ddfafc9..9619f4e 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java +++ b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.gms; -import java.net.InetAddress; import java.util.*; import com.google.common.collect.Maps; @@ -26,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; @@ -37,7 +37,7 @@ public class GossipDigestSynVerbHandler implements IVerbHandler<GossipDigestSyn> public void doVerb(MessageIn<GossipDigestSyn> message, int id) { - InetAddress from = message.from; + InetAddressAndPort from = message.from; if (logger.isTraceEnabled()) logger.trace("Received a GossipDigestSynMessage from {}", from); if (!Gossiper.instance.isEnabled() && !Gossiper.instance.isInShadowRound()) @@ -102,7 +102,7 @@ public class GossipDigestSynVerbHandler implements IVerbHandler<GossipDigestSyn> doSort(gDigestList); List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>(); - Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>(); + Map<InetAddressAndPort, EndpointState> deltaEpStateMap = new HashMap<InetAddressAndPort, EndpointState>(); Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap); logger.trace("sending {} digests and {} deltas", deltaGossipDigestList.size(), deltaEpStateMap.size()); MessageOut<GossipDigestAck> gDigestAckMessage = new MessageOut<GossipDigestAck>(MessagingService.Verb.GOSSIP_DIGEST_ACK, @@ -116,14 +116,14 @@ public class GossipDigestSynVerbHandler implements IVerbHandler<GossipDigestSyn> /* * First construct a map whose key is the endpoint in the GossipDigest and the value is the * GossipDigest itself. Then build a list of version differences i.e difference between the - * version in the GossipDigest and the version in the local state for a given InetAddress. + * version in the GossipDigest and the version in the local state for a given InetAddressAndPort. * Sort this list. Now loop through the sorted list and retrieve the GossipDigest corresponding * to the endpoint from the map that was initially constructed. */ private void doSort(List<GossipDigest> gDigestList) { /* Construct a map of endpoint to GossipDigest. */ - Map<InetAddress, GossipDigest> epToDigestMap = Maps.newHashMapWithExpectedSize(gDigestList.size()); + Map<InetAddressAndPort, GossipDigest> epToDigestMap = Maps.newHashMapWithExpectedSize(gDigestList.size()); for (GossipDigest gDigest : gDigestList) { epToDigestMap.put(gDigest.getEndpoint(), gDigest); @@ -136,7 +136,7 @@ public class GossipDigestSynVerbHandler implements IVerbHandler<GossipDigestSyn> List<GossipDigest> diffDigests = new ArrayList<GossipDigest>(gDigestList.size()); for (GossipDigest gDigest : gDigestList) { - InetAddress ep = gDigest.getEndpoint(); + InetAddressAndPort ep = gDigest.getEndpoint(); EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(ep); int version = (epState != null) ? Gossiper.instance.getMaxEndpointStateVersion(epState) : 0; int diffVersion = Math.abs(version - gDigest.getMaxVersion()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index e675d92..eb6c500 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -18,12 +18,12 @@ package org.apache.cassandra.gms; import java.lang.management.ManagementFactory; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.management.MBeanServer; @@ -34,6 +34,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.CassandraVersion; import org.apache.cassandra.utils.Pair; import org.slf4j.Logger; @@ -99,43 +100,36 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean static final int MAX_GENERATION_DIFFERENCE = 86400 * 365; private long fatClientTimeout; private final Random random = new Random(); - private final Comparator<InetAddress> inetcomparator = new Comparator<InetAddress>() - { - public int compare(InetAddress addr1, InetAddress addr2) - { - return addr1.getHostAddress().compareTo(addr2.getHostAddress()); - } - }; /* subscribers for interest in EndpointState change */ private final List<IEndpointStateChangeSubscriber> subscribers = new CopyOnWriteArrayList<IEndpointStateChangeSubscriber>(); /* live member set */ - private final Set<InetAddress> liveEndpoints = new ConcurrentSkipListSet<InetAddress>(inetcomparator); + private final Set<InetAddressAndPort> liveEndpoints = new ConcurrentSkipListSet<>(); /* unreachable member set */ - private final Map<InetAddress, Long> unreachableEndpoints = new ConcurrentHashMap<InetAddress, Long>(); + private final Map<InetAddressAndPort, Long> unreachableEndpoints = new ConcurrentHashMap<>(); /* initial seeds for joining the cluster */ @VisibleForTesting - final Set<InetAddress> seeds = new ConcurrentSkipListSet<InetAddress>(inetcomparator); + final Set<InetAddressAndPort> seeds = new ConcurrentSkipListSet<>(); /* map where key is the endpoint and value is the state associated with the endpoint */ - final ConcurrentMap<InetAddress, EndpointState> endpointStateMap = new ConcurrentHashMap<InetAddress, EndpointState>(); + final ConcurrentMap<InetAddressAndPort, EndpointState> endpointStateMap = new ConcurrentHashMap<>(); /* map where key is endpoint and value is timestamp when this endpoint was removed from * gossip. We will ignore any gossip regarding these endpoints for QUARANTINE_DELAY time * after removal to prevent nodes from falsely reincarnating during the time when removal * gossip gets propagated to all nodes */ - private final Map<InetAddress, Long> justRemovedEndpoints = new ConcurrentHashMap<InetAddress, Long>(); + private final Map<InetAddressAndPort, Long> justRemovedEndpoints = new ConcurrentHashMap<>(); - private final Map<InetAddress, Long> expireTimeEndpointMap = new ConcurrentHashMap<InetAddress, Long>(); + private final Map<InetAddressAndPort, Long> expireTimeEndpointMap = new ConcurrentHashMap<>(); private volatile boolean inShadowRound = false; // seeds gathered during shadow round that indicated to be in the shadow round phase as well - private final Set<InetAddress> seedsInShadowRound = new ConcurrentSkipListSet<>(inetcomparator); + private final Set<InetAddressAndPort> seedsInShadowRound = new ConcurrentSkipListSet<>(); // endpoint states as gathered during shadow round - private final Map<InetAddress, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>(); + private final Map<InetAddressAndPort, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>(); private volatile long lastProcessedMessageAt = System.currentTimeMillis(); @@ -151,9 +145,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean taskLock.lock(); /* Update the local heartbeat counter. */ - endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().updateHeartBeat(); + endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).getHeartBeatState().updateHeartBeat(); if (logger.isTraceEnabled()) - logger.trace("My heartbeat is now {}", endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().getHeartBeatVersion()); + logger.trace("My heartbeat is now {}", endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).getHeartBeatState().getHeartBeatVersion()); final List<GossipDigest> gDigests = new ArrayList<GossipDigest>(); Gossiper.instance.makeRandomGossipDigest(gDigests); @@ -231,14 +225,24 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean public boolean seenAnySeed() { - for (Map.Entry<InetAddress, EndpointState> entry : endpointStateMap.entrySet()) + for (Map.Entry<InetAddressAndPort, EndpointState> entry : endpointStateMap.entrySet()) { if (seeds.contains(entry.getKey())) return true; try { VersionedValue internalIp = entry.getValue().getApplicationState(ApplicationState.INTERNAL_IP); - if (internalIp != null && seeds.contains(InetAddress.getByName(internalIp.value))) + VersionedValue internalIpAndPort = entry.getValue().getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT); + InetAddressAndPort endpoint = null; + if (internalIpAndPort != null) + { + endpoint = InetAddressAndPort.getByName(internalIpAndPort.value); + } + else if (internalIp != null) + { + endpoint = InetAddressAndPort.getByName(internalIp.value); + } + if (endpoint != null && seeds.contains(endpoint)) return true; } catch (UnknownHostException e) @@ -272,18 +276,18 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean /** * @return a list of live gossip participants, including fat clients */ - public Set<InetAddress> getLiveMembers() + public Set<InetAddressAndPort> getLiveMembers() { - Set<InetAddress> liveMembers = new HashSet<>(liveEndpoints); - if (!liveMembers.contains(FBUtilities.getBroadcastAddress())) - liveMembers.add(FBUtilities.getBroadcastAddress()); + Set<InetAddressAndPort> liveMembers = new HashSet<>(liveEndpoints); + if (!liveMembers.contains(FBUtilities.getBroadcastAddressAndPort())) + liveMembers.add(FBUtilities.getBroadcastAddressAndPort()); return liveMembers; } /** * @return a list of live ring members. */ - public Set<InetAddress> getLiveTokenOwners() + public Set<InetAddressAndPort> getLiveTokenOwners() { return StorageService.instance.getLiveRingMembers(true); } @@ -291,7 +295,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean /** * @return a list of unreachable gossip participants, including fat clients */ - public Set<InetAddress> getUnreachableMembers() + public Set<InetAddressAndPort> getUnreachableMembers() { return unreachableEndpoints.keySet(); } @@ -299,10 +303,10 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean /** * @return a list of unreachable token owners */ - public Set<InetAddress> getUnreachableTokenOwners() + public Set<InetAddressAndPort> getUnreachableTokenOwners() { - Set<InetAddress> tokenOwners = new HashSet<>(); - for (InetAddress endpoint : unreachableEndpoints.keySet()) + Set<InetAddressAndPort> tokenOwners = new HashSet<>(); + for (InetAddressAndPort endpoint : unreachableEndpoints.keySet()) { if (StorageService.instance.getTokenMetadata().isMember(endpoint)) tokenOwners.add(endpoint); @@ -311,7 +315,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return tokenOwners; } - public long getEndpointDowntime(InetAddress ep) + public long getEndpointDowntime(InetAddressAndPort ep) { Long downtime = unreachableEndpoints.get(ep); if (downtime != null) @@ -320,14 +324,25 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return 0L; } - private boolean isShutdown(InetAddress endpoint) + private boolean isShutdown(InetAddressAndPort endpoint) { EndpointState epState = endpointStateMap.get(endpoint); if (epState == null) + { return false; - if (epState.getApplicationState(ApplicationState.STATUS) == null) - return false; - String value = epState.getApplicationState(ApplicationState.STATUS).value; + } + + VersionedValue versionedValue = epState.getApplicationState(ApplicationState.STATUS_WITH_PORT); + if (versionedValue == null) + { + versionedValue = epState.getApplicationState(ApplicationState.STATUS); + if (versionedValue == null) + { + return false; + } + } + + String value = versionedValue.value; String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1); assert (pieces.length > 0); String state = pieces[0]; @@ -340,7 +355,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * * @param endpoint end point that is convicted. */ - public void convict(InetAddress endpoint, double phi) + public void convict(InetAddressAndPort endpoint, double phi) { EndpointState epState = endpointStateMap.get(endpoint); if (epState == null) @@ -366,11 +381,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * This method is used to mark a node as shutdown; that is it gracefully exited on its own and told us about it * @param endpoint endpoint that has shut itself down */ - protected void markAsShutdown(InetAddress endpoint) + protected void markAsShutdown(InetAddressAndPort endpoint) { EndpointState epState = endpointStateMap.get(endpoint); if (epState == null) return; + epState.addApplicationState(ApplicationState.STATUS_WITH_PORT, StorageService.instance.valueFactory.shutdown(true)); epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.shutdown(true)); epState.addApplicationState(ApplicationState.RPC_READY, StorageService.instance.valueFactory.rpcReady(false)); epState.getHeartBeatState().forceHighestPossibleVersionUnsafe(); @@ -397,7 +413,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * * @param endpoint endpoint to be removed from the current membership. */ - private void evictFromMembership(InetAddress endpoint) + private void evictFromMembership(InetAddressAndPort endpoint) { unreachableEndpoints.remove(endpoint); endpointStateMap.remove(endpoint); @@ -411,7 +427,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean /** * Removes the endpoint from Gossip but retains endpoint state */ - public void removeEndpoint(InetAddress endpoint) + public void removeEndpoint(InetAddressAndPort endpoint) { // do subscribers first so anything in the subscriber that depends on gossiper state won't get confused for (IEndpointStateChangeSubscriber subscriber : subscribers) @@ -438,7 +454,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * * @param endpoint */ - private void quarantineEndpoint(InetAddress endpoint) + private void quarantineEndpoint(InetAddressAndPort endpoint) { quarantineEndpoint(endpoint, System.currentTimeMillis()); } @@ -449,7 +465,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * @param endpoint * @param quarantineExpiration */ - private void quarantineEndpoint(InetAddress endpoint, long quarantineExpiration) + private void quarantineEndpoint(InetAddressAndPort endpoint, long quarantineExpiration) { justRemovedEndpoints.put(endpoint, quarantineExpiration); } @@ -458,7 +474,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * Quarantine endpoint specifically for replacement purposes. * @param endpoint */ - public void replacementQuarantine(InetAddress endpoint) + public void replacementQuarantine(InetAddressAndPort endpoint) { // remember, quarantineEndpoint will effectively already add QUARANTINE_DELAY, so this is 2x logger.debug(""); @@ -471,7 +487,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * * @param endpoint The endpoint that has been replaced */ - public void replacedEndpoint(InetAddress endpoint) + public void replacedEndpoint(InetAddressAndPort endpoint) { removeEndpoint(endpoint); evictFromMembership(endpoint); @@ -491,9 +507,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean int maxVersion = 0; // local epstate will be part of endpointStateMap - List<InetAddress> endpoints = new ArrayList<InetAddress>(endpointStateMap.keySet()); + List<InetAddressAndPort> endpoints = new ArrayList<>(endpointStateMap.keySet()); Collections.shuffle(endpoints, random); - for (InetAddress endpoint : endpoints) + for (InetAddressAndPort endpoint : endpoints) { epState = endpointStateMap.get(endpoint); if (epState != null) @@ -524,7 +540,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * @param hostId - the ID of the host being removed * @param localHostId - my own host ID for replication coordination */ - public void advertiseRemoving(InetAddress endpoint, UUID hostId, UUID localHostId) + public void advertiseRemoving(InetAddressAndPort endpoint, UUID hostId, UUID localHostId) { EndpointState epState = endpointStateMap.get(endpoint); // remember this node's generation @@ -541,6 +557,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean epState.updateTimestamp(); // make sure we don't evict it too soon epState.getHeartBeatState().forceNewerGenerationUnsafe(); Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class); + states.put(ApplicationState.STATUS_WITH_PORT, StorageService.instance.valueFactory.removingNonlocal(hostId)); states.put(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(hostId)); states.put(ApplicationState.REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(localHostId)); epState.addApplicationStates(states); @@ -554,12 +571,13 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * @param endpoint * @param hostId */ - public void advertiseTokenRemoved(InetAddress endpoint, UUID hostId) + public void advertiseTokenRemoved(InetAddressAndPort endpoint, UUID hostId) { EndpointState epState = endpointStateMap.get(endpoint); epState.updateTimestamp(); // make sure we don't evict it too soon epState.getHeartBeatState().forceNewerGenerationUnsafe(); long expireTime = computeExpireTime(); + epState.addApplicationState(ApplicationState.STATUS_WITH_PORT, StorageService.instance.valueFactory.removedNonlocal(hostId, expireTime)); epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removedNonlocal(hostId, expireTime)); logger.info("Completing removal of {}", endpoint); addExpireTimeForEndpoint(endpoint, expireTime); @@ -584,7 +602,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean */ public void assassinateEndpoint(String address) throws UnknownHostException { - InetAddress endpoint = InetAddress.getByName(address); + InetAddressAndPort endpoint = InetAddressAndPort.getByName(address); EndpointState epState = endpointStateMap.get(endpoint); Collection<Token> tokens = null; logger.warn("Assassinating {} via gossip", endpoint); @@ -624,18 +642,20 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } // do not pass go, do not collect 200 dollars, just gtfo - epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.left(tokens, computeExpireTime())); + long expireTime = computeExpireTime(); + epState.addApplicationState(ApplicationState.STATUS_WITH_PORT, StorageService.instance.valueFactory.left(tokens, expireTime)); + epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.left(tokens, expireTime)); handleMajorStateChange(endpoint, epState); Uninterruptibles.sleepUninterruptibly(intervalInMillis * 4, TimeUnit.MILLISECONDS); logger.warn("Finished assassinating {}", endpoint); } - public boolean isKnownEndpoint(InetAddress endpoint) + public boolean isKnownEndpoint(InetAddressAndPort endpoint) { return endpointStateMap.containsKey(endpoint); } - public int getCurrentGenerationNumber(InetAddress endpoint) + public int getCurrentGenerationNumber(InetAddressAndPort endpoint) { return endpointStateMap.get(endpoint).getHeartBeatState().getGeneration(); } @@ -647,16 +667,16 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * @param epSet a set of endpoint from which a random endpoint is chosen. * @return true if the chosen endpoint is also a seed. */ - private boolean sendGossip(MessageOut<GossipDigestSyn> message, Set<InetAddress> epSet) + private boolean sendGossip(MessageOut<GossipDigestSyn> message, Set<InetAddressAndPort> epSet) { - List<InetAddress> liveEndpoints = ImmutableList.copyOf(epSet); + List<InetAddressAndPort> liveEndpoints = ImmutableList.copyOf(epSet); int size = liveEndpoints.size(); if (size < 1) return false; /* Generate a random number from 0 -> size */ int index = (size == 1) ? 0 : random.nextInt(size); - InetAddress to = liveEndpoints.get(index); + InetAddressAndPort to = liveEndpoints.get(index); if (logger.isTraceEnabled()) logger.trace("Sending a GossipDigestSyn to {} ...", to); if (firstSynSendAt == 0) @@ -695,7 +715,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean int size = seeds.size(); if (size > 0) { - if (size == 1 && seeds.contains(FBUtilities.getBroadcastAddress())) + if (size == 1 && seeds.contains(FBUtilities.getBroadcastAddressAndPort())) { return; } @@ -715,7 +735,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } } - public boolean isGossipOnlyMember(InetAddress endpoint) + public boolean isGossipOnlyMember(InetAddressAndPort endpoint) { EndpointState epState = endpointStateMap.get(endpoint); if (epState == null) @@ -740,8 +760,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * @param epStates - endpoint states in the cluster * @return true if it is safe to start the node, false otherwise */ - public boolean isSafeForStartup(InetAddress endpoint, UUID localHostUUID, boolean isBootstrapping, - Map<InetAddress, EndpointState> epStates) + public boolean isSafeForStartup(InetAddressAndPort endpoint, UUID localHostUUID, boolean isBootstrapping, + Map<InetAddressAndPort, EndpointState> epStates) { EndpointState epState = epStates.get(endpoint); // if there's no previous state, or the node was previously removed from the cluster, we're good @@ -792,10 +812,10 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } } - Set<InetAddress> eps = endpointStateMap.keySet(); - for (InetAddress endpoint : eps) + Set<InetAddressAndPort> eps = endpointStateMap.keySet(); + for (InetAddressAndPort endpoint : eps) { - if (endpoint.equals(FBUtilities.getBroadcastAddress())) + if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort())) continue; FailureDetector.instance.interpret(endpoint); @@ -829,7 +849,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean if (!justRemovedEndpoints.isEmpty()) { - for (Entry<InetAddress, Long> entry : justRemovedEndpoints.entrySet()) + for (Entry<InetAddressAndPort, Long> entry : justRemovedEndpoints.entrySet()) { if ((now - entry.getValue()) > QUARANTINE_DELAY) { @@ -841,34 +861,34 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } } - protected long getExpireTimeForEndpoint(InetAddress endpoint) + protected long getExpireTimeForEndpoint(InetAddressAndPort endpoint) { /* default expireTime is aVeryLongTime */ Long storedTime = expireTimeEndpointMap.get(endpoint); return storedTime == null ? computeExpireTime() : storedTime; } - public EndpointState getEndpointStateForEndpoint(InetAddress ep) + public EndpointState getEndpointStateForEndpoint(InetAddressAndPort ep) { return endpointStateMap.get(ep); } - public Set<Entry<InetAddress, EndpointState>> getEndpointStates() + public Set<Entry<InetAddressAndPort, EndpointState>> getEndpointStates() { return endpointStateMap.entrySet(); } - public UUID getHostId(InetAddress endpoint) + public UUID getHostId(InetAddressAndPort endpoint) { return getHostId(endpoint, endpointStateMap); } - public UUID getHostId(InetAddress endpoint, Map<InetAddress, EndpointState> epStates) + public UUID getHostId(InetAddressAndPort endpoint, Map<InetAddressAndPort, EndpointState> epStates) { return UUID.fromString(epStates.get(endpoint).getApplicationState(ApplicationState.HOST_ID).value); } - EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version) + EndpointState getStateForVersionBiggerThan(InetAddressAndPort forEndpoint, int version) { EndpointState epState = endpointStateMap.get(forEndpoint); EndpointState reqdEndpointState = null; @@ -919,7 +939,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean /** * determine which endpoint started up earlier */ - public int compareEndpointStartup(InetAddress addr1, InetAddress addr2) + public int compareEndpointStartup(InetAddressAndPort addr1, InetAddressAndPort addr2) { EndpointState ep1 = getEndpointStateForEndpoint(addr1); EndpointState ep2 = getEndpointStateForEndpoint(addr2); @@ -927,15 +947,15 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return ep1.getHeartBeatState().getGeneration() - ep2.getHeartBeatState().getGeneration(); } - void notifyFailureDetector(Map<InetAddress, EndpointState> remoteEpStateMap) + void notifyFailureDetector(Map<InetAddressAndPort, EndpointState> remoteEpStateMap) { - for (Entry<InetAddress, EndpointState> entry : remoteEpStateMap.entrySet()) + for (Entry<InetAddressAndPort, EndpointState> entry : remoteEpStateMap.entrySet()) { notifyFailureDetector(entry.getKey(), entry.getValue()); } } - void notifyFailureDetector(InetAddress endpoint, EndpointState remoteEndpointState) + void notifyFailureDetector(InetAddressAndPort endpoint, EndpointState remoteEndpointState) { EndpointState localEndpointState = endpointStateMap.get(endpoint); /* @@ -976,7 +996,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } - private void markAlive(final InetAddress addr, final EndpointState localState) + private void markAlive(final InetAddressAndPort addr, final EndpointState localState) { localState.markDead(); @@ -999,7 +1019,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } @VisibleForTesting - public void realMarkAlive(final InetAddress addr, final EndpointState localState) + public void realMarkAlive(final InetAddressAndPort addr, final EndpointState localState) { if (logger.isTraceEnabled()) logger.trace("marking as alive {}", addr); @@ -1017,7 +1037,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } @VisibleForTesting - public void markDead(InetAddress addr, EndpointState localState) + public void markDead(InetAddressAndPort addr, EndpointState localState) { if (logger.isTraceEnabled()) logger.trace("marking as down {}", addr); @@ -1037,7 +1057,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * @param ep endpoint * @param epState EndpointState for the endpoint */ - private void handleMajorStateChange(InetAddress ep, EndpointState epState) + private void handleMajorStateChange(InetAddressAndPort ep, EndpointState epState) { EndpointState localEpState = endpointStateMap.get(ep); if (!isDeadState(epState)) @@ -1071,7 +1091,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean markAsShutdown(ep); } - public boolean isAlive(InetAddress endpoint) + public boolean isAlive(InetAddressAndPort endpoint) { EndpointState epState = getEndpointStateForEndpoint(endpoint); if (epState == null) @@ -1099,21 +1119,33 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean private static String getGossipStatus(EndpointState epState) { - if (epState == null || epState.getApplicationState(ApplicationState.STATUS) == null) + if (epState == null) + { return ""; + } + + VersionedValue versionedValue = epState.getApplicationState(ApplicationState.STATUS_WITH_PORT); + if (versionedValue == null) + { + versionedValue = epState.getApplicationState(ApplicationState.STATUS); + if (versionedValue == null) + { + return ""; + } + } - String value = epState.getApplicationState(ApplicationState.STATUS).value; + String value = versionedValue.value; String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1); assert (pieces.length > 0); return pieces[0]; } - void applyStateLocally(Map<InetAddress, EndpointState> epStateMap) + void applyStateLocally(Map<InetAddressAndPort, EndpointState> epStateMap) { - for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet()) + for (Entry<InetAddressAndPort, EndpointState> entry : epStateMap.entrySet()) { - InetAddress ep = entry.getKey(); - if ( ep.equals(FBUtilities.getBroadcastAddress()) && !isInShadowRound()) + InetAddressAndPort ep = entry.getKey(); + if ( ep.equals(FBUtilities.getBroadcastAddressAndPort()) && !isInShadowRound()) continue; if (justRemovedEndpoints.containsKey(ep)) { @@ -1181,7 +1213,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } } - private void applyNewStates(InetAddress addr, EndpointState localState, EndpointState remoteState) + private void applyNewStates(InetAddressAndPort addr, EndpointState localState, EndpointState remoteState) { // don't assert here, since if the node restarts the version will go back to zero int oldVersion = localState.getHeartBeatState().getHeartBeatVersion(); @@ -1194,12 +1226,27 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration(); localState.addApplicationStates(remoteStates); - for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteStates) + //Filter out pre-4.0 versions of data for more complete 4.0 versions + Set<Entry<ApplicationState, VersionedValue>> filtered = remoteStates.stream().filter(entry -> { + switch (entry.getKey()) + { + case INTERNAL_IP: + return remoteState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT) == null; + case STATUS: + return remoteState.getApplicationState(ApplicationState.STATUS_WITH_PORT) == null; + case RPC_ADDRESS: + return remoteState.getApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT) == null; + default: + return true; + } + }).collect(Collectors.toSet()); + + for (Entry<ApplicationState, VersionedValue> remoteEntry : filtered) doOnChangeNotifications(addr, remoteEntry.getKey(), remoteEntry.getValue()); } // notify that a local application state is going to change (doesn't get triggered for remote changes) - private void doBeforeChangeNotifications(InetAddress addr, EndpointState epState, ApplicationState apState, VersionedValue newValue) + private void doBeforeChangeNotifications(InetAddressAndPort addr, EndpointState epState, ApplicationState apState, VersionedValue newValue) { for (IEndpointStateChangeSubscriber subscriber : subscribers) { @@ -1208,7 +1255,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } // notify that an application state has changed - private void doOnChangeNotifications(InetAddress addr, ApplicationState state, VersionedValue value) + private void doOnChangeNotifications(InetAddressAndPort addr, ApplicationState state, VersionedValue value) { for (IEndpointStateChangeSubscriber subscriber : subscribers) { @@ -1226,7 +1273,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } /* Send all the data with version greater than maxRemoteVersion */ - private void sendAll(GossipDigest gDigest, Map<InetAddress, EndpointState> deltaEpStateMap, int maxRemoteVersion) + private void sendAll(GossipDigest gDigest, Map<InetAddressAndPort, EndpointState> deltaEpStateMap, int maxRemoteVersion) { EndpointState localEpStatePtr = getStateForVersionBiggerThan(gDigest.getEndpoint(), maxRemoteVersion); if (localEpStatePtr != null) @@ -1237,7 +1284,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean This method is used to figure the state that the Gossiper has but Gossipee doesn't. The delta digests and the delta state are built up. */ - void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<InetAddress, EndpointState> deltaEpStateMap) + void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<InetAddressAndPort, EndpointState> deltaEpStateMap) { if (gDigestList.size() == 0) { @@ -1245,7 +1292,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean If this is happening then the node is attempting shadow gossip, and we should reply with everything we know. */ logger.debug("Shadow request received, adding all states"); - for (Map.Entry<InetAddress, EndpointState> entry : endpointStateMap.entrySet()) + for (Map.Entry<InetAddressAndPort, EndpointState> entry : endpointStateMap.entrySet()) { gDigestList.add(new GossipDigest(entry.getKey(), 0, 0)); } @@ -1320,7 +1367,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean buildSeedsList(); /* initialize the heartbeat state for this localEndpoint */ maybeInitializeLocalState(generationNbr); - EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress()); + EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()); localState.addApplicationStates(preloadLocalStates); //notify snitches that Gossiper is about to start @@ -1345,14 +1392,14 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * </ul> * * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared - * again by calling {@link Gossiper#maybeFinishShadowRound(InetAddress, boolean, Map)}. This will update + * again by calling {@link Gossiper#maybeFinishShadowRound(InetAddressAndPort, boolean, Map)}. This will update * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at * the same time. * * @return endpoint states gathered during shadow round or empty map */ - public synchronized Map<InetAddress, EndpointState> doShadowRound() + public synchronized Map<InetAddressAndPort, EndpointState> doShadowRound() { buildSeedsList(); // it may be that the local address is the only entry in the seed @@ -1381,7 +1428,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean { // CASSANDRA-8072, retry at the beginning and every 5 seconds logger.trace("Sending shadow round GOSSIP DIGEST SYN to seeds {}", seeds); - for (InetAddress seed : seeds) + for (InetAddressAndPort seed : seeds) MessagingService.instance().sendOneWay(message, seed); } @@ -1393,8 +1440,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean if (slept > StorageService.RING_DELAY) { // if we don't consider ourself to be a seed, fail out - if (!DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress())) - throw new RuntimeException("Unable to gossip with any seeds"); + if (!DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddressAndPort())) + throw new RuntimeException("Unable to gossip with any seeds " + DatabaseDescriptor.getSeeds() + " and " + FBUtilities.getBroadcastAddressAndPort()); logger.warn("Unable to gossip with any seeds but continuing since node is in its own seed list"); inShadowRound = false; @@ -1413,9 +1460,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean @VisibleForTesting void buildSeedsList() { - for (InetAddress seed : DatabaseDescriptor.getSeeds()) + for (InetAddressAndPort seed : DatabaseDescriptor.getSeeds()) { - if (seed.equals(FBUtilities.getBroadcastAddress())) + if (seed.equals(FBUtilities.getBroadcastAddressAndPort())) continue; seeds.add(seed); } @@ -1427,12 +1474,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean HeartBeatState hbState = new HeartBeatState(generationNbr); EndpointState localState = new EndpointState(hbState); localState.markAlive(); - endpointStateMap.putIfAbsent(FBUtilities.getBroadcastAddress(), localState); + endpointStateMap.putIfAbsent(FBUtilities.getBroadcastAddressAndPort(), localState); } public void forceNewerGeneration() { - EndpointState epstate = endpointStateMap.get(FBUtilities.getBroadcastAddress()); + EndpointState epstate = endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()); epstate.getHeartBeatState().forceNewerGenerationUnsafe(); } @@ -1440,9 +1487,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean /** * Add an endpoint we knew about previously, but whose state is unknown */ - public void addSavedEndpoint(InetAddress ep) + public void addSavedEndpoint(InetAddressAndPort ep) { - if (ep.equals(FBUtilities.getBroadcastAddress())) + if (ep.equals(FBUtilities.getBroadcastAddressAndPort())) { logger.debug("Attempt to add self as saved endpoint"); return; @@ -1470,8 +1517,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean private void addLocalApplicationStateInternal(ApplicationState state, VersionedValue value) { assert taskLock.isHeldByCurrentThread(); - EndpointState epState = endpointStateMap.get(FBUtilities.getBroadcastAddress()); - InetAddress epAddr = FBUtilities.getBroadcastAddress(); + EndpointState epState = endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()); + InetAddressAndPort epAddr = FBUtilities.getBroadcastAddressAndPort(); assert epState != null; // Fire "before change" notifications: doBeforeChangeNotifications(epAddr, epState, state, value); @@ -1508,13 +1555,14 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean public void stop() { - EndpointState mystate = endpointStateMap.get(FBUtilities.getBroadcastAddress()); + EndpointState mystate = endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()); if (mystate != null && !isSilentShutdownState(mystate) && StorageService.instance.isJoined()) { logger.info("Announcing shutdown"); + addLocalApplicationState(ApplicationState.STATUS_WITH_PORT, StorageService.instance.valueFactory.shutdown(true)); addLocalApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.shutdown(true)); MessageOut message = new MessageOut(MessagingService.Verb.GOSSIP_SHUTDOWN); - for (InetAddress ep : liveEndpoints) + for (InetAddressAndPort ep : liveEndpoints) MessagingService.instance().sendOneWay(message, ep); Uninterruptibles.sleepUninterruptibly(Integer.getInteger("cassandra.shutdown_announce_in_ms", 2000), TimeUnit.MILLISECONDS); } @@ -1529,7 +1577,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled()); } - protected void maybeFinishShadowRound(InetAddress respondent, boolean isInShadowRound, Map<InetAddress, EndpointState> epStateMap) + protected void maybeFinishShadowRound(InetAddressAndPort respondent, boolean isInShadowRound, Map<InetAddressAndPort, EndpointState> epStateMap) { if (inShadowRound) { @@ -1565,7 +1613,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } @VisibleForTesting - public void initializeNodeUnsafe(InetAddress addr, UUID uuid, int generationNbr) + public void initializeNodeUnsafe(InetAddressAndPort addr, UUID uuid, int generationNbr) { HeartBeatState hbState = new HeartBeatState(generationNbr); EndpointState newState = new EndpointState(hbState); @@ -1581,7 +1629,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } @VisibleForTesting - public void injectApplicationState(InetAddress endpoint, ApplicationState state, VersionedValue value) + public void injectApplicationState(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) { EndpointState localState = endpointStateMap.get(endpoint); localState.addApplicationState(state, value); @@ -1589,15 +1637,15 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean public long getEndpointDowntime(String address) throws UnknownHostException { - return getEndpointDowntime(InetAddress.getByName(address)); + return getEndpointDowntime(InetAddressAndPort.getByName(address)); } public int getCurrentGenerationNumber(String address) throws UnknownHostException { - return getCurrentGenerationNumber(InetAddress.getByName(address)); + return getCurrentGenerationNumber(InetAddressAndPort.getByName(address)); } - public void addExpireTimeForEndpoint(InetAddress endpoint, long expireTime) + public void addExpireTimeForEndpoint(InetAddressAndPort endpoint, long expireTime) { if (logger.isDebugEnabled()) { @@ -1612,14 +1660,14 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } @Nullable - public CassandraVersion getReleaseVersion(InetAddress ep) + public CassandraVersion getReleaseVersion(InetAddressAndPort ep) { EndpointState state = getEndpointStateForEndpoint(ep); return state != null ? state.getReleaseVersion() : null; } @Nullable - public UUID getSchemaVersion(InetAddress ep) + public UUID getSchemaVersion(InetAddressAndPort ep) { EndpointState state = getEndpointStateForEndpoint(ep); return state != null ? state.getSchemaVersion() : null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java b/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java index 1bfd678..dc81650 100644 --- a/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java +++ b/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java @@ -17,7 +17,7 @@ */ package org.apache.cassandra.gms; -import java.net.InetAddress; +import org.apache.cassandra.locator.InetAddressAndPort; /** * This is called by an instance of the IEndpointStateChangePublisher to notify @@ -36,17 +36,17 @@ public interface IEndpointStateChangeSubscriber * @param endpoint endpoint for which the state change occurred. * @param epState state that actually changed for the above endpoint. */ - public void onJoin(InetAddress endpoint, EndpointState epState); + public void onJoin(InetAddressAndPort endpoint, EndpointState epState); - public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue); + public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue); - public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value); + public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value); - public void onAlive(InetAddress endpoint, EndpointState state); + public void onAlive(InetAddressAndPort endpoint, EndpointState state); - public void onDead(InetAddress endpoint, EndpointState state); + public void onDead(InetAddressAndPort endpoint, EndpointState state); - public void onRemove(InetAddress endpoint); + public void onRemove(InetAddressAndPort endpoint); /** * Called whenever a node is restarted. @@ -54,5 +54,5 @@ public interface IEndpointStateChangeSubscriber * previously marked down. It will have only if {@code state.isAlive() == false} * as {@code state} is from before the restarted node is marked up. */ - public void onRestart(InetAddress endpoint, EndpointState state); + public void onRestart(InetAddressAndPort endpoint, EndpointState state); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java b/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java index 8b274b6..4e0c663 100644 --- a/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java +++ b/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java @@ -17,7 +17,7 @@ */ package org.apache.cassandra.gms; -import java.net.InetAddress; +import org.apache.cassandra.locator.InetAddressAndPort; /** * Implemented by the Gossiper to convict an endpoint @@ -33,5 +33,5 @@ public interface IFailureDetectionEventListener * @param ep endpoint to be convicted * @param phi the value of phi with with ep was convicted */ - public void convict(InetAddress ep, double phi); + public void convict(InetAddressAndPort ep, double phi); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/IFailureDetector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/IFailureDetector.java b/src/java/org/apache/cassandra/gms/IFailureDetector.java index a860c7c..62fc97d 100644 --- a/src/java/org/apache/cassandra/gms/IFailureDetector.java +++ b/src/java/org/apache/cassandra/gms/IFailureDetector.java @@ -17,7 +17,7 @@ */ package org.apache.cassandra.gms; -import java.net.InetAddress; +import org.apache.cassandra.locator.InetAddressAndPort; /** * An interface that provides an application with the ability @@ -35,7 +35,7 @@ public interface IFailureDetector * @param ep endpoint in question. * @return true if UP and false if DOWN. */ - public boolean isAlive(InetAddress ep); + public boolean isAlive(InetAddressAndPort ep); /** * This method is invoked by any entity wanting to interrogate the status of an endpoint. @@ -44,7 +44,7 @@ public interface IFailureDetector * * param ep endpoint for which we interpret the inter arrival times. */ - public void interpret(InetAddress ep); + public void interpret(InetAddressAndPort ep); /** * This method is invoked by the receiver of the heartbeat. In our case it would be @@ -53,17 +53,17 @@ public interface IFailureDetector * * param ep endpoint being reported. */ - public void report(InetAddress ep); + public void report(InetAddressAndPort ep); /** * remove endpoint from failure detector */ - public void remove(InetAddress ep); + public void remove(InetAddressAndPort ep); /** * force conviction of endpoint in the failure detector */ - public void forceConviction(InetAddress ep); + public void forceConviction(InetAddressAndPort ep); /** * Register interest for Failure Detector events. http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/VersionedValue.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java index d9c8d0b..691f544 100644 --- a/src/java/org/apache/cassandra/gms/VersionedValue.java +++ b/src/java/org/apache/cassandra/gms/VersionedValue.java @@ -32,6 +32,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.FBUtilities; import org.apache.commons.lang3.StringUtils; @@ -133,11 +134,17 @@ public class VersionedValue implements Comparable<VersionedValue> return new VersionedValue(value.value); } + @Deprecated public VersionedValue bootReplacing(InetAddress oldNode) { return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE, oldNode.getHostAddress())); } + public VersionedValue bootReplacingWithPort(InetAddressAndPort oldNode) + { + return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE, oldNode.toString())); + } + public VersionedValue bootstrapping(Collection<Token> tokens) { return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING, @@ -248,6 +255,11 @@ public class VersionedValue implements Comparable<VersionedValue> return new VersionedValue(endpoint.getHostAddress()); } + public VersionedValue nativeaddressAndPort(InetAddressAndPort address) + { + return new VersionedValue(address.toString()); + } + public VersionedValue releaseVersion() { return new VersionedValue(FBUtilities.getReleaseVersionString()); @@ -263,6 +275,11 @@ public class VersionedValue implements Comparable<VersionedValue> return new VersionedValue(private_ip); } + public VersionedValue internalAddressAndPort(InetAddressAndPort address) + { + return new VersionedValue(address.toString()); + } + public VersionedValue severity(double value) { return new VersionedValue(String.valueOf(value)); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
