Repository: cassandra Updated Branches: refs/heads/trunk 3d4a7e7b6 -> fc92db2b9
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java index ceaa4d1..68c6034 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java @@ -19,103 +19,64 @@ package org.apache.cassandra.streaming.messages; import java.io.IOException; import java.net.InetAddress; -import java.nio.ByteBuffer; import java.util.UUID; import org.apache.cassandra.db.TypeSizes; -import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.io.util.DataOutputBufferFixed; -import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.streaming.StreamOperation; import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.utils.UUIDSerializer; /** * StreamInitMessage is first sent from the node where {@link org.apache.cassandra.streaming.StreamSession} is started, * to initiate corresponding {@link org.apache.cassandra.streaming.StreamSession} on the other side. */ -public class StreamInitMessage +public class StreamInitMessage extends StreamMessage { - public static IVersionedSerializer<StreamInitMessage> serializer = new StreamInitMessageSerializer(); + public static Serializer<StreamInitMessage> serializer = new StreamInitMessageSerializer(); public final InetAddress from; public final int sessionIndex; public final UUID planId; public final StreamOperation streamOperation; - // true if this init message is to connect for outgoing message on receiving side - public final boolean isForOutgoing; public final boolean keepSSTableLevel; public final UUID pendingRepair; public final PreviewKind previewKind; - public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, StreamOperation streamOperation, boolean isForOutgoing, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind) + public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, StreamOperation streamOperation, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind) { + super(Type.STREAM_INIT); this.from = from; this.sessionIndex = sessionIndex; this.planId = planId; this.streamOperation = streamOperation; - this.isForOutgoing = isForOutgoing; this.keepSSTableLevel = keepSSTableLevel; this.pendingRepair = pendingRepair; this.previewKind = previewKind; } - /** - * Create serialized message. - * - * @param compress true if message is compressed - * @param version Streaming protocol version - * @return serialized message in ByteBuffer format - */ - public ByteBuffer createMessage(boolean compress, int version) + @Override + public String toString() { - int header = 0; - // set compression bit. - if (compress) - header |= 4; - // set streaming bit - header |= 8; - // Setting up the version bit - header |= (version << 8); - - byte[] bytes; - try - { - int size = (int)StreamInitMessage.serializer.serializedSize(this, version); - try (DataOutputBuffer buffer = new DataOutputBufferFixed(size)) - { - StreamInitMessage.serializer.serialize(this, buffer, version); - bytes = buffer.getData(); - } - } - catch (IOException e) - { - throw new RuntimeException(e); - } - assert bytes.length > 0; - - ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + bytes.length); - buffer.putInt(MessagingService.PROTOCOL_MAGIC); - buffer.putInt(header); - buffer.put(bytes); - buffer.flip(); - return buffer; + StringBuilder sb = new StringBuilder(128); + sb.append("StreamInitMessage: from = ").append(from); + sb.append(", planId = ").append(planId).append(", session index = ").append(sessionIndex); + return sb.toString(); } - private static class StreamInitMessageSerializer implements IVersionedSerializer<StreamInitMessage> + private static class StreamInitMessageSerializer implements Serializer<StreamInitMessage> { - public void serialize(StreamInitMessage message, DataOutputPlus out, int version) throws IOException + public void serialize(StreamInitMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException { CompactEndpointSerializationHelper.serialize(message.from, out); out.writeInt(message.sessionIndex); UUIDSerializer.serializer.serialize(message.planId, out, MessagingService.current_version); out.writeUTF(message.streamOperation.getDescription()); - out.writeBoolean(message.isForOutgoing); out.writeBoolean(message.keepSSTableLevel); out.writeBoolean(message.pendingRepair != null); @@ -126,18 +87,17 @@ public class StreamInitMessage out.writeInt(message.previewKind.getSerializationVal()); } - public StreamInitMessage deserialize(DataInputPlus in, int version) throws IOException + public StreamInitMessage deserialize(DataInputPlus in, int version, StreamSession session) throws IOException { InetAddress from = CompactEndpointSerializationHelper.deserialize(in); int sessionIndex = in.readInt(); UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version); String description = in.readUTF(); - boolean sentByInitiator = in.readBoolean(); boolean keepSSTableLevel = in.readBoolean(); UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null; PreviewKind previewKind = PreviewKind.deserialize(in.readInt()); - return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description), sentByInitiator, keepSSTableLevel, pendingRepair, previewKind); + return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description), keepSSTableLevel, pendingRepair, previewKind); } public long serializedSize(StreamInitMessage message, int version) @@ -146,7 +106,6 @@ public class StreamInitMessage size += TypeSizes.sizeof(message.sessionIndex); size += UUIDSerializer.serializer.serializedSize(message.planId, MessagingService.current_version); size += TypeSizes.sizeof(message.streamOperation.getDescription()); - size += TypeSizes.sizeof(message.isForOutgoing); size += TypeSizes.sizeof(message.keepSSTableLevel); size += TypeSizes.sizeof(message.pendingRepair != null); if (message.pendingRepair != null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java index 48def64..feeab05 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java @@ -18,10 +18,8 @@ package org.apache.cassandra.streaming.messages; import java.io.IOException; -import java.net.SocketException; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.streaming.StreamSession; @@ -36,67 +34,47 @@ public abstract class StreamMessage public static final int VERSION_40 = 5; public static final int CURRENT_VERSION = VERSION_40; - private transient volatile boolean sent = false; - public static void serialize(StreamMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException { - ByteBuffer buff = ByteBuffer.allocate(1); // message type - buff.put(message.type.type); - buff.flip(); - out.write(buff); + out.writeByte(message.type.type); message.type.outSerializer.serialize(message, out, version, session); } - public static StreamMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException - { - ByteBuffer buff = ByteBuffer.allocate(1); - int readBytes = in.read(buff); - if (readBytes > 0) - { - buff.flip(); - Type type = Type.get(buff.get()); - return type.inSerializer.deserialize(in, version, session); - } - else if (readBytes == 0) - { - // input socket buffer was not filled yet - return null; - } - else - { - // possibly socket gets closed - throw new SocketException("End-of-stream reached"); - } - } - - public void sent() + public static long serializedSize(StreamMessage message, int version) throws IOException { - sent = true; + return 1 + message.type.outSerializer.serializedSize(message, version); } - public boolean wasSent() + public static StreamMessage deserialize(DataInputPlus in, int version, StreamSession session) throws IOException { - return sent; + byte b = in.readByte(); + if (b == 0) + b = -1; + Type type = Type.get(b); + return type.inSerializer.deserialize(in, version, session); } /** StreamMessage serializer */ public static interface Serializer<V extends StreamMessage> { - V deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException; + V deserialize(DataInputPlus in, int version, StreamSession session) throws IOException; void serialize(V message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException; + long serializedSize(V message, int version) throws IOException; } /** StreamMessage types */ - public static enum Type + public enum Type { - PREPARE(1, 5, PrepareMessage.serializer), + PREPARE_SYN(1, 5, PrepareSynMessage.serializer), FILE(2, 0, IncomingFileMessage.serializer, OutgoingFileMessage.serializer), RECEIVED(3, 4, ReceivedMessage.serializer), - RETRY(4, 4, RetryMessage.serializer), COMPLETE(5, 1, CompleteMessage.serializer), SESSION_FAILED(6, 5, SessionFailedMessage.serializer), - KEEP_ALIVE(7, 5, KeepAliveMessage.serializer); + KEEP_ALIVE(7, 5, KeepAliveMessage.serializer), + PREPARE_SYNACK(8, 5, PrepareSynAckMessage.serializer), + PREPARE_ACK(9, 5, PrepareAckMessage.serializer), + STREAM_INIT(10, 5, StreamInitMessage.serializer); public static Type get(byte type) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java index aa23f45..d119081 100644 --- a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java +++ b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java @@ -15,20 +15,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.cassandra.tools; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.Socket; -import java.nio.channels.SocketChannel; +import io.netty.channel.Channel; import org.apache.cassandra.config.EncryptionOptions; -import org.apache.cassandra.security.SSLFactory; +import org.apache.cassandra.net.async.OutboundConnectionIdentifier; +import org.apache.cassandra.streaming.DefaultConnectionFactory; import org.apache.cassandra.streaming.StreamConnectionFactory; -import org.apache.cassandra.utils.FBUtilities; -public class BulkLoadConnectionFactory implements StreamConnectionFactory +public class BulkLoadConnectionFactory extends DefaultConnectionFactory implements StreamConnectionFactory { private final boolean outboundBindAny; private final int storagePort; @@ -43,24 +42,15 @@ public class BulkLoadConnectionFactory implements StreamConnectionFactory this.outboundBindAny = outboundBindAny; } - public Socket createConnection(InetAddress peer) throws IOException + public Channel createConnection(OutboundConnectionIdentifier connectionId, int protocolVersion) throws IOException { // Connect to secure port for all peers if ServerEncryptionOptions is configured other than 'none' // When 'all', 'dc' and 'rack', server nodes always have SSL port open, and since thin client like sstableloader // does not know which node is in which dc/rack, connecting to SSL port is always the option. - if (encryptionOptions != null && encryptionOptions.internode_encryption != EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none) - { - if (outboundBindAny) - return SSLFactory.getSocket(encryptionOptions, peer, secureStoragePort); - else - return SSLFactory.getSocket(encryptionOptions, peer, secureStoragePort, FBUtilities.getLocalAddress(), 0); - } - else - { - Socket socket = SocketChannel.open(new InetSocketAddress(peer, storagePort)).socket(); - if (outboundBindAny && !socket.isBound()) - socket.bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0)); - return socket; - } + int port = encryptionOptions != null && encryptionOptions.internode_encryption != EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none ? + secureStoragePort : storagePort; + + connectionId = connectionId.withNewConnectionAddress(new InetSocketAddress(connectionId.remote(), port)); + return createConnection(connectionId, protocolVersion, encryptionOptions); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 109a51e..267a6d5 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -1076,8 +1076,6 @@ public class NodeProbe implements AutoCloseable return ssProxy.getCasContentionTimeout(); case "truncate": return ssProxy.getTruncateRpcTimeout(); - case "streamingsocket": - return ssProxy.getStreamingSocketTimeout(); default: throw new RuntimeException("Timeout type requires one of (" + GetTimeout.TIMEOUT_TYPES + ")"); } @@ -1156,11 +1154,6 @@ public class NodeProbe implements AutoCloseable case "truncate": ssProxy.setTruncateRpcTimeout(value); break; - case "streamingsocket": - if (value > Integer.MAX_VALUE) - throw new RuntimeException("streamingsocket timeout must be less than " + Integer.MAX_VALUE); - ssProxy.setStreamingSocketTimeout((int) value); - break; default: throw new RuntimeException("Timeout type requires one of (" + GetTimeout.TIMEOUT_TYPES + ")"); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/tools/nodetool/GetTimeout.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/GetTimeout.java b/src/java/org/apache/cassandra/tools/nodetool/GetTimeout.java index 6c9b541..deac8a3 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/GetTimeout.java +++ b/src/java/org/apache/cassandra/tools/nodetool/GetTimeout.java @@ -31,7 +31,7 @@ import static com.google.common.base.Preconditions.checkArgument; @Command(name = "gettimeout", description = "Print the timeout of the given type in ms") public class GetTimeout extends NodeToolCmd { - public static final String TIMEOUT_TYPES = "read, range, write, counterwrite, cascontention, truncate, streamingsocket, misc (general rpc_timeout_in_ms)"; + public static final String TIMEOUT_TYPES = "read, range, write, counterwrite, cascontention, truncate, misc (general rpc_timeout_in_ms)"; @Arguments(usage = "<timeout_type>", description = "The timeout type, one of (" + TIMEOUT_TYPES + ")") private List<String> args = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/utils/UUIDGen.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/UUIDGen.java b/src/java/org/apache/cassandra/utils/UUIDGen.java index 8fac816..37f08bf 100644 --- a/src/java/org/apache/cassandra/utils/UUIDGen.java +++ b/src/java/org/apache/cassandra/utils/UUIDGen.java @@ -40,6 +40,8 @@ public class UUIDGen private static final long START_EPOCH = -12219292800000L; private static final long clockSeqAndNode = makeClockSeqAndNode(); + public static final int UUID_LEN = 16; + /* * The min and max possible lsb for a UUID. * Note that his is not 0 and all 1's because Cassandra TimeUUIDType @@ -106,10 +108,10 @@ public class UUIDGen } /** - * Similar to {@link getTimeUUIDFromMicros}, but randomize (using SecureRandom) the clock and sequence. + * Similar to {@link #getTimeUUIDFromMicros}, but randomize (using SecureRandom) the clock and sequence. * <p> * If you can guarantee that the {@code whenInMicros} argument is unique (for this JVM instance) for - * every call, then you should prefer {@link getTimeUUIDFromMicros} which is faster. If you can't + * every call, then you should prefer {@link #getTimeUUIDFromMicros} which is faster. If you can't * guarantee this however, this method will ensure the returned UUID are still unique (accross calls) * through randomization. * @@ -143,7 +145,7 @@ public class UUIDGen public static ByteBuffer toByteBuffer(UUID uuid) { - ByteBuffer buffer = ByteBuffer.allocate(16); + ByteBuffer buffer = ByteBuffer.allocate(UUID_LEN); buffer.putLong(uuid.getMostSignificantBits()); buffer.putLong(uuid.getLeastSignificantBits()); buffer.flip(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java index a30d6c9..799ac77 100644 --- a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java +++ b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java @@ -24,10 +24,12 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import com.google.common.io.Files; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.schema.Schema; @@ -62,26 +64,42 @@ public class LongStreamingTest } @Test - public void testCompressedStream() throws InvalidRequestException, IOException, ExecutionException, InterruptedException + public void testSstableCompressionStreaming() throws InterruptedException, ExecutionException, IOException { - String KS = "cql_keyspace"; + testStream(true); + } + + @Test + public void testStreamCompressionStreaming() throws InterruptedException, ExecutionException, IOException + { + testStream(false); + } + + private void testStream(boolean useSstableCompression) throws InvalidRequestException, IOException, ExecutionException, InterruptedException + { + String KS = useSstableCompression ? "sstable_compression_ks" : "stream_compression_ks"; String TABLE = "table1"; File tempdir = Files.createTempDir(); File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE); assert dataDir.mkdirs(); - String schema = "CREATE TABLE cql_keyspace.table1 (" + String schema = "CREATE TABLE " + KS + '.' + TABLE + " (" + " k int PRIMARY KEY," + " v1 text," + " v2 int" - + ");";// with compression = {};"; - String insert = "INSERT INTO cql_keyspace.table1 (k, v1, v2) VALUES (?, ?, ?)"; + + ") with compression = " + (useSstableCompression ? "{'class': 'LZ4Compressor'};" : "{};"); + String insert = "INSERT INTO " + KS + '.' + TABLE + " (k, v1, v2) VALUES (?, ?, ?)"; CQLSSTableWriter writer = CQLSSTableWriter.builder() .sorted() .inDirectory(dataDir) .forTable(schema) .using(insert).build(); + + CompressionParams compressionParams = Keyspace.open(KS).getColumnFamilyStore(TABLE).metadata().params.compression; + Assert.assertEquals(useSstableCompression, compressionParams.isEnabled()); + + long start = System.nanoTime(); for (int i = 0; i < 10_000_000; i++) @@ -103,7 +121,7 @@ public class LongStreamingTest private String ks; public void init(String keyspace) { - for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace")) + for (Range<Token> range : StorageService.instance.getLocalRanges(KS)) addRangeForEndpoint(range, FBUtilities.getBroadcastAddress()); this.ks = keyspace; @@ -130,7 +148,7 @@ public class LongStreamingTest private String ks; public void init(String keyspace) { - for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace")) + for (Range<Token> range : StorageService.instance.getLocalRanges(KS)) addRangeForEndpoint(range, FBUtilities.getBroadcastAddress()); this.ks = keyspace; @@ -160,7 +178,7 @@ public class LongStreamingTest millis / 1000d, (dataSize * 2 / (1 << 20) / (millis / 1000d)) * 8)); - UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace.table1 limit 100;"); + UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM " + KS + '.' + TABLE + " limit 100;"); assertEquals(100, rs.size()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java index cd7e381..385ebb7 100644 --- a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java +++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java @@ -54,7 +54,7 @@ public class PreparedStatementsTest extends SchemaLoader // Currently the native server start method return before the server is fully binded to the socket, so we need // to wait slightly before trying to connect to it. We should fix this but in the meantime using a sleep. - Thread.sleep(500); + Thread.sleep(1500); cluster = Cluster.builder().addContactPoint("127.0.0.1") .withPort(DatabaseDescriptor.getNativeTransportPort()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/io/util/RewindableDataInputStreamPlusTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/RewindableDataInputStreamPlusTest.java b/test/unit/org/apache/cassandra/io/util/RewindableDataInputStreamPlusTest.java index 175ab53..1803c51 100644 --- a/test/unit/org/apache/cassandra/io/util/RewindableDataInputStreamPlusTest.java +++ b/test/unit/org/apache/cassandra/io/util/RewindableDataInputStreamPlusTest.java @@ -378,7 +378,7 @@ public class RewindableDataInputStreamPlusTest //finish reading again previous sequence reader.mark(); - //read 3 bytes - OK + //read 3 bytes - START assertEquals('a', reader.readChar()); //read 1 more bytes - CAPACITY will exhaust when trying to reset :( assertEquals(1, reader.readShort()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java b/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java index fa6e2b5..100e1e0 100644 --- a/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java +++ b/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java @@ -42,7 +42,7 @@ import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.KeyspaceParams; -import static org.apache.cassandra.net.async.InboundHandshakeHandler.State.MESSAGING_HANDSHAKE_COMPLETE; +import static org.apache.cassandra.net.async.InboundHandshakeHandler.State.HANDSHAKE_COMPLETE; import static org.apache.cassandra.net.async.OutboundMessagingConnection.State.READY; public class HandshakeHandlersTest @@ -100,7 +100,7 @@ public class HandshakeHandlersTest inboundChannel.writeInbound(o); Assert.assertEquals(READY, imc.getState()); - Assert.assertEquals(MESSAGING_HANDSHAKE_COMPLETE, inboundHandshakeHandler.getState()); + Assert.assertEquals(HANDSHAKE_COMPLETE, inboundHandshakeHandler.getState()); } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java b/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java index 44dc469..4d9829f 100644 --- a/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java +++ b/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java @@ -181,7 +181,7 @@ public class InboundHandshakeHandlerTest buf = new ThirdHandshakeMessage(MESSAGING_VERSION, addr.getAddress()).encode(PooledByteBufAllocator.DEFAULT); state = handler.handleMessagingStartResponse(channel.pipeline().firstContext(), buf); - Assert.assertEquals(State.MESSAGING_HANDSHAKE_COMPLETE, state); + Assert.assertEquals(State.HANDSHAKE_COMPLETE, state); Assert.assertTrue(channel.isOpen()); Assert.assertTrue(channel.isActive()); Assert.assertFalse(channel.outboundMessages().isEmpty()); @@ -217,7 +217,7 @@ public class InboundHandshakeHandlerTest buf.writeInt(MESSAGING_VERSION); CompactEndpointSerializationHelper.serialize(addr.getAddress(), new ByteBufOutputStream(buf)); State state = handler.handleMessagingStartResponse(channel.pipeline().firstContext(), buf); - Assert.assertEquals(State.MESSAGING_HANDSHAKE_COMPLETE, state); + Assert.assertEquals(State.HANDSHAKE_COMPLETE, state); Assert.assertTrue(channel.isOpen()); Assert.assertTrue(channel.isActive()); } @@ -268,9 +268,9 @@ public class InboundHandshakeHandlerTest handler.setHandshakeTimeout(future); Assert.assertFalse(future.isCancelled()); Assert.assertTrue(channel.isOpen()); - handler.setState(State.MESSAGING_HANDSHAKE_COMPLETE); + handler.setState(State.HANDSHAKE_COMPLETE); handler.failHandshake(channel.pipeline().firstContext()); - Assert.assertSame(State.MESSAGING_HANDSHAKE_COMPLETE, handler.getState()); + Assert.assertSame(State.HANDSHAKE_COMPLETE, handler.getState()); Assert.assertTrue(channel.isOpen()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java index 772e47d..d6dd633 100644 --- a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java +++ b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java @@ -24,11 +24,7 @@ import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; import java.util.Optional; -import java.util.concurrent.Delayed; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.SSLHandshakeException; @@ -475,45 +471,4 @@ public class OutboundMessagingConnectionTest Assert.assertNotSame(omc.getConnectionId(), originalId); Assert.assertSame(NOT_READY, omc.getState()); } - - private static class TestScheduledFuture implements ScheduledFuture<Object> - { - private boolean cancelled = false; - - public long getDelay(TimeUnit unit) - { - return 0; - } - - public int compareTo(Delayed o) - { - return 0; - } - - public boolean cancel(boolean mayInterruptIfRunning) - { - cancelled = true; - return false; - } - - public boolean isCancelled() - { - return cancelled; - } - - public boolean isDone() - { - return false; - } - - public Object get() throws InterruptedException, ExecutionException - { - return null; - } - - public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException - { - return null; - } - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java b/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java new file mode 100644 index 0000000..4968196 --- /dev/null +++ b/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net.async; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.embedded.EmbeddedChannel; + +public class RebufferingByteBufDataInputPlusTest +{ + private EmbeddedChannel channel; + private RebufferingByteBufDataInputPlus inputPlus; + private ByteBuf buf; + + @Before + public void setUp() + { + channel = new EmbeddedChannel(); + inputPlus = new RebufferingByteBufDataInputPlus(1 << 10, 1 << 11, channel.config()); + } + + @After + public void tearDown() + { + inputPlus.close(); + channel.close(); + + if (buf != null && buf.refCnt() > 0) + buf.release(buf.refCnt()); + } + + @Test (expected = IllegalArgumentException.class) + public void ctor_badWaterMarks() + { + inputPlus = new RebufferingByteBufDataInputPlus(2, 1, null); + } + + @Test + public void isOpen() + { + Assert.assertTrue(inputPlus.isOpen()); + inputPlus.markClose(); + Assert.assertFalse(inputPlus.isOpen()); + } + + @Test (expected = IllegalStateException.class) + public void append_closed() + { + inputPlus.markClose(); + buf = channel.alloc().buffer(4); + inputPlus.append(buf); + } + + @Test + public void append_normal() throws EOFException + { + int size = 4; + buf = channel.alloc().buffer(size); + buf.writerIndex(size); + inputPlus.append(buf); + Assert.assertEquals(buf.readableBytes(), inputPlus.available()); + } + + @Test + public void read() throws IOException + { + // put two buffers of 8 bytes each into the queue. + // then read an int, then a long. the latter tests offset into the inputPlus, as well as spanning across queued buffers. + // the values of those int/long will both be '42', but spread across both queue buffers. + ByteBuf buf = channel.alloc().buffer(8); + buf.writeInt(42); + buf.writerIndex(8); + inputPlus.append(buf); + buf = channel.alloc().buffer(8); + buf.writeInt(42); + buf.writerIndex(8); + inputPlus.append(buf); + Assert.assertEquals(16, inputPlus.available()); + + ByteBuffer out = ByteBuffer.allocate(4); + int readCount = inputPlus.read(out); + Assert.assertEquals(4, readCount); + out.flip(); + Assert.assertEquals(42, out.getInt()); + Assert.assertEquals(12, inputPlus.available()); + + out = ByteBuffer.allocate(8); + readCount = inputPlus.read(out); + Assert.assertEquals(8, readCount); + out.flip(); + Assert.assertEquals(42, out.getLong()); + Assert.assertEquals(4, inputPlus.available()); + } + + @Test (expected = EOFException.class) + public void read_closed() throws IOException + { + inputPlus.markClose(); + ByteBuffer buf = ByteBuffer.allocate(1); + inputPlus.read(buf); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/net/async/TestScheduledFuture.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/async/TestScheduledFuture.java b/test/unit/org/apache/cassandra/net/async/TestScheduledFuture.java new file mode 100644 index 0000000..f5475ce --- /dev/null +++ b/test/unit/org/apache/cassandra/net/async/TestScheduledFuture.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net.async; + +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class TestScheduledFuture implements ScheduledFuture<Object> +{ + private boolean cancelled = false; + + public long getDelay(TimeUnit unit) + { + return 0; + } + + public int compareTo(Delayed o) + { + return 0; + } + + public boolean cancel(boolean mayInterruptIfRunning) + { + cancelled = true; + return false; + } + + public boolean isCancelled() + { + return cancelled; + } + + public boolean isDone() + { + return false; + } + + public Object get() throws InterruptedException, ExecutionException + { + return null; + } + + public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException + { + return null; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/service/RemoveTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java index 4efdb21..afc5b25 100644 --- a/test/unit/org/apache/cassandra/service/RemoveTest.java +++ b/test/unit/org/apache/cassandra/service/RemoveTest.java @@ -70,6 +70,7 @@ public class RemoveTest public static void setupClass() throws ConfigurationException { oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner); + MessagingService.instance().listen(); } @AfterClass @@ -86,7 +87,6 @@ public class RemoveTest // create a ring of 5 nodes Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 6); - MessagingService.instance().listen(); removalhost = hosts.get(5); hosts.remove(removalhost); removalId = hostIds.get(5); @@ -98,7 +98,6 @@ public class RemoveTest { MessagingService.instance().clearMessageSinks(); MessagingService.instance().clearCallbacksUnsafe(); - MessagingService.instance().shutdown(); } @Test(expected = UnsupportedOperationException.class) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index 21c8375..5c29698 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -32,6 +32,7 @@ import org.junit.BeforeClass; import org.junit.After; import org.junit.Test; +import io.netty.channel.embedded.EmbeddedChannel; import junit.framework.Assert; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.db.ColumnFamilyStore; @@ -74,7 +75,7 @@ public class StreamTransferTaskTest public void testScheduleTimeout() throws Exception { InetAddress peer = FBUtilities.getBroadcastAddress(); - StreamSession session = new StreamSession(peer, peer, null, 0, true, null, PreviewKind.NONE); + StreamSession session = new StreamSession(peer, peer, (connectionId, protocolVersion) -> new EmbeddedChannel(), 0, true, UUID.randomUUID(), PreviewKind.ALL); ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD); // create two sstables @@ -120,7 +121,7 @@ public class StreamTransferTaskTest public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception { InetAddress peer = FBUtilities.getBroadcastAddress(); - StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, null, false, null, PreviewKind.NONE); + StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, new DefaultConnectionFactory(), false, null, PreviewKind.NONE); StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), StreamOperation.OTHER, Collections.<StreamEventHandler>emptyList(), streamCoordinator); StreamSession session = new StreamSession(peer, peer, null, 0, true, null, PreviewKind.NONE); session.init(future); @@ -159,7 +160,7 @@ public class StreamTransferTaskTest } //fail stream session mid-transfer - session.onError(new Exception("Fake exception")); + session.onError(new Exception("Fake exception")).get(5, TimeUnit.SECONDS); //make sure reference was not released for (Ref<SSTableReader> ref : refs) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index 6a5002e..7a51d0c 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@ -26,7 +26,6 @@ import com.google.common.collect.Iterables; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; @@ -47,7 +46,6 @@ import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.StorageService; @@ -62,9 +60,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @RunWith(OrderedJUnit4ClassRunner.class) - -// TODO:JEB intentionally breaking this with CASSANDRA-8457 until CASSANDRA-12229 -@Ignore public class StreamingTransferTest { private static final Logger logger = LoggerFactory.getLogger(StreamingTransferTest.class); @@ -244,7 +239,6 @@ public class StreamingTransferTest ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key0")))); StreamPlan streamPlan = new StreamPlan(StreamOperation.OTHER).transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getTableName()); streamPlan.execute().get(); - verifyConnectionsAreClosed(); //cannot add ranges after stream session is finished try @@ -262,7 +256,6 @@ public class StreamingTransferTest { StreamPlan streamPlan = new StreamPlan(StreamOperation.OTHER).transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable)))); streamPlan.execute().get(); - verifyConnectionsAreClosed(); //cannot add files after stream session is finished try @@ -276,27 +269,6 @@ public class StreamingTransferTest } } - /** - * Test that finished incoming connections are removed from MessagingService (CASSANDRA-11854) - */ - private void verifyConnectionsAreClosed() throws InterruptedException - { - // TODO:JEB intentionally breaking this with CASSANDRA-8457 until CASSANDRA-12229 - //after stream session is finished, message handlers may take several milliseconds to be closed -// outer: -// for (int i = 0; i <= 100; i++) -// { -// for (MessagingService.SocketThread socketThread : MessagingService.instance().getSocketThreads()) -// if (!socketThread.connections.isEmpty()) -// { -// Thread.sleep(100); -// continue outer; -// } -// return; -// } -// fail("Streaming connections remain registered in MessagingService"); - } - private Collection<StreamSession.SSTableStreamingSections> makeStreamingDetails(List<Range<Token>> ranges, Refs<SSTableReader> sstables) { ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java new file mode 100644 index 0000000..a9849a3 --- /dev/null +++ b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.streaming.async; + +import java.net.InetSocketAddress; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.netty.channel.ChannelPromise; +import io.netty.channel.embedded.EmbeddedChannel; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.net.async.TestScheduledFuture; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.streaming.StreamOperation; +import org.apache.cassandra.streaming.StreamResultFuture; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.CompleteMessage; + +public class NettyStreamingMessageSenderTest +{ + private static final InetSocketAddress REMOTE_ADDR = new InetSocketAddress("127.0.0.2", 0); + + private EmbeddedChannel channel; + private StreamSession session; + private NettyStreamingMessageSender sender; + private NettyStreamingMessageSender.FileStreamTask fileStreamTask; + + @BeforeClass + public static void before() + { + DatabaseDescriptor.daemonInitialization(); + } + + @Before + public void setUp() + { + channel = new EmbeddedChannel(); + channel.attr(NettyStreamingMessageSender.TRANSFERRING_FILE_ATTR).set(Boolean.FALSE); + UUID pendingRepair = UUID.randomUUID(); + session = new StreamSession(REMOTE_ADDR.getAddress(), REMOTE_ADDR.getAddress(), (connectionId, protocolVersion) -> null, 0, true, pendingRepair, PreviewKind.ALL); + StreamResultFuture future = StreamResultFuture.initReceivingSide(0, UUID.randomUUID(), StreamOperation.REPAIR, REMOTE_ADDR.getAddress(), channel, true, pendingRepair, session.getPreviewKind()); + session.init(future); + sender = session.getMessageSender(); + sender.setControlMessageChannel(channel); + } + + @After + public void tearDown() + { + if (fileStreamTask != null) + fileStreamTask.unsetChannel(); + } + + @Test + public void KeepAliveTask_normalSend() + { + Assert.assertTrue(channel.isOpen()); + NettyStreamingMessageSender.KeepAliveTask task = sender.new KeepAliveTask(channel, session); + task.run(); + Assert.assertTrue(channel.releaseOutbound()); + } + + @Test + public void KeepAliveTask_channelClosed() + { + channel.close(); + Assert.assertFalse(channel.isOpen()); + channel.releaseOutbound(); + NettyStreamingMessageSender.KeepAliveTask task = sender.new KeepAliveTask(channel, session); + task.future = new TestScheduledFuture(); + Assert.assertFalse(task.future.isCancelled()); + task.run(); + Assert.assertTrue(task.future.isCancelled()); + Assert.assertFalse(channel.releaseOutbound()); + } + + @Test + public void KeepAliveTask_closed() + { + Assert.assertTrue(channel.isOpen()); + NettyStreamingMessageSender.KeepAliveTask task = sender.new KeepAliveTask(channel, session); + task.future = new TestScheduledFuture(); + Assert.assertFalse(task.future.isCancelled()); + + sender.setClosed(); + Assert.assertFalse(sender.connected()); + task.run(); + Assert.assertTrue(task.future.isCancelled()); + Assert.assertFalse(channel.releaseOutbound()); + } + + @Test + public void KeepAliveTask_CurrentlyStreaming() + { + Assert.assertTrue(channel.isOpen()); + channel.attr(NettyStreamingMessageSender.TRANSFERRING_FILE_ATTR).set(Boolean.TRUE); + NettyStreamingMessageSender.KeepAliveTask task = sender.new KeepAliveTask(channel, session); + task.future = new TestScheduledFuture(); + Assert.assertFalse(task.future.isCancelled()); + + Assert.assertTrue(sender.connected()); + task.run(); + Assert.assertFalse(task.future.isCancelled()); + Assert.assertFalse(channel.releaseOutbound()); + } + + @Test + public void FileStreamTask_acquirePermit_closed() + { + fileStreamTask = sender.new FileStreamTask(null); + sender.setClosed(); + Assert.assertFalse(fileStreamTask.acquirePermit(1)); + } + + @Test + public void FileStreamTask_acquirePermit_HapppyPath() + { + int permits = sender.semaphoreAvailablePermits(); + fileStreamTask = sender.new FileStreamTask(null); + Assert.assertTrue(fileStreamTask.acquirePermit(1)); + Assert.assertEquals(permits - 1, sender.semaphoreAvailablePermits()); + } + + @Test + public void FileStreamTask_BadChannelAttr() + { + int permits = sender.semaphoreAvailablePermits(); + channel.attr(NettyStreamingMessageSender.TRANSFERRING_FILE_ATTR).set(Boolean.TRUE); + fileStreamTask = sender.new FileStreamTask(null); + fileStreamTask.injectChannel(channel); + fileStreamTask.run(); + Assert.assertEquals(StreamSession.State.FAILED, session.state()); + Assert.assertTrue(channel.releaseOutbound()); // when the session fails, it will send a SessionFailed msg + Assert.assertEquals(permits, sender.semaphoreAvailablePermits()); + } + + @Test + public void FileStreamTask_HappyPath() + { + int permits = sender.semaphoreAvailablePermits(); + fileStreamTask = sender.new FileStreamTask(new CompleteMessage()); + fileStreamTask.injectChannel(channel); + fileStreamTask.run(); + Assert.assertNotEquals(StreamSession.State.FAILED, session.state()); + Assert.assertTrue(channel.releaseOutbound()); + Assert.assertEquals(permits, sender.semaphoreAvailablePermits()); + } + + @Test + public void onControlMessageComplete_HappyPath() + { + Assert.assertTrue(channel.isOpen()); + Assert.assertTrue(sender.connected()); + ChannelPromise promise = channel.newPromise(); + promise.setSuccess(); + Assert.assertNull(sender.onControlMessageComplete(promise, new CompleteMessage())); + Assert.assertTrue(channel.isOpen()); + Assert.assertTrue(sender.connected()); + Assert.assertNotEquals(StreamSession.State.FAILED, session.state()); + } + + @Test + public void onControlMessageComplete_Exception() throws InterruptedException, ExecutionException, TimeoutException + { + Assert.assertTrue(channel.isOpen()); + Assert.assertTrue(sender.connected()); + ChannelPromise promise = channel.newPromise(); + promise.setFailure(new RuntimeException("this is just a testing exception")); + Future f = sender.onControlMessageComplete(promise, new CompleteMessage()); + + f.get(5, TimeUnit.SECONDS); + + Assert.assertFalse(channel.isOpen()); + Assert.assertFalse(sender.connected()); + Assert.assertEquals(StreamSession.State.FAILED, session.state()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java b/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java new file mode 100644 index 0000000..fff7b17 --- /dev/null +++ b/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.streaming.async; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.util.Random; + +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.streaming.messages.StreamMessage; + +public class StreamCompressionSerializerTest +{ + private static final int VERSION = StreamMessage.CURRENT_VERSION; + private static final Random random = new Random(2347623847623L); + + private final ByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT; + private final StreamCompressionSerializer serializer = new StreamCompressionSerializer(allocator); + private final LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor(); + private final LZ4FastDecompressor decompressor = LZ4Factory.fastestInstance().fastDecompressor(); + + private ByteBuffer input; + private ByteBuf compressed; + private ByteBuf output; + + @BeforeClass + public static void before() + { + DatabaseDescriptor.daemonInitialization(); + } + + @After + public void tearDown() + { + if (input != null) + FileUtils.clean(input); + if (compressed != null && compressed.refCnt() > 0) + compressed.release(compressed.refCnt()); + if (output != null && output.refCnt() > 0) + output.release(output.refCnt()); + } + + @Test + public void roundTrip_HappyPath_NotReadabaleByteBuffer() throws IOException + { + populateInput(); + compressed = serializer.serialize(compressor, input, VERSION); + input.flip(); + ByteBuffer compressedNioBuffer = compressed.nioBuffer(0, compressed.writerIndex()); + output = serializer.deserialize(decompressor, new DataInputBuffer(compressedNioBuffer, false), VERSION); + validateResults(); + } + + private void populateInput() + { + int bufSize = 1 << 14; + input = ByteBuffer.allocateDirect(bufSize); + for (int i = 0; i < bufSize; i += 4) + input.putInt(random.nextInt()); + input.flip(); + } + + private void validateResults() + { + Assert.assertEquals(input.remaining(), output.readableBytes()); + for (int i = 0; i < input.remaining(); i++) + Assert.assertEquals(input.get(i), output.readByte()); + } + + @Test + public void roundTrip_HappyPath_ReadabaleByteBuffer() throws IOException + { + populateInput(); + compressed = serializer.serialize(compressor, input, VERSION); + input.flip(); + output = serializer.deserialize(decompressor, new ByteBufRCH(compressed), VERSION); + validateResults(); + } + + private static class ByteBufRCH extends DataInputBuffer implements ReadableByteChannel + { + public ByteBufRCH(ByteBuf compressed) + { + super (compressed.nioBuffer(0, compressed.readableBytes()), false); + } + + @Override + public int read(ByteBuffer dst) throws IOException + { + int len = dst.remaining(); + dst.put(buffer); + return len; + } + + @Override + public boolean isOpen() + { + return true; + } + + @Override + public void close() + { } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java new file mode 100644 index 0000000..a674e6b --- /dev/null +++ b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.streaming.async; + +import java.io.EOFException; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.UUID; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.embedded.EmbeddedChannel; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.big.BigFormat; +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamOperation; +import org.apache.cassandra.streaming.StreamResultFuture; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.async.StreamingInboundHandler.SessionIdentifier; +import org.apache.cassandra.streaming.messages.CompleteMessage; +import org.apache.cassandra.streaming.messages.FileMessageHeader; +import org.apache.cassandra.streaming.messages.IncomingFileMessage; +import org.apache.cassandra.streaming.messages.StreamInitMessage; +import org.apache.cassandra.streaming.messages.StreamMessage; + +public class StreamingInboundHandlerTest +{ + private static final int VERSION = StreamMessage.CURRENT_VERSION; + private static final InetSocketAddress REMOTE_ADDR = new InetSocketAddress("127.0.0.2", 0); + + private StreamingInboundHandler handler; + private EmbeddedChannel channel; + private RebufferingByteBufDataInputPlus buffers; + private ByteBuf buf; + + @BeforeClass + public static void before() + { + DatabaseDescriptor.daemonInitialization(); + } + + @Before + public void setup() + { + handler = new StreamingInboundHandler(REMOTE_ADDR, VERSION, null); + channel = new EmbeddedChannel(handler); + buffers = new RebufferingByteBufDataInputPlus(1 << 9, 1 << 10, channel.config()); + handler.setPendingBuffers(buffers); + } + + @After + public void tearDown() + { + if (buf != null) + { + while (buf.refCnt() > 0) + buf.release(); + } + + channel.close(); + } + + @Test + public void channelRead_Normal() throws EOFException + { + Assert.assertEquals(0, buffers.available()); + int size = 8; + buf = channel.alloc().buffer(size); + buf.writerIndex(size); + channel.writeInbound(buf); + Assert.assertEquals(size, buffers.available()); + Assert.assertFalse(channel.releaseInbound()); + } + + @Test (expected = EOFException.class) + public void channelRead_Closed() throws EOFException + { + int size = 8; + buf = channel.alloc().buffer(size); + Assert.assertEquals(1, buf.refCnt()); + buf.writerIndex(size); + handler.close(); + channel.writeInbound(buf); + Assert.assertEquals(0, buffers.available()); + Assert.assertEquals(0, buf.refCnt()); + Assert.assertFalse(channel.releaseInbound()); + } + + @Test + public void channelRead_WrongObject() throws EOFException + { + channel.writeInbound("homer"); + Assert.assertEquals(0, buffers.available()); + Assert.assertFalse(channel.releaseInbound()); + } + + @Test + public void StreamDeserializingTask_deriveSession_StreamInitMessage() throws InterruptedException, IOException + { + StreamInitMessage msg = new StreamInitMessage(REMOTE_ADDR.getAddress(), 0, UUID.randomUUID(), StreamOperation.REPAIR, true, UUID.randomUUID(), PreviewKind.ALL); + StreamingInboundHandler.StreamDeserializingTask task = handler.new StreamDeserializingTask(sid -> createSession(sid), null, channel); + StreamSession session = task.deriveSession(msg); + Assert.assertNotNull(session); + } + + private StreamSession createSession(SessionIdentifier sid) + { + return new StreamSession(sid.from, sid.from, (connectionId, protocolVersion) -> null, sid.sessionIndex, true, UUID.randomUUID(), PreviewKind.ALL); + } + + @Test (expected = IllegalStateException.class) + public void StreamDeserializingTask_deriveSession_NoSession() throws InterruptedException, IOException + { + CompleteMessage msg = new CompleteMessage(); + StreamingInboundHandler.StreamDeserializingTask task = handler.new StreamDeserializingTask(sid -> createSession(sid), null, channel); + task.deriveSession(msg); + } + + @Test (expected = IllegalStateException.class) + public void StreamDeserializingTask_deriveSession_IFM_NoSession() throws InterruptedException, IOException + { + FileMessageHeader header = new FileMessageHeader(TableId.generate(), REMOTE_ADDR.getAddress(), UUID.randomUUID(), 0, 0, + BigFormat.latestVersion, SSTableFormat.Type.BIG, 0, new ArrayList<>(), null, 0, UUID.randomUUID(), 0 , null); + IncomingFileMessage msg = new IncomingFileMessage(null, header); + StreamingInboundHandler.StreamDeserializingTask task = handler.new StreamDeserializingTask(sid -> StreamManager.instance.findSession(sid.from, sid.planId, sid.sessionIndex), null, channel); + task.deriveSession(msg); + } + + @Test + public void StreamDeserializingTask_deriveSession_IFM_HasSession() throws InterruptedException, IOException + { + UUID planId = UUID.randomUUID(); + StreamResultFuture future = StreamResultFuture.initReceivingSide(0, planId, StreamOperation.REPAIR, REMOTE_ADDR.getAddress(), channel, true, UUID.randomUUID(), PreviewKind.ALL); + StreamManager.instance.register(future); + FileMessageHeader header = new FileMessageHeader(TableId.generate(), REMOTE_ADDR.getAddress(), planId, 0, 0, + BigFormat.latestVersion, SSTableFormat.Type.BIG, 0, new ArrayList<>(), null, 0, UUID.randomUUID(), 0 , null); + IncomingFileMessage msg = new IncomingFileMessage(null, header); + StreamingInboundHandler.StreamDeserializingTask task = handler.new StreamDeserializingTask(sid -> StreamManager.instance.findSession(sid.from, sid.planId, sid.sessionIndex), null, channel); + StreamSession session = task.deriveSession(msg); + Assert.assertNotNull(session); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java index 19e28fd..16b3a76 100644 --- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java +++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java @@ -28,12 +28,11 @@ import org.apache.cassandra.db.ClusteringComparator; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.io.compress.CompressedSequentialWriter; import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; import org.apache.cassandra.io.util.SequentialWriterOption; import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.format.Version; -import org.apache.cassandra.io.sstable.format.big.BigFormat; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.streaming.compress.CompressedInputStream; import org.apache.cassandra.streaming.compress.CompressionInfo; @@ -174,7 +173,7 @@ public class CompressedInputStreamTest testException(sections, info); return; } - CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, ChecksumType.CRC32, () -> 1.0); + CompressedInputStream input = new CompressedInputStream(new DataInputStreamPlus(new ByteArrayInputStream(toRead)), info, ChecksumType.CRC32, () -> 1.0); try (DataInputStream in = new DataInputStream(input)) { @@ -189,14 +188,14 @@ public class CompressedInputStreamTest private static void testException(List<Pair<Long, Long>> sections, CompressionInfo info) throws IOException { - CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(new byte[0]), info, ChecksumType.CRC32, () -> 1.0); + CompressedInputStream input = new CompressedInputStream(new DataInputStreamPlus(new ByteArrayInputStream(new byte[0])), info, ChecksumType.CRC32, () -> 1.0); try (DataInputStream in = new DataInputStream(input)) { for (int i = 0; i < sections.size(); i++) { - input.position(sections.get(i).left); try { + input.position(sections.get(i).left); in.readLong(); fail("Should have thrown IOException"); } @@ -208,3 +207,4 @@ public class CompressedInputStreamTest } } } + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org