Repository: cassandra Updated Branches: refs/heads/trunk 6ba2fb939 -> 47a12c52a
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java new file mode 100644 index 0000000..8256ac6 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.util.Arrays; +import java.util.List; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.KeyIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.CachingParams; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.streaming.StreamOperation; +import org.apache.cassandra.utils.ByteBufferUtil; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class CassandraOutgoingFileTest +{ + public static final String KEYSPACE = "CassandraOutgoingFileTest"; + public static final String CF_STANDARD = "Standard1"; + public static final String CF_INDEXED = "Indexed1"; + public static final String CF_STANDARDLOWINDEXINTERVAL = "StandardLowIndexInterval"; + + private static SSTableReader sstable; + private static ColumnFamilyStore store; + + @BeforeClass + public static void defineSchemaAndPrepareSSTable() + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARD), + SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEXED, true), + SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARDLOWINDEXINTERVAL) + .minIndexInterval(8) + .maxIndexInterval(256) + .caching(CachingParams.CACHE_NOTHING)); + + Keyspace keyspace = Keyspace.open(KEYSPACE); + store = keyspace.getColumnFamilyStore(CF_STANDARD); + + // insert data and compact to a single sstable + CompactionManager.instance.disableAutoCompaction(); + for (int j = 0; j < 10; j++) + { + new RowUpdateBuilder(store.metadata(), j, String.valueOf(j)) + .clustering("0") + .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER) + .build() + .applyUnsafe(); + } + store.forceBlockingFlush(); + CompactionManager.instance.performMaximal(store, false); + + sstable = store.getLiveSSTables().iterator().next(); + } + + @Test + public void validateFullyContainedIn_SingleContiguousRange_Succeeds() + { + List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(store.getPartitioner().getMinimumToken(), sstable.last.getToken())); + + CassandraOutgoingFile cof = new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(), + sstable.getPositionsForRanges(requestedRanges), + requestedRanges, sstable.estimatedKeys()); + + assertTrue(cof.contained(requestedRanges, sstable)); + } + + @Test + public void validateFullyContainedIn_PartialOverlap_Fails() + { + List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(store.getPartitioner().getMinimumToken(), getTokenAtIndex(2))); + + CassandraOutgoingFile cof = new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(), + sstable.getPositionsForRanges(requestedRanges), + requestedRanges, sstable.estimatedKeys()); + + assertFalse(cof.contained(requestedRanges, sstable)); + } + + @Test + public void validateFullyContainedIn_SplitRange_Succeeds() + { + List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(store.getPartitioner().getMinimumToken(), getTokenAtIndex(4)), + new Range<>(getTokenAtIndex(2), getTokenAtIndex(6)), + new Range<>(getTokenAtIndex(5), sstable.last.getToken())); + + CassandraOutgoingFile cof = new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(), + sstable.getPositionsForRanges(requestedRanges), + requestedRanges, sstable.estimatedKeys()); + + assertTrue(cof.contained(requestedRanges, sstable)); + } + + private DecoratedKey getKeyAtIndex(int i) + { + int count = 0; + DecoratedKey key; + + try (KeyIterator iter = new KeyIterator(sstable.descriptor, sstable.metadata())) + { + do + { + key = iter.next(); + count++; + } while (iter.hasNext() && count < i); + } + return key; + } + + private Token getTokenAtIndex(int i) + { + return getKeyAtIndex(i).getToken(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java index 061a4b2..e48abf6 100644 --- a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java +++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java @@ -15,20 +15,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.cassandra.db.streaming; -import java.util.ArrayList; +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedHashMap; import org.junit.Test; import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; import org.apache.cassandra.db.SerializationHeader; -import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.db.streaming.CassandraStreamHeader.CassandraStreamHeaderSerializer; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.big.BigFormat; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.serializers.SerializationUtils; +import org.apache.cassandra.utils.ByteBufferUtil; public class CassandraStreamHeaderTest { @@ -37,14 +42,51 @@ public class CassandraStreamHeaderTest { String ddl = "CREATE TABLE tbl (k INT PRIMARY KEY, v INT)"; TableMetadata metadata = CreateTableStatement.parse(ddl, "ks").build(); - CassandraStreamHeader header = new CassandraStreamHeader(BigFormat.latestVersion, - SSTableFormat.Type.BIG, - 0, - new ArrayList<>(), - ((CompressionMetadata) null), - 0, - SerializationHeader.makeWithoutStats(metadata).toComponent()); + CassandraStreamHeader header = + CassandraStreamHeader.builder() + .withSSTableFormat(SSTableFormat.Type.BIG) + .withSSTableVersion(BigFormat.latestVersion) + .withSSTableLevel(0) + .withEstimatedKeys(0) + .withSections(Collections.emptyList()) + .withSerializationHeader(SerializationHeader.makeWithoutStats(metadata).toComponent()) + .withTableId(metadata.id) + .build(); SerializationUtils.assertSerializationCycle(header, CassandraStreamHeader.serializer); } + + @Test + public void serializerTest_EntireSSTableTransfer() + { + String ddl = "CREATE TABLE tbl (k INT PRIMARY KEY, v INT)"; + TableMetadata metadata = CreateTableStatement.parse(ddl, "ks").build(); + + ComponentManifest manifest = new ComponentManifest(new LinkedHashMap<Component, Long>() {{ put(Component.DATA, 100L); }}); + + CassandraStreamHeader header = + CassandraStreamHeader.builder() + .withSSTableFormat(SSTableFormat.Type.BIG) + .withSSTableVersion(BigFormat.latestVersion) + .withSSTableLevel(0) + .withEstimatedKeys(0) + .withSections(Collections.emptyList()) + .withSerializationHeader(SerializationHeader.makeWithoutStats(metadata).toComponent()) + .withComponentManifest(manifest) + .isEntireSSTable(true) + .withFirstKey(Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER)) + .withTableId(metadata.id) + .build(); + + SerializationUtils.assertSerializationCycle(header, new TestableCassandraStreamHeaderSerializer()); + } + + private static class TestableCassandraStreamHeaderSerializer extends CassandraStreamHeaderSerializer + { + @Override + public CassandraStreamHeader deserialize(DataInputPlus in, int version) throws IOException + { + return deserialize(in, version, tableId -> Murmur3Partitioner.instance); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/db/streaming/ComponentManifestTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/streaming/ComponentManifestTest.java b/test/unit/org/apache/cassandra/db/streaming/ComponentManifestTest.java new file mode 100644 index 0000000..f478a00 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/streaming/ComponentManifestTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.EOFException; +import java.io.IOException; +import java.util.LinkedHashMap; + +import org.junit.Test; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.async.ByteBufDataInputPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputPlus; +import org.apache.cassandra.serializers.SerializationUtils; + +import static org.junit.Assert.assertNotEquals; + +public class ComponentManifestTest +{ + @Test + public void testSerialization() + { + ComponentManifest expected = new ComponentManifest(new LinkedHashMap<Component, Long>() {{ put(Component.DATA, 100L); }}); + SerializationUtils.assertSerializationCycle(expected, ComponentManifest.serializer); + } + + @Test(expected = EOFException.class) + public void testSerialization_FailsOnBadBytes() throws IOException + { + ByteBuf buf = Unpooled.buffer(512); + ComponentManifest expected = new ComponentManifest(new LinkedHashMap<Component, Long>() {{ put(Component.DATA, 100L); }}); + + DataOutputPlus output = new ByteBufDataOutputPlus(buf); + ComponentManifest.serializer.serialize(expected, output, MessagingService.VERSION_40); + + buf.setInt(0, -100); + + DataInputPlus input = new ByteBufDataInputPlus(buf); + ComponentManifest actual = ComponentManifest.serializer.deserialize(input, MessagingService.VERSION_40); + + assertNotEquals(expected, actual); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java index cee8802..fccb344 100644 --- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java @@ -69,7 +69,10 @@ public class BigTableWriterTest extends AbstractTransactionalTest private TestableBTW(Descriptor desc) { - this(desc, SSTableTxnWriter.create(cfs, desc, 0, 0, null, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS))); + this(desc, SSTableTxnWriter.create(cfs, desc, 0, 0, null, + new SerializationHeader(true, cfs.metadata(), + cfs.metadata().regularAndStaticColumns(), + EncodingStats.NO_STATS))); } private TestableBTW(Descriptor desc, SSTableTxnWriter sw) http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java index fcc9191..c61ee1f 100644 --- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java @@ -22,6 +22,7 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.UUID; @@ -318,6 +319,7 @@ public class LegacySSTableTest List<OutgoingStream> streams = Lists.newArrayList(new CassandraOutgoingFile(StreamOperation.OTHER, sstable.ref(), sstable.getPositionsForRanges(ranges), + ranges, sstable.estimatedKeysForRanges(ranges))); new StreamPlan(StreamOperation.OTHER).transferStreams(FBUtilities.getBroadcastAddressAndPort(), streams).execute().get(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java new file mode 100644 index 0000000..c3931e0 --- /dev/null +++ b/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable.format.big; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.Set; +import java.util.function.Function; + +import com.google.common.collect.ImmutableSet; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.Slices; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReadsListener; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; +import org.apache.cassandra.schema.CachingParams; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadataRef; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Pair; + +import static org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +public class BigTableZeroCopyWriterTest +{ + public static final String KEYSPACE1 = "BigTableBlockWriterTest"; + public static final String CF_STANDARD = "Standard1"; + public static final String CF_STANDARD2 = "Standard2"; + public static final String CF_INDEXED = "Indexed1"; + public static final String CF_STANDARDLOWINDEXINTERVAL = "StandardLowIndexInterval"; + + public static SSTableReader sstable; + public static ColumnFamilyStore store; + private static int expectedRowCount; + + @BeforeClass + public static void defineSchema() throws Exception + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2), + SchemaLoader.compositeIndexCFMD(KEYSPACE1, CF_INDEXED, true), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARDLOWINDEXINTERVAL) + .minIndexInterval(8) + .maxIndexInterval(256) + .caching(CachingParams.CACHE_NOTHING)); + + String ks = KEYSPACE1; + String cf = "Standard1"; + + // clear and create just one sstable for this test + Keyspace keyspace = Keyspace.open(ks); + store = keyspace.getColumnFamilyStore(cf); + store.clearUnsafe(); + store.disableAutoCompaction(); + + DecoratedKey firstKey = null, lastKey = null; + long timestamp = System.currentTimeMillis(); + for (int i = 0; i < store.metadata().params.minIndexInterval; i++) + { + DecoratedKey key = Util.dk(String.valueOf(i)); + if (firstKey == null) + firstKey = key; + if (lastKey == null) + lastKey = key; + if (store.metadata().partitionKeyType.compare(lastKey.getKey(), key.getKey()) < 0) + lastKey = key; + + new RowUpdateBuilder(store.metadata(), timestamp, key.getKey()) + .clustering("col") + .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER) + .build() + .applyUnsafe(); + expectedRowCount++; + } + store.forceBlockingFlush(); + + sstable = store.getLiveSSTables().iterator().next(); + } + + @Test + public void writeDataFile_DataInputPlus() + { + writeDataTestCycle(buffer -> new DataInputStreamPlus(new ByteArrayInputStream(buffer.array()))); + } + + @Test + public void writeDataFile_RebufferingByteBufDataInputPlus() + { + writeDataTestCycle(buffer -> { + EmbeddedChannel channel = new EmbeddedChannel(); + RebufferingByteBufDataInputPlus inputPlus = new RebufferingByteBufDataInputPlus(1 << 10, 1 << 20, channel.config()); + inputPlus.append(Unpooled.wrappedBuffer(buffer)); + return inputPlus; + }); + } + + + private void writeDataTestCycle(Function<ByteBuffer, DataInputPlus> bufferMapper) + { + File dir = store.getDirectories().getDirectoryForNewSSTables(); + Descriptor desc = store.newSSTableDescriptor(dir); + TableMetadataRef metadata = Schema.instance.getTableMetadataRef(desc); + + LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM); + Set<Component> componentsToWrite = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, + Component.STATS); + + BigTableZeroCopyWriter btzcw = new BigTableZeroCopyWriter(desc, metadata, txn, componentsToWrite); + + for (Component component : componentsToWrite) + { + if (Files.exists(Paths.get(desc.filenameFor(component)))) + { + Pair<DataInputPlus, Long> pair = getSSTableComponentData(sstable, component, bufferMapper); + + btzcw.writeComponent(component.type, pair.left, pair.right); + } + } + + Collection<SSTableReader> readers = btzcw.finish(true); + + SSTableReader reader = readers.toArray(new SSTableReader[0])[0]; + + assertNotEquals(sstable.getFilename(), reader.getFilename()); + assertEquals(sstable.estimatedKeys(), reader.estimatedKeys()); + assertEquals(sstable.isPendingRepair(), reader.isPendingRepair()); + + assertRowCount(expectedRowCount); + } + + private void assertRowCount(int expected) + { + int count = 0; + for (int i = 0; i < store.metadata().params.minIndexInterval; i++) + { + DecoratedKey dk = Util.dk(String.valueOf(i)); + UnfilteredRowIterator rowIter = sstable.iterator(dk, + Slices.ALL, + ColumnFilter.all(store.metadata()), + false, + SSTableReadsListener.NOOP_LISTENER); + while (rowIter.hasNext()) + { + rowIter.next(); + count++; + } + } + assertEquals(expected, count); + } + + private Pair<DataInputPlus, Long> getSSTableComponentData(SSTableReader sstable, Component component, + Function<ByteBuffer, DataInputPlus> bufferMapper) + { + FileHandle componentFile = new FileHandle.Builder(sstable.descriptor.filenameFor(component)) + .bufferSize(1024).complete(); + ByteBuffer buffer = ByteBuffer.allocate((int) componentFile.channel.size()); + componentFile.channel.read(buffer, 0); + buffer.flip(); + + DataInputPlus inputPlus = bufferMapper.apply(buffer); + + return Pair.create(inputPlus, componentFile.channel.size()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java b/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java index 2961d9a..69df040 100644 --- a/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java +++ b/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java @@ -21,6 +21,7 @@ package org.apache.cassandra.net.async; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; import org.junit.After; import org.junit.Assert; @@ -28,7 +29,9 @@ import org.junit.Before; import org.junit.Test; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; +import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus; public class RebufferingByteBufDataInputPlusTest { @@ -151,4 +154,99 @@ public class RebufferingByteBufDataInputPlusTest inputPlus.markClose(); Assert.assertEquals(size, inputPlus.available()); } + + @Test + public void consumeUntil_SingleBuffer_Partial_HappyPath() throws IOException + { + consumeUntilTestCycle(1, 8, 0, 4); + } + + @Test + public void consumeUntil_SingleBuffer_AllBytes_HappyPath() throws IOException + { + consumeUntilTestCycle(1, 8, 0, 8); + } + + @Test + public void consumeUntil_MultipleBufferr_Partial_HappyPath() throws IOException + { + consumeUntilTestCycle(2, 8, 0, 13); + } + + @Test + public void consumeUntil_MultipleBuffer_AllBytes_HappyPath() throws IOException + { + consumeUntilTestCycle(2, 8, 0, 16); + } + + @Test(expected = EOFException.class) + public void consumeUntil_SingleBuffer_Fails() throws IOException + { + consumeUntilTestCycle(1, 8, 0, 9); + } + + @Test(expected = EOFException.class) + public void consumeUntil_MultipleBuffer_Fails() throws IOException + { + consumeUntilTestCycle(2, 8, 0, 17); + } + + private void consumeUntilTestCycle(int nBuffs, int buffSize, int startOffset, int len) throws IOException + { + byte[] expectedBytes = new byte[len]; + int count = 0; + for (int j=0; j < nBuffs; j++) + { + ByteBuf buf = channel.alloc().buffer(buffSize); + for (int i = 0; i < buf.capacity(); i++) + { + buf.writeByte(j); + if (count >= startOffset && (count - startOffset) < len) + expectedBytes[count - startOffset] = (byte)j; + count++; + } + + inputPlus.append(buf); + } + inputPlus.append(channel.alloc().buffer(0)); + + TestableWritableByteChannel wbc = new TestableWritableByteChannel(len); + + inputPlus.skipBytesFully(startOffset); + BufferedDataOutputStreamPlus writer = new BufferedDataOutputStreamPlus(wbc); + inputPlus.consumeUntil(writer, len); + + Assert.assertEquals(String.format("Test with {} buffers starting at {} consuming {} bytes", nBuffs, startOffset, + len), len, wbc.writtenBytes.readableBytes()); + + Assert.assertArrayEquals(expectedBytes, wbc.writtenBytes.array()); + } + + private static class TestableWritableByteChannel implements WritableByteChannel + { + private boolean isOpen = true; + public ByteBuf writtenBytes; + + public TestableWritableByteChannel(int initialCapacity) + { + writtenBytes = Unpooled.buffer(initialCapacity); + } + + public int write(ByteBuffer src) throws IOException + { + int size = src.remaining(); + writtenBytes.writeBytes(src); + return size; + } + + public boolean isOpen() + { + return isOpen; + } + + public void close() throws IOException + { + isOpen = false; + } + }; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/serializers/SerializationUtils.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/serializers/SerializationUtils.java b/test/unit/org/apache/cassandra/serializers/SerializationUtils.java index 7ce4ec5..b88b56f 100644 --- a/test/unit/org/apache/cassandra/serializers/SerializationUtils.java +++ b/test/unit/org/apache/cassandra/serializers/SerializationUtils.java @@ -63,6 +63,5 @@ public class SerializationUtils public static <T> void assertSerializationCycle(T src, IVersionedSerializer<T> serializer) { assertSerializationCycle(src, serializer, MessagingService.current_version); - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index 78b3094..8ebe333 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -92,7 +92,7 @@ public class StreamTransferTaskTest { List<Range<Token>> ranges = new ArrayList<>(); ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken())); - task.addTransferStream(new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.selfRef(), sstable.getPositionsForRanges(ranges), 1)); + task.addTransferStream(new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.selfRef(), sstable.getPositionsForRanges(ranges), ranges, 1)); } assertEquals(2, task.getTotalNumberOfFiles()); @@ -144,7 +144,7 @@ public class StreamTransferTaskTest ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken())); Ref<SSTableReader> ref = sstable.selfRef(); refs.add(ref); - task.addTransferStream(new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, ref, sstable.getPositionsForRanges(ranges), 1)); + task.addTransferStream(new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, ref, sstable.getPositionsForRanges(ranges), ranges, 1)); } assertEquals(2, task.getTotalNumberOfFiles()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index 72b9cbe..bc501be 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@ -278,6 +278,7 @@ public class StreamingTransferTest streams.add(new CassandraOutgoingFile(operation, sstables.get(sstable), sstable.getPositionsForRanges(ranges), + ranges, sstable.estimatedKeysForRanges(ranges))); } return streams; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
