Repository: cassandra
Updated Branches:
  refs/heads/trunk 6ba2fb939 -> 47a12c52a


http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java 
b/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
new file mode 100644
index 0000000..8256ac6
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.KeyIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.CachingParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class CassandraOutgoingFileTest
+{
+    public static final String KEYSPACE = "CassandraOutgoingFileTest";
+    public static final String CF_STANDARD = "Standard1";
+    public static final String CF_INDEXED = "Indexed1";
+    public static final String CF_STANDARDLOWINDEXINTERVAL = 
"StandardLowIndexInterval";
+
+    private static SSTableReader sstable;
+    private static ColumnFamilyStore store;
+
+    @BeforeClass
+    public static void defineSchemaAndPrepareSSTable()
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, 
CF_STANDARD),
+                                    SchemaLoader.compositeIndexCFMD(KEYSPACE, 
CF_INDEXED, true),
+                                    SchemaLoader.standardCFMD(KEYSPACE, 
CF_STANDARDLOWINDEXINTERVAL)
+                                                .minIndexInterval(8)
+                                                .maxIndexInterval(256)
+                                                
.caching(CachingParams.CACHE_NOTHING));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        store = keyspace.getColumnFamilyStore(CF_STANDARD);
+
+        // insert data and compact to a single sstable
+        CompactionManager.instance.disableAutoCompaction();
+        for (int j = 0; j < 10; j++)
+        {
+            new RowUpdateBuilder(store.metadata(), j, String.valueOf(j))
+            .clustering("0")
+            .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+            .build()
+            .applyUnsafe();
+        }
+        store.forceBlockingFlush();
+        CompactionManager.instance.performMaximal(store, false);
+
+        sstable = store.getLiveSSTables().iterator().next();
+    }
+
+    @Test
+    public void validateFullyContainedIn_SingleContiguousRange_Succeeds()
+    {
+        List<Range<Token>> requestedRanges = Arrays.asList(new 
Range<>(store.getPartitioner().getMinimumToken(), sstable.last.getToken()));
+
+        CassandraOutgoingFile cof = new 
CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(),
+                                                              
sstable.getPositionsForRanges(requestedRanges),
+                                                              requestedRanges, 
sstable.estimatedKeys());
+
+        assertTrue(cof.contained(requestedRanges, sstable));
+    }
+
+    @Test
+    public void validateFullyContainedIn_PartialOverlap_Fails()
+    {
+        List<Range<Token>> requestedRanges = Arrays.asList(new 
Range<>(store.getPartitioner().getMinimumToken(), getTokenAtIndex(2)));
+
+        CassandraOutgoingFile cof = new 
CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(),
+                                                              
sstable.getPositionsForRanges(requestedRanges),
+                                                              requestedRanges, 
sstable.estimatedKeys());
+
+        assertFalse(cof.contained(requestedRanges, sstable));
+    }
+
+    @Test
+    public void validateFullyContainedIn_SplitRange_Succeeds()
+    {
+        List<Range<Token>> requestedRanges = Arrays.asList(new 
Range<>(store.getPartitioner().getMinimumToken(), getTokenAtIndex(4)),
+                                                         new 
Range<>(getTokenAtIndex(2), getTokenAtIndex(6)),
+                                                         new 
Range<>(getTokenAtIndex(5), sstable.last.getToken()));
+
+        CassandraOutgoingFile cof = new 
CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(),
+                                                              
sstable.getPositionsForRanges(requestedRanges),
+                                                              requestedRanges, 
sstable.estimatedKeys());
+
+        assertTrue(cof.contained(requestedRanges, sstable));
+    }
+
+    private DecoratedKey getKeyAtIndex(int i)
+    {
+        int count = 0;
+        DecoratedKey key;
+
+        try (KeyIterator iter = new KeyIterator(sstable.descriptor, 
sstable.metadata()))
+        {
+            do
+            {
+                key = iter.next();
+                count++;
+            } while (iter.hasNext() && count < i);
+        }
+        return key;
+    }
+
+    private Token getTokenAtIndex(int i)
+    {
+        return getKeyAtIndex(i).getToken();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java 
b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
index 061a4b2..e48abf6 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
@@ -15,20 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.db.streaming;
 
-import java.util.ArrayList;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 
 import org.junit.Test;
 
 import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
 import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.io.compress.CompressionMetadata;
+import 
org.apache.cassandra.db.streaming.CassandraStreamHeader.CassandraStreamHeaderSerializer;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.serializers.SerializationUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class CassandraStreamHeaderTest
 {
@@ -37,14 +42,51 @@ public class CassandraStreamHeaderTest
     {
         String ddl = "CREATE TABLE tbl (k INT PRIMARY KEY, v INT)";
         TableMetadata metadata = CreateTableStatement.parse(ddl, "ks").build();
-        CassandraStreamHeader header = new 
CassandraStreamHeader(BigFormat.latestVersion,
-                                                                 
SSTableFormat.Type.BIG,
-                                                                 0,
-                                                                 new 
ArrayList<>(),
-                                                                 
((CompressionMetadata) null),
-                                                                 0,
-                                                                 
SerializationHeader.makeWithoutStats(metadata).toComponent());
+        CassandraStreamHeader header =
+            CassandraStreamHeader.builder()
+                                 .withSSTableFormat(SSTableFormat.Type.BIG)
+                                 .withSSTableVersion(BigFormat.latestVersion)
+                                 .withSSTableLevel(0)
+                                 .withEstimatedKeys(0)
+                                 .withSections(Collections.emptyList())
+                                 
.withSerializationHeader(SerializationHeader.makeWithoutStats(metadata).toComponent())
+                                 .withTableId(metadata.id)
+                                 .build();
 
         SerializationUtils.assertSerializationCycle(header, 
CassandraStreamHeader.serializer);
     }
+
+    @Test
+    public void serializerTest_EntireSSTableTransfer()
+    {
+        String ddl = "CREATE TABLE tbl (k INT PRIMARY KEY, v INT)";
+        TableMetadata metadata = CreateTableStatement.parse(ddl, "ks").build();
+
+        ComponentManifest manifest = new ComponentManifest(new 
LinkedHashMap<Component, Long>() {{ put(Component.DATA, 100L); }});
+
+        CassandraStreamHeader header =
+            CassandraStreamHeader.builder()
+                                 .withSSTableFormat(SSTableFormat.Type.BIG)
+                                 .withSSTableVersion(BigFormat.latestVersion)
+                                 .withSSTableLevel(0)
+                                 .withEstimatedKeys(0)
+                                 .withSections(Collections.emptyList())
+                                 
.withSerializationHeader(SerializationHeader.makeWithoutStats(metadata).toComponent())
+                                 .withComponentManifest(manifest)
+                                 .isEntireSSTable(true)
+                                 
.withFirstKey(Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER))
+                                 .withTableId(metadata.id)
+                                 .build();
+
+        SerializationUtils.assertSerializationCycle(header, new 
TestableCassandraStreamHeaderSerializer());
+    }
+
+    private static class TestableCassandraStreamHeaderSerializer extends 
CassandraStreamHeaderSerializer
+    {
+        @Override
+        public CassandraStreamHeader deserialize(DataInputPlus in, int 
version) throws IOException
+        {
+            return deserialize(in, version, tableId -> 
Murmur3Partitioner.instance);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/db/streaming/ComponentManifestTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/streaming/ComponentManifestTest.java 
b/test/unit/org/apache/cassandra/db/streaming/ComponentManifestTest.java
new file mode 100644
index 0000000..f478a00
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/streaming/ComponentManifestTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.async.ByteBufDataInputPlus;
+import org.apache.cassandra.net.async.ByteBufDataOutputPlus;
+import org.apache.cassandra.serializers.SerializationUtils;
+
+import static org.junit.Assert.assertNotEquals;
+
+public class ComponentManifestTest
+{
+    @Test
+    public void testSerialization()
+    {
+        ComponentManifest expected = new ComponentManifest(new 
LinkedHashMap<Component, Long>() {{ put(Component.DATA, 100L); }});
+        SerializationUtils.assertSerializationCycle(expected, 
ComponentManifest.serializer);
+    }
+
+    @Test(expected = EOFException.class)
+    public void testSerialization_FailsOnBadBytes() throws IOException
+    {
+        ByteBuf buf = Unpooled.buffer(512);
+        ComponentManifest expected = new ComponentManifest(new 
LinkedHashMap<Component, Long>() {{ put(Component.DATA, 100L); }});
+
+        DataOutputPlus output = new ByteBufDataOutputPlus(buf);
+        ComponentManifest.serializer.serialize(expected, output, 
MessagingService.VERSION_40);
+
+        buf.setInt(0, -100);
+
+        DataInputPlus input = new ByteBufDataInputPlus(buf);
+        ComponentManifest actual = 
ComponentManifest.serializer.deserialize(input, MessagingService.VERSION_40);
+
+        assertNotEquals(expected, actual);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java 
b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
index cee8802..fccb344 100644
--- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
@@ -69,7 +69,10 @@ public class BigTableWriterTest extends 
AbstractTransactionalTest
 
         private TestableBTW(Descriptor desc)
         {
-            this(desc, SSTableTxnWriter.create(cfs, desc, 0, 0, null, new 
SerializationHeader(true, cfs.metadata(), 
cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)));
+            this(desc, SSTableTxnWriter.create(cfs, desc, 0, 0, null,
+                                               new SerializationHeader(true, 
cfs.metadata(),
+                                                                       
cfs.metadata().regularAndStaticColumns(),
+                                                                       
EncodingStats.NO_STATS)));
         }
 
         private TestableBTW(Descriptor desc, SSTableTxnWriter sw)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java 
b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index fcc9191..c61ee1f 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -22,6 +22,7 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
@@ -318,6 +319,7 @@ public class LegacySSTableTest
         List<OutgoingStream> streams = Lists.newArrayList(new 
CassandraOutgoingFile(StreamOperation.OTHER,
                                                                                
     sstable.ref(),
                                                                                
     sstable.getPositionsForRanges(ranges),
+                                                                               
     ranges,
                                                                                
     sstable.estimatedKeysForRanges(ranges)));
         new 
StreamPlan(StreamOperation.OTHER).transferStreams(FBUtilities.getBroadcastAddressAndPort(),
 streams).execute().get();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java
 
b/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java
new file mode 100644
index 0000000..c3931e0
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Set;
+import java.util.function.Function;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.Slices;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
+import org.apache.cassandra.schema.CachingParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class BigTableZeroCopyWriterTest
+{
+    public static final String KEYSPACE1 = "BigTableBlockWriterTest";
+    public static final String CF_STANDARD = "Standard1";
+    public static final String CF_STANDARD2 = "Standard2";
+    public static final String CF_INDEXED = "Indexed1";
+    public static final String CF_STANDARDLOWINDEXINTERVAL = 
"StandardLowIndexInterval";
+
+    public static SSTableReader sstable;
+    public static ColumnFamilyStore store;
+    private static int expectedRowCount;
+
+    @BeforeClass
+    public static void defineSchema() throws Exception
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, 
CF_STANDARD),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, 
CF_STANDARD2),
+                                    SchemaLoader.compositeIndexCFMD(KEYSPACE1, 
CF_INDEXED, true),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, 
CF_STANDARDLOWINDEXINTERVAL)
+                                                .minIndexInterval(8)
+                                                .maxIndexInterval(256)
+                                                
.caching(CachingParams.CACHE_NOTHING));
+
+        String ks = KEYSPACE1;
+        String cf = "Standard1";
+
+        // clear and create just one sstable for this test
+        Keyspace keyspace = Keyspace.open(ks);
+        store = keyspace.getColumnFamilyStore(cf);
+        store.clearUnsafe();
+        store.disableAutoCompaction();
+
+        DecoratedKey firstKey = null, lastKey = null;
+        long timestamp = System.currentTimeMillis();
+        for (int i = 0; i < store.metadata().params.minIndexInterval; i++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(i));
+            if (firstKey == null)
+                firstKey = key;
+            if (lastKey == null)
+                lastKey = key;
+            if (store.metadata().partitionKeyType.compare(lastKey.getKey(), 
key.getKey()) < 0)
+                lastKey = key;
+
+            new RowUpdateBuilder(store.metadata(), timestamp, key.getKey())
+            .clustering("col")
+            .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+            .build()
+            .applyUnsafe();
+            expectedRowCount++;
+        }
+        store.forceBlockingFlush();
+
+        sstable = store.getLiveSSTables().iterator().next();
+    }
+
+    @Test
+    public void writeDataFile_DataInputPlus()
+    {
+        writeDataTestCycle(buffer -> new DataInputStreamPlus(new 
ByteArrayInputStream(buffer.array())));
+    }
+
+    @Test
+    public void writeDataFile_RebufferingByteBufDataInputPlus()
+    {
+        writeDataTestCycle(buffer -> {
+            EmbeddedChannel channel = new EmbeddedChannel();
+            RebufferingByteBufDataInputPlus inputPlus = new 
RebufferingByteBufDataInputPlus(1 << 10, 1 << 20, channel.config());
+            inputPlus.append(Unpooled.wrappedBuffer(buffer));
+            return inputPlus;
+        });
+    }
+
+
+    private void writeDataTestCycle(Function<ByteBuffer, DataInputPlus> 
bufferMapper)
+    {
+        File dir = store.getDirectories().getDirectoryForNewSSTables();
+        Descriptor desc = store.newSSTableDescriptor(dir);
+        TableMetadataRef metadata = Schema.instance.getTableMetadataRef(desc);
+
+        LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.STREAM);
+        Set<Component> componentsToWrite = ImmutableSet.of(Component.DATA, 
Component.PRIMARY_INDEX,
+                                                           Component.STATS);
+
+        BigTableZeroCopyWriter btzcw = new BigTableZeroCopyWriter(desc, 
metadata, txn, componentsToWrite);
+
+        for (Component component : componentsToWrite)
+        {
+            if (Files.exists(Paths.get(desc.filenameFor(component))))
+            {
+                Pair<DataInputPlus, Long> pair = 
getSSTableComponentData(sstable, component, bufferMapper);
+
+                btzcw.writeComponent(component.type, pair.left, pair.right);
+            }
+        }
+
+        Collection<SSTableReader> readers = btzcw.finish(true);
+
+        SSTableReader reader = readers.toArray(new SSTableReader[0])[0];
+
+        assertNotEquals(sstable.getFilename(), reader.getFilename());
+        assertEquals(sstable.estimatedKeys(), reader.estimatedKeys());
+        assertEquals(sstable.isPendingRepair(), reader.isPendingRepair());
+
+        assertRowCount(expectedRowCount);
+    }
+
+    private void assertRowCount(int expected)
+    {
+        int count = 0;
+        for (int i = 0; i < store.metadata().params.minIndexInterval; i++)
+        {
+            DecoratedKey dk = Util.dk(String.valueOf(i));
+            UnfilteredRowIterator rowIter = sstable.iterator(dk,
+                                                             Slices.ALL,
+                                                             
ColumnFilter.all(store.metadata()),
+                                                             false,
+                                                             
SSTableReadsListener.NOOP_LISTENER);
+            while (rowIter.hasNext())
+            {
+                rowIter.next();
+                count++;
+            }
+        }
+        assertEquals(expected, count);
+    }
+
+    private Pair<DataInputPlus, Long> getSSTableComponentData(SSTableReader 
sstable, Component component,
+                                                              
Function<ByteBuffer, DataInputPlus> bufferMapper)
+    {
+        FileHandle componentFile = new 
FileHandle.Builder(sstable.descriptor.filenameFor(component))
+                                   .bufferSize(1024).complete();
+        ByteBuffer buffer = ByteBuffer.allocate((int) 
componentFile.channel.size());
+        componentFile.channel.read(buffer, 0);
+        buffer.flip();
+
+        DataInputPlus inputPlus = bufferMapper.apply(buffer);
+
+        return Pair.create(inputPlus, componentFile.channel.size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java
 
b/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java
index 2961d9a..69df040 100644
--- 
a/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java
+++ 
b/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.net.async;
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
 
 import org.junit.After;
 import org.junit.Assert;
@@ -28,7 +29,9 @@ import org.junit.Before;
 import org.junit.Test;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
 
 public class RebufferingByteBufDataInputPlusTest
 {
@@ -151,4 +154,99 @@ public class RebufferingByteBufDataInputPlusTest
         inputPlus.markClose();
         Assert.assertEquals(size, inputPlus.available());
     }
+
+    @Test
+    public void consumeUntil_SingleBuffer_Partial_HappyPath() throws 
IOException
+    {
+        consumeUntilTestCycle(1, 8, 0, 4);
+    }
+
+    @Test
+    public void consumeUntil_SingleBuffer_AllBytes_HappyPath() throws 
IOException
+    {
+        consumeUntilTestCycle(1, 8, 0, 8);
+    }
+
+    @Test
+    public void consumeUntil_MultipleBufferr_Partial_HappyPath() throws 
IOException
+    {
+        consumeUntilTestCycle(2, 8, 0, 13);
+    }
+
+    @Test
+    public void consumeUntil_MultipleBuffer_AllBytes_HappyPath() throws 
IOException
+    {
+        consumeUntilTestCycle(2, 8, 0, 16);
+    }
+
+    @Test(expected = EOFException.class)
+    public void consumeUntil_SingleBuffer_Fails() throws IOException
+    {
+        consumeUntilTestCycle(1, 8, 0, 9);
+    }
+
+    @Test(expected = EOFException.class)
+    public void consumeUntil_MultipleBuffer_Fails() throws IOException
+    {
+        consumeUntilTestCycle(2, 8, 0, 17);
+    }
+
+    private void consumeUntilTestCycle(int nBuffs, int buffSize, int 
startOffset, int len) throws IOException
+    {
+        byte[] expectedBytes = new byte[len];
+        int count = 0;
+        for (int j=0; j < nBuffs; j++)
+        {
+            ByteBuf buf = channel.alloc().buffer(buffSize);
+            for (int i = 0; i < buf.capacity(); i++)
+            {
+                buf.writeByte(j);
+                if (count >= startOffset && (count - startOffset) < len)
+                    expectedBytes[count - startOffset] = (byte)j;
+                count++;
+            }
+
+            inputPlus.append(buf);
+        }
+        inputPlus.append(channel.alloc().buffer(0));
+
+        TestableWritableByteChannel wbc = new TestableWritableByteChannel(len);
+
+        inputPlus.skipBytesFully(startOffset);
+        BufferedDataOutputStreamPlus writer = new 
BufferedDataOutputStreamPlus(wbc);
+        inputPlus.consumeUntil(writer, len);
+
+        Assert.assertEquals(String.format("Test with {} buffers starting at {} 
consuming {} bytes", nBuffs, startOffset,
+                                          len), len, 
wbc.writtenBytes.readableBytes());
+
+        Assert.assertArrayEquals(expectedBytes, wbc.writtenBytes.array());
+    }
+
+    private static class TestableWritableByteChannel implements 
WritableByteChannel
+    {
+        private boolean isOpen = true;
+        public ByteBuf writtenBytes;
+
+        public TestableWritableByteChannel(int initialCapacity)
+        {
+             writtenBytes = Unpooled.buffer(initialCapacity);
+        }
+
+        public int write(ByteBuffer src) throws IOException
+        {
+            int size = src.remaining();
+            writtenBytes.writeBytes(src);
+            return size;
+        }
+
+        public boolean isOpen()
+        {
+            return isOpen;
+        }
+
+        public void close() throws IOException
+        {
+            isOpen = false;
+        }
+    };
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/serializers/SerializationUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/serializers/SerializationUtils.java 
b/test/unit/org/apache/cassandra/serializers/SerializationUtils.java
index 7ce4ec5..b88b56f 100644
--- a/test/unit/org/apache/cassandra/serializers/SerializationUtils.java
+++ b/test/unit/org/apache/cassandra/serializers/SerializationUtils.java
@@ -63,6 +63,5 @@ public class SerializationUtils
     public static <T> void assertSerializationCycle(T src, 
IVersionedSerializer<T> serializer)
     {
         assertSerializationCycle(src, serializer, 
MessagingService.current_version);
-
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 78b3094..8ebe333 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -92,7 +92,7 @@ public class StreamTransferTaskTest
         {
             List<Range<Token>> ranges = new ArrayList<>();
             ranges.add(new Range<>(sstable.first.getToken(), 
sstable.last.getToken()));
-            task.addTransferStream(new 
CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.selfRef(), 
sstable.getPositionsForRanges(ranges), 1));
+            task.addTransferStream(new 
CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.selfRef(), 
sstable.getPositionsForRanges(ranges), ranges, 1));
         }
         assertEquals(2, task.getTotalNumberOfFiles());
 
@@ -144,7 +144,7 @@ public class StreamTransferTaskTest
             ranges.add(new Range<>(sstable.first.getToken(), 
sstable.last.getToken()));
             Ref<SSTableReader> ref = sstable.selfRef();
             refs.add(ref);
-            task.addTransferStream(new 
CassandraOutgoingFile(StreamOperation.BOOTSTRAP, ref, 
sstable.getPositionsForRanges(ranges), 1));
+            task.addTransferStream(new 
CassandraOutgoingFile(StreamOperation.BOOTSTRAP, ref, 
sstable.getPositionsForRanges(ranges), ranges, 1));
         }
         assertEquals(2, task.getTotalNumberOfFiles());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 72b9cbe..bc501be 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -278,6 +278,7 @@ public class StreamingTransferTest
             streams.add(new CassandraOutgoingFile(operation,
                                                   sstables.get(sstable),
                                                   
sstable.getPositionsForRanges(ranges),
+                                                  ranges,
                                                   
sstable.estimatedKeysForRanges(ranges)));
         }
         return streams;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to