http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java deleted file mode 100644 index c5b0c53..0000000 --- a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.streaming; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.io.compress.CompressionMetadata; -import org.apache.cassandra.io.sstable.Component; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.util.ChannelProxy; -import org.apache.cassandra.io.util.DataOutputStreamPlus; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; -import org.apache.cassandra.streaming.ProgressInfo; -import org.apache.cassandra.streaming.StreamSession; -import org.apache.cassandra.utils.FBUtilities; - -/** - * CassandraStreamWriter for compressed SSTable. - */ -public class CompressedCassandraStreamWriter extends CassandraStreamWriter -{ - private static final int CHUNK_SIZE = 1 << 16; - - private static final Logger logger = LoggerFactory.getLogger(CompressedCassandraStreamWriter.class); - - private final CompressionInfo compressionInfo; - - public CompressedCassandraStreamWriter(SSTableReader sstable, Collection<SSTableReader.PartitionPositionBounds> sections, CompressionInfo compressionInfo, StreamSession session) - { - super(sstable, sections, session); - this.compressionInfo = compressionInfo; - } - - @Override - public void write(DataOutputStreamPlus out) throws IOException - { - assert out instanceof ByteBufDataOutputStreamPlus; - ByteBufDataOutputStreamPlus output = (ByteBufDataOutputStreamPlus)out; - long totalSize = totalSize(); - logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(), - sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); - try (ChannelProxy fc = sstable.getDataChannel().sharedCopy()) - { - long progress = 0L; - // calculate chunks to transfer. we want to send continuous chunks altogether. - List<SSTableReader.PartitionPositionBounds> sections = getTransferSections(compressionInfo.chunks); - - int sectionIdx = 0; - - // stream each of the required sections of the file - for (final SSTableReader.PartitionPositionBounds section : sections) - { - // length of the section to stream - long length = section.upperPosition - section.lowerPosition; - - logger.trace("[Stream #{}] Writing section {} with length {} to stream.", session.planId(), sectionIdx++, length); - - // tracks write progress - long bytesTransferred = 0; - while (bytesTransferred < length) - { - final int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred); - limiter.acquire(toTransfer); - - ByteBuffer outBuffer = ByteBuffer.allocateDirect(toTransfer); - long lastWrite; - try - { - lastWrite = fc.read(outBuffer, section.lowerPosition + bytesTransferred); - assert lastWrite == toTransfer : String.format("could not read required number of bytes from file to be streamed: read %d bytes, wanted %d bytes", lastWrite, toTransfer); - outBuffer.flip(); - output.writeToChannel(outBuffer); - } - catch (IOException e) - { - FileUtils.clean(outBuffer); - throw e; - } - - bytesTransferred += lastWrite; - progress += lastWrite; - session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize); - } - } - logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}", - session.planId(), sstable.getFilename(), session.peer, FBUtilities.prettyPrintMemory(progress), FBUtilities.prettyPrintMemory(totalSize)); - } - } - - @Override - protected long totalSize() - { - long size = 0; - // calculate total length of transferring chunks - for (CompressionMetadata.Chunk chunk : compressionInfo.chunks) - size += chunk.length + 4; // 4 bytes for CRC - return size; - } - - // chunks are assumed to be sorted by offset - private List<SSTableReader.PartitionPositionBounds> getTransferSections(CompressionMetadata.Chunk[] chunks) - { - List<SSTableReader.PartitionPositionBounds> transferSections = new ArrayList<>(); - SSTableReader.PartitionPositionBounds lastSection = null; - for (CompressionMetadata.Chunk chunk : chunks) - { - if (lastSection != null) - { - if (chunk.offset == lastSection.upperPosition) - { - // extend previous section to end of this chunk - lastSection = new SSTableReader.PartitionPositionBounds(lastSection.lowerPosition, chunk.offset + chunk.length + 4); // 4 bytes for CRC - } - else - { - transferSections.add(lastSection); - lastSection = new SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length + 4); - } - } - else - { - lastSection = new SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length + 4); - } - } - if (lastSection != null) - transferSections.add(lastSection); - return transferSections; - } -}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java b/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java index 2f56786..c0278e8 100644 --- a/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java +++ b/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java @@ -60,7 +60,7 @@ public class CompressedInputStream extends RebufferingInputStream implements Aut private long bufferOffset = 0; /** - * The current {@link CompressedCassandraStreamReader#sections} offset in the stream. + * The current {@link CassandraCompressedStreamReader#sections} offset in the stream. */ private long current = 0; @@ -98,7 +98,7 @@ public class CompressedInputStream extends RebufferingInputStream implements Aut } /** - * Invoked when crossing into the next stream boundary in {@link CompressedCassandraStreamReader#sections}. + * Invoked when crossing into the next stream boundary in {@link CassandraCompressedStreamReader#sections}. */ public void position(long position) throws IOException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/IStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/IStreamReader.java b/src/java/org/apache/cassandra/db/streaming/IStreamReader.java new file mode 100644 index 0000000..cf93bc2 --- /dev/null +++ b/src/java/org/apache/cassandra/db/streaming/IStreamReader.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; + +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.util.DataInputPlus; + +/** + * This is the interface is used by the streaming code read a SSTable stream off a channel. + */ +public interface IStreamReader +{ + public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/sstable/Component.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java index 9daac7c..a81db85 100644 --- a/src/java/org/apache/cassandra/io/sstable/Component.java +++ b/src/java/org/apache/cassandra/io/sstable/Component.java @@ -33,6 +33,10 @@ public class Component final static EnumSet<Type> TYPES = EnumSet.allOf(Type.class); + /** + * WARNING: Be careful while changing the names or string representation of the enum + * members. Streaming code depends on the names during streaming (Ref: CASSANDRA-14556). + */ public enum Type { // the base data for an sstable: the remaining components can be regenerated @@ -60,6 +64,7 @@ public class Component CUSTOM(null); final String repr; + Type(String repr) { this.repr = repr; @@ -120,7 +125,7 @@ public class Component * @return the component corresponding to {@code name}. Note that this always return a component as an unrecognized * name is parsed into a CUSTOM component. */ - static Component parse(String name) + public static Component parse(String name) { Type type = Type.fromRepresentation(name); http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index ebc35e7..4ba0533 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -131,7 +131,7 @@ public class SSTableLoader implements StreamEventHandler List<SSTableReader.PartitionPositionBounds> sstableSections = sstable.getPositionsForRanges(tokenRanges); long estimatedKeys = sstable.estimatedKeysForRanges(tokenRanges); Ref<SSTableReader> ref = sstable.ref(); - OutgoingStream stream = new CassandraOutgoingFile(StreamOperation.BULK_LOAD, ref, sstableSections, estimatedKeys); + OutgoingStream stream = new CassandraOutgoingFile(StreamOperation.BULK_LOAD, ref, sstableSections, tokenRanges, estimatedKeys); streamingDetails.put(endpoint, stream); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java new file mode 100644 index 0000000..400f119 --- /dev/null +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.big; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.EnumMap; +import java.util.Map; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadataRef; + +import static java.lang.String.format; +import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; + +public class BigTableZeroCopyWriter extends SSTable implements SSTableMultiWriter +{ + private static final Logger logger = LoggerFactory.getLogger(BigTableZeroCopyWriter.class); + + private final TableMetadataRef metadata; + private volatile SSTableReader finalReader; + private final Map<Component.Type, SequentialWriter> componentWriters; + + private static final SequentialWriterOption WRITER_OPTION = + SequentialWriterOption.newBuilder() + .trickleFsync(false) + .bufferSize(2 << 20) + .bufferType(BufferType.OFF_HEAP) + .build(); + + private static final ImmutableSet<Component> SUPPORTED_COMPONENTS = + ImmutableSet.of(Component.DATA, + Component.PRIMARY_INDEX, + Component.SUMMARY, + Component.STATS, + Component.COMPRESSION_INFO, + Component.FILTER, + Component.DIGEST, + Component.CRC); + + public BigTableZeroCopyWriter(Descriptor descriptor, + TableMetadataRef metadata, + LifecycleTransaction txn, + final Collection<Component> components) + { + super(descriptor, ImmutableSet.copyOf(components), metadata, DatabaseDescriptor.getDiskOptimizationStrategy()); + + txn.trackNew(this); + this.metadata = metadata; + this.componentWriters = new EnumMap<>(Component.Type.class); + + if (!SUPPORTED_COMPONENTS.containsAll(components)) + throw new AssertionError(format("Unsupported streaming component detected %s", + Sets.difference(ImmutableSet.copyOf(components), SUPPORTED_COMPONENTS))); + + for (Component c : components) + componentWriters.put(c.type, makeWriter(descriptor, c)); + } + + private static SequentialWriter makeWriter(Descriptor descriptor, Component component) + { + return new SequentialWriter(new File(descriptor.filenameFor(component)), WRITER_OPTION, false); + } + + private void write(DataInputPlus in, long size, SequentialWriter out) throws FSWriteError + { + final int BUFFER_SIZE = 1 << 20; + long bytesRead = 0; + byte[] buff = new byte[BUFFER_SIZE]; + try + { + while (bytesRead < size) + { + int toRead = (int) Math.min(size - bytesRead, BUFFER_SIZE); + in.readFully(buff, 0, toRead); + int count = Math.min(toRead, BUFFER_SIZE); + out.write(buff, 0, count); + bytesRead += count; + } + out.sync(); + } + catch (IOException e) + { + throw new FSWriteError(e, out.getPath()); + } + } + + @Override + public boolean append(UnfilteredRowIterator partition) + { + throw new UnsupportedOperationException("Operation not supported by BigTableBlockWriter"); + } + + @Override + public Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult) + { + return finish(openResult); + } + + @Override + public Collection<SSTableReader> finish(boolean openResult) + { + setOpenResult(openResult); + return finished(); + } + + @Override + public Collection<SSTableReader> finished() + { + if (finalReader == null) + finalReader = SSTableReader.open(descriptor, components, metadata); + + return ImmutableList.of(finalReader); + } + + @Override + public SSTableMultiWriter setOpenResult(boolean openResult) + { + return null; + } + + @Override + public long getFilePointer() + { + return 0; + } + + @Override + public TableId getTableId() + { + return metadata.id; + } + + @Override + public Throwable commit(Throwable accumulate) + { + for (SequentialWriter writer : componentWriters.values()) + accumulate = writer.commit(accumulate); + return accumulate; + } + + @Override + public Throwable abort(Throwable accumulate) + { + for (SequentialWriter writer : componentWriters.values()) + accumulate = writer.abort(accumulate); + return accumulate; + } + + @Override + public void prepareToCommit() + { + for (SequentialWriter writer : componentWriters.values()) + writer.prepareToCommit(); + } + + @Override + public void close() + { + for (SequentialWriter writer : componentWriters.values()) + writer.close(); + } + + public void writeComponent(Component.Type type, DataInputPlus in, long size) + { + logger.info("Writing component {} to {} length {}", type, componentWriters.get(type).getPath(), prettyPrintMemory(size)); + + if (in instanceof RebufferingByteBufDataInputPlus) + write((RebufferingByteBufDataInputPlus) in, size, componentWriters.get(type)); + else + write(in, size, componentWriters.get(type)); + } + + private void write(RebufferingByteBufDataInputPlus in, long size, SequentialWriter writer) + { + logger.info("Block Writing component to {} length {}", writer.getPath(), prettyPrintMemory(size)); + + try + { + long bytesWritten = in.consumeUntil(writer, size); + + if (bytesWritten != size) + throw new IOException(format("Failed to read correct number of bytes from channel %s", writer)); + } + catch (IOException e) + { + throw new FSWriteError(e, writer.getPath()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java index 54122ee..56d88f7 100644 --- a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java +++ b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java @@ -24,11 +24,9 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.WritableByteChannel; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import net.nicoulaj.compilecommand.annotations.DontInline; - import org.apache.cassandra.config.Config; import org.apache.cassandra.utils.memory.MemoryUtil; import org.apache.cassandra.utils.vint.VIntCoding; @@ -341,7 +339,7 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus } @Override - public <R> R applyToChannel(Function<WritableByteChannel, R> f) throws IOException + public <R> R applyToChannel(CheckedFunction<WritableByteChannel, R, IOException> f) throws IOException { if (strictFlushing) throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/CheckedFunction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/CheckedFunction.java b/src/java/org/apache/cassandra/io/util/CheckedFunction.java new file mode 100644 index 0000000..ec1ce9f --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/CheckedFunction.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.util; + +@FunctionalInterface +public interface CheckedFunction<T, R, E extends Exception> +{ + R apply(T t) throws E; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/DataOutputPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java index a9dbb68..16be42f 100644 --- a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java +++ b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java @@ -22,8 +22,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; -import com.google.common.base.Function; - import org.apache.cassandra.utils.vint.VIntCoding; /** @@ -41,7 +39,7 @@ public interface DataOutputPlus extends DataOutput * Safe way to operate against the underlying channel. Impossible to stash a reference to the channel * and forget to flush */ - <R> R applyToChannel(Function<WritableByteChannel, R> c) throws IOException; + <R> R applyToChannel(CheckedFunction<WritableByteChannel, R, IOException> c) throws IOException; default void writeVInt(long i) throws IOException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java index 086f5c9..ef51888 100644 --- a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java +++ b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java @@ -25,12 +25,12 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import com.google.common.base.Preconditions; + import net.nicoulaj.compilecommand.annotations.DontInline; import org.apache.cassandra.utils.FastByteOperations; import org.apache.cassandra.utils.vint.VIntCoding; -import com.google.common.base.Preconditions; - /** * Rough equivalent of BufferedInputStream and DataInputStream wrapping a ByteBuffer that can be refilled * via rebuffer. Implementations provide this buffer from various channels (socket, file, memory, etc). http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/SequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java index e71f2fa..3eb1a7d 100644 --- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java @@ -138,11 +138,22 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr */ public SequentialWriter(File file, SequentialWriterOption option) { + this(file, option, true); + } + + /** + * Create SequentialWriter for given file with specific writer option. + * @param file + * @param option + * @param strictFlushing + */ + public SequentialWriter(File file, SequentialWriterOption option, boolean strictFlushing) + { super(openChannel(file), option.allocateBuffer()); - strictFlushing = true; - fchannel = (FileChannel)channel; + this.strictFlushing = strictFlushing; + this.fchannel = (FileChannel)channel; - filePath = file.getAbsolutePath(); + this.filePath = file.getAbsolutePath(); this.option = option; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java index 54b4cb1..d9ef010 100644 --- a/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java +++ b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java @@ -378,7 +378,7 @@ public abstract class UnbufferedDataOutputStreamPlus extends DataOutputStreamPlu } @Override - public <R> R applyToChannel(Function<WritableByteChannel, R> f) throws IOException + public <R> R applyToChannel(CheckedFunction<WritableByteChannel, R, IOException> f) throws IOException { return f.apply(channel); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java index 0473465..a77cb07 100644 --- a/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java +++ b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java @@ -22,10 +22,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; -import com.google.common.base.Function; - import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufOutputStream; +import org.apache.cassandra.io.util.CheckedFunction; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.Memory; import org.apache.cassandra.io.util.UnbufferedDataOutputStreamPlus; @@ -82,7 +81,7 @@ public class ByteBufDataOutputPlus extends ByteBufOutputStream implements DataOu } @Override - public <R> R applyToChannel(Function<WritableByteChannel, R> c) throws IOException + public <R> R applyToChannel(CheckedFunction<WritableByteChannel, R, IOException> c) throws IOException { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java index 3a544e4..777bc3e 100644 --- a/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java +++ b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java @@ -20,11 +20,14 @@ package org.apache.cassandra.net.async; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.channels.WritableByteChannel; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import com.google.common.util.concurrent.Uninterruptibles; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -35,6 +38,7 @@ import io.netty.util.concurrent.Future; import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; import org.apache.cassandra.streaming.StreamSession; /** @@ -49,6 +53,7 @@ public class ByteBufDataOutputStreamPlus extends BufferedDataOutputStreamPlus private final StreamSession session; private final Channel channel; private final int bufferSize; + private final Logger logger = LoggerFactory.getLogger(ByteBufDataOutputStreamPlus.class); /** * Tracks how many bytes we've written to the netty channel. This more or less follows the channel's @@ -70,7 +75,6 @@ public class ByteBufDataOutputStreamPlus extends BufferedDataOutputStreamPlus this.channel = channel; this.currentBuf = buffer; this.bufferSize = bufferSize; - channelRateLimiter = new Semaphore(channel.config().getWriteBufferHighWaterMark(), true); } @@ -114,8 +118,9 @@ public class ByteBufDataOutputStreamPlus extends BufferedDataOutputStreamPlus doFlush(buffer.position()); int byteCount = buf.readableBytes(); + if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, 5, TimeUnit.MINUTES)) - throw new IOException("outbound channel was not writable"); + throw new IOException(String.format("outbound channel was not writable. Failed to acquire sufficient permits %d", byteCount)); // the (possibly naive) assumption that we should always flush after each incoming buf ChannelFuture channelFuture = channel.writeAndFlush(buf); @@ -135,6 +140,53 @@ public class ByteBufDataOutputStreamPlus extends BufferedDataOutputStreamPlus return channelFuture; } + /** + * Writes all data in file channel to stream BUFFER_SIZE at a time. + * Closes file channel when done + * + * @param f + * @return number of bytes transferred + * @throws IOException + */ + public long writeToChannel(FileChannel f, StreamRateLimiter limiter) throws IOException + { + final long length = f.size(); + long bytesTransferred = 0; + + try + { + while (bytesTransferred < length) + { + int toRead = (int) Math.min(bufferSize, length - bytesTransferred); + NonClosingDefaultFileRegion fileRegion = new NonClosingDefaultFileRegion(f, bytesTransferred, toRead); + + if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, toRead, 5, TimeUnit.MINUTES)) + throw new IOException(String.format("outbound channel was not writable. Failed to acquire sufficient permits %d", toRead)); + + limiter.acquire(toRead); + + bytesTransferred += toRead; + final boolean shouldClose = (bytesTransferred == length); // this is the last buffer, can safely close channel + + channel.writeAndFlush(fileRegion).addListener(future -> { + handleBuffer(future, toRead); + + if ((shouldClose || !future.isSuccess()) && f.isOpen()) + f.close(); + }); + logger.trace("{} of {} (toRead {} cs {})", bytesTransferred, length, toRead, f.isOpen()); + } + + return bytesTransferred; + } catch (Exception e) + { + if (f.isOpen()) + f.close(); + + throw e; + } + } + @Override protected void doFlush(int count) throws IOException { @@ -145,7 +197,7 @@ public class ByteBufDataOutputStreamPlus extends BufferedDataOutputStreamPlus currentBuf.writerIndex(byteCount); if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, 2, TimeUnit.MINUTES)) - throw new IOException("outbound channel was not writable"); + throw new IOException(String.format("outbound channel was not writable. Failed to acquire sufficient permits %d", byteCount)); channel.writeAndFlush(currentBuf).addListener(future -> handleBuffer(future, byteCount)); currentBuf = channel.alloc().directBuffer(bufferSize, bufferSize); @@ -161,7 +213,7 @@ public class ByteBufDataOutputStreamPlus extends BufferedDataOutputStreamPlus private void handleBuffer(Future<? super Void> future, int bytesWritten) { channelRateLimiter.release(bytesWritten); - + logger.trace("bytesWritten {} {} because {}", bytesWritten, (future.isSuccess() == true) ? "Succeeded" : "Failed", future.cause()); if (!future.isSuccess() && channel.isOpen()) session.onError(future.cause()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/net/async/NonClosingDefaultFileRegion.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/NonClosingDefaultFileRegion.java b/src/java/org/apache/cassandra/net/async/NonClosingDefaultFileRegion.java new file mode 100644 index 0000000..46f0ce1 --- /dev/null +++ b/src/java/org/apache/cassandra/net/async/NonClosingDefaultFileRegion.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net.async; + +import java.io.File; +import java.nio.channels.FileChannel; + +import io.netty.channel.DefaultFileRegion; + +/** + * Netty's DefaultFileRegion closes the underlying FileChannel as soon as + * the refCnt() for the region drops to zero, this is an implementation of + * the DefaultFileRegion that doesn't close the FileChannel. + * + * See {@link ByteBufDataOutputStreamPlus} for its usage. + */ +public class NonClosingDefaultFileRegion extends DefaultFileRegion +{ + + public NonClosingDefaultFileRegion(FileChannel file, long position, long count) + { + super(file, position, count); + } + + public NonClosingDefaultFileRegion(File f, long position, long count) + { + super(f, position, count); + } + + @Override + protected void deallocate() + { + // Overridden to avoid closing the file + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java b/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java index 1f32aa8..4e667da 100644 --- a/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java +++ b/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java @@ -31,6 +31,7 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelConfig; import io.netty.util.ReferenceCountUtil; +import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus; import org.apache.cassandra.io.util.RebufferingInputStream; public class RebufferingByteBufDataInputPlus extends RebufferingInputStream implements ReadableByteChannel @@ -249,4 +250,42 @@ public class RebufferingByteBufDataInputPlus extends RebufferingInputStream impl { return channelConfig.getAllocator(); } + + /** + * Consumes bytes in the stream until the given length + * + * @param writer + * @param len + * @return + * @throws IOException + */ + public long consumeUntil(BufferedDataOutputStreamPlus writer, long len) throws IOException + { + long copied = 0; // number of bytes copied + while (copied < len) + { + if (buffer.remaining() == 0) + { + try + { + reBuffer(); + } + catch (EOFException e) + { + throw new EOFException("EOF after " + copied + " bytes out of " + len); + } + if (buffer.remaining() == 0 && copied < len) + throw new AssertionError("reBuffer() failed to return data"); + } + + int originalLimit = buffer.limit(); + int toCopy = (int) Math.min(len - copied, buffer.remaining()); + buffer.limit(buffer.position() + toCopy); + int written = writer.applyToChannel(c -> c.write(buffer)); + buffer.limit(originalLimit); + copied += written; + } + + return copied; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/StreamCoordinator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java index 34b0bbd..7eada28 100644 --- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java +++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java @@ -282,7 +282,8 @@ public class StreamCoordinator // create if (streamSessions.size() < connectionsPerHost) { - StreamSession session = new StreamSession(streamOperation, peer, factory, streamSessions.size(), pendingRepair, previewKind); + StreamSession session = new StreamSession(streamOperation, peer, factory, streamSessions.size(), + pendingRepair, previewKind); streamSessions.put(++lastReturned, session); return session; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 49beba1..0a96f4c 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -49,6 +49,7 @@ public class StreamReceiveTask extends StreamTask private volatile boolean done = false; private int remoteStreamsReceived = 0; + private long bytesReceived = 0; public StreamReceiveTask(StreamSession session, TableId tableId, int totalStreams, long totalSize) { @@ -76,8 +77,10 @@ public class StreamReceiveTask extends StreamTask } remoteStreamsReceived++; + bytesReceived += stream.getSize(); Preconditions.checkArgument(tableId.equals(stream.getTableId())); - logger.debug("recevied {} of {} total files", remoteStreamsReceived, totalStreams); + logger.debug("received {} of {} total files {} of total bytes {}", remoteStreamsReceived, totalStreams, + bytesReceived, totalSize); receiver.received(stream); http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/StreamResultFuture.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java index ef8976d..4de63be 100644 --- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java +++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java @@ -17,10 +17,8 @@ */ package org.apache.cassandra.streaming; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.*; +import java.util.Collection; +import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import com.google.common.util.concurrent.AbstractFuture; @@ -78,7 +76,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> this(planId, streamOperation, new StreamCoordinator(streamOperation, 0, new DefaultConnectionFactory(), false, pendingRepair, previewKind)); } - static StreamResultFuture init(UUID planId, StreamOperation streamOperation, Collection<StreamEventHandler> listeners, + public static StreamResultFuture init(UUID planId, StreamOperation streamOperation, Collection<StreamEventHandler> listeners, StreamCoordinator coordinator) { StreamResultFuture future = createAndRegister(planId, streamOperation, coordinator); @@ -112,8 +110,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> StreamResultFuture future = StreamManager.instance.getReceivingStream(planId); if (future == null) { - logger.info("[Stream #{} ID#{}] Creating new streaming plan for {} from {} channel.remote {} channel.local {} channel.id {}", - planId, sessionIndex, streamOperation.getDescription(), from, channel.remoteAddress(), channel.localAddress(), channel.id()); + logger.info("[Stream #{} ID#{}] Creating new streaming plan for {} from {} channel.remote {} channel.local {}" + + " channel.id {}", planId, sessionIndex, streamOperation.getDescription(), + from, channel.remoteAddress(), channel.localAddress(), channel.id()); // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure. future = new StreamResultFuture(planId, streamOperation, pendingRepair, previewKind); http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index c56616e..393cd24 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -202,7 +202,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber this.pendingRepair = pendingRepair; this.previewKind = previewKind; - logger.debug("Creating stream session peer={} preferredPeerInetAddressAndPort={}", peer, preferredPeerInetAddressAndPort); + logger.debug("Creating stream session peer={} preferredPeerInetAddressAndPort={}", peer, + preferredPeerInetAddressAndPort); } public UUID planId() @@ -777,7 +778,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber FBUtilities.waitOnFutures(flushes); } - private synchronized void prepareReceiving(StreamSummary summary) + @VisibleForTesting + public synchronized void prepareReceiving(StreamSummary summary) { failIfFinished(); if (summary.files > 0) http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java index bff77cf..3fa80f5 100644 --- a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java +++ b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java @@ -322,7 +322,7 @@ public class NettyStreamingMessageSender implements StreamingMessageSender throw new IllegalStateException("channel's transferring state is currently set to true. refusing to start new stream"); // close the DataOutputStreamPlus as we're done with it - but don't close the channel - try (DataOutputStreamPlus outPlus = ByteBufDataOutputStreamPlus.create(session, channel, 1 << 16)) + try (DataOutputStreamPlus outPlus = ByteBufDataOutputStreamPlus.create(session, channel, 1 << 20)) { StreamMessage.serialize(msg, outPlus, protocolVersion, session); channel.flush(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java index 81fe8cd..7c10ef9 100644 --- a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java +++ b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java @@ -62,7 +62,7 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter static final Function<SessionIdentifier, StreamSession> DEFAULT_SESSION_PROVIDER = sid -> StreamManager.instance.findSession(sid.from, sid.planId, sid.sessionIndex); private static final int AUTO_READ_LOW_WATER_MARK = 1 << 15; - private static final int AUTO_READ_HIGH_WATER_MARK = 1 << 16; + private static final int AUTO_READ_HIGH_WATER_MARK = 1 << 20; private final InetAddressAndPort remoteAddress; private final int protocolVersion; http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java index fbd3e21..a591a43 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java @@ -47,7 +47,8 @@ public class StreamInitMessage extends StreamMessage public final UUID pendingRepair; public final PreviewKind previewKind; - public StreamInitMessage(InetAddressAndPort from, int sessionIndex, UUID planId, StreamOperation streamOperation, UUID pendingRepair, PreviewKind previewKind) + public StreamInitMessage(InetAddressAndPort from, int sessionIndex, UUID planId, StreamOperation streamOperation, + UUID pendingRepair, PreviewKind previewKind) { super(Type.STREAM_INIT); this.from = from; @@ -93,7 +94,8 @@ public class StreamInitMessage extends StreamMessage UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null; PreviewKind previewKind = PreviewKind.deserialize(in.readInt()); - return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description), pendingRepair, previewKind); + return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description), + pendingRepair, previewKind); } public long serializedSize(StreamInitMessage message, int version) @@ -108,6 +110,7 @@ public class StreamInitMessage extends StreamMessage size += UUIDSerializer.serializer.serializedSize(message.pendingRepair, MessagingService.current_version); } size += TypeSizes.sizeof(message.previewKind.getSerializationVal()); + return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/utils/Collectors3.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/Collectors3.java b/src/java/org/apache/cassandra/utils/Collectors3.java new file mode 100644 index 0000000..f8f262e --- /dev/null +++ b/src/java/org/apache/cassandra/utils/Collectors3.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collector; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +/** + * Some extra Collector implementations. + * + * Named Collectors3 just in case Guava ever makes a Collectors2 + */ +public class Collectors3 +{ + private static final Collector.Characteristics[] LIST_CHARACTERISTICS = new Collector.Characteristics[] { }; + public static <T> Collector<T, ?, List<T>> toImmutableList() + { + return Collector.of(ImmutableList.Builder<T>::new, + ImmutableList.Builder<T>::add, + (l, r) -> l.addAll(r.build()), + ImmutableList.Builder<T>::build, + LIST_CHARACTERISTICS); + } + + private static final Collector.Characteristics[] SET_CHARACTERISTICS = new Collector.Characteristics[] { Collector.Characteristics.UNORDERED }; + public static <T> Collector<T, ?, Set<T>> toImmutableSet() + { + return Collector.of(ImmutableSet.Builder<T>::new, + ImmutableSet.Builder<T>::add, + (l, r) -> l.addAll(r.build()), + ImmutableSet.Builder<T>::build, + SET_CHARACTERISTICS); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml index 5893bab..3c09637 100644 --- a/test/conf/cassandra.yaml +++ b/test/conf/cassandra.yaml @@ -46,3 +46,5 @@ enable_user_defined_functions: true enable_scripted_user_defined_functions: true prepared_statements_cache_size_mb: 1 corrupted_tombstone_strategy: exception +stream_entire_sstables: true +stream_throughput_outbound_megabits_per_sec: 200000000 http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/long/org/apache/cassandra/streaming/LongStreamingTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java index bd7ef20..01e67f0 100644 --- a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java +++ b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java @@ -29,10 +29,7 @@ import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.schema.CompressionParams; -import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.schema.Schema; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.Keyspace; @@ -41,6 +38,9 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.io.sstable.CQLSSTableWriter; import org.apache.cassandra.io.sstable.SSTableLoader; +import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.OutputHandler; http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java ---------------------------------------------------------------------- diff --git a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java new file mode 100644 index 0000000..3192bcc --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.test.microbench; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.embedded.EmbeddedChannel; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.streaming.CassandraEntireSSTableStreamReader; +import org.apache.cassandra.db.streaming.CassandraEntireSSTableStreamWriter; +import org.apache.cassandra.db.streaming.CassandraOutgoingFile; +import org.apache.cassandra.db.streaming.CassandraStreamHeader; +import org.apache.cassandra.db.streaming.CassandraStreamReader; +import org.apache.cassandra.db.streaming.CassandraStreamWriter; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; +import org.apache.cassandra.schema.CachingParams; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.streaming.DefaultConnectionFactory; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.streaming.SessionInfo; +import org.apache.cassandra.streaming.StreamCoordinator; +import org.apache.cassandra.streaming.StreamEventHandler; +import org.apache.cassandra.streaming.StreamOperation; +import org.apache.cassandra.streaming.StreamResultFuture; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.StreamSummary; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Please ensure that this benchmark is run with stream_throughput_outbound_megabits_per_sec set to a + * really high value otherwise, throttling will kick in and the results will not be meaningful. + */ +@Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@Threads(1) +public class ZeroCopyStreamingBenchmark +{ + static final int STREAM_SIZE = 50 * 1024 * 1024; + + @State(Scope.Thread) + public static class BenchmarkState + { + public static final String KEYSPACE = "ZeroCopyStreamingBenchmark"; + public static final String CF_STANDARD = "Standard1"; + public static final String CF_INDEXED = "Indexed1"; + public static final String CF_STANDARDLOWINDEXINTERVAL = "StandardLowIndexInterval"; + + private static SSTableReader sstable; + private static ColumnFamilyStore store; + private StreamSession session; + private CassandraEntireSSTableStreamWriter blockStreamWriter; + private ByteBuf serializedBlockStream; + private InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort(); + private CassandraEntireSSTableStreamReader blockStreamReader; + private CassandraStreamWriter partialStreamWriter; + private CassandraStreamReader partialStreamReader; + private ByteBuf serializedPartialStream; + + @Setup + public void setupBenchmark() throws IOException + { + Keyspace keyspace = setupSchemaAndKeySpace(); + store = keyspace.getColumnFamilyStore("Standard1"); + generateData(); + + sstable = store.getLiveSSTables().iterator().next(); + session = setupStreamingSessionForTest(); + blockStreamWriter = new CassandraEntireSSTableStreamWriter(sstable, session, CassandraOutgoingFile.getComponentManifest(sstable)); + + CapturingNettyChannel blockStreamCaptureChannel = new CapturingNettyChannel(STREAM_SIZE); + ByteBufDataOutputStreamPlus out = ByteBufDataOutputStreamPlus.create(session, blockStreamCaptureChannel, 1024 * 1024); + blockStreamWriter.write(out); + serializedBlockStream = blockStreamCaptureChannel.getSerializedStream(); + out.close(); + + session.prepareReceiving(new StreamSummary(sstable.metadata().id, 1, serializedBlockStream.readableBytes())); + + CassandraStreamHeader entireSSTableStreamHeader = + CassandraStreamHeader.builder() + .withSSTableFormat(sstable.descriptor.formatType) + .withSSTableVersion(sstable.descriptor.version) + .withSSTableLevel(0) + .withEstimatedKeys(sstable.estimatedKeys()) + .withSections(Collections.emptyList()) + .withSerializationHeader(sstable.header.toComponent()) + .withComponentManifest(CassandraOutgoingFile.getComponentManifest(sstable)) + .isEntireSSTable(true) + .withFirstKey(sstable.first) + .withTableId(sstable.metadata().id) + .build(); + + blockStreamReader = new CassandraEntireSSTableStreamReader(new StreamMessageHeader(sstable.metadata().id, + peer, session.planId(), + 0, 0, 0, + null), entireSSTableStreamHeader, session); + + List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(sstable.first.minValue().getToken(), sstable.last.getToken())); + partialStreamWriter = new CassandraStreamWriter(sstable, sstable.getPositionsForRanges(requestedRanges), session); + + CapturingNettyChannel partialStreamChannel = new CapturingNettyChannel(STREAM_SIZE); + partialStreamWriter.write(ByteBufDataOutputStreamPlus.create(session, partialStreamChannel, 1024 * 1024)); + serializedPartialStream = partialStreamChannel.getSerializedStream(); + + CassandraStreamHeader partialSSTableStreamHeader = + CassandraStreamHeader.builder() + .withSSTableFormat(sstable.descriptor.formatType) + .withSSTableVersion(sstable.descriptor.version) + .withSSTableLevel(0) + .withEstimatedKeys(sstable.estimatedKeys()) + .withSections(sstable.getPositionsForRanges(requestedRanges)) + .withSerializationHeader(sstable.header.toComponent()) + .withTableId(sstable.metadata().id) + .build(); + + partialStreamReader = new CassandraStreamReader(new StreamMessageHeader(sstable.metadata().id, + peer, session.planId(), + 0, 0, 0, + null), + partialSSTableStreamHeader, session); + } + + private Keyspace setupSchemaAndKeySpace() + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARD), + SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEXED, true), + SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARDLOWINDEXINTERVAL) + .minIndexInterval(8) + .maxIndexInterval(256) + .caching(CachingParams.CACHE_NOTHING)); + + return Keyspace.open(KEYSPACE); + } + + private void generateData() + { + // insert data and compact to a single sstable + CompactionManager.instance.disableAutoCompaction(); + for (int j = 0; j < 1_000_000; j++) + { + new RowUpdateBuilder(store.metadata(), j, String.valueOf(j)) + .clustering("0") + .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER) + .build() + .applyUnsafe(); + } + store.forceBlockingFlush(); + CompactionManager.instance.performMaximal(store, false); + } + + @TearDown + public void tearDown() throws IOException + { + SchemaLoader.cleanupAndLeaveDirs(); + CommitLog.instance.stopUnsafe(true); + } + + private StreamSession setupStreamingSessionForTest() + { + StreamCoordinator streamCoordinator = new StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), false, null, PreviewKind.NONE); + StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), StreamOperation.BOOTSTRAP, Collections.<StreamEventHandler>emptyList(), streamCoordinator); + + InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort(); + streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer, Collections.emptyList(), Collections.emptyList(), StreamSession.State.INITIALIZED)); + + StreamSession session = streamCoordinator.getOrCreateNextSession(peer); + session.init(future); + return session; + } + } + + @Benchmark + @BenchmarkMode(Mode.Throughput) + public void blockStreamWriter(BenchmarkState state) throws Exception + { + EmbeddedChannel channel = createMockNettyChannel(); + ByteBufDataOutputStreamPlus out = ByteBufDataOutputStreamPlus.create(state.session, channel, 1024 * 1024); + state.blockStreamWriter.write(out); + out.close(); + channel.finishAndReleaseAll(); + } + + @Benchmark + @BenchmarkMode(Mode.Throughput) + public void blockStreamReader(BenchmarkState state) throws Exception + { + EmbeddedChannel channel = createMockNettyChannel(); + RebufferingByteBufDataInputPlus in = new RebufferingByteBufDataInputPlus(STREAM_SIZE, STREAM_SIZE, channel.config()); + in.append(state.serializedBlockStream.retainedDuplicate()); + SSTableMultiWriter sstableWriter = state.blockStreamReader.read(in); + Collection<SSTableReader> newSstables = sstableWriter.finished(); + in.close(); + channel.finishAndReleaseAll(); + } + + @Benchmark + @BenchmarkMode(Mode.Throughput) + public void partialStreamWriter(BenchmarkState state) throws Exception + { + EmbeddedChannel channel = createMockNettyChannel(); + ByteBufDataOutputStreamPlus out = ByteBufDataOutputStreamPlus.create(state.session, channel, 1024 * 1024); + state.partialStreamWriter.write(out); + out.close(); + channel.finishAndReleaseAll(); + } + + @Benchmark + @BenchmarkMode(Mode.Throughput) + public void partialStreamReader(BenchmarkState state) throws Exception + { + EmbeddedChannel channel = createMockNettyChannel(); + RebufferingByteBufDataInputPlus in = new RebufferingByteBufDataInputPlus(STREAM_SIZE, STREAM_SIZE, channel.config()); + in.append(state.serializedPartialStream.retainedDuplicate()); + SSTableMultiWriter sstableWriter = state.partialStreamReader.read(in); + Collection<SSTableReader> newSstables = sstableWriter.finished(); + in.close(); + channel.finishAndReleaseAll(); + } + + private EmbeddedChannel createMockNettyChannel() + { + EmbeddedChannel channel = new EmbeddedChannel(); + channel.config().setWriteBufferHighWaterMark(STREAM_SIZE); // avoid blocking + return channel; + } + + private static class CapturingNettyChannel extends EmbeddedChannel + { + private final ByteBuf serializedStream; + private final WritableByteChannel proxyWBC = new WritableByteChannel() + { + public int write(ByteBuffer src) throws IOException + { + int rem = src.remaining(); + serializedStream.writeBytes(src); + return rem; + } + + public boolean isOpen() + { + return true; + } + + public void close() throws IOException + { + } + }; + + public CapturingNettyChannel(int capacity) + { + this.serializedStream = alloc().buffer(capacity); + this.pipeline().addLast(new ChannelOutboundHandlerAdapter() + { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception + { + if (msg instanceof ByteBuf) + serializedStream.writeBytes((ByteBuf) msg); + else if (msg instanceof ByteBuffer) + serializedStream.writeBytes((ByteBuffer) msg); + else if (msg instanceof DefaultFileRegion) + ((DefaultFileRegion) msg).transferTo(proxyWBC, 0); + } + }); + config().setWriteBufferHighWaterMark(capacity); + } + + public ByteBuf getSerializedStream() + { + return serializedStream.copy(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/db/VerifyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java b/test/unit/org/apache/cassandra/db/VerifyTest.java index c9dbe14..0632274 100644 --- a/test/unit/org/apache/cassandra/db/VerifyTest.java +++ b/test/unit/org/apache/cassandra/db/VerifyTest.java @@ -612,15 +612,15 @@ public class VerifyTest Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized); - roh.check(dk(1)); - roh.check(dk(10)); - roh.check(dk(11)); - roh.check(dk(21)); - roh.check(dk(25)); + roh.validate(dk(1)); + roh.validate(dk(10)); + roh.validate(dk(11)); + roh.validate(dk(21)); + roh.validate(dk(25)); boolean gotException = false; try { - roh.check(dk(26)); + roh.validate(dk(26)); } catch (Throwable t) { @@ -635,9 +635,9 @@ public class VerifyTest List<Range<Token>> normalized = new ArrayList<>(); normalized.add(r(0,10)); Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized); - roh.check(dk(1)); + roh.validate(dk(1)); // call with smaller token to get exception - roh.check(dk(0)); + roh.validate(dk(0)); } @@ -646,9 +646,9 @@ public class VerifyTest { List<Range<Token>> normalized = Range.normalize(Collections.singletonList(r(0,0))); Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized); - roh.check(dk(Long.MIN_VALUE)); - roh.check(dk(0)); - roh.check(dk(Long.MAX_VALUE)); + roh.validate(dk(Long.MIN_VALUE)); + roh.validate(dk(0)); + roh.validate(dk(Long.MAX_VALUE)); } @Test @@ -656,12 +656,12 @@ public class VerifyTest { List<Range<Token>> normalized = Range.normalize(Collections.singletonList(r(Long.MAX_VALUE - 1000,Long.MIN_VALUE + 1000))); Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized); - roh.check(dk(Long.MIN_VALUE)); - roh.check(dk(Long.MAX_VALUE)); + roh.validate(dk(Long.MIN_VALUE)); + roh.validate(dk(Long.MAX_VALUE)); boolean gotException = false; try { - roh.check(dk(26)); + roh.validate(dk(26)); } catch (Throwable t) { @@ -673,7 +673,7 @@ public class VerifyTest @Test public void testEmptyRanges() { - new Verifier.RangeOwnHelper(Collections.emptyList()).check(dk(1)); + new Verifier.RangeOwnHelper(Collections.emptyList()).validate(dk(1)); } private DecoratedKey dk(long l) http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java new file mode 100644 index 0000000..947f968 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.Collection; +import java.util.Collections; +import java.util.Queue; +import java.util.UUID; + +import org.junit.BeforeClass; +import org.junit.Test; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.embedded.EmbeddedChannel; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.async.ByteBufDataInputPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.net.async.NonClosingDefaultFileRegion; +import org.apache.cassandra.schema.CachingParams; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.streaming.DefaultConnectionFactory; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.streaming.SessionInfo; +import org.apache.cassandra.streaming.StreamCoordinator; +import org.apache.cassandra.streaming.StreamEventHandler; +import org.apache.cassandra.streaming.StreamOperation; +import org.apache.cassandra.streaming.StreamResultFuture; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.StreamSummary; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class CassandraEntireSSTableStreamWriterTest +{ + public static final String KEYSPACE = "CassandraEntireSSTableStreamWriterTest"; + public static final String CF_STANDARD = "Standard1"; + public static final String CF_INDEXED = "Indexed1"; + public static final String CF_STANDARDLOWINDEXINTERVAL = "StandardLowIndexInterval"; + + private static SSTableReader sstable; + private static ColumnFamilyStore store; + + @BeforeClass + public static void defineSchemaAndPrepareSSTable() + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARD), + SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEXED, true), + SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARDLOWINDEXINTERVAL) + .minIndexInterval(8) + .maxIndexInterval(256) + .caching(CachingParams.CACHE_NOTHING)); + + Keyspace keyspace = Keyspace.open(KEYSPACE); + store = keyspace.getColumnFamilyStore("Standard1"); + + // insert data and compact to a single sstable + CompactionManager.instance.disableAutoCompaction(); + for (int j = 0; j < 10; j++) + { + new RowUpdateBuilder(store.metadata(), j, String.valueOf(j)) + .clustering("0") + .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER) + .build() + .applyUnsafe(); + } + store.forceBlockingFlush(); + CompactionManager.instance.performMaximal(store, false); + + sstable = store.getLiveSSTables().iterator().next(); + } + + @Test + public void testBlockWriterOverWire() throws IOException + { + StreamSession session = setupStreamingSessionForTest(); + + CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, CassandraOutgoingFile.getComponentManifest(sstable)); + + EmbeddedChannel channel = new EmbeddedChannel(); + ByteBufDataOutputStreamPlus out = ByteBufDataOutputStreamPlus.create(session, channel, 1024 * 1024); + writer.write(out); + + Queue msgs = channel.outboundMessages(); + + assertTrue(msgs.peek() instanceof DefaultFileRegion); + } + + @Test + public void testBlockReadingAndWritingOverWire() throws Exception + { + StreamSession session = setupStreamingSessionForTest(); + InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort(); + + CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, CassandraOutgoingFile.getComponentManifest(sstable)); + + // This is needed as Netty releases the ByteBuffers as soon as the channel is flushed + ByteBuf serializedFile = Unpooled.buffer(8192); + EmbeddedChannel channel = createMockNettyChannel(serializedFile); + ByteBufDataOutputStreamPlus out = ByteBufDataOutputStreamPlus.create(session, channel, 1024 * 1024); + + writer.write(out); + + session.prepareReceiving(new StreamSummary(sstable.metadata().id, 1, 5104)); + + CassandraStreamHeader header = + CassandraStreamHeader.builder() + .withSSTableFormat(sstable.descriptor.formatType) + .withSSTableVersion(sstable.descriptor.version) + .withSSTableLevel(0) + .withEstimatedKeys(sstable.estimatedKeys()) + .withSections(Collections.emptyList()) + .withSerializationHeader(sstable.header.toComponent()) + .withComponentManifest(CassandraOutgoingFile.getComponentManifest(sstable)) + .isEntireSSTable(true) + .withFirstKey(sstable.first) + .withTableId(sstable.metadata().id) + .build(); + + CassandraEntireSSTableStreamReader reader = new CassandraEntireSSTableStreamReader(new StreamMessageHeader(sstable.metadata().id, peer, session.planId(), 0, 0, 0, null), header, session); + + SSTableMultiWriter sstableWriter = reader.read(new ByteBufDataInputPlus(serializedFile)); + Collection<SSTableReader> newSstables = sstableWriter.finished(); + + assertEquals(1, newSstables.size()); + } + + private EmbeddedChannel createMockNettyChannel(ByteBuf serializedFile) throws Exception + { + WritableByteChannel wbc = new WritableByteChannel() + { + private boolean isOpen = true; + public int write(ByteBuffer src) throws IOException + { + int size = src.limit(); + serializedFile.writeBytes(src); + return size; + } + + public boolean isOpen() + { + return isOpen; + } + + public void close() throws IOException + { + isOpen = false; + } + }; + + return new EmbeddedChannel(new ChannelOutboundHandlerAdapter() { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception + { + ((NonClosingDefaultFileRegion) msg).transferTo(wbc, 0); + super.write(ctx, msg, promise); + } + }); + } + + private StreamSession setupStreamingSessionForTest() + { + StreamCoordinator streamCoordinator = new StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), false, null, PreviewKind.NONE); + StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), StreamOperation.BOOTSTRAP, Collections.<StreamEventHandler>emptyList(), streamCoordinator); + + InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort(); + streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer, Collections.emptyList(), Collections.emptyList(), StreamSession.State.INITIALIZED)); + + StreamSession session = streamCoordinator.getOrCreateNextSession(peer); + session.init(future); + return session; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
