http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java b/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java new file mode 100644 index 0000000..2f56786 --- /dev/null +++ b/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.streaming; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.DoubleSupplier; + +import com.google.common.collect.Iterators; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.util.concurrent.FastThreadLocalThread; +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.RebufferingInputStream; +import org.apache.cassandra.db.streaming.CassandraStreamReader.StreamDeserializer; +import org.apache.cassandra.utils.ChecksumType; +import org.apache.cassandra.utils.WrappedRunnable; + +/** + * InputStream which reads data from underlining source with given {@link CompressionInfo}. Uses {@link #buffer} as a buffer + * for uncompressed data (which is read by stream consumers - {@link StreamDeserializer} in this case). + */ +public class CompressedInputStream extends RebufferingInputStream implements AutoCloseable +{ + + private static final Logger logger = LoggerFactory.getLogger(CompressedInputStream.class); + + private final CompressionInfo info; + // chunk buffer + private final BlockingQueue<ByteBuffer> dataBuffer; + private final DoubleSupplier crcCheckChanceSupplier; + + /** + * The base offset of the current {@link #buffer} from the beginning of the stream. + */ + private long bufferOffset = 0; + + /** + * The current {@link CompressedCassandraStreamReader#sections} offset in the stream. + */ + private long current = 0; + + private final ChecksumType checksumType; + + private static final int CHECKSUM_LENGTH = 4; + + /** + * Indicates there was a problem when reading from source stream. + * When this is added to the <code>dataBuffer</code> by the stream Reader, + * it is expected that the <code>readException</code> variable is populated + * with the cause of the error when reading from source stream, so it is + * thrown to the consumer on subsequent read operation. + */ + private static final ByteBuffer POISON_PILL = ByteBuffer.wrap(new byte[0]); + + private volatile IOException readException = null; + + private long totalCompressedBytesRead; + + /** + * @param source Input source to read compressed data from + * @param info Compression info + */ + public CompressedInputStream(DataInputPlus source, CompressionInfo info, ChecksumType checksumType, DoubleSupplier crcCheckChanceSupplier) + { + super(ByteBuffer.allocateDirect(info.parameters.chunkLength())); + buffer.limit(buffer.position()); // force the buffer to appear "consumed" so that it triggers reBuffer on the first read + this.info = info; + this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024)); + this.crcCheckChanceSupplier = crcCheckChanceSupplier; + this.checksumType = checksumType; + + new FastThreadLocalThread(new Reader(source, info, dataBuffer)).start(); + } + + /** + * Invoked when crossing into the next stream boundary in {@link CompressedCassandraStreamReader#sections}. + */ + public void position(long position) throws IOException + { + if (readException != null) + throw readException; + + assert position >= current : "stream can only read forward."; + current = position; + + if (current > bufferOffset + buffer.limit()) + reBuffer(false); + + buffer.position((int)(current - bufferOffset)); + } + + protected void reBuffer() throws IOException + { + reBuffer(true); + } + + private void reBuffer(boolean updateCurrent) throws IOException + { + if (readException != null) + { + FileUtils.clean(buffer); + buffer = null; + throw readException; + } + + // increment the offset into the stream based on the current buffer's read count + if (updateCurrent) + current += buffer.position(); + + try + { + ByteBuffer compressedWithCRC = dataBuffer.take(); + if (compressedWithCRC == POISON_PILL) + { + assert readException != null; + throw readException; + } + + decompress(compressedWithCRC); + } + catch (InterruptedException e) + { + throw new EOFException("No chunk available"); + } + } + + private void decompress(ByteBuffer compressed) throws IOException + { + int length = compressed.remaining(); + + // uncompress if the buffer size is less than the max chunk size. else, if the buffer size is greater than or equal to the maxCompressedLength, + // we assume the buffer is not compressed. see CASSANDRA-10520 + final boolean releaseCompressedBuffer; + if (length - CHECKSUM_LENGTH < info.parameters.maxCompressedLength()) + { + buffer.clear(); + compressed.limit(length - CHECKSUM_LENGTH); + info.parameters.getSstableCompressor().uncompress(compressed, buffer); + buffer.flip(); + releaseCompressedBuffer = true; + } + else + { + FileUtils.clean(buffer); + buffer = compressed; + buffer.limit(length - CHECKSUM_LENGTH); + releaseCompressedBuffer = false; + } + totalCompressedBytesRead += length; + + // validate crc randomly + double crcCheckChance = this.crcCheckChanceSupplier.getAsDouble(); + if (crcCheckChance >= 1d || + (crcCheckChance > 0d && crcCheckChance > ThreadLocalRandom.current().nextDouble())) + { + ByteBuffer crcBuf = compressed.duplicate(); + crcBuf.limit(length - CHECKSUM_LENGTH).position(0); + int checksum = (int) checksumType.of(crcBuf); + + crcBuf.limit(length); + if (crcBuf.getInt() != checksum) + throw new IOException("CRC unmatched"); + } + + if (releaseCompressedBuffer) + FileUtils.clean(compressed); + + // buffer offset is always aligned + final int compressedChunkLength = info.parameters.chunkLength(); + bufferOffset = current & ~(compressedChunkLength - 1); + } + + public long getTotalCompressedBytesRead() + { + return totalCompressedBytesRead; + } + + /** + * {@inheritDoc} + * + * Releases the resources specific to this instance, but not the {@link DataInputPlus} that is used by the {@link Reader}. + */ + @Override + public void close() + { + if (buffer != null) + { + FileUtils.clean(buffer); + buffer = null; + } + } + + class Reader extends WrappedRunnable + { + private final DataInputPlus source; + private final Iterator<CompressionMetadata.Chunk> chunks; + private final BlockingQueue<ByteBuffer> dataBuffer; + + Reader(DataInputPlus source, CompressionInfo info, BlockingQueue<ByteBuffer> dataBuffer) + { + this.source = source; + this.chunks = Iterators.forArray(info.chunks); + this.dataBuffer = dataBuffer; + } + + protected void runMayThrow() throws Exception + { + byte[] tmp = null; + while (chunks.hasNext()) + { + CompressionMetadata.Chunk chunk = chunks.next(); + + int readLength = chunk.length + 4; // read with CRC + ByteBuffer compressedWithCRC = null; + try + { + final int r; + if (source instanceof ReadableByteChannel) + { + compressedWithCRC = ByteBuffer.allocateDirect(readLength); + r = ((ReadableByteChannel)source).read(compressedWithCRC); + compressedWithCRC.flip(); + } + else + { + // read into an on-heap araay, then copy over to an off-heap buffer. at a minumum snappy requires + // off-heap buffers for decompression, else we could have just wrapped the plain byte array in a ByteBuffer + if (tmp == null || tmp.length < info.parameters.chunkLength() + CHECKSUM_LENGTH) + tmp = new byte[info.parameters.chunkLength() + CHECKSUM_LENGTH]; + source.readFully(tmp, 0, readLength); + compressedWithCRC = ByteBuffer.allocateDirect(readLength); + compressedWithCRC.put(tmp, 0, readLength); + compressedWithCRC.position(0); + r = readLength; + } + + if (r < 0) + { + FileUtils.clean(compressedWithCRC); + readException = new EOFException("No chunk available"); + dataBuffer.put(POISON_PILL); + return; // throw exception where we consume dataBuffer + } + } + catch (IOException e) + { + if (!(e instanceof EOFException)) + logger.warn("Error while reading compressed input stream.", e); + if (compressedWithCRC != null) + FileUtils.clean(compressedWithCRC); + + readException = e; + dataBuffer.put(POISON_PILL); + return; // throw exception where we consume dataBuffer + } + dataBuffer.put(compressedWithCRC); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java b/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java new file mode 100644 index 0000000..0f0d5c7 --- /dev/null +++ b/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.util.List; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.Pair; + +/** + * Container that carries compression parameters and chunks to decompress data from stream. + */ +public class CompressionInfo +{ + public static final IVersionedSerializer<CompressionInfo> serializer = new CompressionInfoSerializer(); + + public final CompressionMetadata.Chunk[] chunks; + public final CompressionParams parameters; + + public CompressionInfo(CompressionMetadata.Chunk[] chunks, CompressionParams parameters) + { + assert chunks != null && parameters != null; + this.chunks = chunks; + this.parameters = parameters; + } + + static CompressionInfo fromCompressionMetadata(CompressionMetadata metadata, List<Pair<Long, Long>> sections) + { + if (metadata == null) + { + return null; + } + else + { + return new CompressionInfo(metadata.getChunksForSections(sections), metadata.parameters); + } + + } + + static class CompressionInfoSerializer implements IVersionedSerializer<CompressionInfo> + { + public void serialize(CompressionInfo info, DataOutputPlus out, int version) throws IOException + { + if (info == null) + { + out.writeInt(-1); + return; + } + + int chunkCount = info.chunks.length; + out.writeInt(chunkCount); + for (int i = 0; i < chunkCount; i++) + CompressionMetadata.Chunk.serializer.serialize(info.chunks[i], out, version); + // compression params + CompressionParams.serializer.serialize(info.parameters, out, version); + } + + public CompressionInfo deserialize(DataInputPlus in, int version) throws IOException + { + // chunks + int chunkCount = in.readInt(); + if (chunkCount < 0) + return null; + + CompressionMetadata.Chunk[] chunks = new CompressionMetadata.Chunk[chunkCount]; + for (int i = 0; i < chunkCount; i++) + chunks[i] = CompressionMetadata.Chunk.serializer.deserialize(in, version); + + // compression params + CompressionParams parameters = CompressionParams.serializer.deserialize(in, version); + return new CompressionInfo(chunks, parameters); + } + + public long serializedSize(CompressionInfo info, int version) + { + if (info == null) + return TypeSizes.sizeof(-1); + + // chunks + int chunkCount = info.chunks.length; + long size = TypeSizes.sizeof(chunkCount); + for (int i = 0; i < chunkCount; i++) + size += CompressionMetadata.Chunk.serializer.serializedSize(info.chunks[i], version); + // compression params + size += CompressionParams.serializer.serializedSize(info.parameters, version); + return size; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/package-info.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/package-info.java b/src/java/org/apache/cassandra/db/streaming/package-info.java new file mode 100644 index 0000000..1e117aa --- /dev/null +++ b/src/java/org/apache/cassandra/db/streaming/package-info.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <h2>File transfer</h2> + * + * When tranferring whole or subsections of an sstable, only the DATA component is shipped. To that end, + * there are three "modes" of an sstable transfer that need to be handled somewhat differently: + * + * 1) uncompressed sstable - data needs to be read into user space so it can be manipulated: checksum validation, + * apply stream compression (see next section), and/or TLS encryption. + * + * 2) compressed sstable, transferred with SSL/TLS - data needs to be read into user space as that is where the TLS encryption + * needs to happen. Netty does not allow the pretense of doing zero-copy transfers when TLS is in the pipeline; + * data must explicitly be pulled into user-space memory for TLS encryption to work. + * + * 3) compressed sstable, transferred without SSL/TLS - data can be streamed via zero-copy transfer as the data does not + * need to be manipulated (it can be sent "as-is"). + * + * <h3>Compressing the data</h3> + * We always want to transfer as few bytes as possible of the wire when streaming a file. If the + * sstable is not already compressed via table compression options, we apply an on-the-fly stream compression + * to the data. The stream compression format is documented in + * {@link org.apache.cassandra.streaming.async.StreamCompressionSerializer} + * + * You may be wondering: why implement your own compression scheme? why not use netty's built-in compression codecs, + * like {@link io.netty.handler.codec.compression.Lz4FrameEncoder}? That makes complete sense if all the sstables + * to be streamed are non using sstable compression (and obviously you wouldn't use stream compression when the sstables + * are using sstable compression). The problem is when you have a mix of files, some using sstable compression + * and some not. You can either: + * + * - send the files of one type over one kind of socket, and the others over another socket + * - send them both over the same socket, but then auto-adjust per each file type. + * + * I've opted for the latter to keep socket/channel management simpler and cleaner. + * + */ +package org.apache.cassandra.db.streaming; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index 439ebc6..dfabac2 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -20,6 +20,7 @@ package org.apache.cassandra.dht; import java.util.*; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; @@ -155,11 +156,12 @@ public class RangeStreamer boolean connectSequentially, int connectionsPerHost) { + Preconditions.checkArgument(streamOperation == StreamOperation.BOOTSTRAP || streamOperation == StreamOperation.REBUILD, streamOperation); this.metadata = metadata; this.tokens = tokens; this.address = address; this.description = streamOperation.getDescription(); - this.streamPlan = new StreamPlan(streamOperation, connectionsPerHost, true, connectSequentially, null, PreviewKind.NONE); + this.streamPlan = new StreamPlan(streamOperation, connectionsPerHost, connectSequentially, null, PreviewKind.NONE); this.useStrictConsistency = useStrictConsistency; this.snitch = snitch; this.stateStore = stateStore; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 7d77ad5..980fdf1 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -18,12 +18,12 @@ package org.apache.cassandra.io.sstable; import java.io.File; -import java.io.IOException; import java.util.*; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; +import org.apache.cassandra.db.streaming.CassandraOutgoingFile; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.io.FSError; import org.apache.cassandra.schema.TableMetadataRef; @@ -52,7 +52,7 @@ public class SSTableLoader implements StreamEventHandler private final Set<InetAddressAndPort> failedHosts = new HashSet<>(); private final List<SSTableReader> sstables = new ArrayList<>(); - private final Multimap<InetAddressAndPort, StreamSession.SSTableStreamingSections> streamingDetails = HashMultimap.create(); + private final Multimap<InetAddressAndPort, OutgoingStream> streamingDetails = HashMultimap.create(); public SSTableLoader(File directory, Client client, OutputHandler outputHandler) { @@ -131,8 +131,8 @@ public class SSTableLoader implements StreamEventHandler List<Pair<Long, Long>> sstableSections = sstable.getPositionsForRanges(tokenRanges); long estimatedKeys = sstable.estimatedKeysForRanges(tokenRanges); Ref<SSTableReader> ref = sstable.ref(); - StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(ref, sstableSections, estimatedKeys); - streamingDetails.put(endpoint, details); + OutgoingStream stream = new CassandraOutgoingFile(StreamOperation.BULK_LOAD, ref, sstableSections, estimatedKeys); + streamingDetails.put(endpoint, stream); } // to conserve heap space when bulk loading @@ -160,7 +160,7 @@ public class SSTableLoader implements StreamEventHandler client.init(keyspace); outputHandler.output("Established connection to initial hosts"); - StreamPlan plan = new StreamPlan(StreamOperation.BULK_LOAD, connectionsPerHost, false, false, null, PreviewKind.NONE).connectionFactory(client.getConnectionFactory()); + StreamPlan plan = new StreamPlan(StreamOperation.BULK_LOAD, connectionsPerHost, false, null, PreviewKind.NONE).connectionFactory(client.getConnectionFactory()); Map<InetAddressAndPort, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap(); openSSTables(endpointToRanges); @@ -178,15 +178,15 @@ public class SSTableLoader implements StreamEventHandler if (toIgnore.contains(remote)) continue; - List<StreamSession.SSTableStreamingSections> endpointDetails = new LinkedList<>(); + List<OutgoingStream> streams = new LinkedList<>(); // references are acquired when constructing the SSTableStreamingSections above - for (StreamSession.SSTableStreamingSections details : streamingDetails.get(remote)) + for (OutgoingStream stream : streamingDetails.get(remote)) { - endpointDetails.add(details); + streams.add(stream); } - plan.transferFiles(remote, endpointDetails); + plan.transferStreams(remote, streams); } plan.listeners(this, listeners); return plan.execute(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java index 2c4fae4..8d58673 100644 --- a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java @@ -52,7 +52,6 @@ public class AsymmetricLocalSyncTask extends AsymmetricSyncTask implements Strea InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(fetchFrom); StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, - false, pendingRepair, previewKind) .listeners(this) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/repair/LocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java index 60d571b..3901c75 100644 --- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java @@ -61,7 +61,7 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler @VisibleForTesting StreamPlan createStreamPlan(InetAddressAndPort dst, InetAddressAndPort preferred, List<Range<Token>> differences) { - StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair, previewKind) + StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind) .listeners(this) .flushBeforeTransfer(pendingRepair == null) .requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily); // request ranges from the remote node http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/repair/StreamingRepairTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java index 0122b31..725e84d 100644 --- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java +++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java @@ -79,7 +79,7 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler @VisibleForTesting StreamPlan createStreamPlan(InetAddressAndPort dest, InetAddressAndPort preferred) { - StreamPlan sp = new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair, previewKind) + StreamPlan sp = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind) .listeners(this) .flushBeforeTransfer(pendingRepair == null) // sstables are isolated at the beginning of an incremental repair session, so flushing isn't neccessary .requestRanges(dest, preferred, desc.keyspace, ranges, desc.columnFamily); // request ranges from the remote node http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/IncomingStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/IncomingStream.java b/src/java/org/apache/cassandra/streaming/IncomingStream.java new file mode 100644 index 0000000..18bebf5 --- /dev/null +++ b/src/java/org/apache/cassandra/streaming/IncomingStream.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.streaming; + +import java.io.IOException; + +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; + +/** + * The counterpart of {@link OutgoingStream} on the receiving side. + * + * Data streamed in can (and should) be persisted, but must not be included in the table's + * live data set until added by {@link StreamReceiver}. If the stream fails, the stream receiver will + * delete the streamed data, but implementations still need to handle the case where it's process dies + * during streaming, and it has data left around on startup, in which case it should be deleted. + */ +public interface IncomingStream +{ + + /** + * Read in the stream data. + */ + void read(DataInputPlus inputPlus, int version) throws IOException; + + String getName(); + long getSize(); + TableId getTableId(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/OutgoingStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/OutgoingStream.java b/src/java/org/apache/cassandra/streaming/OutgoingStream.java new file mode 100644 index 0000000..e71b985 --- /dev/null +++ b/src/java/org/apache/cassandra/streaming/OutgoingStream.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.streaming; + +import java.io.IOException; +import java.util.UUID; + +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.schema.TableId; + +/** + * Some subset of data to be streamed. Implementations handle writing out their data via the write method. + * On the receiving end, {@link IncomingStream} streams the data in. + * + * All the data contained in a given stream needs to have the same repairedAt timestamp (or 0) and pendingRepair + * id (or null). + */ +public interface OutgoingStream +{ + /** + * Write the streams data into the socket + */ + void write(StreamSession session, DataOutputStreamPlus output, int version) throws IOException; + + /** + * Release any resources held by the stream + */ + void finish(); + + long getRepairedAt(); + UUID getPendingRepair(); + + String getName(); + long getSize(); + TableId getTableId(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/PreviewKind.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/PreviewKind.java b/src/java/org/apache/cassandra/streaming/PreviewKind.java index 3b4d2a0..51c5746 100644 --- a/src/java/org/apache/cassandra/streaming/PreviewKind.java +++ b/src/java/org/apache/cassandra/streaming/PreviewKind.java @@ -21,26 +21,19 @@ package org.apache.cassandra.streaming; import java.util.UUID; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; - -import org.apache.cassandra.io.sstable.format.SSTableReader; - public enum PreviewKind { - NONE(0, null), - ALL(1, Predicates.alwaysTrue()), - UNREPAIRED(2, Predicates.not(SSTableReader::isRepaired)), - REPAIRED(3, SSTableReader::isRepaired); + NONE(0), + ALL(1), + UNREPAIRED(2), + REPAIRED(3); private final int serializationVal; - private final Predicate<SSTableReader> streamingPredicate; - PreviewKind(int serializationVal, Predicate<SSTableReader> streamingPredicate) + PreviewKind(int serializationVal) { assert ordinal() == serializationVal; this.serializationVal = serializationVal; - this.streamingPredicate = streamingPredicate; } public int getSerializationVal() @@ -53,10 +46,6 @@ public enum PreviewKind return values()[serializationVal]; } - public Predicate<SSTableReader> getStreamingPredicate() - { - return streamingPredicate; - } public boolean isPreview() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/ProgressInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/ProgressInfo.java b/src/java/org/apache/cassandra/streaming/ProgressInfo.java index 2334599..ac91855 100644 --- a/src/java/org/apache/cassandra/streaming/ProgressInfo.java +++ b/src/java/org/apache/cassandra/streaming/ProgressInfo.java @@ -24,7 +24,7 @@ import com.google.common.base.Objects; import org.apache.cassandra.locator.InetAddressAndPort; /** - * ProgressInfo contains file transfer progress. + * ProgressInfo contains stream transfer progress. */ public class ProgressInfo implements Serializable { @@ -69,7 +69,7 @@ public class ProgressInfo implements Serializable } /** - * @return true if file transfer is completed + * @return true if transfer is completed */ public boolean isCompleted() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/SessionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/SessionInfo.java b/src/java/org/apache/cassandra/streaming/SessionInfo.java index bbca753..4b4bbed 100644 --- a/src/java/org/apache/cassandra/streaming/SessionInfo.java +++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java @@ -70,7 +70,7 @@ public final class SessionInfo implements Serializable } /** - * Update progress of receiving/sending file. + * Update progress of receiving/sending stream. * * @param newProgress new progress info */ @@ -157,11 +157,11 @@ public final class SessionInfo implements Serializable return getTotalSizes(sendingSummaries); } - private long getTotalSizeInProgress(Collection<ProgressInfo> files) + private long getTotalSizeInProgress(Collection<ProgressInfo> streams) { long total = 0; - for (ProgressInfo file : files) - total += file.currentBytes; + for (ProgressInfo stream : streams) + total += stream.currentBytes; return total; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamCoordinator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java index a22e07d..139488d 100644 --- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java +++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java @@ -46,18 +46,18 @@ public class StreamCoordinator private final boolean connectSequentially; private Map<InetAddressAndPort, HostStreamingData> peerSessions = new HashMap<>(); + private final StreamOperation streamOperation; private final int connectionsPerHost; private StreamConnectionFactory factory; - private final boolean keepSSTableLevel; private Iterator<StreamSession> sessionsToConnect = null; private final UUID pendingRepair; private final PreviewKind previewKind; - public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, StreamConnectionFactory factory, + public StreamCoordinator(StreamOperation streamOperation, int connectionsPerHost, StreamConnectionFactory factory, boolean connectSequentially, UUID pendingRepair, PreviewKind previewKind) { + this.streamOperation = streamOperation; this.connectionsPerHost = connectionsPerHost; - this.keepSSTableLevel = keepSSTableLevel; this.factory = factory; this.connectSequentially = connectSequentially; this.pendingRepair = pendingRepair; @@ -191,51 +191,47 @@ public class StreamCoordinator return result; } - public synchronized void transferFiles(InetAddressAndPort to, Collection<StreamSession.SSTableStreamingSections> sstableDetails) + public synchronized void transferStreams(InetAddressAndPort to, Collection<OutgoingStream> streams) { HostStreamingData sessionList = getOrCreateHostData(to); if (connectionsPerHost > 1) { - List<List<StreamSession.SSTableStreamingSections>> buckets = sliceSSTableDetails(sstableDetails); + List<Collection<OutgoingStream>> buckets = bucketStreams(streams); - for (List<StreamSession.SSTableStreamingSections> subList : buckets) + for (Collection<OutgoingStream> bucket : buckets) { StreamSession session = sessionList.getOrCreateNextSession(to, to); - session.addTransferFiles(subList); + session.addTransferStreams(bucket); } } else { StreamSession session = sessionList.getOrCreateNextSession(to, to); - session.addTransferFiles(sstableDetails); + session.addTransferStreams(streams); } } - private List<List<StreamSession.SSTableStreamingSections>> sliceSSTableDetails(Collection<StreamSession.SSTableStreamingSections> sstableDetails) + private List<Collection<OutgoingStream>> bucketStreams(Collection<OutgoingStream> streams) { // There's no point in divvying things up into more buckets than we have sstableDetails - int targetSlices = Math.min(sstableDetails.size(), connectionsPerHost); - int step = Math.round((float) sstableDetails.size() / (float) targetSlices); + int targetSlices = Math.min(streams.size(), connectionsPerHost); + int step = Math.round((float) streams.size() / (float) targetSlices); int index = 0; - List<List<StreamSession.SSTableStreamingSections>> result = new ArrayList<>(); - List<StreamSession.SSTableStreamingSections> slice = null; - Iterator<StreamSession.SSTableStreamingSections> iter = sstableDetails.iterator(); - while (iter.hasNext()) - { - StreamSession.SSTableStreamingSections streamSession = iter.next(); + List<Collection<OutgoingStream>> result = new ArrayList<>(); + List<OutgoingStream> slice = null; + for (OutgoingStream stream: streams) + { if (index % step == 0) { slice = new ArrayList<>(); result.add(slice); } - slice.add(streamSession); + slice.add(stream); ++index; - iter.remove(); } - return result; } @@ -302,7 +298,7 @@ public class StreamCoordinator // create if (streamSessions.size() < connectionsPerHost) { - StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, pendingRepair, previewKind); + StreamSession session = new StreamSession(streamOperation, peer, connecting, factory, streamSessions.size(), pendingRepair, previewKind); streamSessions.put(++lastReturned, session); return session; } @@ -334,7 +330,7 @@ public class StreamCoordinator StreamSession session = streamSessions.get(id); if (session == null) { - session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, pendingRepair, previewKind); + session = new StreamSession(streamOperation, peer, connecting, factory, id, pendingRepair, previewKind); streamSessions.put(id, session); } return session; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamHook.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamHook.java b/src/java/org/apache/cassandra/streaming/StreamHook.java index d610297..86b5182 100644 --- a/src/java/org/apache/cassandra/streaming/StreamHook.java +++ b/src/java/org/apache/cassandra/streaming/StreamHook.java @@ -18,19 +18,17 @@ package org.apache.cassandra.streaming; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.io.sstable.SSTableMultiWriter; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.streaming.messages.OutgoingFileMessage; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.messages.OutgoingStreamMessage; import org.apache.cassandra.utils.FBUtilities; public interface StreamHook { public static final StreamHook instance = createHook(); - public OutgoingFileMessage reportOutgoingFile(StreamSession session, SSTableReader sstable, OutgoingFileMessage message); + public OutgoingStreamMessage reportOutgoingStream(StreamSession session, OutgoingStream stream, OutgoingStreamMessage message); public void reportStreamFuture(StreamSession session, StreamResultFuture future); - public void reportIncomingFile(ColumnFamilyStore cfs, SSTableMultiWriter writer, StreamSession session, int sequenceNumber); + public void reportIncomingStream(TableId tableId, IncomingStream stream, StreamSession session, int sequenceNumber); static StreamHook createHook() { @@ -43,14 +41,14 @@ public interface StreamHook { return new StreamHook() { - public OutgoingFileMessage reportOutgoingFile(StreamSession session, SSTableReader sstable, OutgoingFileMessage message) + public OutgoingStreamMessage reportOutgoingStream(StreamSession session, OutgoingStream stream, OutgoingStreamMessage message) { return message; } public void reportStreamFuture(StreamSession session, StreamResultFuture future) {} - public void reportIncomingFile(ColumnFamilyStore cfs, SSTableMultiWriter writer, StreamSession session, int sequenceNumber) {} + public void reportIncomingStream(TableId tableId, IncomingStream stream, StreamSession session, int sequenceNumber) {} }; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index 43e9068..98d68ce 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -48,19 +48,19 @@ public class StreamPlan */ public StreamPlan(StreamOperation streamOperation) { - this(streamOperation, 1, false, false, NO_PENDING_REPAIR, PreviewKind.NONE); + this(streamOperation, 1, false, NO_PENDING_REPAIR, PreviewKind.NONE); } - public StreamPlan(StreamOperation streamOperation, boolean keepSSTableLevels, boolean connectSequentially) + public StreamPlan(StreamOperation streamOperation, boolean connectSequentially) { - this(streamOperation, 1, keepSSTableLevels, connectSequentially, NO_PENDING_REPAIR, PreviewKind.NONE); + this(streamOperation, 1, connectSequentially, NO_PENDING_REPAIR, PreviewKind.NONE); } - public StreamPlan(StreamOperation streamOperation, int connectionsPerHost, boolean keepSSTableLevels, + public StreamPlan(StreamOperation streamOperation, int connectionsPerHost, boolean connectSequentially, UUID pendingRepair, PreviewKind previewKind) { this.streamOperation = streamOperation; - this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, new DefaultConnectionFactory(), + this.coordinator = new StreamCoordinator(streamOperation, connectionsPerHost, new DefaultConnectionFactory(), connectSequentially, pendingRepair, previewKind); } @@ -137,18 +137,16 @@ public class StreamPlan } /** - * Add transfer task to send given SSTable files. + * Add transfer task to send given streams * * @param to endpoint address of receiver - * @param sstableDetails sstables with file positions and estimated key count. - * this collection will be modified to remove those files that are successfully handed off + * @param streams streams to send * @return this object for chaining */ - public StreamPlan transferFiles(InetAddressAndPort to, Collection<StreamSession.SSTableStreamingSections> sstableDetails) + public StreamPlan transferStreams(InetAddressAndPort to, Collection<OutgoingStream> streams) { - coordinator.transferFiles(to, sstableDetails); + coordinator.transferStreams(to, streams); return this; - } public StreamPlan listeners(StreamEventHandler handler, StreamEventHandler... handlers) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java deleted file mode 100644 index f4eb9c4..0000000 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.streaming; - -import java.io.*; -import java.util.Collection; -import java.util.UUID; - -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.collect.UnmodifiableIterator; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.io.util.TrackedDataInputPlus; -import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.io.sstable.SSTableMultiWriter; -import org.apache.cassandra.io.sstable.SSTableSimpleIterator; -import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter; -import org.apache.cassandra.io.sstable.format.SSTableFormat; -import org.apache.cassandra.io.sstable.format.Version; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.streaming.compress.StreamCompressionInputStream; -import org.apache.cassandra.streaming.messages.FileMessageHeader; -import org.apache.cassandra.streaming.messages.StreamMessage; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; - -/** - * StreamReader reads from stream and writes to SSTable. - */ -public class StreamReader -{ - private static final Logger logger = LoggerFactory.getLogger(StreamReader.class); - protected final TableId tableId; - protected final long estimatedKeys; - protected final Collection<Pair<Long, Long>> sections; - protected final StreamSession session; - protected final Version inputVersion; - protected final long repairedAt; - protected final UUID pendingRepair; - protected final SSTableFormat.Type format; - protected final int sstableLevel; - protected final SerializationHeader.Component header; - protected final int fileSeqNum; - - public StreamReader(FileMessageHeader header, StreamSession session) - { - if (session.getPendingRepair() != null) - { - // we should only ever be streaming pending repair - // sstables if the session has a pending repair id - assert session.getPendingRepair().equals(header.pendingRepair); - } - this.session = session; - this.tableId = header.tableId; - this.estimatedKeys = header.estimatedKeys; - this.sections = header.sections; - this.inputVersion = header.version; - this.repairedAt = header.repairedAt; - this.pendingRepair = header.pendingRepair; - this.format = header.format; - this.sstableLevel = header.sstableLevel; - this.header = header.header; - this.fileSeqNum = header.sequenceNumber; - } - - /** - * @param inputPlus where this reads data from - * @return SSTable transferred - * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. - */ - @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed - public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException - { - long totalSize = totalSize(); - - ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); - if (cfs == null) - { - // schema was dropped during streaming - throw new IOException("CF " + tableId + " was dropped during streaming"); - } - - logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}', pendingRepair = '{}'.", - session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), - cfs.getTableName(), pendingRepair); - - StreamDeserializer deserializer = null; - SSTableMultiWriter writer = null; - try (StreamCompressionInputStream streamCompressionInputStream = new StreamCompressionInputStream(inputPlus, StreamMessage.CURRENT_VERSION)) - { - TrackedDataInputPlus in = new TrackedDataInputPlus(streamCompressionInputStream); - deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata())); - writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, format); - while (in.getBytesRead() < totalSize) - { - writePartition(deserializer, writer); - // TODO move this to BytesReadTracker - session.progress(writer.getFilename(), ProgressInfo.Direction.IN, in.getBytesRead(), totalSize); - } - logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", - session.planId(), fileSeqNum, session.peer, FBUtilities.prettyPrintMemory(in.getBytesRead()), FBUtilities.prettyPrintMemory(totalSize)); - return writer; - } - catch (Throwable e) - { - Object partitionKey = deserializer != null ? deserializer.partitionKey() : ""; - logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", - session.planId(), partitionKey, cfs.keyspace.getName(), cfs.getTableName(), e); - if (writer != null) - { - writer.abort(e); - } - throw Throwables.propagate(e); - } - } - - protected SerializationHeader getHeader(TableMetadata metadata) - { - return header != null? header.toHeader(metadata) : null; //pre-3.0 sstable have no SerializationHeader - } - - protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, UUID pendingRepair, SSTableFormat.Type format) throws IOException - { - Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize); - if (localDir == null) - throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize))); - - RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, pendingRepair, format, sstableLevel, totalSize, session.getTransaction(tableId), getHeader(cfs.metadata())); - StreamHook.instance.reportIncomingFile(cfs, writer, session, fileSeqNum); - return writer; - } - - protected long totalSize() - { - long size = 0; - for (Pair<Long, Long> section : sections) - size += section.right - section.left; - return size; - } - - protected void writePartition(StreamDeserializer deserializer, SSTableMultiWriter writer) throws IOException - { - writer.append(deserializer.newPartition()); - deserializer.checkForExceptions(); - } - - public static class StreamDeserializer extends UnmodifiableIterator<Unfiltered> implements UnfilteredRowIterator - { - private final TableMetadata metadata; - private final DataInputPlus in; - private final SerializationHeader header; - private final SerializationHelper helper; - - private DecoratedKey key; - private DeletionTime partitionLevelDeletion; - private SSTableSimpleIterator iterator; - private Row staticRow; - private IOException exception; - - public StreamDeserializer(TableMetadata metadata, DataInputPlus in, Version version, SerializationHeader header) throws IOException - { - this.metadata = metadata; - this.in = in; - this.helper = new SerializationHelper(metadata, version.correspondingMessagingVersion(), SerializationHelper.Flag.PRESERVE_SIZE); - this.header = header; - } - - public StreamDeserializer newPartition() throws IOException - { - key = metadata.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in)); - partitionLevelDeletion = DeletionTime.serializer.deserialize(in); - iterator = SSTableSimpleIterator.create(metadata, in, header, helper, partitionLevelDeletion); - staticRow = iterator.readStaticRow(); - return this; - } - - public TableMetadata metadata() - { - return metadata; - } - - public RegularAndStaticColumns columns() - { - // We don't know which columns we'll get so assume it can be all of them - return metadata.regularAndStaticColumns(); - } - - public boolean isReverseOrder() - { - return false; - } - - public DecoratedKey partitionKey() - { - return key; - } - - public DeletionTime partitionLevelDeletion() - { - return partitionLevelDeletion; - } - - public Row staticRow() - { - return staticRow; - } - - public EncodingStats stats() - { - return header.stats(); - } - - public boolean hasNext() - { - try - { - return iterator.hasNext(); - } - catch (IOError e) - { - if (e.getCause() != null && e.getCause() instanceof IOException) - { - exception = (IOException)e.getCause(); - return false; - } - throw e; - } - } - - public Unfiltered next() - { - // Note that in practice we know that IOException will be thrown by hasNext(), because that's - // where the actual reading happens, so we don't bother catching RuntimeException here (contrarily - // to what we do in hasNext) - Unfiltered unfiltered = iterator.next(); - return metadata.isCounter() && unfiltered.kind() == Unfiltered.Kind.ROW - ? maybeMarkLocalToBeCleared((Row) unfiltered) - : unfiltered; - } - - private Row maybeMarkLocalToBeCleared(Row row) - { - return metadata.isCounter() ? row.markCounterLocalToBeCleared() : row; - } - - public void checkForExceptions() throws IOException - { - if (exception != null) - throw exception; - } - - public void close() - { - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index b823311..49beba1 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -17,38 +17,16 @@ */ package org.apache.cassandra.streaming; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.google.common.base.Preconditions; -import com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.compaction.OperationType; -import org.apache.cassandra.db.filter.ColumnFilter; -import org.apache.cassandra.db.lifecycle.LifecycleTransaction; -import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.db.rows.ThrottledUnfilteredIterator; -import org.apache.cassandra.db.rows.UnfilteredRowIterator; -import org.apache.cassandra.db.view.View; -import org.apache.cassandra.dht.Bounds; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.ISSTableScanner; -import org.apache.cassandra.io.sstable.SSTableMultiWriter; -import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.JVMStabilityInspector; -import org.apache.cassandra.utils.Throwables; -import org.apache.cassandra.utils.concurrent.Refs; /** * Task that manages receiving files for the session for certain ColumnFamily. @@ -59,69 +37,51 @@ public class StreamReceiveTask extends StreamTask private static final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("StreamReceiveTask")); - private static final int MAX_ROWS_PER_BATCH = Integer.getInteger("cassandra.repair.mutation_repair_rows_per_batch", 100); + private final StreamReceiver receiver; - // number of files to receive - private final int totalFiles; - // total size of files to receive - private final long totalSize; + // number of streams to receive + private final int totalStreams; - // Transaction tracking new files received - private final LifecycleTransaction txn; + // total size of streams to receive + private final long totalSize; // true if task is done (either completed or aborted) private volatile boolean done = false; - // holds references to SSTables received - protected Collection<SSTableReader> sstables; - - private int remoteSSTablesReceived = 0; + private int remoteStreamsReceived = 0; - public StreamReceiveTask(StreamSession session, TableId tableId, int totalFiles, long totalSize) + public StreamReceiveTask(StreamSession session, TableId tableId, int totalStreams, long totalSize) { super(session, tableId); - this.totalFiles = totalFiles; + this.receiver = ColumnFamilyStore.getIfExists(tableId).getStreamManager().createStreamReceiver(session, totalStreams); + this.totalStreams = totalStreams; this.totalSize = totalSize; - // this is an "offline" transaction, as we currently manually expose the sstables once done; - // this should be revisited at a later date, so that LifecycleTransaction manages all sstable state changes - this.txn = LifecycleTransaction.offline(OperationType.STREAM); - this.sstables = new ArrayList<>(totalFiles); } /** - * Process received file. + * Process received stream. * - * @param sstable SSTable file received. + * @param stream Stream received. */ - public synchronized void received(SSTableMultiWriter sstable) + public synchronized void received(IncomingStream stream) { Preconditions.checkState(!session.isPreview(), "we should never receive sstables when previewing"); if (done) { - logger.warn("[{}] Received sstable {} on already finished stream received task. Aborting sstable.", session.planId(), - sstable.getFilename()); - Throwables.maybeFail(sstable.abort(null)); + logger.warn("[{}] Received stream {} on already finished stream received task. Aborting stream.", session.planId(), + stream.getName()); + receiver.discardStream(stream); return; } - remoteSSTablesReceived++; - assert tableId.equals(sstable.getTableId()); - logger.debug("recevied {} of {} total files", remoteSSTablesReceived, totalFiles); + remoteStreamsReceived++; + Preconditions.checkArgument(tableId.equals(stream.getTableId())); + logger.debug("recevied {} of {} total files", remoteStreamsReceived, totalStreams); - Collection<SSTableReader> finished = null; - try - { - finished = sstable.finish(true); - } - catch (Throwable t) - { - Throwables.maybeFail(sstable.abort(t)); - } - txn.update(finished, false); - sstables.addAll(finished); + receiver.received(stream); - if (remoteSSTablesReceived == totalFiles) + if (remoteStreamsReceived == totalStreams) { done = true; executor.submit(new OnCompletionRunnable(this)); @@ -130,7 +90,7 @@ public class StreamReceiveTask extends StreamTask public int getTotalNumberOfFiles() { - return totalFiles; + return totalStreams; } public long getTotalSize() @@ -138,11 +98,11 @@ public class StreamReceiveTask extends StreamTask return totalSize; } - public synchronized LifecycleTransaction getTransaction() + public synchronized StreamReceiver getReceiver() { if (done) throw new RuntimeException(String.format("Stream receive task %s of cf %s already finished.", session.planId(), tableId)); - return txn; + return receiver; } private static class OnCompletionRunnable implements Runnable @@ -154,116 +114,19 @@ public class StreamReceiveTask extends StreamTask this.task = task; } - /* - * We have a special path for views and for CDC. - * - * For views, since the view requires cleaning up any pre-existing state, we must put all partitions - * through the same write path as normal mutations. This also ensures any 2is are also updated. - * - * For CDC-enabled tables, we want to ensure that the mutations are run through the CommitLog so they - * can be archived by the CDC process on discard. - */ - private boolean requiresWritePath(ColumnFamilyStore cfs) { - return hasCDC(cfs) || (task.session.streamOperation().requiresViewBuild() && hasViews(cfs)); - } - - private boolean hasViews(ColumnFamilyStore cfs) - { - return !Iterables.isEmpty(View.findAll(cfs.metadata.keyspace, cfs.getTableName())); - } - - private boolean hasCDC(ColumnFamilyStore cfs) - { - return cfs.metadata().params.cdc; - } - - private void sendThroughWritePath(ColumnFamilyStore cfs, Collection<SSTableReader> readers) { - boolean hasCdc = hasCDC(cfs); - ColumnFilter filter = ColumnFilter.all(cfs.metadata()); - for (SSTableReader reader : readers) - { - Keyspace ks = Keyspace.open(reader.getKeyspaceName()); - // When doing mutation-based repair we split each partition into smaller batches - // ({@link Stream MAX_ROWS_PER_BATCH}) to avoid OOMing and generating heap pressure - try (ISSTableScanner scanner = reader.getScanner(); - CloseableIterator<UnfilteredRowIterator> throttledPartitions = ThrottledUnfilteredIterator.throttle(scanner, MAX_ROWS_PER_BATCH)) - { - while (throttledPartitions.hasNext()) - { - // MV *can* be applied unsafe if there's no CDC on the CFS as we flush - // before transaction is done. - // - // If the CFS has CDC, however, these updates need to be written to the CommitLog - // so they get archived into the cdc_raw folder - ks.apply(new Mutation(PartitionUpdate.fromIterator(throttledPartitions.next(), filter)), - hasCdc, - true, - false); - } - } - } - } - public void run() { - ColumnFamilyStore cfs = null; - boolean requiresWritePath = false; try { - cfs = ColumnFamilyStore.getIfExists(task.tableId); - if (cfs == null) + if (ColumnFamilyStore.getIfExists(task.tableId) == null) { // schema was dropped during streaming - task.sstables.clear(); - task.abortTransaction(); + task.receiver.abort(); task.session.taskCompleted(task); return; } - requiresWritePath = requiresWritePath(cfs); - Collection<SSTableReader> readers = task.sstables; - - try (Refs<SSTableReader> refs = Refs.ref(readers)) - { - if (requiresWritePath) - { - sendThroughWritePath(cfs, readers); - } - else - { - task.finishTransaction(); - - // add sstables (this will build secondary indexes too, see CASSANDRA-10130) - logger.debug("[Stream #{}] Received {} sstables from {} ({})", task.session.planId(), readers.size(), task.session.peer, readers); - cfs.addSSTables(readers); - - //invalidate row and counter cache - if (cfs.isRowCacheEnabled() || cfs.metadata().isCounter()) - { - List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size()); - readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()))); - Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate); - - if (cfs.isRowCacheEnabled()) - { - int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds); - if (invalidatedKeys > 0) - logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " + - "receive task completed.", task.session.planId(), invalidatedKeys, - cfs.keyspace.getName(), cfs.getTableName()); - } - - if (cfs.metadata().isCounter()) - { - int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds); - if (invalidatedKeys > 0) - logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " + - "receive task completed.", task.session.planId(), invalidatedKeys, - cfs.keyspace.getName(), cfs.getTableName()); - } - } - } - } + task.receiver.finished();; task.session.taskCompleted(task); } catch (Throwable t) @@ -273,14 +136,7 @@ public class StreamReceiveTask extends StreamTask } finally { - // We don't keep the streamed sstables since we've applied them manually so we abort the txn and delete - // the streamed sstables. - if (requiresWritePath) - { - if (cfs != null) - cfs.forceBlockingFlush(); - task.abortTransaction(); - } + task.receiver.cleanup(); } } } @@ -297,17 +153,6 @@ public class StreamReceiveTask extends StreamTask return; done = true; - abortTransaction(); - sstables.clear(); - } - - private synchronized void abortTransaction() - { - txn.abort(); - } - - private synchronized void finishTransaction() - { - txn.finish(); + receiver.abort(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamReceiver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiver.java b/src/java/org/apache/cassandra/streaming/StreamReceiver.java new file mode 100644 index 0000000..bc357ef --- /dev/null +++ b/src/java/org/apache/cassandra/streaming/StreamReceiver.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.streaming; + +/** + * StreamReceiver acts as a staging area for incoming data. Received data + * ends up here, and is kept separate from the live data until all streams + * for a session have been received successfully + */ +public interface StreamReceiver +{ + /** + * Called after we've finished receiving stream data. The data covered by the given stream should + * be kept isolated from the live dataset for it's table. + */ + void received(IncomingStream stream); + + /** + * This is called when we've received stream data we can't add to the received set for some reason, + * usually when we've received data for a session which has been closed. The data backing this stream + * should be deleted, and any resources associated with the given stream should be released. + */ + void discardStream(IncomingStream stream); + + /** + * Called when something went wrong with a stream session. All data associated with this receiver + * should be deleted, and any associated resources should be cleaned up + */ + void abort(); + + /** + * Called when a stream session has succesfully completed. All stream data being held by this receiver + * should be added to the live data sets for their respective tables before this method returns. + */ + void finished(); + + /** + * Called after finished has returned and we've sent any messages to other nodes. Mainly for + * signaling that mvs and cdc should cleanup. + */ + void cleanup(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamResultFuture.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java index 544f37f..3b11fb6 100644 --- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java +++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java @@ -73,9 +73,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> set(getCurrentState()); } - private StreamResultFuture(UUID planId, StreamOperation streamOperation, boolean keepSSTableLevels, UUID pendingRepair, PreviewKind previewKind) + private StreamResultFuture(UUID planId, StreamOperation streamOperation, UUID pendingRepair, PreviewKind previewKind) { - this(planId, streamOperation, new StreamCoordinator(0, keepSSTableLevels, new DefaultConnectionFactory(), false, pendingRepair, previewKind)); + this(planId, streamOperation, new StreamCoordinator(streamOperation, 0, new DefaultConnectionFactory(), false, pendingRepair, previewKind)); } static StreamResultFuture init(UUID planId, StreamOperation streamOperation, Collection<StreamEventHandler> listeners, @@ -106,7 +106,6 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> StreamOperation streamOperation, InetAddressAndPort from, Channel channel, - boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind) { @@ -116,7 +115,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, streamOperation.getDescription()); // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure. - future = new StreamResultFuture(planId, streamOperation, keepSSTableLevel, pendingRepair, previewKind); + future = new StreamResultFuture(planId, streamOperation, pendingRepair, previewKind); StreamManager.instance.registerReceiving(future); } future.attachConnection(from, sessionIndex, channel); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
