http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java b/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java index 4d9829f..93a1c20 100644 --- a/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java +++ b/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java @@ -23,6 +23,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayList; +import com.google.common.net.InetAddresses; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -39,6 +40,7 @@ import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.compression.Lz4FrameDecoder; import io.netty.handler.codec.compression.Lz4FrameEncoder; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.async.HandshakeProtocol.FirstHandshakeMessage; @@ -49,7 +51,7 @@ import static org.apache.cassandra.net.async.NettyFactory.Mode.MESSAGING; public class InboundHandshakeHandlerTest { - private static final InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 0); + private static final InetAddressAndPort addr = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0); private static final int MESSAGING_VERSION = MessagingService.current_version; private static final int VERSION_30 = MessagingService.VERSION_30; @@ -84,7 +86,7 @@ public class InboundHandshakeHandlerTest { handler = new InboundHandshakeHandler(new TestAuthenticator(true)); channel = new EmbeddedChannel(handler); - boolean result = handler.handleAuthenticate(addr, channel.pipeline().firstContext()); + boolean result = handler.handleAuthenticate(new InetSocketAddress(addr.address, addr.port), channel.pipeline().firstContext()); Assert.assertTrue(result); Assert.assertTrue(channel.isOpen()); } @@ -92,7 +94,7 @@ public class InboundHandshakeHandlerTest @Test public void handleAuthenticate_Bad() { - boolean result = handler.handleAuthenticate(addr, channel.pipeline().firstContext()); + boolean result = handler.handleAuthenticate(new InetSocketAddress(addr.address, addr.port), channel.pipeline().firstContext()); Assert.assertFalse(result); Assert.assertFalse(channel.isOpen()); Assert.assertFalse(channel.isActive()); @@ -178,7 +180,7 @@ public class InboundHandshakeHandlerTest if (buf.refCnt() > 0) buf.release(); - buf = new ThirdHandshakeMessage(MESSAGING_VERSION, addr.getAddress()).encode(PooledByteBufAllocator.DEFAULT); + buf = new ThirdHandshakeMessage(MESSAGING_VERSION, addr).encode(PooledByteBufAllocator.DEFAULT); state = handler.handleMessagingStartResponse(channel.pipeline().firstContext(), buf); Assert.assertEquals(State.HANDSHAKE_COMPLETE, state); @@ -203,7 +205,7 @@ public class InboundHandshakeHandlerTest { buf = Unpooled.buffer(32, 32); buf.writeInt(MESSAGING_VERSION + 1); - CompactEndpointSerializationHelper.serialize(addr.getAddress(), new ByteBufOutputStream(buf)); + CompactEndpointSerializationHelper.instance.serialize(addr, new ByteBufDataOutputPlus(buf), MESSAGING_VERSION + 1); State state = handler.handleMessagingStartResponse(channel.pipeline().firstContext(), buf); Assert.assertEquals(State.HANDSHAKE_FAIL, state); Assert.assertFalse(channel.isOpen()); @@ -215,7 +217,7 @@ public class InboundHandshakeHandlerTest { buf = Unpooled.buffer(32, 32); buf.writeInt(MESSAGING_VERSION); - CompactEndpointSerializationHelper.serialize(addr.getAddress(), new ByteBufOutputStream(buf)); + CompactEndpointSerializationHelper.instance.serialize(addr, new ByteBufDataOutputPlus(buf), MESSAGING_VERSION); State state = handler.handleMessagingStartResponse(channel.pipeline().firstContext(), buf); Assert.assertEquals(State.HANDSHAKE_COMPLETE, state); Assert.assertTrue(channel.isOpen()); @@ -228,7 +230,7 @@ public class InboundHandshakeHandlerTest ChannelPipeline pipeline = channel.pipeline(); Assert.assertNotNull(pipeline.get(InboundHandshakeHandler.class)); - handler.setupMessagingPipeline(pipeline, addr.getAddress(), false, MESSAGING_VERSION); + handler.setupMessagingPipeline(pipeline, addr, false, MESSAGING_VERSION); Assert.assertNotNull(pipeline.get(MessageInHandler.class)); Assert.assertNull(pipeline.get(Lz4FrameDecoder.class)); Assert.assertNull(pipeline.get(Lz4FrameEncoder.class)); @@ -241,7 +243,7 @@ public class InboundHandshakeHandlerTest ChannelPipeline pipeline = channel.pipeline(); Assert.assertNotNull(pipeline.get(InboundHandshakeHandler.class)); - handler.setupMessagingPipeline(pipeline, addr.getAddress(), true, MESSAGING_VERSION); + handler.setupMessagingPipeline(pipeline, addr, true, MESSAGING_VERSION); Assert.assertNotNull(pipeline.get(MessageInHandler.class)); Assert.assertNotNull(pipeline.get(Lz4FrameDecoder.class)); Assert.assertNull(pipeline.get(Lz4FrameEncoder.class));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/MessageInHandlerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/async/MessageInHandlerTest.java b/test/unit/org/apache/cassandra/net/async/MessageInHandlerTest.java index bb82d2c..43cb964 100644 --- a/test/unit/org/apache/cassandra/net/async/MessageInHandlerTest.java +++ b/test/unit/org/apache/cassandra/net/async/MessageInHandlerTest.java @@ -20,40 +20,36 @@ package org.apache.cassandra.net.async; import java.io.EOFException; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.function.BiConsumer; -import com.google.common.base.Charsets; +import com.google.common.net.InetAddresses; import org.junit.After; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufOutputStream; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; import io.netty.channel.embedded.EmbeddedChannel; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.util.concurrent.Future; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.ParameterType; import org.apache.cassandra.net.async.MessageInHandler.MessageHeader; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis; +import org.apache.cassandra.utils.UUIDGen; public class MessageInHandlerTest { - private static final InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 0); + private static final InetAddressAndPort addr = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0); private static final int MSG_VERSION = MessagingService.current_version; private static final int MSG_ID = 42; @@ -81,7 +77,7 @@ public class MessageInHandlerTest buf.writeInt(-1); buf.writerIndex(len); - MessageInHandler handler = new MessageInHandler(addr.getAddress(), MSG_VERSION, null); + MessageInHandler handler = new MessageInHandler(addr, MSG_VERSION, null); EmbeddedChannel channel = new EmbeddedChannel(handler); Assert.assertTrue(channel.isOpen()); channel.writeInbound(buf); @@ -98,22 +94,25 @@ public class MessageInHandlerTest @Test public void decode_HappyPath_WithParameters() throws Exception { - Map<String, byte[]> parameters = new HashMap<>(); - parameters.put("p1", "val1".getBytes(Charsets.UTF_8)); - parameters.put("p2", "val2".getBytes(Charsets.UTF_8)); + UUID uuid = UUIDGen.getTimeUUID(); + Map<ParameterType, Object> parameters = new HashMap<>(); + parameters.put(ParameterType.FAILURE_REASON, (short)42); + parameters.put(ParameterType.TRACE_SESSION, uuid); MessageInWrapper result = decode_HappyPath(parameters); Assert.assertEquals(2, result.messageIn.parameters.size()); + Assert.assertEquals((short)42, result.messageIn.parameters.get(ParameterType.FAILURE_REASON)); + Assert.assertEquals(uuid, result.messageIn.parameters.get(ParameterType.TRACE_SESSION)); } - private MessageInWrapper decode_HappyPath(Map<String, byte[]> parameters) throws Exception + private MessageInWrapper decode_HappyPath(Map<ParameterType, Object> parameters) throws Exception { MessageOut msgOut = new MessageOut(MessagingService.Verb.ECHO); - for (Map.Entry<String, byte[]> param : parameters.entrySet()) + for (Map.Entry<ParameterType, Object> param : parameters.entrySet()) msgOut = msgOut.withParameter(param.getKey(), param.getValue()); serialize(msgOut); MessageInWrapper wrapper = new MessageInWrapper(); - MessageInHandler handler = new MessageInHandler(addr.getAddress(), MSG_VERSION, wrapper.messageConsumer); + MessageInHandler handler = new MessageInHandler(addr, MSG_VERSION, wrapper.messageConsumer); List<Object> out = new ArrayList<>(); handler.decode(null, buf, out); @@ -140,7 +139,7 @@ public class MessageInHandlerTest public void decode_WithHalfReceivedParameters() throws Exception { MessageOut msgOut = new MessageOut(MessagingService.Verb.ECHO); - msgOut = msgOut.withParameter("p3", "val1".getBytes(Charsets.UTF_8)); + msgOut = msgOut.withParameter(ParameterType.FAILURE_REASON, (short)42); serialize(msgOut); @@ -150,7 +149,7 @@ public class MessageInHandlerTest buf.writerIndex(originalWriterIndex - 6); MessageInWrapper wrapper = new MessageInWrapper(); - MessageInHandler handler = new MessageInHandler(addr.getAddress(), MSG_VERSION, wrapper.messageConsumer); + MessageInHandler handler = new MessageInHandler(addr, MSG_VERSION, wrapper.messageConsumer); List<Object> out = new ArrayList<>(); handler.decode(null, buf, out); @@ -221,7 +220,7 @@ public class MessageInHandlerTest @Test public void exceptionHandled() { - MessageInHandler handler = new MessageInHandler(addr.getAddress(), MSG_VERSION, null); + MessageInHandler handler = new MessageInHandler(addr, MSG_VERSION, null); EmbeddedChannel channel = new EmbeddedChannel(handler); Assert.assertTrue(channel.isOpen()); handler.exceptionCaught(channel.pipeline().firstContext(), new EOFException()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/MessageOutHandlerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/async/MessageOutHandlerTest.java b/test/unit/org/apache/cassandra/net/async/MessageOutHandlerTest.java index 566dfdb..86112ae 100644 --- a/test/unit/org/apache/cassandra/net/async/MessageOutHandlerTest.java +++ b/test/unit/org/apache/cassandra/net/async/MessageOutHandlerTest.java @@ -44,8 +44,10 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.ParameterType; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.UUIDGen; @@ -65,15 +67,15 @@ public class MessageOutHandlerTest } @Before - public void setup() + public void setup() throws Exception { setup(MessageOutHandler.AUTO_FLUSH_THRESHOLD); } - private void setup(int flushThreshold) + private void setup(int flushThreshold) throws Exception { - OutboundConnectionIdentifier connectionId = OutboundConnectionIdentifier.small(new InetSocketAddress("127.0.0.1", 0), - new InetSocketAddress("127.0.0.2", 0)); + OutboundConnectionIdentifier connectionId = OutboundConnectionIdentifier.small(InetAddressAndPort.getByNameOverrideDefaults("127.0.0.1", 0), + InetAddressAndPort.getByNameOverrideDefaults("127.0.0.2", 0)); OutboundMessagingConnection omc = new NonSendingOutboundMessagingConnection(connectionId, null, Optional.empty()); channel = new EmbeddedChannel(); channelWriter = ChannelWriter.create(channel, omc::handleMessageResult, Optional.empty()); @@ -91,7 +93,7 @@ public class MessageOutHandlerTest } @Test - public void write_WithFlush() throws ExecutionException, InterruptedException, TimeoutException + public void write_WithFlush() throws Exception { setup(1); MessageOut message = new MessageOut(MessagingService.Verb.ECHO); @@ -217,7 +219,7 @@ public class MessageOutHandlerTest public void captureTracingInfo_ForceException() { MessageOut message = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE) - .withParameter(Tracing.TRACE_HEADER, new byte[9]); + .withParameter(ParameterType.TRACE_SESSION, new byte[9]); handler.captureTracingInfo(new QueuedMessage(message, 42)); } @@ -226,7 +228,7 @@ public class MessageOutHandlerTest { UUID uuid = UUID.randomUUID(); MessageOut message = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE) - .withParameter(Tracing.TRACE_HEADER, UUIDGen.decompose(uuid)); + .withParameter(ParameterType.TRACE_SESSION, uuid); handler.captureTracingInfo(new QueuedMessage(message, 42)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java b/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java index 0550490..4607d5c 100644 --- a/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java +++ b/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.net.async; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Optional; @@ -47,6 +46,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.async.NettyFactory.InboundInitializer; import org.apache.cassandra.net.async.NettyFactory.OutboundInitializer; @@ -56,8 +56,8 @@ import org.apache.cassandra.utils.NativeLibrary; public class NettyFactoryTest { - private static final InetSocketAddress LOCAL_ADDR = new InetSocketAddress("127.0.0.1", 9876); - private static final InetSocketAddress REMOTE_ADDR = new InetSocketAddress("127.0.0.2", 9876); + private static final InetAddressAndPort LOCAL_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 9876); + private static final InetAddressAndPort REMOTE_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 9876); private static final int receiveBufferSize = 1 << 16; private static final IInternodeAuthenticator AUTHENTICATOR = new AllowAllInternodeAuthenticator(); private static final boolean EPOLL_AVAILABLE = NativeTransportService.useEpoll(); @@ -129,7 +129,6 @@ public class NettyFactoryTest Channel inboundChannel = null; try { - InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 9876); InboundInitializer inboundInitializer = new InboundInitializer(AUTHENTICATOR, null, channelGroup); inboundChannel = NettyFactory.instance.createInboundChannel(LOCAL_ADDR, inboundInitializer, receiveBufferSize); NettyFactory.instance.createInboundChannel(LOCAL_ADDR, inboundInitializer, receiveBufferSize); @@ -144,7 +143,7 @@ public class NettyFactoryTest @Test(expected = ConfigurationException.class) public void createServerChannel_UnbindableAddress() { - InetSocketAddress addr = new InetSocketAddress("1.1.1.1", 9876); + InetAddressAndPort addr = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("1.1.1.1"), 9876); InboundInitializer inboundInitializer = new InboundInitializer(AUTHENTICATOR, null, channelGroup); NettyFactory.instance.createInboundChannel(addr, inboundInitializer, receiveBufferSize); } @@ -162,10 +161,10 @@ public class NettyFactoryTest Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(serverEncryptionOptions)); serverEncryptionOptions.enable_legacy_ssl_storage_port = false; - InetAddress originalBroadcastAddr = FBUtilities.getBroadcastAddress(); + InetAddressAndPort originalBroadcastAddr = FBUtilities.getBroadcastAddressAndPort(); try { - FBUtilities.setBroadcastInetAddress(InetAddresses.increment(FBUtilities.getLocalAddress())); + FBUtilities.setBroadcastInetAddress(InetAddresses.increment(FBUtilities.getLocalAddressAndPort().address)); DatabaseDescriptor.setListenOnBroadcastAddress(true); serverEncryptionOptions.enabled = false; @@ -178,7 +177,7 @@ public class NettyFactoryTest } finally { - FBUtilities.setBroadcastInetAddress(originalBroadcastAddr); + FBUtilities.setBroadcastInetAddress(originalBroadcastAddr.address); DatabaseDescriptor.setListenOnBroadcastAddress(false); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/OutboundHandshakeHandlerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/async/OutboundHandshakeHandlerTest.java b/test/unit/org/apache/cassandra/net/async/OutboundHandshakeHandlerTest.java index f8bfab1..be71fd4 100644 --- a/test/unit/org/apache/cassandra/net/async/OutboundHandshakeHandlerTest.java +++ b/test/unit/org/apache/cassandra/net/async/OutboundHandshakeHandlerTest.java @@ -23,6 +23,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Optional; +import com.google.common.net.InetAddresses; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -38,6 +39,7 @@ import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.compression.Lz4FrameDecoder; import io.netty.handler.codec.compression.Lz4FrameEncoder; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.async.HandshakeProtocol.SecondHandshakeMessage; import org.apache.cassandra.net.async.OutboundHandshakeHandler.HandshakeResult; @@ -47,8 +49,8 @@ import static org.apache.cassandra.net.async.OutboundHandshakeHandler.HandshakeR public class OutboundHandshakeHandlerTest { private static final int MESSAGING_VERSION = MessagingService.current_version; - private static final InetSocketAddress localAddr = new InetSocketAddress("127.0.0.1", 0); - private static final InetSocketAddress remoteAddr = new InetSocketAddress("127.0.0.2", 0); + private static final InetAddressAndPort localAddr = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0); + private static final InetAddressAndPort remoteAddr = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 0); private static final String HANDLER_NAME = "clientHandshakeHandler"; private EmbeddedChannel channel; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java index 641c28c..bf6e066 100644 --- a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java +++ b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.SSLHandshakeException; +import com.google.common.net.InetAddresses; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -45,6 +46,7 @@ import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.locator.AbstractEndpointSnitch; import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.MessagingServiceTest; @@ -59,9 +61,9 @@ import static org.apache.cassandra.net.async.OutboundMessagingConnection.State.R public class OutboundMessagingConnectionTest { - private static final InetSocketAddress LOCAL_ADDR = new InetSocketAddress("127.0.0.1", 9998); - private static final InetSocketAddress REMOTE_ADDR = new InetSocketAddress("127.0.0.2", 9999); - private static final InetSocketAddress RECONNECT_ADDR = new InetSocketAddress("127.0.0.3", 9999); + private static final InetAddressAndPort LOCAL_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 9998); + private static final InetAddressAndPort REMOTE_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 9999); + private static final InetAddressAndPort RECONNECT_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.3"), 9999); private static final int MESSAGING_VERSION = MessagingService.current_version; private OutboundConnectionIdentifier connectionId; @@ -131,47 +133,47 @@ public class OutboundMessagingConnectionTest public void shouldCompressConnection_None() { DatabaseDescriptor.setInternodeCompression(Config.InternodeCompression.none); - Assert.assertFalse(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR.getAddress(), REMOTE_ADDR.getAddress())); + Assert.assertFalse(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR, REMOTE_ADDR)); } @Test public void shouldCompressConnection_All() { DatabaseDescriptor.setInternodeCompression(Config.InternodeCompression.all); - Assert.assertTrue(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR.getAddress(), REMOTE_ADDR.getAddress())); + Assert.assertTrue(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR, REMOTE_ADDR)); } @Test public void shouldCompressConnection_SameDc() { TestSnitch snitch = new TestSnitch(); - snitch.add(LOCAL_ADDR.getAddress(), "dc1"); - snitch.add(REMOTE_ADDR.getAddress(), "dc1"); + snitch.add(LOCAL_ADDR, "dc1"); + snitch.add(REMOTE_ADDR, "dc1"); DatabaseDescriptor.setEndpointSnitch(snitch); DatabaseDescriptor.setInternodeCompression(Config.InternodeCompression.dc); - Assert.assertFalse(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR.getAddress(), REMOTE_ADDR.getAddress())); + Assert.assertFalse(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR, REMOTE_ADDR)); } private static class TestSnitch extends AbstractEndpointSnitch { - private Map<InetAddress, String> nodeToDc = new HashMap<>(); + private Map<InetAddressAndPort, String> nodeToDc = new HashMap<>(); - void add(InetAddress node, String dc) + void add(InetAddressAndPort node, String dc) { nodeToDc.put(node, dc); } - public String getRack(InetAddress endpoint) + public String getRack(InetAddressAndPort endpoint) { return null; } - public String getDatacenter(InetAddress endpoint) + public String getDatacenter(InetAddressAndPort endpoint) { return nodeToDc.get(endpoint); } - public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) + public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) { return 0; } @@ -181,11 +183,11 @@ public class OutboundMessagingConnectionTest public void shouldCompressConnection_DifferentDc() { TestSnitch snitch = new TestSnitch(); - snitch.add(LOCAL_ADDR.getAddress(), "dc1"); - snitch.add(REMOTE_ADDR.getAddress(), "dc2"); + snitch.add(LOCAL_ADDR, "dc1"); + snitch.add(REMOTE_ADDR, "dc2"); DatabaseDescriptor.setEndpointSnitch(snitch); DatabaseDescriptor.setInternodeCompression(Config.InternodeCompression.dc); - Assert.assertTrue(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR.getAddress(), REMOTE_ADDR.getAddress())); + Assert.assertTrue(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR, REMOTE_ADDR)); } @Test @@ -247,7 +249,7 @@ public class OutboundMessagingConnectionTest MessageOut messageOut = new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK); OutboundMessagingPool pool = new OutboundMessagingPool(REMOTE_ADDR, LOCAL_ADDR, null, - new MessagingServiceTest.MockBackPressureStrategy(null).newState(REMOTE_ADDR.getAddress()), auth); + new MessagingServiceTest.MockBackPressureStrategy(null).newState(REMOTE_ADDR), auth); omc = pool.getConnection(messageOut); Assert.assertSame(State.NOT_READY, omc.getState()); Assert.assertFalse(omc.connect()); @@ -371,7 +373,7 @@ public class OutboundMessagingConnectionTest Assert.assertFalse(channelWriter.isClosed()); Assert.assertEquals(channelWriter, omc.getChannelWriter()); Assert.assertEquals(READY, omc.getState()); - Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR.getAddress())); + Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR)); Assert.assertNull(omc.getConnectionTimeoutFuture()); Assert.assertTrue(connectionTimeoutFuture.isCancelled()); } @@ -391,7 +393,7 @@ public class OutboundMessagingConnectionTest Assert.assertTrue(channelWriter.isClosed()); Assert.assertNull(omc.getChannelWriter()); Assert.assertEquals(CLOSED, omc.getState()); - Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR.getAddress())); + Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR)); Assert.assertNull(omc.getConnectionTimeoutFuture()); Assert.assertTrue(connectionTimeoutFuture.isCancelled()); } @@ -408,7 +410,7 @@ public class OutboundMessagingConnectionTest omc.finishHandshake(result); Assert.assertNotNull(omc.getChannelWriter()); Assert.assertEquals(CREATING_CHANNEL, omc.getState()); - Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR.getAddress())); + Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR)); Assert.assertEquals(count, omc.backlogSize()); } @@ -423,7 +425,7 @@ public class OutboundMessagingConnectionTest HandshakeResult result = HandshakeResult.failed(); omc.finishHandshake(result); Assert.assertEquals(NOT_READY, omc.getState()); - Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR.getAddress())); + Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR)); Assert.assertEquals(0, omc.backlogSize()); } @@ -512,8 +514,8 @@ public class OutboundMessagingConnectionTest OutboundConnectionIdentifier connectionId = omc.getConnectionId(); omc.maybeUpdateConnectionId(); Assert.assertNotEquals(connectionId, omc.getConnectionId()); - Assert.assertEquals(new InetSocketAddress(REMOTE_ADDR.getAddress(), DatabaseDescriptor.getSSLStoragePort()), omc.getConnectionId().remoteAddress()); - Assert.assertEquals(new InetSocketAddress(REMOTE_ADDR.getAddress(), DatabaseDescriptor.getSSLStoragePort()), omc.getConnectionId().connectionAddress()); + Assert.assertEquals(InetAddressAndPort.getByAddressOverrideDefaults(REMOTE_ADDR.address, DatabaseDescriptor.getSSLStoragePort()), omc.getConnectionId().remote()); + Assert.assertEquals(InetAddressAndPort.getByAddressOverrideDefaults(REMOTE_ADDR.address, DatabaseDescriptor.getSSLStoragePort()), omc.getConnectionId().connectionAddress()); Assert.assertEquals(peerVersion, omc.getTargetVersion()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/OutboundMessagingPoolTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/async/OutboundMessagingPoolTest.java b/test/unit/org/apache/cassandra/net/async/OutboundMessagingPoolTest.java index 655cd15..ecd8697 100644 --- a/test/unit/org/apache/cassandra/net/async/OutboundMessagingPoolTest.java +++ b/test/unit/org/apache/cassandra/net/async/OutboundMessagingPoolTest.java @@ -18,10 +18,10 @@ package org.apache.cassandra.net.async; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import com.google.common.net.InetAddresses; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -35,6 +35,7 @@ import org.apache.cassandra.gms.GossipDigestSyn; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.BackPressureState; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; @@ -42,9 +43,9 @@ import org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionTyp public class OutboundMessagingPoolTest { - private static final InetSocketAddress LOCAL_ADDR = new InetSocketAddress("127.0.0.1", 9476); - private static final InetSocketAddress REMOTE_ADDR = new InetSocketAddress("127.0.0.2", 9476); - private static final InetSocketAddress RECONNECT_ADDR = new InetSocketAddress("127.0.0.3", 9476); + private static final InetAddressAndPort LOCAL_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 9476); + private static final InetAddressAndPort REMOTE_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 9476); + private static final InetAddressAndPort RECONNECT_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.3"), 9476); private static final List<ConnectionType> INTERNODE_MESSAGING_CONN_TYPES = new ArrayList<ConnectionType>() {{ add(ConnectionType.GOSSIP); add(ConnectionType.LARGE_MESSAGE); add(ConnectionType.SMALL_MESSAGE); }}; @@ -59,7 +60,7 @@ public class OutboundMessagingPoolTest @Before public void setup() { - BackPressureState backPressureState = DatabaseDescriptor.getBackPressureStrategy().newState(REMOTE_ADDR.getAddress()); + BackPressureState backPressureState = DatabaseDescriptor.getBackPressureStrategy().newState(REMOTE_ADDR); pool = new OutboundMessagingPool(REMOTE_ADDR, LOCAL_ADDR, null, backPressureState, new AllowAllInternodeAuthenticator()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java index 21c51c6..970e648 100644 --- a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java +++ b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.repair; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Set; import java.util.UUID; @@ -33,6 +32,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.utils.ByteBufferUtil; @@ -41,19 +41,19 @@ import org.apache.cassandra.utils.UUIDGen; @Ignore public abstract class AbstractRepairTest { - protected static final InetAddress COORDINATOR; - protected static final InetAddress PARTICIPANT1; - protected static final InetAddress PARTICIPANT2; - protected static final InetAddress PARTICIPANT3; + protected static final InetAddressAndPort COORDINATOR; + protected static final InetAddressAndPort PARTICIPANT1; + protected static final InetAddressAndPort PARTICIPANT2; + protected static final InetAddressAndPort PARTICIPANT3; static { try { - COORDINATOR = InetAddress.getByName("10.0.0.1"); - PARTICIPANT1 = InetAddress.getByName("10.0.0.1"); - PARTICIPANT2 = InetAddress.getByName("10.0.0.2"); - PARTICIPANT3 = InetAddress.getByName("10.0.0.3"); + COORDINATOR = InetAddressAndPort.getByName("10.0.0.1"); + PARTICIPANT1 = InetAddressAndPort.getByName("10.0.0.1"); + PARTICIPANT2 = InetAddressAndPort.getByName("10.0.0.2"); + PARTICIPANT3 = InetAddressAndPort.getByName("10.0.0.3"); } catch (UnknownHostException e) { @@ -64,7 +64,7 @@ public abstract class AbstractRepairTest DatabaseDescriptor.daemonInitialization(); } - protected static final Set<InetAddress> PARTICIPANTS = ImmutableSet.of(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3); + protected static final Set<InetAddressAndPort> PARTICIPANTS = ImmutableSet.of(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3); protected static Token t(int v) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java index 7f3dbff..95046bd 100644 --- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.repair; -import java.net.InetAddress; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -36,6 +35,7 @@ import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; @@ -78,8 +78,8 @@ public class LocalSyncTaskTest extends AbstractRepairTest @Test public void testNoDifference() throws Throwable { - final InetAddress ep1 = InetAddress.getByName("127.0.0.1"); - final InetAddress ep2 = InetAddress.getByName("127.0.0.1"); + final InetAddressAndPort ep1 = InetAddressAndPort.getByName("127.0.0.1"); + final InetAddressAndPort ep2 = InetAddressAndPort.getByName("127.0.0.1"); Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken()); RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range)); @@ -106,7 +106,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest Keyspace keyspace = Keyspace.open(KEYSPACE1); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1"); - ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddress(), + ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddressAndPort(), Arrays.asList(cfs), Arrays.asList(range), false, ActiveRepairService.UNREPAIRED_SSTABLE, false, PreviewKind.NONE); @@ -128,8 +128,8 @@ public class LocalSyncTaskTest extends AbstractRepairTest // difference the trees // note: we reuse the same endpoint which is bogus in theory but fine here - TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), tree1); - TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), tree2); + TreeResponse r1 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.1"), tree1); + TreeResponse r2 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.2"), tree2); LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false, PreviewKind.NONE); task.run(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java b/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java index db76f73..2044106 100644 --- a/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java @@ -28,6 +28,7 @@ import com.google.common.collect.Sets; import org.junit.Assert; import org.junit.Test; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.RepairRunnable.CommonRange; import static org.apache.cassandra.repair.RepairRunnable.filterCommonRanges; @@ -53,7 +54,7 @@ public class RepairRunnableTest extends AbstractRepairTest { CommonRange cr1 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2), Sets.newHashSet(RANGE1, RANGE2)); CommonRange cr2 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3), Sets.newHashSet(RANGE3)); - Set<InetAddress> liveEndpoints = Sets.newHashSet(PARTICIPANT2, PARTICIPANT3); // PARTICIPANT1 is excluded + Set<InetAddressAndPort> liveEndpoints = Sets.newHashSet(PARTICIPANT2, PARTICIPANT3); // PARTICIPANT1 is excluded List<CommonRange> initial = Lists.newArrayList(cr1, cr2); List<CommonRange> expected = Lists.newArrayList(new CommonRange(Sets.newHashSet(PARTICIPANT2), Sets.newHashSet(RANGE1, RANGE2)), http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/RepairSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java index 984218d..54f0511 100644 --- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java @@ -19,7 +19,6 @@ package org.apache.cassandra.repair; import java.io.IOException; -import java.net.InetAddress; import java.util.Arrays; import java.util.Set; import java.util.UUID; @@ -35,7 +34,7 @@ import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.Gossiper; -import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.UUIDGen; @@ -54,7 +53,7 @@ public class RepairSessionTest @Test public void testConviction() throws Exception { - InetAddress remote = InetAddress.getByName("127.0.0.2"); + InetAddressAndPort remote = InetAddressAndPort.getByName("127.0.0.2"); Gossiper.instance.initializeNodeUnsafe(remote, UUID.randomUUID(), 1); // Set up RepairSession @@ -62,7 +61,7 @@ public class RepairSessionTest UUID sessionId = UUID.randomUUID(); IPartitioner p = Murmur3Partitioner.instance; Range<Token> repairRange = new Range<>(p.getToken(ByteBufferUtil.bytes(0)), p.getToken(ByteBufferUtil.bytes(100))); - Set<InetAddress> endpoints = Sets.newHashSet(remote); + Set<InetAddressAndPort> endpoints = Sets.newHashSet(remote); RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange), "Keyspace1", RepairParallelism.SEQUENTIAL, endpoints, false, false, false, http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/ValidatorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java index b45edc1..322772a 100644 --- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java +++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.repair; -import java.net.InetAddress; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -34,6 +33,7 @@ import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.BufferDecoratedKey; import org.apache.cassandra.db.ColumnFamilyStore; @@ -95,7 +95,7 @@ public class ValidatorTest final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink(); - InetAddress remote = InetAddress.getByName("127.0.0.2"); + InetAddressAndPort remote = InetAddressAndPort.getByName("127.0.0.2"); ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily); @@ -134,7 +134,7 @@ public class ValidatorTest final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink(); - InetAddress remote = InetAddress.getByName("127.0.0.2"); + InetAddressAndPort remote = InetAddressAndPort.getByName("127.0.0.2"); Validator validator = new Validator(desc, remote, 0, PreviewKind.NONE); validator.fail(); @@ -189,12 +189,12 @@ public class ValidatorTest cfs.getTableName(), Collections.singletonList(new Range<>(sstable.first.getToken(), sstable.last.getToken()))); - ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(), + ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddressAndPort(), Collections.singletonList(cfs), desc.ranges, false, ActiveRepairService.UNREPAIRED_SSTABLE, false, PreviewKind.NONE); final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink(); - Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true, false, PreviewKind.NONE); + Validator validator = new Validator(desc, FBUtilities.getBroadcastAddressAndPort(), 0, true, false, PreviewKind.NONE); CompactionManager.instance.submitValidation(cfs, validator); MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS); @@ -218,7 +218,7 @@ public class ValidatorTest final CompletableFuture<MessageOut> future = new CompletableFuture<>(); MessagingService.instance().addMessageSink(new IMessageSink() { - public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to) + public boolean allowOutgoingMessage(MessageOut message, int id, InetAddressAndPort to) { future.complete(message); return false; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java b/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java index 52a43e6..9693010 100644 --- a/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java +++ b/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.repair.asymmetric; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Iterator; @@ -30,6 +29,7 @@ import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.TreeResponse; import org.apache.cassandra.utils.MerkleTree; import org.apache.cassandra.utils.MerkleTrees; @@ -43,8 +43,8 @@ public class DifferenceHolderTest @Test public void testFromEmptyMerkleTrees() throws UnknownHostException { - InetAddress a1 = InetAddress.getByName("127.0.0.1"); - InetAddress a2 = InetAddress.getByName("127.0.0.2"); + InetAddressAndPort a1 = InetAddressAndPort.getByName("127.0.0.1"); + InetAddressAndPort a2 = InetAddressAndPort.getByName("127.0.0.2"); MerkleTrees mt1 = new MerkleTrees(Murmur3Partitioner.instance); MerkleTrees mt2 = new MerkleTrees(Murmur3Partitioner.instance); @@ -64,8 +64,8 @@ public class DifferenceHolderTest IPartitioner partitioner = Murmur3Partitioner.instance; Range<Token> fullRange = new Range<>(partitioner.getMinimumToken(), partitioner.getMinimumToken()); int maxsize = 16; - InetAddress a1 = InetAddress.getByName("127.0.0.1"); - InetAddress a2 = InetAddress.getByName("127.0.0.2"); + InetAddressAndPort a1 = InetAddressAndPort.getByName("127.0.0.1"); + InetAddressAndPort a2 = InetAddressAndPort.getByName("127.0.0.2"); // merkle tree building stolen from MerkleTreesTest: MerkleTrees mt1 = new MerkleTrees(partitioner); MerkleTrees mt2 = new MerkleTrees(partitioner); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/asymmetric/ReduceHelperTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/asymmetric/ReduceHelperTest.java b/test/unit/org/apache/cassandra/repair/asymmetric/ReduceHelperTest.java index 19c42fb..6c64b1a 100644 --- a/test/unit/org/apache/cassandra/repair/asymmetric/ReduceHelperTest.java +++ b/test/unit/org/apache/cassandra/repair/asymmetric/ReduceHelperTest.java @@ -19,11 +19,9 @@ package org.apache.cassandra.repair.asymmetric; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -33,13 +31,13 @@ import java.util.Map; import java.util.Set; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import org.junit.Test; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; import static junit.framework.TestCase.fail; import static org.junit.Assert.assertEquals; @@ -47,24 +45,24 @@ import static org.junit.Assert.assertTrue; public class ReduceHelperTest { - private static final InetAddress[] addresses; - private static final InetAddress A; - private static final InetAddress B; - private static final InetAddress C; - private static final InetAddress D; - private static final InetAddress E; + private static final InetAddressAndPort[] addresses; + private static final InetAddressAndPort A; + private static final InetAddressAndPort B; + private static final InetAddressAndPort C; + private static final InetAddressAndPort D; + private static final InetAddressAndPort E; static { try { - A = InetAddress.getByName("127.0.0.0"); - B = InetAddress.getByName("127.0.0.1"); - C = InetAddress.getByName("127.0.0.2"); - D = InetAddress.getByName("127.0.0.3"); - E = InetAddress.getByName("127.0.0.4"); + A = InetAddressAndPort.getByName("127.0.0.0"); + B = InetAddressAndPort.getByName("127.0.0.1"); + C = InetAddressAndPort.getByName("127.0.0.2"); + D = InetAddressAndPort.getByName("127.0.0.3"); + E = InetAddressAndPort.getByName("127.0.0.4"); // for diff creation in loops: - addresses = new InetAddress[]{ A, B, C, D, E }; + addresses = new InetAddressAndPort[]{ A, B, C, D, E }; } catch (UnknownHostException e) { @@ -89,7 +87,7 @@ public class ReduceHelperTest C x x D = */ - Map<InetAddress, HostDifferences> differences = new HashMap<>(); + Map<InetAddressAndPort, HostDifferences> differences = new HashMap<>(); for (int i = 0; i < 4; i++) { HostDifferences hostDiffs = new HostDifferences(); @@ -105,7 +103,7 @@ public class ReduceHelperTest } DifferenceHolder differenceHolder = new DifferenceHolder(differences); - Map<InetAddress, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder); + Map<InetAddressAndPort, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder); assertEquals(set(set(C), set(E,D)), streams(tracker.get(A))); assertEquals(set(set(C), set(E,D)), streams(tracker.get(B))); @@ -113,7 +111,7 @@ public class ReduceHelperTest assertEquals(set(set(A,B), set(C)), streams(tracker.get(D))); assertEquals(set(set(A,B), set(C)), streams(tracker.get(E))); - ImmutableMap<InetAddress, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x,y) -> y); + ImmutableMap<InetAddressAndPort, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x, y) -> y); HostDifferences n0 = reduced.get(A); assertEquals(0, n0.get(A).size()); @@ -163,7 +161,7 @@ public class ReduceHelperTest C x x D = */ - Map<InetAddress, HostDifferences> differences = new HashMap<>(); + Map<InetAddressAndPort, HostDifferences> differences = new HashMap<>(); for (int i = 0; i < 4; i++) { HostDifferences hostDifferences = new HostDifferences(); @@ -179,7 +177,7 @@ public class ReduceHelperTest } DifferenceHolder differenceHolder = new DifferenceHolder(differences); - Map<InetAddress, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder); + Map<InetAddressAndPort, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder); assertEquals(set(set(C), set(E, D)), streams(tracker.get(A))); assertEquals(set(set(C), set(E, D)), streams(tracker.get(B))); assertEquals(set(set(A, B), set(E, D)), streams(tracker.get(C))); @@ -187,7 +185,7 @@ public class ReduceHelperTest assertEquals(set(set(A, B), set(C)), streams(tracker.get(E))); // if there is an option, never stream from node 1: - ImmutableMap<InetAddress, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x,y) -> Sets.difference(y, set(B))); + ImmutableMap<InetAddressAndPort, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x,y) -> Sets.difference(y, set(B))); HostDifferences n0 = reduced.get(A); assertEquals(0, n0.get(A).size()); @@ -223,7 +221,7 @@ public class ReduceHelperTest assertEquals(0, n4.get(E).size()); } - private Iterable<Set<InetAddress>> streams(IncomingRepairStreamTracker incomingRepairStreamTracker) + private Iterable<Set<InetAddressAndPort>> streams(IncomingRepairStreamTracker incomingRepairStreamTracker) { return incomingRepairStreamTracker.getIncoming().values().iterator().next().allStreams(); } @@ -248,12 +246,12 @@ public class ReduceHelperTest B streams (0, 50] from {C}, (50, 100] from {A, C} C streams (0, 50] from {A, B}, (50, 100] from B */ - Map<InetAddress, HostDifferences> differences = new HashMap<>(); + Map<InetAddressAndPort, HostDifferences> differences = new HashMap<>(); addDifference(A, differences, B, list(range(50, 100))); addDifference(A, differences, C, list(range(0, 50))); addDifference(B, differences, C, list(range(0, 100))); DifferenceHolder differenceHolder = new DifferenceHolder(differences); - Map<InetAddress, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder); + Map<InetAddressAndPort, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder); assertEquals(set(set(C)), tracker.get(A).getIncoming().get(range(0, 50)).allStreams()); assertEquals(set(set(B)), tracker.get(A).getIncoming().get(range(50, 100)).allStreams()); assertEquals(set(set(C)), tracker.get(B).getIncoming().get(range(0, 50)).allStreams()); @@ -261,7 +259,7 @@ public class ReduceHelperTest assertEquals(set(set(A,B)), tracker.get(C).getIncoming().get(range(0, 50)).allStreams()); assertEquals(set(set(B)), tracker.get(C).getIncoming().get(range(50, 100)).allStreams()); - ImmutableMap<InetAddress, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x, y) -> y); + ImmutableMap<InetAddressAndPort, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x, y) -> y); HostDifferences n0 = reduced.get(A); @@ -270,7 +268,7 @@ public class ReduceHelperTest HostDifferences n1 = reduced.get(B); assertEquals(0, n1.get(B).size()); - if (n1.get(A) != null) + if (!n1.get(A).isEmpty()) { assertTrue(n1.get(C).equals(list(range(0, 50)))); assertTrue(n1.get(A).equals(list(range(50, 100)))); @@ -281,7 +279,7 @@ public class ReduceHelperTest } HostDifferences n2 = reduced.get(C); assertEquals(0, n2.get(C).size()); - if (n2.get(A) != null) + if (!n2.get(A).isEmpty()) { assertTrue(n2.get(A).equals(list(range(0,50)))); assertTrue(n2.get(B).equals(list(range(50, 100)))); @@ -312,13 +310,13 @@ public class ReduceHelperTest B == C on (5, 10], (40, 45] */ - Map<InetAddress, HostDifferences> differences = new HashMap<>(); + Map<InetAddressAndPort, HostDifferences> differences = new HashMap<>(); addDifference(A, differences, B, list(range(5, 45))); addDifference(A, differences, C, list(range(0, 10), range(40,50))); addDifference(B, differences, C, list(range(0, 5), range(10,40), range(45,50))); DifferenceHolder differenceHolder = new DifferenceHolder(differences); - Map<InetAddress, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder); + Map<InetAddressAndPort, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder); Map<Range<Token>, StreamFromOptions> ranges = tracker.get(A).getIncoming(); assertEquals(5, ranges.size()); @@ -344,21 +342,21 @@ public class ReduceHelperTest assertEquals(set(set(B)), ranges.get(range(10, 40)).allStreams()); assertEquals(set(set(A)), ranges.get(range(40, 45)).allStreams()); assertEquals(set(set(A,B)), ranges.get(range(45, 50)).allStreams()); - ImmutableMap<InetAddress, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x, y) -> y); + ImmutableMap<InetAddressAndPort, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x, y) -> y); assertNoOverlap(A, reduced.get(A), list(range(0, 50))); assertNoOverlap(B, reduced.get(B), list(range(0, 50))); assertNoOverlap(C, reduced.get(C), list(range(0, 50))); } - private void assertNoOverlap(InetAddress incomingNode, HostDifferences node, List<Range<Token>> expectedAfterNormalize) + private void assertNoOverlap(InetAddressAndPort incomingNode, HostDifferences node, List<Range<Token>> expectedAfterNormalize) { Set<Range<Token>> allRanges = new HashSet<>(); - Set<InetAddress> remoteNodes = Sets.newHashSet(A,B,C); + Set<InetAddressAndPort> remoteNodes = Sets.newHashSet(A,B,C); remoteNodes.remove(incomingNode); - Iterator<InetAddress> iter = remoteNodes.iterator(); + Iterator<InetAddressAndPort> iter = remoteNodes.iterator(); allRanges.addAll(node.get(iter.next())); - InetAddress i = iter.next(); + InetAddressAndPort i = iter.next(); for (Range<Token> r : node.get(i)) { for (Range<Token> existing : allRanges) @@ -379,14 +377,14 @@ public class ReduceHelperTest return ranges; } - private static Set<InetAddress> set(InetAddress ... elem) + private static Set<InetAddressAndPort> set(InetAddressAndPort ... elem) { return Sets.newHashSet(elem); } @SafeVarargs - private static Set<Set<InetAddress>> set(Set<InetAddress> ... elem) + private static Set<Set<InetAddressAndPort>> set(Set<InetAddressAndPort> ... elem) { - Set<Set<InetAddress>> ret = Sets.newHashSet(); + Set<Set<InetAddressAndPort>> ret = Sets.newHashSet(); ret.addAll(Arrays.asList(elem)); return ret; } @@ -418,7 +416,7 @@ public class ReduceHelperTest assertTrue(r1.size() > 0 ^ r2.size() > 0); } - private void addDifference(InetAddress host1, Map<InetAddress, HostDifferences> differences, InetAddress host2, List<Range<Token>> ranges) + private void addDifference(InetAddressAndPort host1, Map<InetAddressAndPort, HostDifferences> differences, InetAddressAndPort host2, List<Range<Token>> ranges) { differences.computeIfAbsent(host1, (x) -> new HostDifferences()).add(host2, ranges); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/asymmetric/StreamFromOptionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/asymmetric/StreamFromOptionsTest.java b/test/unit/org/apache/cassandra/repair/asymmetric/StreamFromOptionsTest.java index 3ba3cfe..e2a7700 100644 --- a/test/unit/org/apache/cassandra/repair/asymmetric/StreamFromOptionsTest.java +++ b/test/unit/org/apache/cassandra/repair/asymmetric/StreamFromOptionsTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.repair.asymmetric; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collections; import java.util.HashSet; @@ -30,6 +29,7 @@ import org.junit.Test; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; import static junit.framework.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -41,16 +41,16 @@ public class StreamFromOptionsTest public void addAllDiffingTest() throws UnknownHostException { StreamFromOptions sfo = new StreamFromOptions(new MockDiffs(true), range(0, 10)); - Set<InetAddress> toAdd = new HashSet<>(); - toAdd.add(InetAddress.getByName("127.0.0.1")); - toAdd.add(InetAddress.getByName("127.0.0.2")); - toAdd.add(InetAddress.getByName("127.0.0.3")); + Set<InetAddressAndPort> toAdd = new HashSet<>(); + toAdd.add(InetAddressAndPort.getByName("127.0.0.1")); + toAdd.add(InetAddressAndPort.getByName("127.0.0.2")); + toAdd.add(InetAddressAndPort.getByName("127.0.0.3")); toAdd.forEach(sfo::add); // if all added have differences, each set will contain a single host assertEquals(3, Iterables.size(sfo.allStreams())); - Set<InetAddress> allStreams = new HashSet<>(); - for (Set<InetAddress> streams : sfo.allStreams()) + Set<InetAddressAndPort> allStreams = new HashSet<>(); + for (Set<InetAddressAndPort> streams : sfo.allStreams()) { assertEquals(1, streams.size()); allStreams.addAll(streams); @@ -62,10 +62,10 @@ public class StreamFromOptionsTest public void addAllMatchingTest() throws UnknownHostException { StreamFromOptions sfo = new StreamFromOptions(new MockDiffs(false), range(0, 10)); - Set<InetAddress> toAdd = new HashSet<>(); - toAdd.add(InetAddress.getByName("127.0.0.1")); - toAdd.add(InetAddress.getByName("127.0.0.2")); - toAdd.add(InetAddress.getByName("127.0.0.3")); + Set<InetAddressAndPort> toAdd = new HashSet<>(); + toAdd.add(InetAddressAndPort.getByName("127.0.0.1")); + toAdd.add(InetAddressAndPort.getByName("127.0.0.2")); + toAdd.add(InetAddressAndPort.getByName("127.0.0.3")); toAdd.forEach(sfo::add); // if all added match, the set will contain all hosts @@ -83,10 +83,10 @@ public class StreamFromOptionsTest private void splitTestHelper(boolean diffing) throws UnknownHostException { StreamFromOptions sfo = new StreamFromOptions(new MockDiffs(diffing), range(0, 10)); - Set<InetAddress> toAdd = new HashSet<>(); - toAdd.add(InetAddress.getByName("127.0.0.1")); - toAdd.add(InetAddress.getByName("127.0.0.2")); - toAdd.add(InetAddress.getByName("127.0.0.3")); + Set<InetAddressAndPort> toAdd = new HashSet<>(); + toAdd.add(InetAddressAndPort.getByName("127.0.0.1")); + toAdd.add(InetAddressAndPort.getByName("127.0.0.2")); + toAdd.add(InetAddressAndPort.getByName("127.0.0.3")); toAdd.forEach(sfo::add); StreamFromOptions sfo1 = sfo.copy(range(0, 5)); StreamFromOptions sfo2 = sfo.copy(range(5, 10)); @@ -95,8 +95,8 @@ public class StreamFromOptionsTest assertEquals(range(5, 10), sfo2.range); assertTrue(Iterables.elementsEqual(sfo1.allStreams(), sfo2.allStreams())); // verify the backing set is not shared between the copies: - sfo1.add(InetAddress.getByName("127.0.0.4")); - sfo2.add(InetAddress.getByName("127.0.0.5")); + sfo1.add(InetAddressAndPort.getByName("127.0.0.4")); + sfo2.add(InetAddressAndPort.getByName("127.0.0.5")); assertFalse(Iterables.elementsEqual(sfo1.allStreams(), sfo2.allStreams())); } @@ -116,7 +116,7 @@ public class StreamFromOptionsTest } @Override - public boolean hasDifferenceBetween(InetAddress node1, InetAddress node2, Range<Token> range) + public boolean hasDifferenceBetween(InetAddressAndPort node1, InetAddressAndPort node2, Range<Token> range) { return hasDifference; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java index 367fea9..4570328 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.repair.consistent; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Set; import java.util.UUID; @@ -33,6 +32,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.ByteBufferUtil; @@ -41,19 +41,19 @@ import org.apache.cassandra.utils.UUIDGen; @Ignore public abstract class AbstractConsistentSessionTest { - protected static final InetAddress COORDINATOR; - protected static final InetAddress PARTICIPANT1; - protected static final InetAddress PARTICIPANT2; - protected static final InetAddress PARTICIPANT3; + protected static final InetAddressAndPort COORDINATOR; + protected static final InetAddressAndPort PARTICIPANT1; + protected static final InetAddressAndPort PARTICIPANT2; + protected static final InetAddressAndPort PARTICIPANT3; static { try { - COORDINATOR = InetAddress.getByName("10.0.0.1"); - PARTICIPANT1 = InetAddress.getByName("10.0.0.1"); - PARTICIPANT2 = InetAddress.getByName("10.0.0.2"); - PARTICIPANT3 = InetAddress.getByName("10.0.0.3"); + COORDINATOR = InetAddressAndPort.getByName("10.0.0.1"); + PARTICIPANT1 = InetAddressAndPort.getByName("10.0.0.1"); + PARTICIPANT2 = InetAddressAndPort.getByName("10.0.0.2"); + PARTICIPANT3 = InetAddressAndPort.getByName("10.0.0.3"); } catch (UnknownHostException e) { @@ -64,7 +64,7 @@ public abstract class AbstractConsistentSessionTest DatabaseDescriptor.daemonInitialization(); } - protected static final Set<InetAddress> PARTICIPANTS = ImmutableSet.of(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3); + protected static final Set<InetAddressAndPort> PARTICIPANTS = ImmutableSet.of(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3); protected static Token t(int v) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java index fb312c3..5d054d3 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.repair.consistent; -import java.net.InetAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -35,6 +34,7 @@ import com.google.common.util.concurrent.SettableFuture; import org.junit.Assert; import org.junit.Test; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.AbstractRepairTest; import org.apache.cassandra.repair.RepairSessionResult; import org.apache.cassandra.repair.messages.FailSession; @@ -77,7 +77,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest return new RepairSessionResult(coordinator.sessionID, "ks", coordinator.ranges, null, false); } - private static void assertMessageSent(InstrumentedCoordinatorSession coordinator, InetAddress participant, RepairMessage expected) + private static void assertMessageSent(InstrumentedCoordinatorSession coordinator, InetAddressAndPort participant, RepairMessage expected) { Assert.assertTrue(coordinator.sentMessages.containsKey(participant)); Assert.assertEquals(1, coordinator.sentMessages.get(participant).size()); @@ -91,9 +91,9 @@ public class CoordinatorSessionTest extends AbstractRepairTest super(builder); } - Map<InetAddress, List<RepairMessage>> sentMessages = new HashMap<>(); + Map<InetAddressAndPort, List<RepairMessage>> sentMessages = new HashMap<>(); - protected void sendMessage(InetAddress destination, RepairMessage message) + protected void sendMessage(InetAddressAndPort destination, RepairMessage message) { if (!sentMessages.containsKey(destination)) { @@ -189,7 +189,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest coordinator.fail(); Assert.assertEquals(FAILED, coordinator.getState()); - for (InetAddress participant : PARTICIPANTS) + for (InetAddressAndPort participant : PARTICIPANTS) { assertMessageSent(coordinator, participant, new FailSession(coordinator.sessionID)); } @@ -221,7 +221,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest Assert.assertTrue(coordinator.sentMessages.isEmpty()); ListenableFuture sessionResult = coordinator.execute(sessionSupplier, hasFailures); - for (InetAddress participant : PARTICIPANTS) + for (InetAddressAndPort participant : PARTICIPANTS) { RepairMessage expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS)); @@ -254,7 +254,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest repairFuture.set(results); // propose messages should have been sent once all repair sessions completed successfully - for (InetAddress participant : PARTICIPANTS) + for (InetAddressAndPort participant : PARTICIPANTS) { RepairMessage expected = new FinalizePropose(coordinator.sessionID); assertMessageSent(coordinator, participant, expected); @@ -277,7 +277,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest Assert.assertTrue(coordinator.finalizeCommitCalled); Assert.assertEquals(ConsistentSession.State.FINALIZED, coordinator.getState()); - for (InetAddress participant : PARTICIPANTS) + for (InetAddressAndPort participant : PARTICIPANTS) { RepairMessage expected = new FinalizeCommit(coordinator.sessionID); assertMessageSent(coordinator, participant, expected); @@ -304,7 +304,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest Assert.assertFalse(repairSubmitted.get()); Assert.assertTrue(coordinator.sentMessages.isEmpty()); ListenableFuture sessionResult = coordinator.execute(sessionSupplier, hasFailures); - for (InetAddress participant : PARTICIPANTS) + for (InetAddressAndPort participant : PARTICIPANTS) { PrepareConsistentRequest expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS)); assertMessageSent(coordinator, participant, expected); @@ -339,7 +339,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest Assert.assertTrue(coordinator.failCalled); // all participants should have been notified of session failure - for (InetAddress participant : PARTICIPANTS) + for (InetAddressAndPort participant : PARTICIPANTS) { RepairMessage expected = new FailSession(coordinator.sessionID); assertMessageSent(coordinator, participant, expected); @@ -366,7 +366,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest Assert.assertFalse(repairSubmitted.get()); Assert.assertTrue(coordinator.sentMessages.isEmpty()); ListenableFuture sessionResult = coordinator.execute(sessionSupplier, hasFailures); - for (InetAddress participant : PARTICIPANTS) + for (InetAddressAndPort participant : PARTICIPANTS) { PrepareConsistentRequest expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS)); assertMessageSent(coordinator, participant, expected); @@ -394,7 +394,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest Assert.assertFalse(repairSubmitted.get()); // all participants should have been notified of session failure - for (InetAddress participant : PARTICIPANTS) + for (InetAddressAndPort participant : PARTICIPANTS) { RepairMessage expected = new FailSession(coordinator.sessionID); assertMessageSent(coordinator, participant, expected); @@ -422,7 +422,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest Assert.assertTrue(coordinator.sentMessages.isEmpty()); ListenableFuture sessionResult = coordinator.execute(sessionSupplier, hasFailures); - for (InetAddress participant : PARTICIPANTS) + for (InetAddressAndPort participant : PARTICIPANTS) { RepairMessage expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS)); @@ -455,7 +455,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest repairFuture.set(results); // propose messages should have been sent once all repair sessions completed successfully - for (InetAddress participant : PARTICIPANTS) + for (InetAddressAndPort participant : PARTICIPANTS) { RepairMessage expected = new FinalizePropose(coordinator.sessionID); assertMessageSent(coordinator, participant, expected); @@ -481,7 +481,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest Assert.assertEquals(ConsistentSession.State.FAILED, coordinator.getState()); // failure messages should have been sent to all participants - for (InetAddress participant : PARTICIPANTS) + for (InetAddressAndPort participant : PARTICIPANTS) { RepairMessage expected = new FailSession(coordinator.sessionID); assertMessageSent(coordinator, participant, expected); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java index b40e185..9bf4270 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.repair.consistent; -import java.net.InetAddress; import java.util.Set; import java.util.UUID; @@ -29,6 +28,7 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.AbstractRepairTest; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.Schema; @@ -55,9 +55,9 @@ public class CoordinatorSessionsTest extends AbstractRepairTest } int prepareResponseCalls = 0; - InetAddress preparePeer = null; + InetAddressAndPort preparePeer = null; boolean prepareSuccess = false; - public synchronized void handlePrepareResponse(InetAddress participant, boolean success) + public synchronized void handlePrepareResponse(InetAddressAndPort participant, boolean success) { prepareResponseCalls++; preparePeer = participant; @@ -65,9 +65,9 @@ public class CoordinatorSessionsTest extends AbstractRepairTest } int finalizePromiseCalls = 0; - InetAddress promisePeer = null; + InetAddressAndPort promisePeer = null; boolean promiseSuccess = false; - public synchronized void handleFinalizePromise(InetAddress participant, boolean success) + public synchronized void handleFinalizePromise(InetAddressAndPort participant, boolean success) { finalizePromiseCalls++; promisePeer = participant; @@ -93,7 +93,7 @@ public class CoordinatorSessionsTest extends AbstractRepairTest return (InstrumentedCoordinatorSession) super.getSession(sessionId); } - public InstrumentedCoordinatorSession registerSession(UUID sessionId, Set<InetAddress> peers) + public InstrumentedCoordinatorSession registerSession(UUID sessionId, Set<InetAddressAndPort> peers) { return (InstrumentedCoordinatorSession) super.registerSession(sessionId, peers); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java index 6808efe..3ea888d 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java +++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java @@ -18,10 +18,10 @@ package org.apache.cassandra.repair.consistent; -import java.net.InetAddress; import java.util.Set; import java.util.UUID; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.service.ActiveRepairService; /** @@ -36,7 +36,7 @@ public class LocalSessionAccessor ARS.consistent.local.start(); } - public static void prepareUnsafe(UUID sessionID, InetAddress coordinator, Set<InetAddress> peers) + public static void prepareUnsafe(UUID sessionID, InetAddressAndPort coordinator, Set<InetAddressAndPort> peers) { ActiveRepairService.ParentRepairSession prs = ARS.getParentRepairSession(sessionID); assert prs != null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java index 6e6d222..5fa43a9 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.repair.consistent; -import java.net.InetAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -42,6 +41,7 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.cql3.statements.CreateTableStatement; import org.apache.cassandra.repair.AbstractRepairTest; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaConstants; @@ -106,20 +106,20 @@ public class LocalSessionTest extends AbstractRepairTest } } - private static void assertNoMessagesSent(InstrumentedLocalSessions sessions, InetAddress to) + private static void assertNoMessagesSent(InstrumentedLocalSessions sessions, InetAddressAndPort to) { Assert.assertNull(sessions.sentMessages.get(to)); } - private static void assertMessagesSent(InstrumentedLocalSessions sessions, InetAddress to, RepairMessage... expected) + private static void assertMessagesSent(InstrumentedLocalSessions sessions, InetAddressAndPort to, RepairMessage... expected) { Assert.assertEquals(Lists.newArrayList(expected), sessions.sentMessages.get(to)); } static class InstrumentedLocalSessions extends LocalSessions { - Map<InetAddress, List<RepairMessage>> sentMessages = new HashMap<>(); - protected void sendMessage(InetAddress destination, RepairMessage message) + Map<InetAddressAndPort, List<RepairMessage>> sentMessages = new HashMap<>(); + protected void sendMessage(InetAddressAndPort destination, RepairMessage message) { if (!sentMessages.containsKey(destination)) { @@ -159,12 +159,13 @@ public class LocalSessionTest extends AbstractRepairTest return getSession(sessionID); } - protected InetAddress getBroadcastAddress() + @Override + protected InetAddressAndPort getBroadcastAddressAndPort() { return PARTICIPANT1; } - protected boolean isAlive(InetAddress address) + protected boolean isAlive(InetAddressAndPort address) { return true; } @@ -811,7 +812,7 @@ public class LocalSessionTest extends AbstractRepairTest sessions.start(); Assert.assertNotNull(sessions.getSession(session.sessionID)); - QueryProcessor.instance.executeInternal("DELETE participants FROM system.repairs WHERE parent_id=?", session.sessionID); + QueryProcessor.instance.executeInternal("DELETE participants, participants_wp FROM system.repairs WHERE parent_id=?", session.sessionID); sessions = new LocalSessions(); sessions.start(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java index 5aeab3e..213cdd3 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.repair.consistent; -import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -43,6 +42,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; @@ -148,7 +148,7 @@ public class PendingAntiCompactionTest // create a session so the anti compaction can fine it UUID sessionID = UUIDGen.getTimeUUID(); - ActiveRepairService.instance.registerParentRepairSession(sessionID, InetAddress.getLocalHost(), Lists.newArrayList(cfs), ranges, true, 1, true, PreviewKind.NONE); + ActiveRepairService.instance.registerParentRepairSession(sessionID, InetAddressAndPort.getLocalHost(), Lists.newArrayList(cfs), ranges, true, 1, true, PreviewKind.NONE); PendingAntiCompaction pac; ExecutorService executor = Executors.newSingleThreadExecutor(); @@ -352,7 +352,7 @@ public class PendingAntiCompactionTest PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); UUID sessionID = UUIDGen.getTimeUUID(); ActiveRepairService.instance.registerParentRepairSession(sessionID, - InetAddress.getByName("127.0.0.1"), + InetAddressAndPort.getByName("127.0.0.1"), Lists.newArrayList(cfs), FULL_RANGE, true,0, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
