http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/async/NettyFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/NettyFactory.java b/src/java/org/apache/cassandra/net/async/NettyFactory.java index 7844c9b..d891043 100644 --- a/src/java/org/apache/cassandra/net/async/NettyFactory.java +++ b/src/java/org/apache/cassandra/net/async/NettyFactory.java @@ -48,6 +48,7 @@ import org.apache.cassandra.auth.IInternodeAuthenticator; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.service.NativeTransportService; @@ -184,9 +185,9 @@ public final class NettyFactory * Create a {@link Channel} that listens on the {@code localAddr}. This method will block while trying to bind to the address, * but it does not make a remote call. */ - public Channel createInboundChannel(InetSocketAddress localAddr, InboundInitializer initializer, int receiveBufferSize) throws ConfigurationException + public Channel createInboundChannel(InetAddressAndPort localAddr, InboundInitializer initializer, int receiveBufferSize) throws ConfigurationException { - String nic = FBUtilities.getNetworkInterface(localAddr.getAddress()); + String nic = FBUtilities.getNetworkInterface(localAddr.address); logger.info("Starting Messaging Service on {} {}, encryption: {}", localAddr, nic == null ? "" : String.format(" (%s)", nic), encryptionLogStatement(initializer.encryptionOptions)); Class<? extends ServerChannel> transport = useEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class; @@ -202,7 +203,7 @@ public final class NettyFactory if (receiveBufferSize > 0) bootstrap.childOption(ChannelOption.SO_RCVBUF, receiveBufferSize); - ChannelFuture channelFuture = bootstrap.bind(localAddr); + ChannelFuture channelFuture = bootstrap.bind(new InetSocketAddress(localAddr.address, localAddr.port)); if (!channelFuture.awaitUninterruptibly().isSuccess()) { @@ -333,8 +334,9 @@ public final class NettyFactory .option(ChannelOption.TCP_NODELAY, params.tcpNoDelay) .option(ChannelOption.WRITE_BUFFER_WATER_MARK, params.waterMark) .handler(new OutboundInitializer(params)); - bootstrap.localAddress(params.connectionId.local(), 0); - bootstrap.remoteAddress(params.connectionId.connectionAddress()); + bootstrap.localAddress(params.connectionId.local().address, 0); + InetAddressAndPort remoteAddress = params.connectionId.connectionAddress(); + bootstrap.remoteAddress(new InetSocketAddress(remoteAddress.address, remoteAddress.port)); return bootstrap; } @@ -362,7 +364,8 @@ public final class NettyFactory { SslContext sslContext = SSLFactory.getSslContext(params.encryptionOptions, true, false); // for some reason channel.remoteAddress() will return null - InetSocketAddress peer = params.encryptionOptions.require_endpoint_verification ? params.connectionId.remoteAddress() : null; + InetAddressAndPort address = params.connectionId.remote(); + InetSocketAddress peer = params.encryptionOptions.require_endpoint_verification ? new InetSocketAddress(address.address, address.port) : null; SslHandler sslHandler = newSslHandler(channel, sslContext, peer); logger.trace("creating outbound netty SslContext: context={}, engine={}", sslContext.getClass().getName(), sslHandler.engine().getClass().getName()); pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java index 6b2ff0d..f3cb554 100644 --- a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java +++ b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java @@ -21,6 +21,8 @@ package org.apache.cassandra.net.async; import java.net.InetAddress; import java.net.InetSocketAddress; +import org.apache.cassandra.locator.InetAddressAndPort; + /** * Identifies an outbound messaging connection. * @@ -38,24 +40,24 @@ public class OutboundConnectionIdentifier /** * Memoization of the local node's broadcast address. */ - private final InetSocketAddress localAddr; + private final InetAddressAndPort localAddr; /** * The address by which the remote is identified. This may be different from {@link #remoteConnectionAddr} for * something like EC2 public IP address which need to be used for communication between EC2 regions. */ - private final InetSocketAddress remoteAddr; + private final InetAddressAndPort remoteAddr; /** * The address to which we're connecting to the node (often the same as {@link #remoteAddr} but not always). */ - private final InetSocketAddress remoteConnectionAddr; + private final InetAddressAndPort remoteConnectionAddr; private final ConnectionType connectionType; - private OutboundConnectionIdentifier(InetSocketAddress localAddr, - InetSocketAddress remoteAddr, - InetSocketAddress remoteConnectionAddr, + private OutboundConnectionIdentifier(InetAddressAndPort localAddr, + InetAddressAndPort remoteAddr, + InetAddressAndPort remoteConnectionAddr, ConnectionType connectionType) { this.localAddr = localAddr; @@ -64,8 +66,8 @@ public class OutboundConnectionIdentifier this.connectionType = connectionType; } - private OutboundConnectionIdentifier(InetSocketAddress localAddr, - InetSocketAddress remoteAddr, + private OutboundConnectionIdentifier(InetAddressAndPort localAddr, + InetAddressAndPort remoteAddr, ConnectionType connectionType) { this(localAddr, remoteAddr, remoteAddr, connectionType); @@ -75,7 +77,7 @@ public class OutboundConnectionIdentifier * Creates an identifier for a small message connection and using the remote "identifying" address as its connection * address. */ - public static OutboundConnectionIdentifier small(InetSocketAddress localAddr, InetSocketAddress remoteAddr) + public static OutboundConnectionIdentifier small(InetAddressAndPort localAddr, InetAddressAndPort remoteAddr) { return new OutboundConnectionIdentifier(localAddr, remoteAddr, ConnectionType.SMALL_MESSAGE); } @@ -84,7 +86,7 @@ public class OutboundConnectionIdentifier * Creates an identifier for a large message connection and using the remote "identifying" address as its connection * address. */ - public static OutboundConnectionIdentifier large(InetSocketAddress localAddr, InetSocketAddress remoteAddr) + public static OutboundConnectionIdentifier large(InetAddressAndPort localAddr, InetAddressAndPort remoteAddr) { return new OutboundConnectionIdentifier(localAddr, remoteAddr, ConnectionType.LARGE_MESSAGE); } @@ -93,7 +95,7 @@ public class OutboundConnectionIdentifier * Creates an identifier for a gossip connection and using the remote "identifying" address as its connection * address. */ - public static OutboundConnectionIdentifier gossip(InetSocketAddress localAddr, InetSocketAddress remoteAddr) + public static OutboundConnectionIdentifier gossip(InetAddressAndPort localAddr, InetAddressAndPort remoteAddr) { return new OutboundConnectionIdentifier(localAddr, remoteAddr, ConnectionType.GOSSIP); } @@ -102,7 +104,7 @@ public class OutboundConnectionIdentifier * Creates an identifier for a gossip connection and using the remote "identifying" address as its connection * address. */ - public static OutboundConnectionIdentifier stream(InetSocketAddress localAddr, InetSocketAddress remoteAddr) + public static OutboundConnectionIdentifier stream(InetAddressAndPort localAddr, InetAddressAndPort remoteAddr) { return new OutboundConnectionIdentifier(localAddr, remoteAddr, ConnectionType.STREAM); } @@ -115,45 +117,37 @@ public class OutboundConnectionIdentifier * @return a newly created connection identifier that differs from this one only by using {@code remoteConnectionAddr} * as connection address to the remote. */ - public OutboundConnectionIdentifier withNewConnectionAddress(InetSocketAddress remoteConnectionAddr) + public OutboundConnectionIdentifier withNewConnectionAddress(InetAddressAndPort remoteConnectionAddr) { return new OutboundConnectionIdentifier(localAddr, remoteAddr, remoteConnectionAddr, connectionType); } public OutboundConnectionIdentifier withNewConnectionPort(int port) { - return new OutboundConnectionIdentifier(localAddr, new InetSocketAddress(remoteAddr.getAddress(), port), - new InetSocketAddress(remoteConnectionAddr.getAddress(), port), connectionType); + return new OutboundConnectionIdentifier(localAddr, InetAddressAndPort.getByAddressOverrideDefaults(remoteAddr.address, port), + InetAddressAndPort.getByAddressOverrideDefaults(remoteConnectionAddr.address, port), connectionType); } /** * The local node address. */ - public InetAddress local() + public InetAddressAndPort local() { - return localAddr.getAddress(); + return localAddr; } /** * The remote node identifying address (the one to use for anything else than connecting to the node). */ - public InetSocketAddress remoteAddress() + public InetAddressAndPort remote() { return remoteAddr; } /** - * The remote node identifying address (the one to use for anything else than connecting to the node). - */ - public InetAddress remote() - { - return remoteAddr.getAddress(); - } - - /** * The remote node connection address (the one to use to actually connect to the remote, and only that). */ - public InetSocketAddress connectionAddress() + public InetAddressAndPort connectionAddress() { return remoteConnectionAddr; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java index 4522ba4..28775ef 100644 --- a/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java +++ b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java @@ -43,6 +43,7 @@ import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.async.NettyFactory.Mode; @@ -258,12 +259,12 @@ public class OutboundMessagingConnection logger.debug("connection attempt {} to {}", connectAttemptCount, connectionId); - InetSocketAddress remote = connectionId.remoteAddress(); - if (!authenticator.authenticate(remote.getAddress(), remote.getPort())) + InetAddressAndPort remote = connectionId.remote(); + if (!authenticator.authenticate(remote.address, remote.port)) { logger.warn("Internode auth failed connecting to {}", connectionId); //Remove the connection pool and other thread so messages aren't queued - MessagingService.instance().destroyConnectionPool(remote.getAddress()); + MessagingService.instance().destroyConnectionPool(remote); // don't update the state field as destroyConnectionPool() *should* call OMC.close() // on all the connections in the OMP for the remoteAddress @@ -284,7 +285,7 @@ public class OutboundMessagingConnection } @VisibleForTesting - static boolean shouldCompressConnection(InetAddress localHost, InetAddress remoteHost) + static boolean shouldCompressConnection(InetAddressAndPort localHost, InetAddressAndPort remoteHost) { return (DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.all) || ((DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.dc) && !isLocalDC(localHost, remoteHost)); @@ -355,7 +356,7 @@ public class OutboundMessagingConnection return null; } - static boolean isLocalDC(InetAddress localHost, InetAddress remoteHost) + static boolean isLocalDC(InetAddressAndPort localHost, InetAddressAndPort remoteHost) { String remoteDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(remoteHost); String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(localHost); @@ -585,7 +586,7 @@ public class OutboundMessagingConnection * Any outstanding messages in the existing channel will still be sent to the previous address (we won't/can't move them from * one channel to another). */ - void reconnectWithNewIp(InetSocketAddress newAddr) + void reconnectWithNewIp(InetAddressAndPort newAddr) { State currentState = state.get(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java b/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java index 0086da8..c701229 100644 --- a/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java +++ b/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java @@ -28,6 +28,7 @@ import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.metrics.ConnectionMetrics; import org.apache.cassandra.net.BackPressureState; import org.apache.cassandra.net.MessageOut; @@ -56,14 +57,14 @@ public class OutboundMessagingPool * An override address on which to communicate with the peer. Typically used for something like EC2 public IP addresses * which need to be used for communication between EC2 regions. */ - private InetSocketAddress preferredRemoteAddr; + private InetAddressAndPort preferredRemoteAddr; - public OutboundMessagingPool(InetSocketAddress remoteAddr, InetSocketAddress localAddr, ServerEncryptionOptions encryptionOptions, + public OutboundMessagingPool(InetAddressAndPort remoteAddr, InetAddressAndPort localAddr, ServerEncryptionOptions encryptionOptions, BackPressureState backPressureState, IInternodeAuthenticator authenticator) { preferredRemoteAddr = remoteAddr; this.backPressureState = backPressureState; - metrics = new ConnectionMetrics(localAddr.getAddress(), this); + metrics = new ConnectionMetrics(localAddr, this); smallMessageChannel = new OutboundMessagingConnection(OutboundConnectionIdentifier.small(localAddr, preferredRemoteAddr), @@ -76,10 +77,10 @@ public class OutboundMessagingPool encryptionOptions, Optional.empty(), authenticator); } - private static Optional<CoalescingStrategy> coalescingStrategy(InetSocketAddress remoteAddr) + private static Optional<CoalescingStrategy> coalescingStrategy(InetAddressAndPort remoteAddr) { String strategyName = DatabaseDescriptor.getOtcCoalescingStrategy(); - String displayName = remoteAddr.getAddress().getHostAddress(); + String displayName = remoteAddr.toString(); return CoalescingStrategies.newCoalescingStrategy(strategyName, DatabaseDescriptor.getOtcCoalescingWindow(), OutboundMessagingConnection.logger, @@ -117,7 +118,7 @@ public class OutboundMessagingPool * * @param addr IP Address to use (and prefer) going forward for connecting to the peer */ - public void reconnectWithNewIp(InetSocketAddress addr) + public void reconnectWithNewIp(InetAddressAndPort addr) { preferredRemoteAddr = addr; gossipChannel.reconnectWithNewIp(addr); @@ -166,7 +167,7 @@ public class OutboundMessagingPool return metrics.timeouts.getCount(); } - public InetSocketAddress getPreferredRemoteAddr() + public InetAddressAndPort getPreferredRemoteAddr() { return preferredRemoteAddr; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java index 5464520..2c4fae4 100644 --- a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java @@ -18,13 +18,13 @@ package org.apache.cassandra.repair; -import java.net.InetAddress; import java.util.List; import java.util.UUID; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.ProgressInfo; import org.apache.cassandra.streaming.StreamEvent; @@ -41,15 +41,15 @@ public class AsymmetricLocalSyncTask extends AsymmetricSyncTask implements Strea private final UUID pendingRepair; private final TraceState state = Tracing.instance.get(); - public AsymmetricLocalSyncTask(RepairJobDesc desc, InetAddress fetchFrom, List<Range<Token>> rangesToFetch, UUID pendingRepair, PreviewKind previewKind) + public AsymmetricLocalSyncTask(RepairJobDesc desc, InetAddressAndPort fetchFrom, List<Range<Token>> rangesToFetch, UUID pendingRepair, PreviewKind previewKind) { - super(desc, FBUtilities.getBroadcastAddress(), fetchFrom, rangesToFetch, previewKind); + super(desc, FBUtilities.getBroadcastAddressAndPort(), fetchFrom, rangesToFetch, previewKind); this.pendingRepair = pendingRepair; } public void startSync(List<Range<Token>> rangesToFetch) { - InetAddress preferred = SystemKeyspace.getPreferredIP(fetchFrom); + InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(fetchFrom); StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, false, http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java index d70975d..e24d854 100644 --- a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java +++ b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java @@ -18,12 +18,12 @@ package org.apache.cassandra.repair; -import java.net.InetAddress; import java.util.List; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.RepairException; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.AsymmetricSyncRequest; import org.apache.cassandra.streaming.PreviewKind; @@ -33,14 +33,14 @@ import org.apache.cassandra.utils.FBUtilities; public class AsymmetricRemoteSyncTask extends AsymmetricSyncTask implements CompletableRemoteSyncTask { - public AsymmetricRemoteSyncTask(RepairJobDesc desc, InetAddress fetchNode, InetAddress fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind) + public AsymmetricRemoteSyncTask(RepairJobDesc desc, InetAddressAndPort fetchNode, InetAddressAndPort fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind) { super(desc, fetchNode, fetchFrom, rangesToFetch, previewKind); } public void startSync(List<Range<Token>> rangesToFetch) { - InetAddress local = FBUtilities.getBroadcastAddress(); + InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); AsymmetricSyncRequest request = new AsymmetricSyncRequest(desc, local, fetchingNode, fetchFrom, rangesToFetch, previewKind); String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.fetchingNode, request.fetchFrom); Tracing.traceRepair(message); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java index fe00058..4d38e8a 100644 --- a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java +++ b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java @@ -18,7 +18,6 @@ package org.apache.cassandra.repair; -import java.net.InetAddress; import java.util.List; import java.util.concurrent.TimeUnit; @@ -30,6 +29,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.tracing.Tracing; @@ -37,15 +37,15 @@ public abstract class AsymmetricSyncTask extends AbstractFuture<SyncStat> implem { private static Logger logger = LoggerFactory.getLogger(AsymmetricSyncTask.class); protected final RepairJobDesc desc; - protected final InetAddress fetchFrom; + protected final InetAddressAndPort fetchFrom; protected final List<Range<Token>> rangesToFetch; - protected final InetAddress fetchingNode; + protected final InetAddressAndPort fetchingNode; protected final PreviewKind previewKind; private long startTime = Long.MIN_VALUE; protected volatile SyncStat stat; - public AsymmetricSyncTask(RepairJobDesc desc, InetAddress fetchingNode, InetAddress fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind) + public AsymmetricSyncTask(RepairJobDesc desc, InetAddressAndPort fetchingNode, InetAddressAndPort fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind) { this.desc = desc; this.fetchFrom = fetchFrom; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/LocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java index 8545b22..60d571b 100644 --- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.repair; -import java.net.InetAddress; import java.util.List; import java.util.UUID; @@ -28,7 +27,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.ProgressInfo; import org.apache.cassandra.streaming.StreamEvent; @@ -39,8 +38,6 @@ import org.apache.cassandra.streaming.StreamOperation; import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.MerkleTree; -import org.apache.cassandra.utils.MerkleTrees; /** * LocalSyncTask performs streaming between local(coordinator) node and remote replica. @@ -62,7 +59,7 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler } @VisibleForTesting - StreamPlan createStreamPlan(InetAddress dst, InetAddress preferred, List<Range<Token>> differences) + StreamPlan createStreamPlan(InetAddressAndPort dst, InetAddressAndPort preferred, List<Range<Token>> differences) { StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair, previewKind) .listeners(this) @@ -84,10 +81,10 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler @Override protected void startSync(List<Range<Token>> differences) { - InetAddress local = FBUtilities.getBroadcastAddress(); + InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); // We can take anyone of the node as source or destination, however if one is localhost, we put at source to avoid a forwarding - InetAddress dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint; - InetAddress preferred = SystemKeyspace.getPreferredIP(dst); + InetAddressAndPort dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint; + InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(dst); String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), dst); logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/NodePair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/NodePair.java b/src/java/org/apache/cassandra/repair/NodePair.java index a73c61a..bfb237e 100644 --- a/src/java/org/apache/cassandra/repair/NodePair.java +++ b/src/java/org/apache/cassandra/repair/NodePair.java @@ -18,13 +18,13 @@ package org.apache.cassandra.repair; import java.io.IOException; -import java.net.InetAddress; import com.google.common.base.Objects; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.CompactEndpointSerializationHelper; /** @@ -36,10 +36,10 @@ public class NodePair { public static IVersionedSerializer<NodePair> serializer = new NodePairSerializer(); - public final InetAddress endpoint1; - public final InetAddress endpoint2; + public final InetAddressAndPort endpoint1; + public final InetAddressAndPort endpoint2; - public NodePair(InetAddress endpoint1, InetAddress endpoint2) + public NodePair(InetAddressAndPort endpoint1, InetAddressAndPort endpoint2) { this.endpoint1 = endpoint1; this.endpoint2 = endpoint2; @@ -56,6 +56,12 @@ public class NodePair } @Override + public String toString() + { + return endpoint1.toString() + " - " + endpoint2.toString(); + } + + @Override public int hashCode() { return Objects.hashCode(endpoint1, endpoint2); @@ -65,20 +71,21 @@ public class NodePair { public void serialize(NodePair nodePair, DataOutputPlus out, int version) throws IOException { - CompactEndpointSerializationHelper.serialize(nodePair.endpoint1, out); - CompactEndpointSerializationHelper.serialize(nodePair.endpoint2, out); + CompactEndpointSerializationHelper.instance.serialize(nodePair.endpoint1, out, version); + CompactEndpointSerializationHelper.instance.serialize(nodePair.endpoint2, out, version); } public NodePair deserialize(DataInputPlus in, int version) throws IOException { - InetAddress ep1 = CompactEndpointSerializationHelper.deserialize(in); - InetAddress ep2 = CompactEndpointSerializationHelper.deserialize(in); + InetAddressAndPort ep1 = CompactEndpointSerializationHelper.instance.deserialize(in, version); + InetAddressAndPort ep2 = CompactEndpointSerializationHelper.instance.deserialize(in, version); return new NodePair(ep1, ep2); } public long serializedSize(NodePair nodePair, int version) { - return 2 * CompactEndpointSerializationHelper.serializedSize(nodePair.endpoint1); + return CompactEndpointSerializationHelper.instance.serializedSize(nodePair.endpoint1, version) + + CompactEndpointSerializationHelper.instance.serializedSize(nodePair.endpoint2, version); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/RemoteSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java index 93feb72..0a47f73 100644 --- a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java +++ b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.repair; -import java.net.InetAddress; import java.util.List; import org.slf4j.Logger; @@ -26,6 +25,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.RepairException; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.SyncRequest; import org.apache.cassandra.streaming.PreviewKind; @@ -51,7 +51,7 @@ public class RemoteSyncTask extends SyncTask implements CompletableRemoteSyncTas @Override protected void startSync(List<Range<Token>> differences) { - InetAddress local = FBUtilities.getBroadcastAddress(); + InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); SyncRequest request = new SyncRequest(desc, local, r1.endpoint, r2.endpoint, differences, previewKind); String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, request.dst); logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/RepairJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index 7b8eb92..48973d2 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.repair; -import java.net.InetAddress; import java.util.*; import java.util.stream.Collectors; @@ -33,6 +32,7 @@ import org.apache.cassandra.repair.asymmetric.DifferenceHolder; import org.apache.cassandra.repair.asymmetric.HostDifferences; import org.apache.cassandra.repair.asymmetric.PreferedNodeFilter; import org.apache.cassandra.repair.asymmetric.ReduceHelper; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -83,14 +83,14 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable Keyspace ks = Keyspace.open(desc.keyspace); ColumnFamilyStore cfs = ks.getColumnFamilyStore(desc.columnFamily); cfs.metric.repairsStarted.inc(); - List<InetAddress> allEndpoints = new ArrayList<>(session.endpoints); - allEndpoints.add(FBUtilities.getBroadcastAddress()); + List<InetAddressAndPort> allEndpoints = new ArrayList<>(session.endpoints); + allEndpoints.add(FBUtilities.getBroadcastAddressAndPort()); ListenableFuture<List<TreeResponse>> validations; // Create a snapshot at all nodes unless we're using pure parallel repairs if (parallelismDegree != RepairParallelism.PARALLEL) { - ListenableFuture<List<InetAddress>> allSnapshotTasks; + ListenableFuture<List<InetAddressAndPort>> allSnapshotTasks; if (isIncremental) { // consistent repair does it's own "snapshotting" @@ -99,8 +99,8 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable else { // Request snapshot to all replica - List<ListenableFuture<InetAddress>> snapshotTasks = new ArrayList<>(allEndpoints.size()); - for (InetAddress endpoint : allEndpoints) + List<ListenableFuture<InetAddressAndPort>> snapshotTasks = new ArrayList<>(allEndpoints.size()); + for (InetAddressAndPort endpoint : allEndpoints) { SnapshotTask snapshotTask = new SnapshotTask(desc, endpoint); snapshotTasks.add(snapshotTask); @@ -110,9 +110,9 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable } // When all snapshot complete, send validation requests - validations = Futures.transformAsync(allSnapshotTasks, new AsyncFunction<List<InetAddress>, List<TreeResponse>>() + validations = Futures.transformAsync(allSnapshotTasks, new AsyncFunction<List<InetAddressAndPort>, List<TreeResponse>>() { - public ListenableFuture<List<TreeResponse>> apply(List<InetAddress> endpoints) + public ListenableFuture<List<TreeResponse>> apply(List<InetAddressAndPort> endpoints) { if (parallelismDegree == RepairParallelism.SEQUENTIAL) return sendSequentialValidationRequest(endpoints); @@ -164,7 +164,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable { return trees -> { - InetAddress local = FBUtilities.getLocalAddress(); + InetAddressAndPort local = FBUtilities.getLocalAddressAndPort(); List<SyncTask> syncTasks = new ArrayList<>(); // We need to difference all trees one against another @@ -198,7 +198,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable { return trees -> { - InetAddress local = FBUtilities.getLocalAddress(); + InetAddressAndPort local = FBUtilities.getLocalAddressAndPort(); List<AsymmetricSyncTask> syncTasks = new ArrayList<>(); // We need to difference all trees one against another @@ -210,16 +210,16 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable .filter(node -> getDC(streaming) .equals(getDC(node))) .collect(Collectors.toSet()); - ImmutableMap<InetAddress, HostDifferences> reducedDifferences = ReduceHelper.reduce(diffHolder, preferSameDCFilter); + ImmutableMap<InetAddressAndPort, HostDifferences> reducedDifferences = ReduceHelper.reduce(diffHolder, preferSameDCFilter); for (int i = 0; i < trees.size(); i++) { - InetAddress address = trees.get(i).endpoint; + InetAddressAndPort address = trees.get(i).endpoint; HostDifferences streamsFor = reducedDifferences.get(address); if (streamsFor != null) { assert streamsFor.get(address).isEmpty() : "We should not fetch ranges from ourselves"; - for (InetAddress fetchFrom : streamsFor.hosts()) + for (InetAddressAndPort fetchFrom : streamsFor.hosts()) { List<Range<Token>> toFetch = streamsFor.get(fetchFrom); logger.debug("{} is about to fetch {} from {}", address, toFetch, fetchFrom); @@ -246,7 +246,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable }; } - private String getDC(InetAddress address) + private String getDC(InetAddressAndPort address) { return DatabaseDescriptor.getEndpointSnitch().getDatacenter(address); } @@ -257,14 +257,14 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable * @param endpoints Endpoint addresses to send validation request * @return Future that can get all {@link TreeResponse} from replica, if all validation succeed. */ - private ListenableFuture<List<TreeResponse>> sendValidationRequest(Collection<InetAddress> endpoints) + private ListenableFuture<List<TreeResponse>> sendValidationRequest(Collection<InetAddressAndPort> endpoints) { String message = String.format("Requesting merkle trees for %s (to %s)", desc.columnFamily, endpoints); logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); Tracing.traceRepair(message); int nowInSec = FBUtilities.nowInSeconds(); List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size()); - for (InetAddress endpoint : endpoints) + for (InetAddressAndPort endpoint : endpoints) { ValidationTask task = new ValidationTask(desc, endpoint, nowInSec, previewKind); tasks.add(task); @@ -277,7 +277,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable /** * Creates {@link ValidationTask} and submit them to task executor so that tasks run sequentially. */ - private ListenableFuture<List<TreeResponse>> sendSequentialValidationRequest(Collection<InetAddress> endpoints) + private ListenableFuture<List<TreeResponse>> sendSequentialValidationRequest(Collection<InetAddressAndPort> endpoints) { String message = String.format("Requesting merkle trees for %s (to %s)", desc.columnFamily, endpoints); logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); @@ -285,8 +285,8 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable int nowInSec = FBUtilities.nowInSeconds(); List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size()); - Queue<InetAddress> requests = new LinkedList<>(endpoints); - InetAddress address = requests.poll(); + Queue<InetAddressAndPort> requests = new LinkedList<>(endpoints); + InetAddressAndPort address = requests.poll(); ValidationTask firstTask = new ValidationTask(desc, address, nowInSec, previewKind); logger.info("Validating {}", address); session.waitForValidation(Pair.create(desc, address), firstTask); @@ -294,7 +294,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable ValidationTask currentTask = firstTask; while (requests.size() > 0) { - final InetAddress nextAddress = requests.poll(); + final InetAddressAndPort nextAddress = requests.poll(); final ValidationTask nextTask = new ValidationTask(desc, nextAddress, nowInSec, previewKind); tasks.add(nextTask); Futures.addCallback(currentTask, new FutureCallback<TreeResponse>() @@ -319,7 +319,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable /** * Creates {@link ValidationTask} and submit them to task executor so that tasks run sequentially within each dc. */ - private ListenableFuture<List<TreeResponse>> sendDCAwareValidationRequest(Collection<InetAddress> endpoints) + private ListenableFuture<List<TreeResponse>> sendDCAwareValidationRequest(Collection<InetAddressAndPort> endpoints) { String message = String.format("Requesting merkle trees for %s (to %s)", desc.columnFamily, endpoints); logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); @@ -327,11 +327,11 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable int nowInSec = FBUtilities.nowInSeconds(); List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size()); - Map<String, Queue<InetAddress>> requestsByDatacenter = new HashMap<>(); - for (InetAddress endpoint : endpoints) + Map<String, Queue<InetAddressAndPort>> requestsByDatacenter = new HashMap<>(); + for (InetAddressAndPort endpoint : endpoints) { String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint); - Queue<InetAddress> queue = requestsByDatacenter.get(dc); + Queue<InetAddressAndPort> queue = requestsByDatacenter.get(dc); if (queue == null) { queue = new LinkedList<>(); @@ -340,10 +340,10 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable queue.add(endpoint); } - for (Map.Entry<String, Queue<InetAddress>> entry : requestsByDatacenter.entrySet()) + for (Map.Entry<String, Queue<InetAddressAndPort>> entry : requestsByDatacenter.entrySet()) { - Queue<InetAddress> requests = entry.getValue(); - InetAddress address = requests.poll(); + Queue<InetAddressAndPort> requests = entry.getValue(); + InetAddressAndPort address = requests.poll(); ValidationTask firstTask = new ValidationTask(desc, address, nowInSec, previewKind); logger.info("Validating {}", address); session.waitForValidation(Pair.create(desc, address), firstTask); @@ -351,7 +351,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable ValidationTask currentTask = firstTask; while (requests.size() > 0) { - final InetAddress nextAddress = requests.poll(); + final InetAddressAndPort nextAddress = requests.poll(); final ValidationTask nextTask = new ValidationTask(desc, nextAddress, nowInSec, previewKind); tasks.add(nextTask); Futures.addCallback(currentTask, new FutureCallback<TreeResponse>() http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index c26d4d1..4c0a564 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.repair; -import java.net.InetAddress; import java.util.*; import com.google.common.base.Predicate; @@ -28,10 +27,12 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.ParameterType; import org.apache.cassandra.repair.messages.*; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.ActiveRepairService; @@ -225,11 +226,11 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> } } - private void logErrorAndSendFailureResponse(String errorMessage, InetAddress to, int id) + private void logErrorAndSendFailureResponse(String errorMessage, InetAddressAndPort to, int id) { logger.error(errorMessage); MessageOut reply = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE) - .withParameter(MessagingService.FAILURE_RESPONSE_PARAM, MessagingService.ONE_BYTE); + .withParameter(ParameterType.FAILURE_RESPONSE, MessagingService.ONE_BYTE); MessagingService.instance().sendReply(reply, id, to); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index 1c9778b..5121874 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -60,6 +60,7 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.repair.consistent.CoordinatorSession; import org.apache.cassandra.metrics.StorageMetrics; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.QueryState; @@ -141,10 +142,10 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti @VisibleForTesting static class CommonRange { - public final Set<InetAddress> endpoints; + public final Set<InetAddressAndPort> endpoints; public final Collection<Range<Token>> ranges; - public CommonRange(Set<InetAddress> endpoints, Collection<Range<Token>> ranges) + public CommonRange(Set<InetAddressAndPort> endpoints, Collection<Range<Token>> ranges) { Preconditions.checkArgument(endpoints != null && !endpoints.isEmpty()); Preconditions.checkArgument(ranges != null && !ranges.isEmpty()); @@ -232,7 +233,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti traceState = null; } - final Set<InetAddress> allNeighbors = new HashSet<>(); + final Set<InetAddressAndPort> allNeighbors = new HashSet<>(); List<CommonRange> commonRanges = new ArrayList<>(); //pre-calculate output of getLocalRanges and pass it to getNeighbors to increase performance and prevent @@ -243,9 +244,9 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti { for (Range<Token> range : options.getRanges()) { - Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range, - options.getDataCenters(), - options.getHosts()); + Set<InetAddressAndPort> neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range, + options.getDataCenters(), + options.getHosts()); addRangeToNeighbors(commonRanges, range, neighbors); allNeighbors.addAll(neighbors); @@ -286,7 +287,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti try (Timer.Context ctx = Keyspace.open(keyspace).metric.repairPrepareTime.time()) { - ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddress(), allNeighbors, options, columnFamilyStores); + ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddressAndPort(), allNeighbors, options, columnFamilyStores); progress.incrementAndGet(); } catch (Throwable t) @@ -362,7 +363,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti * removes dead nodes from common ranges, and exludes ranges left without any participants */ @VisibleForTesting - static List<CommonRange> filterCommonRanges(List<CommonRange> commonRanges, Set<InetAddress> liveEndpoints, boolean force) + static List<CommonRange> filterCommonRanges(List<CommonRange> commonRanges, Set<InetAddressAndPort> liveEndpoints, boolean force) { if (!force) { @@ -374,7 +375,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti for (CommonRange commonRange: commonRanges) { - Set<InetAddress> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, liveEndpoints::contains)); + Set<InetAddressAndPort> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, liveEndpoints::contains)); // this node is implicitly a participant in this repair, so a single endpoint is ok here if (!endpoints.isEmpty()) @@ -391,15 +392,15 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti long startTime, boolean forceRepair, TraceState traceState, - Set<InetAddress> allNeighbors, + Set<InetAddressAndPort> allNeighbors, List<CommonRange> commonRanges, String... cfnames) { // the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted - Predicate<InetAddress> isAlive = FailureDetector.instance::isAlive; - Set<InetAddress> allParticipants = ImmutableSet.<InetAddress>builder() + Predicate<InetAddressAndPort> isAlive = FailureDetector.instance::isAlive; + Set<InetAddressAndPort> allParticipants = ImmutableSet.<InetAddressAndPort>builder() .addAll(forceRepair ? Iterables.filter(allNeighbors, isAlive) : allNeighbors) - .add(FBUtilities.getBroadcastAddress()) + .add(FBUtilities.getBroadcastAddressAndPort()) .build(); List<CommonRange> allRanges = filterCommonRanges(commonRanges, allParticipants, forceRepair); @@ -673,7 +674,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti ImmutableList.of(failureMessage, completionMessage)); } - private void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, Set<InetAddress> neighbors) + private void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, Set<InetAddressAndPort> neighbors) { for (int i = 0; i < neighborRangeList.size(); i++) { @@ -708,7 +709,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti SelectStatement statement = (SelectStatement) QueryProcessor.parseStatement(query).prepare().statement; ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId); - InetAddress source = FBUtilities.getBroadcastAddress(); + InetAddressAndPort source = FBUtilities.getBroadcastAddressAndPort(); HashSet<UUID>[] seen = new HashSet[] { new HashSet<>(), new HashSet<>() }; int si = 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index 609ec56..91d767d 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -18,7 +18,6 @@ package org.apache.cassandra.repair; import java.io.IOException; -import java.net.InetAddress; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -34,6 +33,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.*; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.SessionSummary; import org.apache.cassandra.tracing.Tracing; @@ -95,14 +95,14 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement /** Range to repair */ public final Collection<Range<Token>> ranges; - public final Set<InetAddress> endpoints; + public final Set<InetAddressAndPort> endpoints; public final boolean isIncremental; public final PreviewKind previewKind; private final AtomicBoolean isFailed = new AtomicBoolean(false); // Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address) - private final ConcurrentMap<Pair<RepairJobDesc, InetAddress>, ValidationTask> validating = new ConcurrentHashMap<>(); + private final ConcurrentMap<Pair<RepairJobDesc, InetAddressAndPort>, ValidationTask> validating = new ConcurrentHashMap<>(); // Remote syncing jobs wait response in syncingTasks map private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, CompletableRemoteSyncTask> syncingTasks = new ConcurrentHashMap<>(); @@ -130,7 +130,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement Collection<Range<Token>> ranges, String keyspace, RepairParallelism parallelismDegree, - Set<InetAddress> endpoints, + Set<InetAddressAndPort> endpoints, boolean isIncremental, boolean pullRepair, boolean force, @@ -152,8 +152,8 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement if (force) { logger.debug("force flag set, removing dead endpoints"); - final Set<InetAddress> removeCandidates = new HashSet<>(); - for (final InetAddress endpoint : endpoints) + final Set<InetAddressAndPort> removeCandidates = new HashSet<>(); + for (final InetAddressAndPort endpoint : endpoints) { if (!FailureDetector.instance.isAlive(endpoint)) { @@ -189,7 +189,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement return ranges; } - public void waitForValidation(Pair<RepairJobDesc, InetAddress> key, ValidationTask task) + public void waitForValidation(Pair<RepairJobDesc, InetAddressAndPort> key, ValidationTask task) { validating.put(key, task); } @@ -207,7 +207,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement * @param endpoint endpoint that sent merkle tree * @param trees calculated merkle trees, or null if validation failed */ - public void validationComplete(RepairJobDesc desc, InetAddress endpoint, MerkleTrees trees) + public void validationComplete(RepairJobDesc desc, InetAddressAndPort endpoint, MerkleTrees trees) { ValidationTask task = validating.remove(Pair.create(desc, endpoint)); if (task == null) @@ -245,8 +245,8 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement private String repairedNodes() { StringBuilder sb = new StringBuilder(); - sb.append(FBUtilities.getBroadcastAddress()); - for (InetAddress ep : endpoints) + sb.append(FBUtilities.getBroadcastAddressAndPort()); + for (InetAddressAndPort ep : endpoints) sb.append(", ").append(ep); return sb.toString(); } @@ -285,7 +285,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement } // Checking all nodes are live - for (InetAddress endpoint : endpoints) + for (InetAddressAndPort endpoint : endpoints) { if (!FailureDetector.instance.isAlive(endpoint) && !skippedReplicas) { @@ -353,23 +353,23 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement terminate(); } - public void onJoin(InetAddress endpoint, EndpointState epState) {} - public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {} - public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {} - public void onAlive(InetAddress endpoint, EndpointState state) {} - public void onDead(InetAddress endpoint, EndpointState state) {} + public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {} + public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {} + public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) {} + public void onAlive(InetAddressAndPort endpoint, EndpointState state) {} + public void onDead(InetAddressAndPort endpoint, EndpointState state) {} - public void onRemove(InetAddress endpoint) + public void onRemove(InetAddressAndPort endpoint) { convict(endpoint, Double.MAX_VALUE); } - public void onRestart(InetAddress endpoint, EndpointState epState) + public void onRestart(InetAddressAndPort endpoint, EndpointState epState) { convict(endpoint, Double.MAX_VALUE); } - public void convict(InetAddress endpoint, double phi) + public void convict(InetAddressAndPort endpoint, double phi) { if (!endpoints.contains(endpoint)) return; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/SnapshotTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SnapshotTask.java b/src/java/org/apache/cassandra/repair/SnapshotTask.java index 2b267a7..acc5186 100644 --- a/src/java/org/apache/cassandra/repair/SnapshotTask.java +++ b/src/java/org/apache/cassandra/repair/SnapshotTask.java @@ -17,13 +17,13 @@ */ package org.apache.cassandra.repair; -import java.net.InetAddress; import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; import com.google.common.util.concurrent.AbstractFuture; import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; @@ -32,12 +32,12 @@ import org.apache.cassandra.repair.messages.SnapshotMessage; /** * SnapshotTask is a task that sends snapshot request. */ -public class SnapshotTask extends AbstractFuture<InetAddress> implements RunnableFuture<InetAddress> +public class SnapshotTask extends AbstractFuture<InetAddressAndPort> implements RunnableFuture<InetAddressAndPort> { private final RepairJobDesc desc; - private final InetAddress endpoint; + private final InetAddressAndPort endpoint; - public SnapshotTask(RepairJobDesc desc, InetAddress endpoint) + public SnapshotTask(RepairJobDesc desc, InetAddressAndPort endpoint) { this.desc = desc; this.endpoint = endpoint; @@ -74,7 +74,7 @@ public class SnapshotTask extends AbstractFuture<InetAddress> implements Runnabl public boolean isLatencyForSnitch() { return false; } - public void onFailure(InetAddress from, RequestFailureReason failureReason) + public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) { //listener.failedSnapshot(); task.setException(new RuntimeException("Could not create snapshot at " + from)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/StreamingRepairTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java index a1b7459..59fee0b 100644 --- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java +++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.repair; -import java.net.InetAddress; import java.util.UUID; import java.util.Collections; import java.util.Collection; @@ -29,6 +28,8 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.SyncComplete; import org.apache.cassandra.streaming.PreviewKind; @@ -48,14 +49,14 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler private final RepairJobDesc desc; private final boolean asymmetric; - private final InetAddress initiator; - private final InetAddress src; - private final InetAddress dst; + private final InetAddressAndPort initiator; + private final InetAddressAndPort src; + private final InetAddressAndPort dst; private final Collection<Range<Token>> ranges; private final UUID pendingRepair; private final PreviewKind previewKind; - public StreamingRepairTask(RepairJobDesc desc, InetAddress initiator, InetAddress src, InetAddress dst, Collection<Range<Token>> ranges, UUID pendingRepair, PreviewKind previewKind, boolean asymmetric) + public StreamingRepairTask(RepairJobDesc desc, InetAddressAndPort initiator, InetAddressAndPort src, InetAddressAndPort dst, Collection<Range<Token>> ranges, UUID pendingRepair, PreviewKind previewKind, boolean asymmetric) { this.desc = desc; this.initiator = initiator; @@ -69,14 +70,14 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler public void run() { - InetAddress dest = dst; - InetAddress preferred = SystemKeyspace.getPreferredIP(dest); + InetAddressAndPort dest = dst; + InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(dest); logger.info("[streaming task #{}] Performing streaming repair of {} ranges with {}", desc.sessionId, ranges.size(), dst); createStreamPlan(dest, preferred).execute(); } @VisibleForTesting - StreamPlan createStreamPlan(InetAddress dest, InetAddress preferred) + StreamPlan createStreamPlan(InetAddressAndPort dest, InetAddressAndPort preferred) { StreamPlan sp = new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair, previewKind) .listeners(this) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java index 3770621..b46ae5e 100644 --- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java +++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java @@ -19,7 +19,6 @@ package org.apache.cassandra.repair; import java.io.PrintWriter; import java.io.StringWriter; -import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; @@ -45,6 +44,7 @@ import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceParams; @@ -74,48 +74,50 @@ public final class SystemDistributedKeyspace private static final TableMetadata RepairHistory = parse(REPAIR_HISTORY, - "Repair history", - "CREATE TABLE %s (" - + "keyspace_name text," - + "columnfamily_name text," - + "id timeuuid," - + "parent_id timeuuid," - + "range_begin text," - + "range_end text," - + "coordinator inet," - + "participants set<inet>," - + "exception_message text," - + "exception_stacktrace text," - + "status text," - + "started_at timestamp," - + "finished_at timestamp," - + "PRIMARY KEY ((keyspace_name, columnfamily_name), id))"); + "Repair history", + "CREATE TABLE %s (" + + "keyspace_name text," + + "columnfamily_name text," + + "id timeuuid," + + "parent_id timeuuid," + + "range_begin text," + + "range_end text," + + "coordinator inet," + + "coordinator_port int," + + "participants set<inet>," + + "participants_v2 set<text>," + + "exception_message text," + + "exception_stacktrace text," + + "status text," + + "started_at timestamp," + + "finished_at timestamp," + + "PRIMARY KEY ((keyspace_name, columnfamily_name), id))"); private static final TableMetadata ParentRepairHistory = parse(PARENT_REPAIR_HISTORY, - "Repair history", - "CREATE TABLE %s (" - + "parent_id timeuuid," - + "keyspace_name text," - + "columnfamily_names set<text>," - + "started_at timestamp," - + "finished_at timestamp," - + "exception_message text," - + "exception_stacktrace text," - + "requested_ranges set<text>," - + "successful_ranges set<text>," - + "options map<text, text>," - + "PRIMARY KEY (parent_id))"); + "Repair history", + "CREATE TABLE %s (" + + "parent_id timeuuid," + + "keyspace_name text," + + "columnfamily_names set<text>," + + "started_at timestamp," + + "finished_at timestamp," + + "exception_message text," + + "exception_stacktrace text," + + "requested_ranges set<text>," + + "successful_ranges set<text>," + + "options map<text, text>," + + "PRIMARY KEY (parent_id))"); private static final TableMetadata ViewBuildStatus = parse(VIEW_BUILD_STATUS, - "Materialized View build status", - "CREATE TABLE %s (" - + "keyspace_name text," - + "view_name text," - + "host_id uuid," - + "status text," - + "PRIMARY KEY ((keyspace_name, view_name), host_id))"); + "Materialized View build status", + "CREATE TABLE %s (" + + "keyspace_name text," + + "view_name text," + + "host_id uuid," + + "status text," + + "PRIMARY KEY ((keyspace_name, view_name), host_id))"); private static TableMetadata parse(String table, String description, String cql) { @@ -184,17 +186,21 @@ public final class SystemDistributedKeyspace processSilent(fmtQuery); } - public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, Collection<Range<Token>> ranges, Iterable<InetAddress> endpoints) + public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, Collection<Range<Token>> ranges, Iterable<InetAddressAndPort> endpoints) { - String coordinator = FBUtilities.getBroadcastAddress().getHostAddress(); - Set<String> participants = Sets.newHashSet(coordinator); + InetAddressAndPort coordinator = FBUtilities.getBroadcastAddressAndPort(); + Set<String> participants = Sets.newHashSet(); + Set<String> participants_v2 = Sets.newHashSet(); - for (InetAddress endpoint : endpoints) - participants.add(endpoint.getHostAddress()); + for (InetAddressAndPort endpoint : endpoints) + { + participants.add(endpoint.getHostAddress(false)); + participants_v2.add(endpoint.toString()); + } String query = - "INSERT INTO %s.%s (keyspace_name, columnfamily_name, id, parent_id, range_begin, range_end, coordinator, participants, status, started_at) " + - "VALUES ( '%s', '%s', %s, %s, '%s', '%s', '%s', { '%s' }, '%s', toTimestamp(now()))"; + "INSERT INTO %s.%s (keyspace_name, columnfamily_name, id, parent_id, range_begin, range_end, coordinator, coordinator_port, participants, participants_v2, status, started_at) " + + "VALUES ( '%s', '%s', %s, %s, '%s', '%s', '%s', %d, { '%s' }, { '%s' }, '%s', toTimestamp(now()))"; for (String cfname : cfnames) { @@ -207,8 +213,10 @@ public final class SystemDistributedKeyspace parent_id.toString(), range.left.toString(), range.right.toString(), - coordinator, + coordinator.getHostAddress(false), + coordinator.port, Joiner.on("', '").join(participants), + Joiner.on("', '").join(participants_v2), RepairState.STARTED.toString()); processSilent(fmtQry); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/TreeResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/TreeResponse.java b/src/java/org/apache/cassandra/repair/TreeResponse.java index c898b36..8571caa 100644 --- a/src/java/org/apache/cassandra/repair/TreeResponse.java +++ b/src/java/org/apache/cassandra/repair/TreeResponse.java @@ -17,8 +17,7 @@ */ package org.apache.cassandra.repair; -import java.net.InetAddress; - +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.MerkleTrees; /** @@ -26,10 +25,10 @@ import org.apache.cassandra.utils.MerkleTrees; */ public class TreeResponse { - public final InetAddress endpoint; + public final InetAddressAndPort endpoint; public final MerkleTrees trees; - public TreeResponse(InetAddress endpoint, MerkleTrees trees) + public TreeResponse(InetAddressAndPort endpoint, MerkleTrees trees) { this.endpoint = endpoint; this.trees = trees; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/ValidationTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/ValidationTask.java b/src/java/org/apache/cassandra/repair/ValidationTask.java index 175709f..fc500cf 100644 --- a/src/java/org/apache/cassandra/repair/ValidationTask.java +++ b/src/java/org/apache/cassandra/repair/ValidationTask.java @@ -17,11 +17,10 @@ */ package org.apache.cassandra.repair; -import java.net.InetAddress; - import com.google.common.util.concurrent.AbstractFuture; import org.apache.cassandra.exceptions.RepairException; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.ValidationRequest; import org.apache.cassandra.streaming.PreviewKind; @@ -34,11 +33,11 @@ import org.apache.cassandra.utils.MerkleTrees; public class ValidationTask extends AbstractFuture<TreeResponse> implements Runnable { private final RepairJobDesc desc; - private final InetAddress endpoint; + private final InetAddressAndPort endpoint; private final int nowInSec; private final PreviewKind previewKind; - public ValidationTask(RepairJobDesc desc, InetAddress endpoint, int nowInSec, PreviewKind previewKind) + public ValidationTask(RepairJobDesc desc, InetAddressAndPort endpoint, int nowInSec, PreviewKind previewKind) { this.desc = desc; this.endpoint = endpoint; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/Validator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java index 9803638..4c2856d 100644 --- a/src/java/org/apache/cassandra/repair/Validator.java +++ b/src/java/org/apache/cassandra/repair/Validator.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.repair; -import java.net.InetAddress; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; @@ -41,6 +40,7 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.ValidationComplete; import org.apache.cassandra.streaming.PreviewKind; @@ -64,7 +64,7 @@ public class Validator implements Runnable private static final Logger logger = LoggerFactory.getLogger(Validator.class); public final RepairJobDesc desc; - public final InetAddress initiator; + public final InetAddressAndPort initiator; public final int nowInSec; private final boolean evenTreeDistribution; public final boolean isIncremental; @@ -81,17 +81,17 @@ public class Validator implements Runnable private final PreviewKind previewKind; - public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, PreviewKind previewKind) + public Validator(RepairJobDesc desc, InetAddressAndPort initiator, int nowInSec, PreviewKind previewKind) { this(desc, initiator, nowInSec, false, false, previewKind); } - public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, boolean isIncremental, PreviewKind previewKind) + public Validator(RepairJobDesc desc, InetAddressAndPort initiator, int nowInSec, boolean isIncremental, PreviewKind previewKind) { this(desc, initiator, nowInSec, false, isIncremental, previewKind); } - public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, boolean evenTreeDistribution, boolean isIncremental, PreviewKind previewKind) + public Validator(RepairJobDesc desc, InetAddressAndPort initiator, int nowInSec, boolean evenTreeDistribution, boolean isIncremental, PreviewKind previewKind) { this.desc = desc; this.initiator = initiator; @@ -352,7 +352,7 @@ public class Validator implements Runnable public void run() { // respond to the request that triggered this validation - if (!initiator.equals(FBUtilities.getBroadcastAddress())) + if (!initiator.equals(FBUtilities.getBroadcastAddressAndPort())) { logger.info("{} Sending completed merkle tree to {} for {}.{}", previewKind.logPrefix(desc.sessionId), initiator, desc.keyspace, desc.columnFamily); Tracing.traceRepair("Sending completed merkle tree to {} for {}.{}", initiator, desc.keyspace, desc.columnFamily); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java b/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java index eb99977..c9b7ed7 100644 --- a/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java +++ b/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java @@ -18,7 +18,6 @@ package org.apache.cassandra.repair.asymmetric; -import java.net.InetAddress; import java.util.List; import java.util.Map; import java.util.Set; @@ -28,6 +27,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.TreeResponse; import org.apache.cassandra.utils.MerkleTrees; @@ -36,11 +36,11 @@ import org.apache.cassandra.utils.MerkleTrees; */ public class DifferenceHolder { - private final ImmutableMap<InetAddress, HostDifferences> differences; + private final ImmutableMap<InetAddressAndPort, HostDifferences> differences; public DifferenceHolder(List<TreeResponse> trees) { - ImmutableMap.Builder<InetAddress, HostDifferences> diffBuilder = ImmutableMap.builder(); + ImmutableMap.Builder<InetAddressAndPort, HostDifferences> diffBuilder = ImmutableMap.builder(); for (int i = 0; i < trees.size() - 1; ++i) { TreeResponse r1 = trees.get(i); @@ -58,9 +58,9 @@ public class DifferenceHolder } @VisibleForTesting - DifferenceHolder(Map<InetAddress, HostDifferences> differences) + DifferenceHolder(Map<InetAddressAndPort, HostDifferences> differences) { - ImmutableMap.Builder<InetAddress, HostDifferences> diffBuilder = ImmutableMap.builder(); + ImmutableMap.Builder<InetAddressAndPort, HostDifferences> diffBuilder = ImmutableMap.builder(); diffBuilder.putAll(differences); this.differences = diffBuilder.build(); } @@ -68,12 +68,12 @@ public class DifferenceHolder /** * differences only holds one 'side' of the difference - if A and B mismatch, only A will be a key in the map */ - public Set<InetAddress> keyHosts() + public Set<InetAddressAndPort> keyHosts() { return differences.keySet(); } - public HostDifferences get(InetAddress hostWithDifference) + public HostDifferences get(InetAddressAndPort hostWithDifference) { return differences.get(hostWithDifference); } @@ -85,7 +85,7 @@ public class DifferenceHolder '}'; } - public boolean hasDifferenceBetween(InetAddress node1, InetAddress node2, Range<Token> range) + public boolean hasDifferenceBetween(InetAddressAndPort node1, InetAddressAndPort node2, Range<Token> range) { HostDifferences diffsNode1 = differences.get(node1); if (diffsNode1 != null && diffsNode1.hasDifferencesFor(node2, range)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java b/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java index 6cbe987..ab294b9 100644 --- a/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java +++ b/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java @@ -18,7 +18,6 @@ package org.apache.cassandra.repair.asymmetric; -import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -28,23 +27,24 @@ import java.util.Set; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; /** * Tracks the differences for a single host */ public class HostDifferences { - private final Map<InetAddress, List<Range<Token>>> perHostDifferences = new HashMap<>(); + private final Map<InetAddressAndPort, List<Range<Token>>> perHostDifferences = new HashMap<>(); /** * Adds a set of differences between the node this instance is tracking and endpoint */ - public void add(InetAddress endpoint, List<Range<Token>> difference) + public void add(InetAddressAndPort endpoint, List<Range<Token>> difference) { perHostDifferences.put(endpoint, difference); } - public void addSingleRange(InetAddress remoteNode, Range<Token> rangeToFetch) + public void addSingleRange(InetAddressAndPort remoteNode, Range<Token> rangeToFetch) { perHostDifferences.computeIfAbsent(remoteNode, (x) -> new ArrayList<>()).add(rangeToFetch); } @@ -52,7 +52,7 @@ public class HostDifferences /** * Does this instance have differences for range with node2? */ - public boolean hasDifferencesFor(InetAddress node2, Range<Token> range) + public boolean hasDifferencesFor(InetAddressAndPort node2, Range<Token> range) { List<Range<Token>> differences = get(node2); for (Range<Token> diff : differences) @@ -64,12 +64,12 @@ public class HostDifferences return false; } - public Set<InetAddress> hosts() + public Set<InetAddressAndPort> hosts() { return perHostDifferences.keySet(); } - public List<Range<Token>> get(InetAddress differingHost) + public List<Range<Token>> get(InetAddressAndPort differingHost) { return perHostDifferences.getOrDefault(differingHost, Collections.emptyList()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java b/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java index b41ddd8..450336f 100644 --- a/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java +++ b/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java @@ -18,7 +18,6 @@ package org.apache.cassandra.repair.asymmetric; -import java.net.InetAddress; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -29,6 +28,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; /** * Tracks incoming streams for a single host @@ -60,7 +60,7 @@ public class IncomingRepairStreamTracker * @param range the range we need to stream from streamFromNode * @param streamFromNode the node we should stream from */ - public void addIncomingRangeFrom(Range<Token> range, InetAddress streamFromNode) + public void addIncomingRangeFrom(Range<Token> range, InetAddressAndPort streamFromNode) { logger.trace("adding incoming range {} from {}", range, streamFromNode); Set<Range<Token>> newInput = RangeDenormalizer.denormalize(range, incoming); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java b/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java index 90788dc..e8ca85d 100644 --- a/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java +++ b/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java @@ -18,10 +18,11 @@ package org.apache.cassandra.repair.asymmetric; -import java.net.InetAddress; import java.util.Set; +import org.apache.cassandra.locator.InetAddressAndPort; + public interface PreferedNodeFilter { - public Set<InetAddress> apply(InetAddress streamingNode, Set<InetAddress> toStream); + public Set<InetAddressAndPort> apply(InetAddressAndPort streamingNode, Set<InetAddressAndPort> toStream); } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
