http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/ForwardToSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/ForwardToSerializer.java b/src/java/org/apache/cassandra/net/ForwardToSerializer.java new file mode 100644 index 0000000..c4e8843 --- /dev/null +++ b/src/java/org/apache/cassandra/net/ForwardToSerializer.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; + +public class ForwardToSerializer implements IVersionedSerializer<ForwardToContainer> +{ + public static ForwardToSerializer instance = new ForwardToSerializer(); + + private ForwardToSerializer() {} + + public void serialize(ForwardToContainer forwardToContainer, DataOutputPlus out, int version) throws IOException + { + out.writeInt(forwardToContainer.targets.size()); + Iterator<InetAddressAndPort> iter = forwardToContainer.targets.iterator(); + for (int ii = 0; ii < forwardToContainer.messageIds.length; ii++) + { + CompactEndpointSerializationHelper.instance.serialize(iter.next(), out, version); + out.writeInt(forwardToContainer.messageIds[ii]); + } + } + + public ForwardToContainer deserialize(DataInputPlus in, int version) throws IOException + { + int[] ids = new int[in.readInt()]; + List<InetAddressAndPort> hosts = new ArrayList<>(ids.length); + for (int ii = 0; ii < ids.length; ii++) + { + hosts.add(CompactEndpointSerializationHelper.instance.deserialize(in, version)); + ids[ii] = in.readInt(); + } + return new ForwardToContainer(hosts, ids); + } + + public long serializedSize(ForwardToContainer forwardToContainer, int version) + { + //Number of forward addresses, 4 bytes per for each id + long size = 4 + + (4 * forwardToContainer.targets.size()); + //Depending on ipv6 or ipv4 the address size is different. + for (InetAddressAndPort forwardTo : forwardToContainer.targets) + { + size += CompactEndpointSerializationHelper.instance.serializedSize(forwardTo, version); + } + + return size; + } + + public static ForwardToContainer fromBytes(byte[] bytes, int version) + { + try (DataInputBuffer input = new DataInputBuffer(bytes)) + { + return instance.deserialize(input, version); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/IAsyncCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IAsyncCallback.java b/src/java/org/apache/cassandra/net/IAsyncCallback.java index 7835079..251d263 100644 --- a/src/java/org/apache/cassandra/net/IAsyncCallback.java +++ b/src/java/org/apache/cassandra/net/IAsyncCallback.java @@ -17,11 +17,10 @@ */ package org.apache.cassandra.net; -import java.net.InetAddress; - import com.google.common.base.Predicate; import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.locator.InetAddressAndPort; /** * implementors of IAsyncCallback need to make sure that any public methods @@ -31,9 +30,9 @@ import org.apache.cassandra.gms.FailureDetector; */ public interface IAsyncCallback<T> { - Predicate<InetAddress> isAlive = new Predicate<InetAddress>() + Predicate<InetAddressAndPort> isAlive = new Predicate<InetAddressAndPort>() { - public boolean apply(InetAddress endpoint) + public boolean apply(InetAddressAndPort endpoint) { return FailureDetector.instance.isAlive(endpoint); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java index 1cd27b6..2b91f20 100644 --- a/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java +++ b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java @@ -17,9 +17,8 @@ */ package org.apache.cassandra.net; -import java.net.InetAddress; - import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.locator.InetAddressAndPort; public interface IAsyncCallbackWithFailure<T> extends IAsyncCallback<T> { @@ -27,5 +26,5 @@ public interface IAsyncCallbackWithFailure<T> extends IAsyncCallback<T> /** * Called when there is an exception on the remote node or timeout happens */ - void onFailure(InetAddress from, RequestFailureReason failureReason); + void onFailure(InetAddressAndPort from, RequestFailureReason failureReason); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/IMessageSink.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IMessageSink.java b/src/java/org/apache/cassandra/net/IMessageSink.java index 5150901..090d2c2 100644 --- a/src/java/org/apache/cassandra/net/IMessageSink.java +++ b/src/java/org/apache/cassandra/net/IMessageSink.java @@ -17,7 +17,7 @@ */ package org.apache.cassandra.net; -import java.net.InetAddress; +import org.apache.cassandra.locator.InetAddressAndPort; public interface IMessageSink { @@ -26,7 +26,7 @@ public interface IMessageSink * * @return true if the message is allowed, false if it should be dropped */ - boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to); + boolean allowOutgoingMessage(MessageOut message, int id, InetAddressAndPort to); /** * Allow or drop an incoming message http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/MessageDeliveryTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java index c7fc991..6e132a8 100644 --- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java +++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java @@ -20,6 +20,7 @@ package org.apache.cassandra.net; import java.io.IOException; import java.util.EnumSet; +import com.google.common.primitives.Shorts; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,6 +29,7 @@ import org.apache.cassandra.db.monitoring.ApproximateTime; import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.index.IndexNotAvailableException; +import org.apache.cassandra.io.DummyByteVersionedSerializer; import org.apache.cassandra.io.util.DataOutputBuffer; public class MessageDeliveryTask implements Runnable @@ -96,19 +98,11 @@ public class MessageDeliveryTask implements Runnable if (message.doCallbackOnFailure()) { MessageOut response = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE) - .withParameter(MessagingService.FAILURE_RESPONSE_PARAM, MessagingService.ONE_BYTE); + .withParameter(ParameterType.FAILURE_RESPONSE, MessagingService.ONE_BYTE); if (t instanceof TombstoneOverwhelmingException) { - try (DataOutputBuffer out = new DataOutputBuffer()) - { - out.writeShort(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code); - response = response.withParameter(MessagingService.FAILURE_REASON_PARAM, out.getData()); - } - catch (IOException ex) - { - throw new RuntimeException(ex); - } + response = response.withParameter(ParameterType.FAILURE_REASON, Shorts.checkedCast(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code)); } MessagingService.instance().sendReply(response, id, message.from); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/MessageIn.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java index d520fa9..a426ef0 100644 --- a/src/java/org/apache/cassandra/net/MessageIn.java +++ b/src/java/org/apache/cassandra/net/MessageIn.java @@ -18,7 +18,6 @@ package org.apache.cassandra.net; import java.io.IOException; -import java.net.InetAddress; import java.util.Collections; import java.util.Map; @@ -31,8 +30,8 @@ import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService.Verb; - /** * The receiving node's view of a {@link MessageOut}. See documentation on {@link MessageOut} for details on the * serialization format. @@ -41,16 +40,16 @@ import org.apache.cassandra.net.MessagingService.Verb; */ public class MessageIn<T> { - public final InetAddress from; + public final InetAddressAndPort from; public final T payload; - public final Map<String, byte[]> parameters; - public final Verb verb; + public final Map<ParameterType, Object> parameters; + public final MessagingService.Verb verb; public final int version; public final long constructionTime; - private MessageIn(InetAddress from, + private MessageIn(InetAddressAndPort from, T payload, - Map<String, byte[]> parameters, + Map<ParameterType, Object> parameters, Verb verb, int version, long constructionTime) @@ -63,9 +62,9 @@ public class MessageIn<T> this.constructionTime = constructionTime; } - public static <T> MessageIn<T> create(InetAddress from, + public static <T> MessageIn<T> create(InetAddressAndPort from, T payload, - Map<String, byte[]> parameters, + Map<ParameterType, Object> parameters, Verb verb, int version, long constructionTime) @@ -73,9 +72,9 @@ public class MessageIn<T> return new MessageIn<>(from, payload, parameters, verb, version, constructionTime); } - public static <T> MessageIn<T> create(InetAddress from, + public static <T> MessageIn<T> create(InetAddressAndPort from, T payload, - Map<String, byte[]> parameters, + Map<ParameterType, Object> parameters, MessagingService.Verb verb, int version) { @@ -89,37 +88,46 @@ public class MessageIn<T> public static <T2> MessageIn<T2> read(DataInputPlus in, int version, int id, long constructionTime) throws IOException { - InetAddress from = CompactEndpointSerializationHelper.deserialize(in); + InetAddressAndPort from = CompactEndpointSerializationHelper.instance.deserialize(in, version); MessagingService.Verb verb = MessagingService.Verb.fromId(in.readInt()); - Map<String, byte[]> parameters = readParameters(in); + Map<ParameterType, Object> parameters = readParameters(in, version); int payloadSize = in.readInt(); return read(in, version, id, constructionTime, from, payloadSize, verb, parameters); } - public static Map<String, byte[]> readParameters(DataInputPlus in) throws IOException + public static Map<ParameterType, Object> readParameters(DataInputPlus in, int version) throws IOException { int parameterCount = in.readInt(); + Map<ParameterType, Object> parameters; if (parameterCount == 0) { return Collections.emptyMap(); } else { - ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder(); + ImmutableMap.Builder<ParameterType, Object> builder = ImmutableMap.builder(); for (int i = 0; i < parameterCount; i++) { String key = in.readUTF(); - byte[] value = new byte[in.readInt()]; - in.readFully(value); - builder.put(key, value); + ParameterType type = ParameterType.byName.get(key); + if (type != null) + { + byte[] value = new byte[in.readInt()]; + in.readFully(value); + builder.put(type, type.serializer.deserialize(new DataInputBuffer(value), version)); + } + else + { + in.skipBytes(in.readInt()); + } } return builder.build(); } } public static <T2> MessageIn<T2> read(DataInputPlus in, int version, int id, long constructionTime, - InetAddress from, int payloadSize, Verb verb, Map<String, byte[]> parameters) throws IOException + InetAddressAndPort from, int payloadSize, Verb verb, Map<ParameterType, Object> parameters) throws IOException { IVersionedSerializer<T2> serializer = (IVersionedSerializer<T2>) MessagingService.verbSerializers.get(verb); if (serializer instanceof MessagingService.CallbackDeterminedSerializer) @@ -140,7 +148,7 @@ public class MessageIn<T> return MessageIn.create(from, payload, parameters, verb, version, constructionTime); } - public static long deriveConstructionTime(InetAddress from, int messageTimestamp, long currentTime) + public static long deriveConstructionTime(InetAddressAndPort from, int messageTimestamp, long currentTime) { // Reconstruct the message construction time sent by the remote host (we sent only the lower 4 bytes, assuming the // higher 4 bytes wouldn't change between the sender and receiver) @@ -182,36 +190,18 @@ public class MessageIn<T> public boolean doCallbackOnFailure() { - return parameters.containsKey(MessagingService.FAILURE_CALLBACK_PARAM); + return parameters.containsKey(ParameterType.FAILURE_CALLBACK); } public boolean isFailureResponse() { - return parameters.containsKey(MessagingService.FAILURE_RESPONSE_PARAM); - } - - public boolean containsFailureReason() - { - return parameters.containsKey(MessagingService.FAILURE_REASON_PARAM); + return parameters.containsKey(ParameterType.FAILURE_RESPONSE); } public RequestFailureReason getFailureReason() { - if (containsFailureReason()) - { - try (DataInputBuffer in = new DataInputBuffer(parameters.get(MessagingService.FAILURE_REASON_PARAM))) - { - return RequestFailureReason.fromCode(in.readUnsignedShort()); - } - catch (IOException ex) - { - throw new RuntimeException(ex); - } - } - else - { - return RequestFailureReason.UNKNOWN; - } + Short code = (Short)parameters.get(ParameterType.FAILURE_REASON); + return code != null ? RequestFailureReason.fromCode(code) : RequestFailureReason.UNKNOWN; } public long getTimeout() http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/MessageOut.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java index 379aff5..7d3c0af 100644 --- a/src/java/org/apache/cassandra/net/MessageOut.java +++ b/src/java/org/apache/cassandra/net/MessageOut.java @@ -19,17 +19,18 @@ package org.apache.cassandra.net; import java.io.IOException; -import java.net.InetAddress; -import java.util.Collections; -import java.util.Map; +import java.util.ArrayList; +import java.util.List; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableList; +import com.google.common.primitives.Ints; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; @@ -80,12 +81,20 @@ import static org.apache.cassandra.tracing.Tracing.isTracing; public class MessageOut<T> { private static final int SERIALIZED_SIZE_VERSION_UNDEFINED = -1; + //Parameters are stored in an object array as tuples of size two + public static final int PARAMETER_TUPLE_SIZE = 2; + //Offset in a parameter tuple containing the type of the parameter + public static final int PARAMETER_TUPLE_TYPE_OFFSET = 0; + //Offset in a parameter tuple containing the actual parameter represented as a POJO + public static final int PARAMETER_TUPLE_PARAMETER_OFFSET = 1; - public final InetAddress from; + public final InetAddressAndPort from; public final MessagingService.Verb verb; public final T payload; public final IVersionedSerializer<T> serializer; - public final Map<String, byte[]> parameters; + //A list of tuples, first object is the ParameterType enum value, + //the second object is the POJO to serialize + public final List<Object> parameters; /** * Memoization of the serialized size of the just the payload. @@ -115,16 +124,16 @@ public class MessageOut<T> serializer, isTracing() ? Tracing.instance.getTraceHeaders() - : Collections.<String, byte[]>emptyMap()); + : ImmutableList.of()); } - private MessageOut(MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, Map<String, byte[]> parameters) + private MessageOut(MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, List<Object> parameters) { - this(FBUtilities.getBroadcastAddress(), verb, payload, serializer, parameters); + this(FBUtilities.getBroadcastAddressAndPort(), verb, payload, serializer, parameters); } @VisibleForTesting - public MessageOut(InetAddress from, MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, Map<String, byte[]> parameters) + public MessageOut(InetAddressAndPort from, MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, List<Object> parameters) { this.from = from; this.verb = verb; @@ -133,11 +142,13 @@ public class MessageOut<T> this.parameters = parameters; } - public MessageOut<T> withParameter(String key, byte[] value) + public <VT> MessageOut<T> withParameter(ParameterType type, VT value) { - ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder(); - builder.putAll(parameters).put(key, value); - return new MessageOut<T>(verb, payload, serializer, builder.build()); + List<Object> newParameters = new ArrayList<>(parameters.size() + 2); + newParameters.addAll(parameters); + newParameters.add(type); + newParameters.add(value); + return new MessageOut<T>(verb, payload, serializer, newParameters); } public Stage getStage() @@ -159,15 +170,19 @@ public class MessageOut<T> public void serialize(DataOutputPlus out, int version) throws IOException { - CompactEndpointSerializationHelper.serialize(from, out); + CompactEndpointSerializationHelper.instance.serialize(from, out, version); out.writeInt(verb.getId()); - out.writeInt(parameters.size()); - for (Map.Entry<String, byte[]> entry : parameters.entrySet()) + assert parameters.size() % PARAMETER_TUPLE_SIZE == 0; + out.writeInt(parameters.size() / PARAMETER_TUPLE_SIZE); + for (int ii = 0; ii < parameters.size(); ii += PARAMETER_TUPLE_SIZE) { - out.writeUTF(entry.getKey()); - out.writeInt(entry.getValue().length); - out.write(entry.getValue()); + ParameterType type = (ParameterType)parameters.get(ii + PARAMETER_TUPLE_TYPE_OFFSET); + out.writeUTF(type.key); + IVersionedSerializer serializer = type.serializer; + Object parameter = parameters.get(ii + PARAMETER_TUPLE_PARAMETER_OFFSET); + out.writeInt(Ints.checkedCast(serializer.serializedSize(parameter, version))); + serializer.serialize(parameter, out, version); } if (payload != null) @@ -187,15 +202,19 @@ public class MessageOut<T> private Pair<Long, Long> calculateSerializedSize(int version) { - long size = CompactEndpointSerializationHelper.serializedSize(from); + long size = 0; + size += CompactEndpointSerializationHelper.instance.serializedSize(from, version); size += TypeSizes.sizeof(verb.getId()); size += TypeSizes.sizeof(parameters.size()); - for (Map.Entry<String, byte[]> entry : parameters.entrySet()) + for (int ii = 0; ii < parameters.size(); ii += PARAMETER_TUPLE_SIZE) { - size += TypeSizes.sizeof(entry.getKey()); - size += TypeSizes.sizeof(entry.getValue().length); - size += entry.getValue().length; + ParameterType type = (ParameterType)parameters.get(ii + PARAMETER_TUPLE_TYPE_OFFSET); + size += TypeSizes.sizeof(type.key()); + size += 4;//length prefix + IVersionedSerializer serializer = type.serializer; + Object parameter = parameters.get(ii + PARAMETER_TUPLE_PARAMETER_OFFSET); + size += serializer.serializedSize(parameter, version); } long payloadSize = payload == null ? 0 : serializer.serializedSize(payload, version); @@ -237,4 +256,16 @@ public class MessageOut<T> return sizes.left.intValue(); } + + public Object getParameter(ParameterType type) + { + for (int ii = 0; ii < parameters.size(); ii += PARAMETER_TUPLE_SIZE) + { + if (((ParameterType)parameters.get(ii + PARAMETER_TUPLE_TYPE_OFFSET)).equals(type)) + { + return parameters.get(ii + PARAMETER_TUPLE_PARAMETER_OFFSET); + } + } + return null; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 4e6fe1c..9f00d27 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -20,8 +20,6 @@ package org.apache.cassandra.net; import java.io.IOError; import java.io.IOException; import java.lang.management.ManagementFactory; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; @@ -37,6 +35,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; + import javax.management.MBeanServer; import javax.management.ObjectName; @@ -91,6 +90,7 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.ILatencySubscriber; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.metrics.CassandraMetricsRegistry; import org.apache.cassandra.metrics.ConnectionMetrics; import org.apache.cassandra.metrics.DroppedMessageMetrics; @@ -127,10 +127,7 @@ public final class MessagingService implements MessagingServiceMBean public static final int VERSION_40 = 12; public static final int current_version = VERSION_40; - public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC"; public static final byte[] ONE_BYTE = new byte[1]; - public static final String FAILURE_RESPONSE_PARAM = "FAIL"; - public static final String FAILURE_REASON_PARAM = "FAIL_REASON"; /** * we preface every message with this number so the recipient can validate the sender is sane @@ -447,7 +444,7 @@ public final class MessagingService implements MessagingServiceMBean private final Map<Verb, IVerbHandler> verbHandlers; @VisibleForTesting - public final ConcurrentMap<InetAddress, OutboundMessagingPool> channelManagers = new NonBlockingHashMap<>(); + public final ConcurrentMap<InetAddressAndPort, OutboundMessagingPool> channelManagers = new NonBlockingHashMap<>(); final List<ServerChannel> serverChannels = Lists.newArrayList(); private static final Logger logger = LoggerFactory.getLogger(MessagingService.class); @@ -506,7 +503,7 @@ public final class MessagingService implements MessagingServiceMBean private final List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>(); // protocol versions of the other nodes in the cluster - private final ConcurrentMap<InetAddress, Integer> versions = new NonBlockingHashMap<InetAddress, Integer>(); + private final ConcurrentMap<InetAddressAndPort, Integer> versions = new NonBlockingHashMap<>(); // message sinks are a testing hook private final Set<IMessageSink> messageSinks = new CopyOnWriteArraySet<>(); @@ -629,7 +626,7 @@ public final class MessagingService implements MessagingServiceMBean * @param callback The message callback. * @param message The actual message. */ - public void updateBackPressureOnSend(InetAddress host, IAsyncCallback callback, MessageOut<?> message) + public void updateBackPressureOnSend(InetAddressAndPort host, IAsyncCallback callback, MessageOut<?> message) { if (DatabaseDescriptor.backPressureEnabled() && callback.supportsBackPressure()) { @@ -646,7 +643,7 @@ public final class MessagingService implements MessagingServiceMBean * @param callback The message callback. * @param timeout True if updated following a timeout, false otherwise. */ - public void updateBackPressureOnReceive(InetAddress host, IAsyncCallback callback, boolean timeout) + public void updateBackPressureOnReceive(InetAddressAndPort host, IAsyncCallback callback, boolean timeout) { if (DatabaseDescriptor.backPressureEnabled() && callback.supportsBackPressure()) { @@ -669,14 +666,14 @@ public final class MessagingService implements MessagingServiceMBean * @param hosts The hosts to apply back-pressure to. * @param timeoutInNanos The max back-pressure timeout. */ - public void applyBackPressure(Iterable<InetAddress> hosts, long timeoutInNanos) + public void applyBackPressure(Iterable<InetAddressAndPort> hosts, long timeoutInNanos) { if (DatabaseDescriptor.backPressureEnabled()) { Set<BackPressureState> states = new HashSet<BackPressureState>(); - for (InetAddress host : hosts) + for (InetAddressAndPort host : hosts) { - if (host.equals(FBUtilities.getBroadcastAddress())) + if (host.equals(FBUtilities.getBroadcastAddressAndPort())) continue; OutboundMessagingPool pool = getMessagingConnection(host); if (pool != null) @@ -686,13 +683,13 @@ public final class MessagingService implements MessagingServiceMBean } } - BackPressureState getBackPressureState(InetAddress host) + BackPressureState getBackPressureState(InetAddressAndPort host) { OutboundMessagingPool messagingConnection = getMessagingConnection(host); return messagingConnection != null ? messagingConnection.getBackPressureState() : null; } - void markTimeout(InetAddress addr) + void markTimeout(InetAddressAndPort addr) { OutboundMessagingPool conn = channelManagers.get(addr); if (conn != null) @@ -706,13 +703,13 @@ public final class MessagingService implements MessagingServiceMBean * @param address the host that replied to the message * @param latency */ - public void maybeAddLatency(IAsyncCallback cb, InetAddress address, long latency) + public void maybeAddLatency(IAsyncCallback cb, InetAddressAndPort address, long latency) { if (cb.isLatencyForSnitch()) addLatency(address, latency); } - public void addLatency(InetAddress address, long latency) + public void addLatency(InetAddressAndPort address, long latency) { for (ILatencySubscriber subscriber : subscribers) subscriber.receiveTiming(address, latency); @@ -721,7 +718,7 @@ public final class MessagingService implements MessagingServiceMBean /** * called from gossiper when it notices a node is not responding. */ - public void convict(InetAddress ep) + public void convict(InetAddressAndPort ep) { logger.trace("Resetting pool for {}", ep); reset(ep); @@ -735,24 +732,24 @@ public final class MessagingService implements MessagingServiceMBean public void listen(ServerEncryptionOptions serverEncryptionOptions) { callbacks.reset(); // hack to allow tests to stop/restart MS - listen(FBUtilities.getLocalAddress(), serverEncryptionOptions); + listen(FBUtilities.getLocalAddressAndPort(), serverEncryptionOptions); if (shouldListenOnBroadcastAddress()) - listen(FBUtilities.getBroadcastAddress(), serverEncryptionOptions); + listen(FBUtilities.getBroadcastAddressAndPort(), serverEncryptionOptions); listenGate.signalAll(); } public static boolean shouldListenOnBroadcastAddress() { return DatabaseDescriptor.shouldListenOnBroadcastAddress() - && !FBUtilities.getLocalAddress().equals(FBUtilities.getBroadcastAddress()); + && !FBUtilities.getLocalAddressAndPort().equals(FBUtilities.getBroadcastAddressAndPort()); } /** * Listen on the specified port. * - * @param localEp InetAddress whose port to listen on. + * @param localEp InetAddressAndPort whose port to listen on. */ - private void listen(InetAddress localEp, ServerEncryptionOptions serverEncryptionOptions) throws ConfigurationException + private void listen(InetAddressAndPort localEp, ServerEncryptionOptions serverEncryptionOptions) throws ConfigurationException { IInternodeAuthenticator authenticator = DatabaseDescriptor.getInternodeAuthenticator(); int receiveBufferSize = DatabaseDescriptor.getInternodeRecvBufferSize(); @@ -766,7 +763,7 @@ public final class MessagingService implements MessagingServiceMBean ServerEncryptionOptions legacyEncOptions = new ServerEncryptionOptions(serverEncryptionOptions); legacyEncOptions.optional = false; - InetSocketAddress localAddr = new InetSocketAddress(localEp, DatabaseDescriptor.getSSLStoragePort()); + InetAddressAndPort localAddr = InetAddressAndPort.getByAddressOverrideDefaults(localEp.address, DatabaseDescriptor.getSSLStoragePort()); ChannelGroup channelGroup = new DefaultChannelGroup("LegacyEncryptedInternodeMessagingGroup", NettyFactory.executorForChannelGroups()); InboundInitializer initializer = new InboundInitializer(authenticator, legacyEncOptions, channelGroup); Channel encryptedChannel = NettyFactory.instance.createInboundChannel(localAddr, initializer, receiveBufferSize); @@ -774,7 +771,8 @@ public final class MessagingService implements MessagingServiceMBean } // this is for the socket that can be plain, only ssl, or optional plain/ssl - InetSocketAddress localAddr = new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()); + assert localEp.port == DatabaseDescriptor.getStoragePort() : String.format("Local endpoint port %d doesn't match YAML configured port %d%n", localEp.port, DatabaseDescriptor.getStoragePort()); + InetAddressAndPort localAddr = InetAddressAndPort.getByAddressOverrideDefaults(localEp.address, DatabaseDescriptor.getStoragePort()); ChannelGroup channelGroup = new DefaultChannelGroup("InternodeMessagingGroup", NettyFactory.executorForChannelGroups()); InboundInitializer initializer = new InboundInitializer(authenticator, serverEncryptionOptions, channelGroup); Channel channel = NettyFactory.instance.createInboundChannel(localAddr, initializer, receiveBufferSize); @@ -809,10 +807,10 @@ public final class MessagingService implements MessagingServiceMBean * the inbound connections/channels can be closed when the listening socket itself is being closed. */ private final ChannelGroup connectedChannels; - private final InetSocketAddress address; + private final InetAddressAndPort address; private final SecurityLevel securityLevel; - private ServerChannel(Channel channel, ChannelGroup channelGroup, InetSocketAddress address, SecurityLevel securityLevel) + private ServerChannel(Channel channel, ChannelGroup channelGroup, InetAddressAndPort address, SecurityLevel securityLevel) { this.channel = channel; this.connectedChannels = channelGroup; @@ -840,7 +838,7 @@ public final class MessagingService implements MessagingServiceMBean return channel; } - InetSocketAddress getAddress() + InetAddressAndPort getAddress() { return address; } @@ -869,7 +867,7 @@ public final class MessagingService implements MessagingServiceMBean } - public void destroyConnectionPool(InetAddress to) + public void destroyConnectionPool(InetAddressAndPort to) { OutboundMessagingPool pool = channelManagers.remove(to); if (pool != null) @@ -884,26 +882,26 @@ public final class MessagingService implements MessagingServiceMBean * @param address IP Address to identify the peer * @param preferredAddress IP Address to use (and prefer) going forward for connecting to the peer */ - public void reconnectWithNewIp(InetAddress address, InetAddress preferredAddress) + public void reconnectWithNewIp(InetAddressAndPort address, InetAddressAndPort preferredAddress) { SystemKeyspace.updatePreferredIP(address, preferredAddress); OutboundMessagingPool messagingPool = channelManagers.get(address); if (messagingPool != null) - messagingPool.reconnectWithNewIp(new InetSocketAddress(preferredAddress, portFor(address))); + messagingPool.reconnectWithNewIp(InetAddressAndPort.getByAddressOverrideDefaults(preferredAddress.address, portFor(address))); } - private void reset(InetAddress address) + private void reset(InetAddressAndPort address) { OutboundMessagingPool messagingPool = channelManagers.remove(address); if (messagingPool != null) messagingPool.close(false); } - public InetAddress getCurrentEndpoint(InetAddress publicAddress) + public InetAddressAndPort getCurrentEndpoint(InetAddressAndPort publicAddress) { OutboundMessagingPool messagingPool = getMessagingConnection(publicAddress); - return messagingPool != null ? messagingPool.getPreferredRemoteAddr().getAddress() : null; + return messagingPool != null ? messagingPool.getPreferredRemoteAddr() : null; } /** @@ -931,7 +929,7 @@ public final class MessagingService implements MessagingServiceMBean return verbHandlers.get(type); } - public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout, boolean failureCallback) + public int addCallback(IAsyncCallback cb, MessageOut message, InetAddressAndPort to, long timeout, boolean failureCallback) { assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel int messageId = nextId(); @@ -942,7 +940,7 @@ public final class MessagingService implements MessagingServiceMBean public int addCallback(IAsyncCallback cb, MessageOut<?> message, - InetAddress to, + InetAddressAndPort to, long timeout, ConsistencyLevel consistencyLevel, boolean allowHints) @@ -971,12 +969,12 @@ public final class MessagingService implements MessagingServiceMBean return idGen.incrementAndGet(); } - public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb) + public int sendRR(MessageOut message, InetAddressAndPort to, IAsyncCallback cb) { return sendRR(message, to, cb, message.getTimeout(), false); } - public int sendRRWithFailure(MessageOut message, InetAddress to, IAsyncCallbackWithFailure cb) + public int sendRRWithFailure(MessageOut message, InetAddressAndPort to, IAsyncCallbackWithFailure cb) { return sendRR(message, to, cb, message.getTimeout(), true); } @@ -992,11 +990,11 @@ public final class MessagingService implements MessagingServiceMBean * @param timeout the timeout used for expiration * @return an reference to message id used to match with the result */ - public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout, boolean failureCallback) + public int sendRR(MessageOut message, InetAddressAndPort to, IAsyncCallback cb, long timeout, boolean failureCallback) { int id = addCallback(cb, message, to, timeout, failureCallback); updateBackPressureOnSend(to, cb, message); - sendOneWay(failureCallback ? message.withParameter(FAILURE_CALLBACK_PARAM, ONE_BYTE) : message, id, to); + sendOneWay(failureCallback ? message.withParameter(ParameterType.FAILURE_CALLBACK, ONE_BYTE) : message, id, to); return id; } @@ -1013,22 +1011,22 @@ public final class MessagingService implements MessagingServiceMBean * @return an reference to message id used to match with the result */ public int sendRR(MessageOut<?> message, - InetAddress to, + InetAddressAndPort to, AbstractWriteResponseHandler<?> handler, boolean allowHints) { int id = addCallback(handler, message, to, message.getTimeout(), handler.consistencyLevel, allowHints); updateBackPressureOnSend(to, handler, message); - sendOneWay(message.withParameter(FAILURE_CALLBACK_PARAM, ONE_BYTE), id, to); + sendOneWay(message.withParameter(ParameterType.FAILURE_CALLBACK, ONE_BYTE), id, to); return id; } - public void sendOneWay(MessageOut message, InetAddress to) + public void sendOneWay(MessageOut message, InetAddressAndPort to) { sendOneWay(message, nextId(), to); } - public void sendReply(MessageOut message, int id, InetAddress to) + public void sendReply(MessageOut message, int id, InetAddressAndPort to) { sendOneWay(message, id, to); } @@ -1040,12 +1038,12 @@ public final class MessagingService implements MessagingServiceMBean * @param message messages to be sent. * @param to endpoint to which the message needs to be sent */ - public void sendOneWay(MessageOut message, int id, InetAddress to) + public void sendOneWay(MessageOut message, int id, InetAddressAndPort to) { if (logger.isTraceEnabled()) - logger.trace("{} sending {} to {}@{}", FBUtilities.getBroadcastAddress(), message.verb, id, to); + logger.trace("{} sending {} to {}@{}", FBUtilities.getBroadcastAddressAndPort(), message.verb, id, to); - if (to.equals(FBUtilities.getBroadcastAddress())) + if (to.equals(FBUtilities.getBroadcastAddressAndPort())) logger.trace("Message-to-self {} going over MessagingService", message); // message sinks are a testing hook @@ -1058,7 +1056,7 @@ public final class MessagingService implements MessagingServiceMBean outboundMessagingPool.sendMessage(message, id); } - public <T> AsyncOneResponse<T> sendRR(MessageOut message, InetAddress to) + public <T> AsyncOneResponse<T> sendRR(MessageOut message, InetAddressAndPort to) { AsyncOneResponse<T> iar = new AsyncOneResponse<T>(); sendRR(message, to, iar); @@ -1176,7 +1174,7 @@ public final class MessagingService implements MessagingServiceMBean /** * @return the last version associated with address, or @param version if this is the first such version */ - public int setVersion(InetAddress endpoint, int version) + public int setVersion(InetAddressAndPort endpoint, int version) { logger.trace("Setting version {} for {}", version, endpoint); @@ -1184,7 +1182,7 @@ public final class MessagingService implements MessagingServiceMBean return v == null ? version : v; } - public void resetVersion(InetAddress endpoint) + public void resetVersion(InetAddressAndPort endpoint) { logger.trace("Resetting version for {}", endpoint); versions.remove(endpoint); @@ -1194,7 +1192,7 @@ public final class MessagingService implements MessagingServiceMBean * Returns the messaging-version as announced by the given node but capped * to the min of the version as announced by the node and {@link #current_version}. */ - public int getVersion(InetAddress endpoint) + public int getVersion(InetAddressAndPort endpoint) { Integer v = versions.get(endpoint); if (v == null) @@ -1209,13 +1207,13 @@ public final class MessagingService implements MessagingServiceMBean public int getVersion(String endpoint) throws UnknownHostException { - return getVersion(InetAddress.getByName(endpoint)); + return getVersion(InetAddressAndPort.getByName(endpoint)); } /** * Returns the messaging-version exactly as announced by the given endpoint. */ - public int getRawVersion(InetAddress endpoint) + public int getRawVersion(InetAddressAndPort endpoint) { Integer v = versions.get(endpoint); if (v == null) @@ -1223,7 +1221,7 @@ public final class MessagingService implements MessagingServiceMBean return v; } - public boolean knowsVersion(InetAddress endpoint) + public boolean knowsVersion(InetAddressAndPort endpoint) { return versions.containsKey(endpoint); } @@ -1358,72 +1356,144 @@ public final class MessagingService implements MessagingServiceMBean public Map<String, Integer> getLargeMessagePendingTasks() { Map<String, Integer> pendingTasks = new HashMap<String, Integer>(channelManagers.size()); - for (Map.Entry<InetAddress, OutboundMessagingPool> entry : channelManagers.entrySet()) - pendingTasks.put(entry.getKey().getHostAddress(), entry.getValue().largeMessageChannel.getPendingMessages()); + for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : channelManagers.entrySet()) + pendingTasks.put(entry.getKey().toString(false), entry.getValue().largeMessageChannel.getPendingMessages()); return pendingTasks; } public Map<String, Long> getLargeMessageCompletedTasks() { Map<String, Long> completedTasks = new HashMap<String, Long>(channelManagers.size()); - for (Map.Entry<InetAddress, OutboundMessagingPool> entry : channelManagers.entrySet()) - completedTasks.put(entry.getKey().getHostAddress(), entry.getValue().largeMessageChannel.getCompletedMessages()); + for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : channelManagers.entrySet()) + completedTasks.put(entry.getKey().toString(false), entry.getValue().largeMessageChannel.getCompletedMessages()); return completedTasks; } public Map<String, Long> getLargeMessageDroppedTasks() { Map<String, Long> droppedTasks = new HashMap<String, Long>(channelManagers.size()); - for (Map.Entry<InetAddress, OutboundMessagingPool> entry : channelManagers.entrySet()) - droppedTasks.put(entry.getKey().getHostAddress(), entry.getValue().largeMessageChannel.getDroppedMessages()); + for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : channelManagers.entrySet()) + droppedTasks.put(entry.getKey().toString(false), entry.getValue().largeMessageChannel.getDroppedMessages()); return droppedTasks; } public Map<String, Integer> getSmallMessagePendingTasks() { Map<String, Integer> pendingTasks = new HashMap<String, Integer>(channelManagers.size()); - for (Map.Entry<InetAddress, OutboundMessagingPool> entry : channelManagers.entrySet()) - pendingTasks.put(entry.getKey().getHostAddress(), entry.getValue().smallMessageChannel.getPendingMessages()); + for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : channelManagers.entrySet()) + pendingTasks.put(entry.getKey().toString(false), entry.getValue().smallMessageChannel.getPendingMessages()); return pendingTasks; } public Map<String, Long> getSmallMessageCompletedTasks() { Map<String, Long> completedTasks = new HashMap<String, Long>(channelManagers.size()); - for (Map.Entry<InetAddress, OutboundMessagingPool> entry : channelManagers.entrySet()) - completedTasks.put(entry.getKey().getHostAddress(), entry.getValue().smallMessageChannel.getCompletedMessages()); + for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : channelManagers.entrySet()) + completedTasks.put(entry.getKey().toString(false), entry.getValue().smallMessageChannel.getCompletedMessages()); return completedTasks; } public Map<String, Long> getSmallMessageDroppedTasks() { Map<String, Long> droppedTasks = new HashMap<String, Long>(channelManagers.size()); - for (Map.Entry<InetAddress, OutboundMessagingPool> entry : channelManagers.entrySet()) - droppedTasks.put(entry.getKey().getHostAddress(), entry.getValue().smallMessageChannel.getDroppedMessages()); + for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : channelManagers.entrySet()) + droppedTasks.put(entry.getKey().toString(false), entry.getValue().smallMessageChannel.getDroppedMessages()); return droppedTasks; } public Map<String, Integer> getGossipMessagePendingTasks() { Map<String, Integer> pendingTasks = new HashMap<String, Integer>(channelManagers.size()); - for (Map.Entry<InetAddress, OutboundMessagingPool> entry : channelManagers.entrySet()) - pendingTasks.put(entry.getKey().getHostAddress(), entry.getValue().gossipChannel.getPendingMessages()); + for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : channelManagers.entrySet()) + pendingTasks.put(entry.getKey().toString(false), entry.getValue().gossipChannel.getPendingMessages()); return pendingTasks; } public Map<String, Long> getGossipMessageCompletedTasks() { Map<String, Long> completedTasks = new HashMap<String, Long>(channelManagers.size()); - for (Map.Entry<InetAddress, OutboundMessagingPool> entry : channelManagers.entrySet()) - completedTasks.put(entry.getKey().getHostAddress(), entry.getValue().gossipChannel.getCompletedMessages()); + for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : channelManagers.entrySet()) + completedTasks.put(entry.getKey().toString(false), entry.getValue().gossipChannel.getCompletedMessages()); return completedTasks; } public Map<String, Long> getGossipMessageDroppedTasks() { Map<String, Long> droppedTasks = new HashMap<String, Long>(channelManagers.size()); - for (Map.Entry<InetAddress, OutboundMessagingPool> entry : channelManagers.entrySet()) - droppedTasks.put(entry.getKey().getHostAddress(), entry.getValue().gossipChannel.getDroppedMessages()); + for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : channelManagers.entrySet()) + droppedTasks.put(entry.getKey().toString(false), entry.getValue().gossipChannel.getDroppedMessages()); + return droppedTasks; + } + + public Map<String, Integer> getLargeMessagePendingTasksWithPort() + { + Map<String, Integer> pendingTasks = new HashMap<String, Integer>(channelManagers.size()); + for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : channelManagers.entrySet()) + pendingTasks.put(entry.getKey().toString(), entry.getValue().largeMessageChannel.getPendingMessages()); + return pendingTasks; + } + + public Map<String, Long> getLargeMessageCompletedTasksWithPort() + { + Map<String, Long> completedTasks = new HashMap<String, Long>(channelManagers.size()); + for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : channelManagers.entrySet()) + completedTasks.put(entry.getKey().toString(), entry.getValue().largeMessageChannel.getCompletedMessages()); + return completedTasks; + } + + public Map<String, Long> getLargeMessageDroppedTasksWithPort() + { + Map<String, Long> droppedTasks = new HashMap<String, Long>(channelManagers.size()); + for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : channelManagers.entrySet()) + droppedTasks.put(entry.getKey().toString(), entry.getValue().largeMessageChannel.getDroppedMessages()); + return droppedTasks; + } + + public Map<String, Integer> getSmallMessagePendingTasksWithPort() + { + Map<String, Integer> pendingTasks = new HashMap<String, Integer>(channelManagers.size()); + for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : channelManagers.entrySet()) + pendingTasks.put(entry.getKey().toString(), entry.getValue().smallMessageChannel.getPendingMessages()); + return pendingTasks; + } + + public Map<String, Long> getSmallMessageCompletedTasksWithPort() + { + Map<String, Long> completedTasks = new HashMap<String, Long>(channelManagers.size()); + for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : channelManagers.entrySet()) + completedTasks.put(entry.getKey().toString(), entry.getValue().smallMessageChannel.getCompletedMessages()); + return completedTasks; + } + + public Map<String, Long> getSmallMessageDroppedTasksWithPort() + { + Map<String, Long> droppedTasks = new HashMap<String, Long>(channelManagers.size()); + for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : channelManagers.entrySet()) + droppedTasks.put(entry.getKey().toString(), entry.getValue().smallMessageChannel.getDroppedMessages()); + return droppedTasks; + } + + public Map<String, Integer> getGossipMessagePendingTasksWithPort() + { + Map<String, Integer> pendingTasks = new HashMap<String, Integer>(channelManagers.size()); + for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : channelManagers.entrySet()) + pendingTasks.put(entry.getKey().toString(), entry.getValue().gossipChannel.getPendingMessages()); + return pendingTasks; + } + + public Map<String, Long> getGossipMessageCompletedTasksWithPort() + { + Map<String, Long> completedTasks = new HashMap<String, Long>(channelManagers.size()); + for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : channelManagers.entrySet()) + completedTasks.put(entry.getKey().toString(), entry.getValue().gossipChannel.getCompletedMessages()); + return completedTasks; + } + + public Map<String, Long> getGossipMessageDroppedTasksWithPort() + { + Map<String, Long> droppedTasks = new HashMap<String, Long>(channelManagers.size()); + for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : channelManagers.entrySet()) + droppedTasks.put(entry.getKey().toString(), entry.getValue().gossipChannel.getDroppedMessages()); return droppedTasks; } @@ -1435,7 +1505,6 @@ public final class MessagingService implements MessagingServiceMBean return map; } - public long getTotalTimeouts() { return ConnectionMetrics.totalTimeouts.getCount(); @@ -1444,9 +1513,21 @@ public final class MessagingService implements MessagingServiceMBean public Map<String, Long> getTimeoutsPerHost() { Map<String, Long> result = new HashMap<String, Long>(channelManagers.size()); - for (Map.Entry<InetAddress, OutboundMessagingPool> entry : channelManagers.entrySet()) + for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : channelManagers.entrySet()) { - String ip = entry.getKey().getHostAddress(); + String ip = entry.getKey().toString(false); + long recent = entry.getValue().getTimeouts(); + result.put(ip, recent); + } + return result; + } + + public Map<String, Long> getTimeoutsPerHostWithPort() + { + Map<String, Long> result = new HashMap<String, Long>(channelManagers.size()); + for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : channelManagers.entrySet()) + { + String ip = entry.getKey().toString(); long recent = entry.getValue().getTimeouts(); result.put(ip, recent); } @@ -1456,8 +1537,17 @@ public final class MessagingService implements MessagingServiceMBean public Map<String, Double> getBackPressurePerHost() { Map<String, Double> map = new HashMap<>(channelManagers.size()); - for (Map.Entry<InetAddress, OutboundMessagingPool> entry : channelManagers.entrySet()) - map.put(entry.getKey().getHostAddress(), entry.getValue().getBackPressureState().getBackPressureRateLimit()); + for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : channelManagers.entrySet()) + map.put(entry.getKey().toString(false), entry.getValue().getBackPressureState().getBackPressureRateLimit()); + + return map; + } + + public Map<String, Double> getBackPressurePerHostWithPort() + { + Map<String, Double> map = new HashMap<>(channelManagers.size()); + for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : channelManagers.entrySet()) + map.put(entry.getKey().toString(false), entry.getValue().getBackPressureState().getBackPressureRateLimit()); return map; } @@ -1493,18 +1583,18 @@ public final class MessagingService implements MessagingServiceMBean bounds.left.getPartitioner().getClass().getName())); } - private OutboundMessagingPool getMessagingConnection(InetAddress to) + private OutboundMessagingPool getMessagingConnection(InetAddressAndPort to) { OutboundMessagingPool pool = channelManagers.get(to); if (pool == null) { final boolean secure = isEncryptedConnection(to); final int port = portFor(to, secure); - if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(to, port)) + if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(to.address, port)) return null; - InetSocketAddress preferredRemote = new InetSocketAddress(SystemKeyspace.getPreferredIP(to), port); - InetSocketAddress local = new InetSocketAddress(FBUtilities.getLocalAddress(), 0); + InetAddressAndPort preferredRemote = SystemKeyspace.getPreferredIP(to); + InetAddressAndPort local = FBUtilities.getLocalAddressAndPort(); ServerEncryptionOptions encryptionOptions = secure ? DatabaseDescriptor.getServerEncryptionOptions() : null; IInternodeAuthenticator authenticator = DatabaseDescriptor.getInternodeAuthenticator(); @@ -1519,16 +1609,16 @@ public final class MessagingService implements MessagingServiceMBean return pool; } - public int portFor(InetAddress addr) + public int portFor(InetAddressAndPort addr) { final boolean secure = isEncryptedConnection(addr); return portFor(addr, secure); } - private int portFor(InetAddress address, boolean secure) + private int portFor(InetAddressAndPort address, boolean secure) { if (!secure) - return DatabaseDescriptor.getStoragePort(); + return address.port; Integer v = versions.get(address); // if we don't know the version of the peer, assume it is 4.0 (or higher) as the only time is would be lower @@ -1536,12 +1626,15 @@ public final class MessagingService implements MessagingServiceMBean // unfortunately fail - however the peer should connect to this node (at some point), and once we learn it's version, it'll be // in versions map. thus, when we attempt to reconnect to that node, we'll have the version and we can get the correct port. // we will be able to remove this logic at 5.0. + // Also as of 4.0 we will propagate the "regular" port (which will support both SSL and non-SSL) via gossip so + // for SSL and version 4.0 always connect to the gossiped port because if SSL is enabled it should ALWAYS + // listen for SSL on the "regular" port. int version = v != null ? v.intValue() : VERSION_40; - return version < VERSION_40 ? DatabaseDescriptor.getSSLStoragePort() : DatabaseDescriptor.getStoragePort(); + return version < VERSION_40 ? DatabaseDescriptor.getSSLStoragePort() : address.port; } @VisibleForTesting - boolean isConnected(InetAddress address, MessageOut messageOut) + boolean isConnected(InetAddressAndPort address, MessageOut messageOut) { OutboundMessagingPool pool = channelManagers.get(address); if (pool == null) @@ -1549,7 +1642,7 @@ public final class MessagingService implements MessagingServiceMBean return pool.getConnection(messageOut).isConnected(); } - public static boolean isEncryptedConnection(InetAddress address) + public static boolean isEncryptedConnection(InetAddressAndPort address) { IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); switch (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption) @@ -1559,13 +1652,13 @@ public final class MessagingService implements MessagingServiceMBean case all: break; case dc: - if (snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddress()))) + if (snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()))) return false; break; case rack: // for rack then check if the DC's are the same. - if (snitch.getRack(address).equals(snitch.getRack(FBUtilities.getBroadcastAddress())) - && snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddress()))) + if (snitch.getRack(address).equals(snitch.getRack(FBUtilities.getBroadcastAddressAndPort())) + && snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()))) return false; break; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/MessagingServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java index b2e79e0..f4a0c43 100644 --- a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java +++ b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java @@ -30,47 +30,69 @@ public interface MessagingServiceMBean /** * Pending tasks for large message TCP Connections */ + @Deprecated public Map<String, Integer> getLargeMessagePendingTasks(); + public Map<String, Integer> getLargeMessagePendingTasksWithPort(); /** * Completed tasks for large message) TCP Connections */ + @Deprecated public Map<String, Long> getLargeMessageCompletedTasks(); + public Map<String, Long> getLargeMessageCompletedTasksWithPort(); /** * Dropped tasks for large message TCP Connections */ + @Deprecated public Map<String, Long> getLargeMessageDroppedTasks(); + public Map<String, Long> getLargeMessageDroppedTasksWithPort(); + /** * Pending tasks for small message TCP Connections */ + @Deprecated public Map<String, Integer> getSmallMessagePendingTasks(); + public Map<String, Integer> getSmallMessagePendingTasksWithPort(); + /** * Completed tasks for small message TCP Connections */ + @Deprecated public Map<String, Long> getSmallMessageCompletedTasks(); + public Map<String, Long> getSmallMessageCompletedTasksWithPort(); + /** * Dropped tasks for small message TCP Connections */ + @Deprecated public Map<String, Long> getSmallMessageDroppedTasks(); + public Map<String, Long> getSmallMessageDroppedTasksWithPort(); + /** * Pending tasks for gossip message TCP Connections */ + @Deprecated public Map<String, Integer> getGossipMessagePendingTasks(); + public Map<String, Integer> getGossipMessagePendingTasksWithPort(); /** * Completed tasks for gossip message TCP Connections */ + @Deprecated public Map<String, Long> getGossipMessageCompletedTasks(); + public Map<String, Long> getGossipMessageCompletedTasksWithPort(); /** * Dropped tasks for gossip message TCP Connections */ + @Deprecated public Map<String, Long> getGossipMessageDroppedTasks(); + public Map<String, Long> getGossipMessageDroppedTasksWithPort(); /** * dropped message counts for server lifetime @@ -85,12 +107,16 @@ public interface MessagingServiceMBean /** * Number of timeouts per host */ + @Deprecated public Map<String, Long> getTimeoutsPerHost(); + public Map<String, Long> getTimeoutsPerHostWithPort(); /** * Back-pressure rate limiting per host */ + @Deprecated public Map<String, Double> getBackPressurePerHost(); + public Map<String, Double> getBackPressurePerHostWithPort(); /** * Enable/Disable back-pressure http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/ParameterType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/ParameterType.java b/src/java/org/apache/cassandra/net/ParameterType.java new file mode 100644 index 0000000..0a1f73f --- /dev/null +++ b/src/java/org/apache/cassandra/net/ParameterType.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net; + +import java.util.Map; + +import com.google.common.collect.ImmutableMap; + +import org.apache.cassandra.io.DummyByteVersionedSerializer; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.ShortVersionedSerializer; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.UUIDSerializer; + +/** + * Type names and serializers for various parameters that + */ +public enum ParameterType +{ + FORWARD_TO("FORWARD_TO", ForwardToSerializer.instance), + FORWARD_FROM("FORWARD_FROM", CompactEndpointSerializationHelper.instance), + FAILURE_RESPONSE("FAIL", DummyByteVersionedSerializer.instance), + FAILURE_REASON("FAIL_REASON", ShortVersionedSerializer.instance), + FAILURE_CALLBACK("CAL_BAC", DummyByteVersionedSerializer.instance), + TRACE_SESSION("TraceSession", UUIDSerializer.serializer), + TRACE_TYPE("TraceType", Tracing.traceTypeSerializer); + + public static final Map<String, ParameterType> byName; + public final String key; + public final IVersionedSerializer serializer; + + static + { + ImmutableMap.Builder<String, ParameterType> builder = ImmutableMap.builder(); + for (ParameterType type : values()) + { + builder.put(type.key, type); + } + byName = builder.build(); + } + + ParameterType(String key, IVersionedSerializer serializer) + { + this.key = key; + this.serializer = serializer; + } + + public String key() + { + return key; + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/RateBasedBackPressure.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/RateBasedBackPressure.java b/src/java/org/apache/cassandra/net/RateBasedBackPressure.java index 64685b0..b951bc0 100644 --- a/src/java/org/apache/cassandra/net/RateBasedBackPressure.java +++ b/src/java/org/apache/cassandra/net/RateBasedBackPressure.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.net; -import java.net.InetAddress; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -34,6 +33,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.SystemTimeSource; import org.apache.cassandra.utils.TimeSource; @@ -253,7 +253,7 @@ public class RateBasedBackPressure implements BackPressureStrategy<RateBasedBack } @Override - public RateBasedBackPressureState newState(InetAddress host) + public RateBasedBackPressureState newState(InetAddressAndPort host) { return new RateBasedBackPressureState(host, timeSource, windowSize); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java b/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java index 541d7a6..9df056e 100644 --- a/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java +++ b/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java @@ -17,11 +17,11 @@ */ package org.apache.cassandra.net; -import java.net.InetAddress; import java.util.concurrent.TimeUnit; import com.google.common.util.concurrent.RateLimiter; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.SlidingTimeRate; import org.apache.cassandra.utils.TimeSource; import org.apache.cassandra.utils.concurrent.IntervalLock; @@ -46,12 +46,12 @@ import org.apache.cassandra.utils.concurrent.IntervalLock; */ class RateBasedBackPressureState extends IntervalLock implements BackPressureState { - private final InetAddress host; + private final InetAddressAndPort host; final SlidingTimeRate incomingRate; final SlidingTimeRate outgoingRate; final RateLimiter rateLimiter; - RateBasedBackPressureState(InetAddress host, TimeSource timeSource, long windowSize) + RateBasedBackPressureState(InetAddressAndPort host, TimeSource timeSource, long windowSize) { super(timeSource); this.host = host; @@ -99,7 +99,7 @@ class RateBasedBackPressureState extends IntervalLock implements BackPressureSta } @Override - public InetAddress getHost() + public InetAddressAndPort getHost() { return host; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/WriteCallbackInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java index 9ecc385..41ac31b 100644 --- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java +++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java @@ -18,11 +18,10 @@ */ package org.apache.cassandra.net; -import java.net.InetAddress; - import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.paxos.Commit; import org.apache.cassandra.utils.FBUtilities; @@ -32,7 +31,7 @@ public class WriteCallbackInfo extends CallbackInfo // either a Mutation, or a Paxos Commit (MessageOut) private final Object mutation; - public WriteCallbackInfo(InetAddress target, + public WriteCallbackInfo(InetAddressAndPort target, IAsyncCallback callback, MessageOut message, IVersionedSerializer<?> serializer, @@ -43,7 +42,7 @@ public class WriteCallbackInfo extends CallbackInfo assert message != null; this.mutation = shouldHint(allowHints, message, consistencyLevel); //Local writes shouldn't go through messaging service (https://issues.apache.org/jira/browse/CASSANDRA-10477) - assert (!target.equals(FBUtilities.getBroadcastAddress())); + assert (!target.equals(FBUtilities.getBroadcastAddressAndPort())); } public boolean shouldHint() http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/async/HandshakeProtocol.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/HandshakeProtocol.java b/src/java/org/apache/cassandra/net/async/HandshakeProtocol.java index 9b8df80..327b20e 100644 --- a/src/java/org/apache/cassandra/net/async/HandshakeProtocol.java +++ b/src/java/org/apache/cassandra/net/async/HandshakeProtocol.java @@ -24,11 +24,15 @@ import java.net.InetAddress; import java.util.Objects; import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.Ints; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufOutputStream; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.MessagingService; @@ -227,9 +231,9 @@ public class HandshakeProtocol private static final int MIN_LENGTH = 9; final int messagingVersion; - final InetAddress address; + final InetAddressAndPort address; - ThirdHandshakeMessage(int messagingVersion, InetAddress address) + ThirdHandshakeMessage(int messagingVersion, InetAddressAndPort address) { this.messagingVersion = messagingVersion; this.address = address; @@ -238,14 +242,14 @@ public class HandshakeProtocol @SuppressWarnings("resource") public ByteBuf encode(ByteBufAllocator allocator) { - int bufLength = Integer.BYTES + CompactEndpointSerializationHelper.serializedSize(address); + int bufLength = Ints.checkedCast(Integer.BYTES + CompactEndpointSerializationHelper.instance.serializedSize(address, messagingVersion)); ByteBuf buffer = allocator.directBuffer(bufLength, bufLength); buffer.writerIndex(0); buffer.writeInt(messagingVersion); try { - DataOutput bbos = new ByteBufOutputStream(buffer); - CompactEndpointSerializationHelper.serialize(address, bbos); + DataOutputPlus dop = new ByteBufDataOutputPlus(buffer); + CompactEndpointSerializationHelper.instance.serialize(address, dop, messagingVersion); return buffer; } catch (IOException e) @@ -263,10 +267,10 @@ public class HandshakeProtocol in.markReaderIndex(); int version = in.readInt(); - DataInput inputStream = new ByteBufInputStream(in); + DataInputPlus input = new ByteBufDataInputPlus(in); try { - InetAddress address = CompactEndpointSerializationHelper.deserialize(inputStream); + InetAddressAndPort address = CompactEndpointSerializationHelper.instance.deserialize(input, version); return new ThirdHandshakeMessage(version, address); } catch (IOException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/async/InboundHandshakeHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/InboundHandshakeHandler.java b/src/java/org/apache/cassandra/net/async/InboundHandshakeHandler.java index 625f03d..a84112e 100644 --- a/src/java/org/apache/cassandra/net/async/InboundHandshakeHandler.java +++ b/src/java/org/apache/cassandra/net/async/InboundHandshakeHandler.java @@ -22,6 +22,7 @@ import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.ssl.SslHandler; import org.apache.cassandra.auth.IInternodeAuthenticator; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.async.HandshakeProtocol.FirstHandshakeMessage; import org.apache.cassandra.net.async.HandshakeProtocol.SecondHandshakeMessage; @@ -209,7 +210,7 @@ class InboundHandshakeHandler extends ByteToMessageDecoder { ChannelPipeline pipeline = ctx.pipeline(); InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress(); - pipeline.addLast(NettyFactory.instance.streamingGroup, "streamInbound", new StreamingInboundHandler(address, protocolVersion, null)); + pipeline.addLast(NettyFactory.instance.streamingGroup, "streamInbound", new StreamingInboundHandler(InetAddressAndPort.getByAddressOverrideDefaults(address.getAddress(), address.getPort()), protocolVersion, null)); pipeline.remove(this); // pass a custom recv ByteBuf allocator to the channel. the default recv ByteBuf size is 1k, but in streaming we're @@ -244,7 +245,7 @@ class InboundHandshakeHandler extends ByteToMessageDecoder } // record the (true) version of the endpoint - InetAddress from = msg.address; + InetAddressAndPort from = msg.address; MessagingService.instance().setVersion(from, maxVersion); logger.trace("Set version for {} to {} (will use {})", from, maxVersion, MessagingService.instance().getVersion(from)); @@ -253,7 +254,7 @@ class InboundHandshakeHandler extends ByteToMessageDecoder } @VisibleForTesting - void setupMessagingPipeline(ChannelPipeline pipeline, InetAddress peer, boolean compressed, int messagingVersion) + void setupMessagingPipeline(ChannelPipeline pipeline, InetAddressAndPort peer, boolean compressed, int messagingVersion) { if (compressed) pipeline.addLast(NettyFactory.INBOUND_COMPRESSOR_HANDLER_NAME, NettyFactory.createLz4Decoder(messagingVersion)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/async/MessageInHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/MessageInHandler.java b/src/java/org/apache/cassandra/net/async/MessageInHandler.java index b400512..0423b80 100644 --- a/src/java/org/apache/cassandra/net/async/MessageInHandler.java +++ b/src/java/org/apache/cassandra/net/async/MessageInHandler.java @@ -21,7 +21,6 @@ package org.apache.cassandra.net.async; import java.io.DataInputStream; import java.io.EOFException; import java.io.IOException; -import java.net.InetAddress; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -37,9 +36,12 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import org.apache.cassandra.db.monitoring.ApproximateTime; import org.apache.cassandra.exceptions.UnknownTableException; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.ParameterType; /** * Parses out individual messages from the incoming buffers. Each message, both header and payload, is incrementally built up @@ -79,7 +81,7 @@ class MessageInHandler extends ByteToMessageDecoder */ private static final int SECOND_SECTION_BYTE_COUNT = 8; - private final InetAddress peer; + private final InetAddressAndPort peer; private final int messagingVersion; /** @@ -91,12 +93,12 @@ class MessageInHandler extends ByteToMessageDecoder private State state; private MessageHeader messageHeader; - MessageInHandler(InetAddress peer, int messagingVersion) + MessageInHandler(InetAddressAndPort peer, int messagingVersion) { this (peer, messagingVersion, MESSAGING_SERVICE_CONSUMER); } - MessageInHandler(InetAddress peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer) + MessageInHandler(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer) { this.peer = peer; this.messagingVersion = messagingVersion; @@ -140,7 +142,7 @@ class MessageInHandler extends ByteToMessageDecoder int serializedAddrSize; if (readableBytes < 1 || readableBytes < (serializedAddrSize = in.getByte(in.readerIndex()) + 1)) return; - messageHeader.from = CompactEndpointSerializationHelper.deserialize(inputPlus); + messageHeader.from = CompactEndpointSerializationHelper.instance.deserialize(inputPlus, messagingVersion); state = State.READ_SECOND_CHUNK; readableBytes -= serializedAddrSize; // fall-through @@ -199,7 +201,7 @@ class MessageInHandler extends ByteToMessageDecoder /** * @return <code>true</code> if all the parameters have been read from the {@link ByteBuf}; else, <code>false</code>. */ - private boolean readParameters(ByteBuf in, ByteBufDataInputPlus inputPlus, int parameterCount, Map<String, byte[]> parameters) throws IOException + private boolean readParameters(ByteBuf in, ByteBufDataInputPlus inputPlus, int parameterCount, Map<ParameterType, Object> parameters) throws IOException { // makes the assumption that map.size() is a constant time function (HashMap.size() is) while (parameters.size() < parameterCount) @@ -208,9 +210,10 @@ class MessageInHandler extends ByteToMessageDecoder return false; String key = DataInputStream.readUTF(inputPlus); + ParameterType parameterType = ParameterType.byName.get(key); byte[] value = new byte[in.readInt()]; in.readBytes(value); - parameters.put(key, value); + parameters.put(parameterType, parameterType.serializer.deserialize(new DataInputBuffer(value), messagingVersion)); } return true; @@ -300,11 +303,11 @@ class MessageInHandler extends ByteToMessageDecoder { int messageId; long constructionTime; - InetAddress from; + InetAddressAndPort from; MessagingService.Verb verb; int payloadSize; - Map<String, byte[]> parameters = Collections.emptyMap(); + Map<ParameterType, Object> parameters = Collections.emptyMap(); /** * Total number of incoming parameters. http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/async/MessageOutHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/MessageOutHandler.java b/src/java/org/apache/cassandra/net/async/MessageOutHandler.java index e88b56a..f1647ab 100644 --- a/src/java/org/apache/cassandra/net/async/MessageOutHandler.java +++ b/src/java/org/apache/cassandra/net/async/MessageOutHandler.java @@ -40,6 +40,7 @@ import io.netty.handler.timeout.IdleStateEvent; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.ParameterType; import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis; @@ -196,10 +197,9 @@ class MessageOutHandler extends ChannelDuplexHandler { try { - byte[] sessionBytes = msg.message.parameters.get(Tracing.TRACE_HEADER); - if (sessionBytes != null) + UUID sessionId = (UUID)msg.message.getParameter(ParameterType.TRACE_SESSION); + if (sessionId != null) { - UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes)); TraceState state = Tracing.instance.get(sessionId); String message = String.format("Sending %s message to %s, size = %d bytes", msg.message.verb, connectionId.connectionAddress(), @@ -207,9 +207,9 @@ class MessageOutHandler extends ChannelDuplexHandler // session may have already finished; see CASSANDRA-5668 if (state == null) { - byte[] traceTypeBytes = msg.message.parameters.get(Tracing.TRACE_TYPE); - Tracing.TraceType traceType = traceTypeBytes == null ? Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]); - Tracing.instance.trace(ByteBuffer.wrap(sessionBytes), message, traceType.getTTL()); + Tracing.TraceType traceType = (Tracing.TraceType)msg.message.getParameter(ParameterType.TRACE_TYPE); + traceType = traceType == null ? Tracing.TraceType.QUERY : traceType; + Tracing.instance.trace(ByteBuffer.wrap(UUIDGen.decompose(sessionId)), message, traceType.getTTL()); } else { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
