http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 4085c43..adf5d76 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -17,27 +17,17 @@ */ package org.apache.cassandra.streaming; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.SocketTimeoutException; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; import com.google.common.collect.*; import com.google.common.util.concurrent.Futures; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.lifecycle.LifecycleTransaction; -import org.apache.cassandra.db.lifecycle.SSTableIntervalTree; -import org.apache.cassandra.db.lifecycle.SSTableSet; -import org.apache.cassandra.db.lifecycle.View; -import org.apache.cassandra.io.sstable.SSTableMultiWriter; -import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +35,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelId; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.*; @@ -54,19 +43,13 @@ import org.apache.cassandra.metrics.StreamingMetrics; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.async.OutboundConnectionIdentifier; import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.streaming.async.NettyStreamingMessageSender; import org.apache.cassandra.streaming.messages.*; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; -import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.concurrent.Ref; -import org.apache.cassandra.utils.concurrent.Refs; /** - * Handles the streaming a one or more section of one of more sstables to and from a specific - * remote node. The sending side performs a block-level transfer of the source sstable, while the receiver - * must deserilaize that data stream into an partitions and rows, and then write that out as an sstable. + * Handles the streaming a one or more streams to and from a specific remote node. * * Both this node and the remote one will create a similar symmetrical {@link StreamSession}. A streaming * session has the following life-cycle: @@ -98,15 +81,15 @@ import org.apache.cassandra.utils.concurrent.Refs; * * (a) The streaming phase is started at each node by calling {@link StreamSession#startStreamingFiles(boolean)}. * This will send, sequentially on each outbound streaming connection (see {@link NettyStreamingMessageSender}), - * an {@link OutgoingFileMessage} for each file in each of the {@link StreamTransferTask}. - * Each {@link OutgoingFileMessage} consists of a {@link FileMessageHeader} that contains metadata about the file - * being streamed, followed by the file content itself. Once all the files for a {@link StreamTransferTask} are sent, + * an {@link OutgoingStreamMessage} for each stream in each of the {@link StreamTransferTask}. + * Each {@link OutgoingStreamMessage} consists of a {@link StreamMessageHeader} that contains metadata about + * the stream, followed by the stream content itself. Once all the files for a {@link StreamTransferTask} are sent, * the task is marked complete {@link StreamTransferTask#complete(int)}. - * (b) On the receiving side, a SSTable will be written for the incoming file, and once the file is fully received, - * the file will be marked as complete ({@link StreamReceiveTask#received(SSTableMultiWriter)}). When all files - * for the {@link StreamReceiveTask} have been received, the sstables are added to the CFS (and 2ndary indexes/MV are built), + * (b) On the receiving side, the incoming data is written to disk, and once the stream is fully received, + * it will be marked as complete ({@link StreamReceiveTask#received(IncomingStream)}). When all streams + * for the {@link StreamReceiveTask} have been received, the data is added to the CFS (and 2ndary indexes/MV are built), * and the task is marked complete ({@link #taskCompleted(StreamReceiveTask)}). - * (b) If during the streaming of a particular file an error occurs on the receiving end of a stream + * (b) If during the streaming of a particular stream an error occurs on the receiving end of a stream * (it may be either the initiator or the follower), the node will send a {@link SessionFailedMessage} * to the sender and close the stream session. * (c) When all transfer and receive tasks for a session are complete, the session moves to the Completion phase @@ -127,7 +110,7 @@ import org.apache.cassandra.utils.concurrent.Refs; * F: PrepareSynAckMessage * I: PrepareAckMessage * (stream - this can happen in both directions) - * I: OutgoingFileMessage + * I: OutgoingStreamMessage * F: ReceivedMessage * (completion) * I/F: CompleteMessage @@ -140,6 +123,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber { private static final Logger logger = LoggerFactory.getLogger(StreamSession.class); + private final StreamOperation streamOperation; /** * Streaming endpoint. * @@ -170,7 +154,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber private final ConcurrentMap<ChannelId, Channel> incomingChannels = new ConcurrentHashMap<>(); private final AtomicBoolean isAborted = new AtomicBoolean(false); - private final boolean keepSSTableLevel; private final UUID pendingRepair; private final PreviewKind previewKind; @@ -189,11 +172,13 @@ public class StreamSession implements IEndpointStateChangeSubscriber /** * Create new streaming session with the peer. - * @param peer Address of streaming peer + * @param streamOperation + * @param peer Address of streaming peer * @param connecting Actual connecting address */ - public StreamSession(InetAddressAndPort peer, InetAddressAndPort connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind) + public StreamSession(StreamOperation streamOperation, InetAddressAndPort peer, InetAddressAndPort connecting, StreamConnectionFactory factory, int index, UUID pendingRepair, PreviewKind previewKind) { + this.streamOperation = streamOperation; this.peer = peer; this.connecting = connecting; this.index = index; @@ -202,7 +187,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber InetAddressAndPort.getByAddressOverrideDefaults(connecting.address, MessagingService.instance().portFor(connecting))); this.messageSender = new NettyStreamingMessageSender(this, id, factory, StreamMessage.CURRENT_VERSION, previewKind.isPreview()); this.metrics = StreamingMetrics.get(connecting); - this.keepSSTableLevel = keepSSTableLevel; this.pendingRepair = pendingRepair; this.previewKind = previewKind; } @@ -222,9 +206,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber return streamResult == null ? null : streamResult.streamOperation; } - public boolean keepSSTableLevel() + public StreamOperation getStreamOperation() { - return keepSSTableLevel; + return streamOperation; } public UUID getPendingRepair() @@ -242,10 +226,10 @@ public class StreamSession implements IEndpointStateChangeSubscriber return previewKind; } - public LifecycleTransaction getTransaction(TableId tableId) + public StreamReceiver getAggregator(TableId tableId) { assert receivers.containsKey(tableId); - return receivers.get(tableId).getTransaction(); + return receivers.get(tableId).getReceiver(); } /** @@ -309,8 +293,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber /** * Set up transfer for specific keyspace/ranges/CFs * - * Used in repair - a streamed sstable in repair will be marked with the given repairedAt time - * * @param keyspace Transfer keyspace * @param ranges Transfer ranges * @param columnFamilies Transfer ColumnFamilies @@ -324,23 +306,15 @@ public class StreamSession implements IEndpointStateChangeSubscriber flushSSTables(stores); List<Range<Token>> normalizedRanges = Range.normalize(ranges); - List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, pendingRepair, previewKind); - try - { - addTransferFiles(sections); - Set<Range<Token>> toBeUpdated = transferredRangesPerKeyspace.get(keyspace); - if (toBeUpdated == null) - { - toBeUpdated = new HashSet<>(); - } - toBeUpdated.addAll(ranges); - transferredRangesPerKeyspace.put(keyspace, toBeUpdated); - } - finally + List<OutgoingStream> streams = getOutgoingStreamsForRanges(normalizedRanges, stores, pendingRepair, previewKind); + addTransferStreams(streams); + Set<Range<Token>> toBeUpdated = transferredRangesPerKeyspace.get(keyspace); + if (toBeUpdated == null) { - for (SSTableStreamingSections release : sections) - release.ref.release(); + toBeUpdated = new HashSet<>(); } + toBeUpdated.addAll(ranges); + transferredRangesPerKeyspace.put(keyspace, toBeUpdated); } private void failIfFinished() @@ -366,83 +340,30 @@ public class StreamSession implements IEndpointStateChangeSubscriber } @VisibleForTesting - public static List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, UUID pendingRepair, PreviewKind previewKind) + public List<OutgoingStream> getOutgoingStreamsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, UUID pendingRepair, PreviewKind previewKind) { - Refs<SSTableReader> refs = new Refs<>(); + List<OutgoingStream> streams = new ArrayList<>(); try { - for (ColumnFamilyStore cfStore : stores) - { - final List<Range<PartitionPosition>> keyRanges = new ArrayList<>(ranges.size()); - for (Range<Token> range : ranges) - keyRanges.add(Range.makeRowRange(range)); - refs.addAll(cfStore.selectAndReference(view -> { - Set<SSTableReader> sstables = Sets.newHashSet(); - SSTableIntervalTree intervalTree = SSTableIntervalTree.build(view.select(SSTableSet.CANONICAL)); - Predicate<SSTableReader> predicate; - if (previewKind.isPreview()) - { - predicate = previewKind.getStreamingPredicate(); - } - else if (pendingRepair == ActiveRepairService.NO_PENDING_REPAIR) - { - predicate = Predicates.alwaysTrue(); - } - else - { - predicate = s -> s.isPendingRepair() && s.getSSTableMetadata().pendingRepair.equals(pendingRepair); - } - - for (Range<PartitionPosition> keyRange : keyRanges) - { - // keyRange excludes its start, while sstableInBounds is inclusive (of both start and end). - // This is fine however, because keyRange has been created from a token range through Range.makeRowRange (see above). - // And that later method uses the Token.maxKeyBound() method to creates the range, which return a "fake" key that - // sort after all keys having the token. That "fake" key cannot however be equal to any real key, so that even - // including keyRange.left will still exclude any key having the token of the original token range, and so we're - // still actually selecting what we wanted. - for (SSTableReader sstable : Iterables.filter(View.sstablesInBounds(keyRange.left, keyRange.right, intervalTree), predicate)) - { - sstables.add(sstable); - } - } - - if (logger.isDebugEnabled()) - logger.debug("ViewFilter for {}/{} sstables", sstables.size(), Iterables.size(view.select(SSTableSet.CANONICAL))); - return sstables; - }).refs); - } - - List<SSTableStreamingSections> sections = new ArrayList<>(refs.size()); - for (SSTableReader sstable : refs) + for (ColumnFamilyStore cfs: stores) { - sections.add(new SSTableStreamingSections(refs.get(sstable), sstable.getPositionsForRanges(ranges), sstable.estimatedKeysForRanges(ranges))); + streams.addAll(cfs.getStreamManager().createOutgoingStreams(this, ranges, pendingRepair, previewKind)); } - return sections; } catch (Throwable t) { - refs.release(); + streams.forEach(OutgoingStream::finish); throw t; } + return streams; } - synchronized void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails) + synchronized void addTransferStreams(Collection<OutgoingStream> streams) { failIfFinished(); - Iterator<SSTableStreamingSections> iter = sstableDetails.iterator(); - while (iter.hasNext()) + for (OutgoingStream stream: streams) { - SSTableStreamingSections details = iter.next(); - if (details.sections.isEmpty()) - { - // A reference was acquired on the sstable and we won't stream it - details.ref.release(); - iter.remove(); - continue; - } - - TableId tableId = details.ref.get().metadata().id; + TableId tableId = stream.getTableId(); StreamTransferTask task = transfers.get(tableId); if (task == null) { @@ -452,22 +373,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber if (task == null) task = newTask; } - task.addTransferFile(details.ref, details.estimatedKeys, details.sections); - iter.remove(); - } - } - - public static class SSTableStreamingSections - { - public final Ref<SSTableReader> ref; - public final List<Pair<Long, Long>> sections; - public final long estimatedKeys; - - public SSTableStreamingSections(Ref<SSTableReader> ref, List<Pair<Long, Long>> sections, long estimatedKeys) - { - this.ref = ref; - this.sections = sections; - this.estimatedKeys = estimatedKeys; + task.addTransferStream(stream); } } @@ -554,8 +460,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber case PREPARE_ACK: prepareAck((PrepareAckMessage) message); break; - case FILE: - receive((IncomingFileMessage) message); + case STREAM: + receive((IncomingStreamMessage) message); break; case RECEIVED: ReceivedMessage received = (ReceivedMessage) message; @@ -681,41 +587,42 @@ public class StreamSession implements IEndpointStateChangeSubscriber } /** - * Call back after sending FileMessageHeader. + * Call back after sending StreamMessageHeader. * - * @param header sent header + * @param message sent stream message */ - public void fileSent(FileMessageHeader header) + public void streamSent(OutgoingStreamMessage message) { - long headerSize = header.size(); + long headerSize = message.stream.getSize(); StreamingMetrics.totalOutgoingBytes.inc(headerSize); metrics.outgoingBytes.inc(headerSize); // schedule timeout for receiving ACK - StreamTransferTask task = transfers.get(header.tableId); + StreamTransferTask task = transfers.get(message.header.tableId); if (task != null) { - task.scheduleTimeout(header.sequenceNumber, 12, TimeUnit.HOURS); + task.scheduleTimeout(message.header.sequenceNumber, 12, TimeUnit.HOURS); } } /** - * Call back after receiving a streamed file. + * Call back after receiving a stream. * - * @param message received file + * @param message received stream */ - public void receive(IncomingFileMessage message) + public void receive(IncomingStreamMessage message) { if (isPreview()) { throw new RuntimeException("Cannot receive files for preview session"); } - long headerSize = message.header.size(); + long headerSize = message.stream.getSize(); StreamingMetrics.totalIncomingBytes.inc(headerSize); metrics.incomingBytes.inc(headerSize); // send back file received message messageSender.sendMessage(new ReceivedMessage(message.header.tableId, message.header.sequenceNumber)); - receivers.get(message.header.tableId).received(message.sstable); + StreamHook.instance.reportIncomingStream(message.header.tableId, message.stream, this, message.header.sequenceNumber); + receivers.get(message.header.tableId).received(message.stream); } public void progress(String filename, ProgressInfo.Direction direction, long bytes, long total) @@ -812,8 +719,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber } finally { - // aborting the tasks here needs to be the last thing we do so that we - // accurately report expected streaming, but don't leak any sstable refs + // aborting the tasks here needs to be the last thing we do so that we accurately report + // expected streaming, but don't leak any resources held by the task for (StreamTask task : Iterables.concat(receivers.values(), transfers.values())) task.abort(); } @@ -872,10 +779,10 @@ public class StreamSession implements IEndpointStateChangeSubscriber for (StreamTransferTask task : transfers.values()) { - Collection<OutgoingFileMessage> messages = task.getFileMessages(); + Collection<OutgoingStreamMessage> messages = task.getFileMessages(); if (!messages.isEmpty()) { - for (OutgoingFileMessage ofm : messages) + for (OutgoingStreamMessage ofm : messages) { // pass the session planId/index to the OFM (which is only set at init(), after the transfers have already been created) ofm.header.addSessionInfo(this);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamTransferTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java index 5e21712..802188a 100644 --- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java @@ -24,20 +24,18 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.NamedThreadFactory; -import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.streaming.messages.OutgoingFileMessage; -import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.concurrent.Ref; +import org.apache.cassandra.streaming.messages.OutgoingStreamMessage; /** - * StreamTransferTask sends sections of SSTable files in certain ColumnFamily. + * StreamTransferTask sends streams for a given table */ public class StreamTransferTask extends StreamTask { @@ -48,7 +46,7 @@ public class StreamTransferTask extends StreamTask private boolean aborted = false; @VisibleForTesting - protected final Map<Integer, OutgoingFileMessage> files = new HashMap<>(); + protected final Map<Integer, OutgoingStreamMessage> streams = new HashMap<>(); private final Map<Integer, ScheduledFuture> timeoutTasks = new HashMap<>(); private long totalSize; @@ -58,19 +56,19 @@ public class StreamTransferTask extends StreamTask super(session, tableId); } - public synchronized void addTransferFile(Ref<SSTableReader> ref, long estimatedKeys, List<Pair<Long, Long>> sections) + public synchronized void addTransferStream(OutgoingStream stream) { - assert ref.get() != null && tableId.equals(ref.get().metadata().id); - OutgoingFileMessage message = new OutgoingFileMessage(ref, session, sequenceNumber.getAndIncrement(), estimatedKeys, sections, session.keepSSTableLevel()); - message = StreamHook.instance.reportOutgoingFile(session, ref.get(), message); - files.put(message.header.sequenceNumber, message); - totalSize += message.header.size(); + Preconditions.checkArgument(tableId.equals(stream.getTableId())); + OutgoingStreamMessage message = new OutgoingStreamMessage(tableId, session, stream, sequenceNumber.getAndIncrement()); + message = StreamHook.instance.reportOutgoingStream(session, stream, message); + streams.put(message.header.sequenceNumber, message); + totalSize += message.stream.getSize(); } /** - * Received ACK for file at {@code sequenceNumber}. + * Received ACK for stream at {@code sequenceNumber}. * - * @param sequenceNumber sequence number of file + * @param sequenceNumber sequence number of stream */ public void complete(int sequenceNumber) { @@ -81,12 +79,12 @@ public class StreamTransferTask extends StreamTask if (timeout != null) timeout.cancel(false); - OutgoingFileMessage file = files.remove(sequenceNumber); - if (file != null) - file.complete(); + OutgoingStreamMessage stream = streams.remove(sequenceNumber); + if (stream != null) + stream.complete(); - logger.debug("recevied sequenceNumber {}, remaining files {}", sequenceNumber, files.keySet()); - signalComplete = files.isEmpty(); + logger.debug("recevied sequenceNumber {}, remaining files {}", sequenceNumber, streams.keySet()); + signalComplete = streams.isEmpty(); } // all file sent, notify session this task is complete. @@ -105,11 +103,11 @@ public class StreamTransferTask extends StreamTask timeoutTasks.clear(); Throwable fail = null; - for (OutgoingFileMessage file : files.values()) + for (OutgoingStreamMessage stream : streams.values()) { try { - file.complete(); + stream.complete(); } catch (Throwable t) { @@ -117,14 +115,14 @@ public class StreamTransferTask extends StreamTask else fail.addSuppressed(t); } } - files.clear(); + streams.clear(); if (fail != null) Throwables.propagate(fail); } public synchronized int getTotalNumberOfFiles() { - return files.size(); + return streams.size(); } public long getTotalSize() @@ -132,35 +130,35 @@ public class StreamTransferTask extends StreamTask return totalSize; } - public synchronized Collection<OutgoingFileMessage> getFileMessages() + public synchronized Collection<OutgoingStreamMessage> getFileMessages() { // We may race between queuing all those messages and the completion of the completion of // the first ones. So copy the values to avoid a ConcurrentModificationException - return new ArrayList<>(files.values()); + return new ArrayList<>(streams.values()); } - public synchronized OutgoingFileMessage createMessageForRetry(int sequenceNumber) + public synchronized OutgoingStreamMessage createMessageForRetry(int sequenceNumber) { // remove previous time out task to be rescheduled later ScheduledFuture future = timeoutTasks.remove(sequenceNumber); if (future != null) future.cancel(false); - return files.get(sequenceNumber); + return streams.get(sequenceNumber); } /** - * Schedule timeout task to release reference for file sent. + * Schedule timeout task to release reference for stream sent. * When not receiving ACK after sending to receiver in given time, * the task will release reference. * - * @param sequenceNumber sequence number of file sent. + * @param sequenceNumber sequence number of stream sent. * @param time time to timeout * @param unit unit of given time * @return scheduled future for timeout task */ public synchronized ScheduledFuture scheduleTimeout(final int sequenceNumber, long time, TimeUnit unit) { - if (!files.containsKey(sequenceNumber)) + if (!streams.containsKey(sequenceNumber)) return null; ScheduledFuture future = timeoutExecutor.schedule(new Runnable() http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java deleted file mode 100644 index 81b3d8a..0000000 --- a/src/java/org/apache/cassandra/streaming/StreamWriter.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.streaming; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Collection; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.io.sstable.Component; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.util.ChannelProxy; -import org.apache.cassandra.io.util.DataIntegrityMetadata; -import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator; -import org.apache.cassandra.io.util.DataOutputStreamPlus; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; -import org.apache.cassandra.streaming.compress.ByteBufCompressionDataOutputStreamPlus; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; - -/** - * StreamWriter writes given section of the SSTable to given channel. - */ -public class StreamWriter -{ - private static final int DEFAULT_CHUNK_SIZE = 64 * 1024; - - private static final Logger logger = LoggerFactory.getLogger(StreamWriter.class); - - protected final SSTableReader sstable; - protected final Collection<Pair<Long, Long>> sections; - protected final StreamRateLimiter limiter; - protected final StreamSession session; - - public StreamWriter(SSTableReader sstable, Collection<Pair<Long, Long>> sections, StreamSession session) - { - this.session = session; - this.sstable = sstable; - this.sections = sections; - this.limiter = StreamManager.getRateLimiter(session.peer); - } - - /** - * Stream file of specified sections to given channel. - * - * StreamWriter uses LZF compression on wire to decrease size to transfer. - * - * @param output where this writes data to - * @throws IOException on any I/O error - */ - public void write(DataOutputStreamPlus output) throws IOException - { - long totalSize = totalSize(); - logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(), - sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); - - try(ChannelProxy proxy = sstable.getDataChannel().sharedCopy(); - ChecksumValidator validator = new File(sstable.descriptor.filenameFor(Component.CRC)).exists() - ? DataIntegrityMetadata.checksumValidator(sstable.descriptor) - : null) - { - int bufferSize = validator == null ? DEFAULT_CHUNK_SIZE: validator.chunkSize; - - // setting up data compression stream - long progress = 0L; - - try (DataOutputStreamPlus compressedOutput = new ByteBufCompressionDataOutputStreamPlus(output, limiter)) - { - // stream each of the required sections of the file - for (Pair<Long, Long> section : sections) - { - long start = validator == null ? section.left : validator.chunkStart(section.left); - // if the transfer does not start on the valididator's chunk boundary, this is the number of bytes to offset by - int transferOffset = (int) (section.left - start); - if (validator != null) - validator.seek(start); - - // length of the section to read - long length = section.right - start; - // tracks write progress - long bytesRead = 0; - while (bytesRead < length) - { - int toTransfer = (int) Math.min(bufferSize, length - bytesRead); - long lastBytesRead = write(proxy, validator, compressedOutput, start, transferOffset, toTransfer, bufferSize); - start += lastBytesRead; - bytesRead += lastBytesRead; - progress += (lastBytesRead - transferOffset); - session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize); - transferOffset = 0; - } - - // make sure that current section is sent - output.flush(); - } - logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}", - session.planId(), sstable.getFilename(), session.peer, FBUtilities.prettyPrintMemory(progress), FBUtilities.prettyPrintMemory(totalSize)); - } - } - } - - protected long totalSize() - { - long size = 0; - for (Pair<Long, Long> section : sections) - size += section.right - section.left; - return size; - } - - /** - * Sequentially read bytes from the file and write them to the output stream - * - * @param proxy The file reader to read from - * @param validator validator to verify data integrity - * @param start The readd offset from the beginning of the {@code proxy} file. - * @param transferOffset number of bytes to skip transfer, but include for validation. - * @param toTransfer The number of bytes to be transferred. - * - * @return Number of bytes transferred. - * - * @throws java.io.IOException on any I/O error - */ - protected long write(ChannelProxy proxy, ChecksumValidator validator, DataOutputStreamPlus output, long start, int transferOffset, int toTransfer, int bufferSize) throws IOException - { - // the count of bytes to read off disk - int minReadable = (int) Math.min(bufferSize, proxy.size() - start); - - // this buffer will hold the data from disk. as it will be compressed on the fly by - // ByteBufCompressionDataOutputStreamPlus.write(ByteBuffer), we can release this buffer as soon as we can. - ByteBuffer buffer = ByteBuffer.allocateDirect(minReadable); - try - { - int readCount = proxy.read(buffer, start); - assert readCount == minReadable : String.format("could not read required number of bytes from file to be streamed: read %d bytes, wanted %d bytes", readCount, minReadable); - buffer.flip(); - - if (validator != null) - { - validator.validate(buffer); - buffer.flip(); - } - - buffer.position(transferOffset); - buffer.limit(transferOffset + (toTransfer - transferOffset)); - output.write(buffer); - } - finally - { - FileUtils.clean(buffer); - } - - return toTransfer; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/TableStreamManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/TableStreamManager.java b/src/java/org/apache/cassandra/streaming/TableStreamManager.java new file mode 100644 index 0000000..11512e9 --- /dev/null +++ b/src/java/org/apache/cassandra/streaming/TableStreamManager.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.streaming; + +import java.util.Collection; +import java.util.UUID; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; + +/** + * The main streaming hook for a storage implementation. + * + * From here, the streaming system can get instances of {@link StreamReceiver}, {@link IncomingStream}, + * and {@link OutgoingStream}, which expose the interfaces into the the underlying storage implementation + * needed to make streaming work. + */ +public interface TableStreamManager +{ + /** + * Creates a {@link StreamReceiver} for the given session, expecting the given number of streams + */ + StreamReceiver createStreamReceiver(StreamSession session, int totalStreams); + + /** + * Creates an {@link IncomingStream} for the given header + */ + IncomingStream prepareIncomingStream(StreamSession session, StreamMessageHeader header); + + /** + * Returns a collection of {@link OutgoingStream}s that contains the data selected by the + * given ranges, pendingRepair, and preview. + * + * There aren't any requirements on how data is divided between the outgoing streams + */ + Collection<OutgoingStream> createOutgoingStreams(StreamSession session, + Collection<Range<Token>> ranges, + UUID pendingRepair, + PreviewKind previewKind); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java index 20b7c87..bbc451d 100644 --- a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java +++ b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java @@ -55,28 +55,28 @@ import org.apache.cassandra.net.async.OutboundConnectionIdentifier; import org.apache.cassandra.streaming.StreamConnectionFactory; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.StreamingMessageSender; -import org.apache.cassandra.streaming.messages.IncomingFileMessage; +import org.apache.cassandra.streaming.messages.IncomingStreamMessage; import org.apache.cassandra.streaming.messages.KeepAliveMessage; -import org.apache.cassandra.streaming.messages.OutgoingFileMessage; +import org.apache.cassandra.streaming.messages.OutgoingStreamMessage; import org.apache.cassandra.streaming.messages.StreamInitMessage; import org.apache.cassandra.streaming.messages.StreamMessage; import org.apache.cassandra.utils.FBUtilities; /** * Responsible for sending {@link StreamMessage}s to a given peer. We manage an array of netty {@link Channel}s - * for sending {@link OutgoingFileMessage} instances; all other {@link StreamMessage} types are sent via + * for sending {@link OutgoingStreamMessage} instances; all other {@link StreamMessage} types are sent via * a special control channel. The reason for this is to treat those messages carefully and not let them get stuck - * behind a file transfer. + * behind a stream transfer. * - * One of the challenges when sending files is we might need to delay shipping the file if: + * One of the challenges when sending streams is we might need to delay shipping the stream if: * * - we've exceeded our network I/O use due to rate limiting (at the cassandra level) * - the receiver isn't keeping up, which causes the local TCP socket buffer to not empty, which causes epoll writes to not * move any bytes to the socket, which causes buffers to stick around in user-land (a/k/a cassandra) memory. * - * When those conditions occur, it's easy enough to reschedule processing the file once the resources pick up + * When those conditions occur, it's easy enough to reschedule processing the stream once the resources pick up * (we acquire the permits from the rate limiter, or the socket drains). However, we need to ensure that - * no other messages are submitted to the same channel while the current file is still being processed. + * no other messages are submitted to the same channel while the current stream is still being processed. */ public class NettyStreamingMessageSender implements StreamingMessageSender { @@ -97,8 +97,8 @@ public class NettyStreamingMessageSender implements StreamingMessageSender private volatile boolean closed; /** - * A special {@link Channel} for sending non-file streaming messages, basically anything that isn't an - * {@link OutgoingFileMessage} (or an {@link IncomingFileMessage}, but a node doesn't send that, it's only received). + * A special {@link Channel} for sending non-stream streaming messages, basically anything that isn't an + * {@link OutgoingStreamMessage} (or an {@link IncomingStreamMessage}, but a node doesn't send that, it's only received). */ private Channel controlMessageChannel; @@ -113,9 +113,9 @@ public class NettyStreamingMessageSender implements StreamingMessageSender private final ConcurrentMap<Thread, Channel> threadToChannelMap = new ConcurrentHashMap<>(); /** - * A netty channel attribute used to indicate if a channel is currently transferring a file. This is primarily used + * A netty channel attribute used to indicate if a channel is currently transferring a stream. This is primarily used * to indicate to the {@link KeepAliveTask} if it is safe to send a {@link KeepAliveMessage}, as sending the - * (application level) keep-alive in the middle of streaming a file would be bad news. + * (application level) keep-alive in the middle of a stream would be bad news. */ @VisibleForTesting static final AttributeKey<Boolean> TRANSFERRING_FILE_ATTR = AttributeKey.valueOf("transferringFile"); @@ -141,7 +141,6 @@ public class NettyStreamingMessageSender implements StreamingMessageSender session.sessionIndex(), session.planId(), session.streamOperation(), - session.keepSSTableLevel(), session.getPendingRepair(), session.getPreviewKind()); sendMessage(message); @@ -209,12 +208,12 @@ public class NettyStreamingMessageSender implements StreamingMessageSender if (closed) throw new RuntimeException("stream has been closed, cannot send " + message); - if (message instanceof OutgoingFileMessage) + if (message instanceof OutgoingStreamMessage) { if (isPreview) - throw new RuntimeException("Cannot send file messages for preview streaming sessions"); + throw new RuntimeException("Cannot send stream data messages for preview streaming sessions"); logger.debug("{} Sending {}", createLogTag(session, null), message); - fileTransferExecutor.submit(new FileStreamTask((OutgoingFileMessage)message)); + fileTransferExecutor.submit(new FileStreamTask((OutgoingStreamMessage)message)); return; } @@ -271,7 +270,7 @@ public class NettyStreamingMessageSender implements StreamingMessageSender return null; Channel channel = channelFuture.channel(); - logger.error("{} failed to send a stream message/file to peer {}: msg = {}", + logger.error("{} failed to send a stream message/data to peer {}: msg = {}", createLogTag(session, channel), connectionId, msg, future.cause()); // StreamSession will invoke close(), but we have to mark this sender as closed so the session doesn't try @@ -288,12 +287,12 @@ public class NettyStreamingMessageSender implements StreamingMessageSender private static final int SEMAPHORE_UNAVAILABLE_LOG_INTERVAL = 3; /** - * Even though we expect only an {@link OutgoingFileMessage} at runtime, the type here is {@link StreamMessage} + * Even though we expect only an {@link OutgoingStreamMessage} at runtime, the type here is {@link StreamMessage} * to facilitate simpler testing. */ private final StreamMessage msg; - FileStreamTask(OutgoingFileMessage ofm) + FileStreamTask(OutgoingStreamMessage ofm) { this.msg = ofm; } @@ -357,9 +356,9 @@ public class NettyStreamingMessageSender implements StreamingMessageSender if (now - timeOfLastLogging > logIntervalNanos) { timeOfLastLogging = now; - OutgoingFileMessage ofm = (OutgoingFileMessage)msg; - logger.info("{} waiting to acquire a permit to begin streaming file {}. This message logs every {} minutes", - createLogTag(session, null), ofm.getFilename(), logInterval); + OutgoingStreamMessage ofm = (OutgoingStreamMessage)msg; + logger.info("{} waiting to acquire a permit to begin streaming {}. This message logs every {} minutes", + createLogTag(session, null), ofm.getName(), logInterval); } } catch (InterruptedException ie) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java index 907572b..03f0640 100644 --- a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java +++ b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java @@ -20,7 +20,6 @@ package org.apache.cassandra.streaming.async; import java.io.EOFException; import java.io.IOException; -import java.net.InetAddress; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -43,8 +42,8 @@ import org.apache.cassandra.streaming.StreamManager; import org.apache.cassandra.streaming.StreamReceiveException; import org.apache.cassandra.streaming.StreamResultFuture; import org.apache.cassandra.streaming.StreamSession; -import org.apache.cassandra.streaming.messages.FileMessageHeader; -import org.apache.cassandra.streaming.messages.IncomingFileMessage; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.streaming.messages.IncomingStreamMessage; import org.apache.cassandra.streaming.messages.KeepAliveMessage; import org.apache.cassandra.streaming.messages.StreamInitMessage; import org.apache.cassandra.streaming.messages.StreamMessage; @@ -53,8 +52,8 @@ import org.apache.cassandra.utils.JVMStabilityInspector; import static org.apache.cassandra.streaming.async.NettyStreamingMessageSender.createLogTag; /** - * Handles the inbound side of streaming messages and sstable data. From the incoming data, we derserialize the message - * and potentially reify partitions and rows and write those out to new sstable files. Because deserialization is a blocking affair, + * Handles the inbound side of streaming messages and stream data. From the incoming data, we derserialize the message + * including the actual stream data itself. Because the reading and deserialization of streams is a blocking affair, * we can't block the netty event loop. Thus we have a background thread perform all the blocking deserialization. */ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter @@ -128,7 +127,7 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter if (cause instanceof IOException) logger.trace("connection problem while streaming", cause); else - logger.warn("exception occurred while in processing streaming file", cause); + logger.warn("exception occurred while in processing streaming data", cause); close(); } @@ -224,20 +223,20 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter StreamSession deriveSession(StreamMessage message) throws IOException { StreamSession streamSession = null; - // StreamInitMessage starts a new channel, and IncomingFileMessage potentially, as well. - // IncomingFileMessage needs a session to be established a priori, though + // StreamInitMessage starts a new channel, and IncomingStreamMessage potentially, as well. + // IncomingStreamMessage needs a session to be established a priori, though if (message instanceof StreamInitMessage) { assert session == null : "initiator of stream session received a StreamInitMessage"; StreamInitMessage init = (StreamInitMessage) message; - StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.streamOperation, init.from, channel, init.keepSSTableLevel, init.pendingRepair, init.previewKind); + StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.streamOperation, init.from, channel, init.pendingRepair, init.previewKind); streamSession = sessionProvider.apply(new SessionIdentifier(init.from, init.planId, init.sessionIndex)); } - else if (message instanceof IncomingFileMessage) + else if (message instanceof IncomingStreamMessage) { - // TODO: it'd be great to check if the session actually exists before slurping in the entire sstable, + // TODO: it'd be great to check if the session actually exists before slurping in the entire stream, // but that's a refactoring for another day - FileMessageHeader header = ((IncomingFileMessage) message).header; + StreamMessageHeader header = ((IncomingStreamMessage) message).header; streamSession = sessionProvider.apply(new SessionIdentifier(header.sender, header.planId, header.sessionIndex)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/async/package-info.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/async/package-info.java b/src/java/org/apache/cassandra/streaming/async/package-info.java index ecf5115..9455c7c 100644 --- a/src/java/org/apache/cassandra/streaming/async/package-info.java +++ b/src/java/org/apache/cassandra/streaming/async/package-info.java @@ -18,14 +18,14 @@ /** * <h1>Non-blocking streaming with netty</h1> - * This document describes the implementation details of streaming protocol. A listener for a streaming + * This document describes the implementation details of the streaming protocol. A listener for a streaming * session listens on the same socket as internode messaging, and participates in the same handshake protocol * That protocol is described in the package-level documentation for {@link org.apache.cassandra.net.async}, and * thus not here. * * Streaming 2.0 was implemented as CASSANDRA-5286. Streaming 2.0 used (the equivalent of) a single thread and * a single socket to transfer sstables sequentially to a peer (either as part of a repair, bootstrap, and so on). - * Part of the motivation for switching to netty and a non-blocking model as to enable file transfers to occur + * Part of the motivation for switching to netty and a non-blocking model as to enable stream transfers to occur * in parallel for a given session. * * Thus, a more detailed approach is required for stream session management. @@ -34,38 +34,6 @@ * * The full details of the session lifecycle are documented in {@link org.apache.cassandra.streaming.StreamSession}. * - * - * <h2>File transfer</h2> - * - * When tranferring whole or subsections of an sstable, only the DATA component is shipped. To that end, - * there are three "modes" of an sstable transfer that need to be handled somewhat differently: - * - * 1) uncompressed sstable - data needs to be read into user space so it can be manipulated: checksum validation, - * apply stream compression (see next section), and/or TLS encryption. - * - * 2) compressed sstable, transferred with SSL/TLS - data needs to be read into user space as that is where the TLS encryption - * needs to happen. Netty does not allow the pretense of doing zero-copy transfers when TLS is in the pipeline; - * data must explicitly be pulled into user-space memory for TLS encryption to work. - * - * 3) compressed sstable, transferred without SSL/TLS - data can be streamed via zero-copy transfer as the data does not - * need to be manipulated (it can be sent "as-is"). - * - * <h3>Compressing the data</h3> - * We always want to transfer as few bytes as possible of the wire when streaming a file. If the - * sstable is not already compressed via table compression options, we apply an on-the-fly stream compression - * to the data. The stream compression format is documented in - * {@link org.apache.cassandra.streaming.async.StreamCompressionSerializer} - * - * You may be wondering: why implement your own compression scheme? why not use netty's built-in compression codecs, - * like {@link io.netty.handler.codec.compression.Lz4FrameEncoder}? That makes complete sense if all the sstables - * to be streamed are non using sstable compression (and obviously you wouldn't use stream compression when the sstables - * are using sstable compression). The problem is when you have a mix of files, some using sstable compression - * and some not. You can either: - * - * - send the files of one type over one kind of socket, and the others over another socket - * - send them both over the same socket, but then auto-adjust per each file type. - * - * I've opted for the latter to keep socket/channel management simpler and cleaner. */ package org.apache.cassandra.streaming.async; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java deleted file mode 100644 index 290dd9e..0000000 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java +++ /dev/null @@ -1,285 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.streaming.compress; - -import java.io.EOFException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; -import java.util.Iterator; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ThreadLocalRandom; -import java.util.function.DoubleSupplier; - -import com.google.common.collect.Iterators; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.netty.util.concurrent.FastThreadLocalThread; -import org.apache.cassandra.io.compress.CompressionMetadata; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.io.util.RebufferingInputStream; -import org.apache.cassandra.streaming.StreamReader.StreamDeserializer; -import org.apache.cassandra.utils.ChecksumType; -import org.apache.cassandra.utils.WrappedRunnable; - -/** - * InputStream which reads data from underlining source with given {@link CompressionInfo}. Uses {@link #buffer} as a buffer - * for uncompressed data (which is read by stream consumers - {@link StreamDeserializer} in this case). - */ -public class CompressedInputStream extends RebufferingInputStream implements AutoCloseable -{ - - private static final Logger logger = LoggerFactory.getLogger(CompressedInputStream.class); - - private final CompressionInfo info; - // chunk buffer - private final BlockingQueue<ByteBuffer> dataBuffer; - private final DoubleSupplier crcCheckChanceSupplier; - - /** - * The base offset of the current {@link #buffer} from the beginning of the stream. - */ - private long bufferOffset = 0; - - /** - * The current {@link CompressedStreamReader#sections} offset in the stream. - */ - private long current = 0; - - private final ChecksumType checksumType; - - private static final int CHECKSUM_LENGTH = 4; - - /** - * Indicates there was a problem when reading from source stream. - * When this is added to the <code>dataBuffer</code> by the stream Reader, - * it is expected that the <code>readException</code> variable is populated - * with the cause of the error when reading from source stream, so it is - * thrown to the consumer on subsequent read operation. - */ - private static final ByteBuffer POISON_PILL = ByteBuffer.wrap(new byte[0]); - - private volatile IOException readException = null; - - private long totalCompressedBytesRead; - - /** - * @param source Input source to read compressed data from - * @param info Compression info - */ - public CompressedInputStream(DataInputPlus source, CompressionInfo info, ChecksumType checksumType, DoubleSupplier crcCheckChanceSupplier) - { - super(ByteBuffer.allocateDirect(info.parameters.chunkLength())); - buffer.limit(buffer.position()); // force the buffer to appear "consumed" so that it triggers reBuffer on the first read - this.info = info; - this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024)); - this.crcCheckChanceSupplier = crcCheckChanceSupplier; - this.checksumType = checksumType; - - new FastThreadLocalThread(new Reader(source, info, dataBuffer)).start(); - } - - /** - * Invoked when crossing into the next stream boundary in {@link CompressedStreamReader#sections}. - */ - public void position(long position) throws IOException - { - if (readException != null) - throw readException; - - assert position >= current : "stream can only read forward."; - current = position; - - if (current > bufferOffset + buffer.limit()) - reBuffer(false); - - buffer.position((int)(current - bufferOffset)); - } - - protected void reBuffer() throws IOException - { - reBuffer(true); - } - - private void reBuffer(boolean updateCurrent) throws IOException - { - if (readException != null) - { - FileUtils.clean(buffer); - buffer = null; - throw readException; - } - - // increment the offset into the stream based on the current buffer's read count - if (updateCurrent) - current += buffer.position(); - - try - { - ByteBuffer compressedWithCRC = dataBuffer.take(); - if (compressedWithCRC == POISON_PILL) - { - assert readException != null; - throw readException; - } - - decompress(compressedWithCRC); - } - catch (InterruptedException e) - { - throw new EOFException("No chunk available"); - } - } - - private void decompress(ByteBuffer compressed) throws IOException - { - int length = compressed.remaining(); - - // uncompress if the buffer size is less than the max chunk size. else, if the buffer size is greater than or equal to the maxCompressedLength, - // we assume the buffer is not compressed. see CASSANDRA-10520 - final boolean releaseCompressedBuffer; - if (length - CHECKSUM_LENGTH < info.parameters.maxCompressedLength()) - { - buffer.clear(); - compressed.limit(length - CHECKSUM_LENGTH); - info.parameters.getSstableCompressor().uncompress(compressed, buffer); - buffer.flip(); - releaseCompressedBuffer = true; - } - else - { - FileUtils.clean(buffer); - buffer = compressed; - buffer.limit(length - CHECKSUM_LENGTH); - releaseCompressedBuffer = false; - } - totalCompressedBytesRead += length; - - // validate crc randomly - double crcCheckChance = this.crcCheckChanceSupplier.getAsDouble(); - if (crcCheckChance >= 1d || - (crcCheckChance > 0d && crcCheckChance > ThreadLocalRandom.current().nextDouble())) - { - ByteBuffer crcBuf = compressed.duplicate(); - crcBuf.limit(length - CHECKSUM_LENGTH).position(0); - int checksum = (int) checksumType.of(crcBuf); - - crcBuf.limit(length); - if (crcBuf.getInt() != checksum) - throw new IOException("CRC unmatched"); - } - - if (releaseCompressedBuffer) - FileUtils.clean(compressed); - - // buffer offset is always aligned - final int compressedChunkLength = info.parameters.chunkLength(); - bufferOffset = current & ~(compressedChunkLength - 1); - } - - public long getTotalCompressedBytesRead() - { - return totalCompressedBytesRead; - } - - /** - * {@inheritDoc} - * - * Releases the resources specific to this instance, but not the {@link DataInputPlus} that is used by the {@link Reader}. - */ - @Override - public void close() - { - if (buffer != null) - { - FileUtils.clean(buffer); - buffer = null; - } - } - - class Reader extends WrappedRunnable - { - private final DataInputPlus source; - private final Iterator<CompressionMetadata.Chunk> chunks; - private final BlockingQueue<ByteBuffer> dataBuffer; - - Reader(DataInputPlus source, CompressionInfo info, BlockingQueue<ByteBuffer> dataBuffer) - { - this.source = source; - this.chunks = Iterators.forArray(info.chunks); - this.dataBuffer = dataBuffer; - } - - protected void runMayThrow() throws Exception - { - byte[] tmp = null; - while (chunks.hasNext()) - { - CompressionMetadata.Chunk chunk = chunks.next(); - - int readLength = chunk.length + 4; // read with CRC - ByteBuffer compressedWithCRC = null; - try - { - final int r; - if (source instanceof ReadableByteChannel) - { - compressedWithCRC = ByteBuffer.allocateDirect(readLength); - r = ((ReadableByteChannel)source).read(compressedWithCRC); - compressedWithCRC.flip(); - } - else - { - // read into an on-heap araay, then copy over to an off-heap buffer. at a minumum snappy requires - // off-heap buffers for decompression, else we could have just wrapped the plain byte array in a ByteBuffer - if (tmp == null || tmp.length < info.parameters.chunkLength() + CHECKSUM_LENGTH) - tmp = new byte[info.parameters.chunkLength() + CHECKSUM_LENGTH]; - source.readFully(tmp, 0, readLength); - compressedWithCRC = ByteBuffer.allocateDirect(readLength); - compressedWithCRC.put(tmp, 0, readLength); - compressedWithCRC.position(0); - r = readLength; - } - - if (r < 0) - { - FileUtils.clean(compressedWithCRC); - readException = new EOFException("No chunk available"); - dataBuffer.put(POISON_PILL); - return; // throw exception where we consume dataBuffer - } - } - catch (IOException e) - { - if (!(e instanceof EOFException)) - logger.warn("Error while reading compressed input stream.", e); - if (compressedWithCRC != null) - FileUtils.clean(compressedWithCRC); - - readException = e; - dataBuffer.put(POISON_PILL); - return; // throw exception where we consume dataBuffer - } - dataBuffer.put(compressedWithCRC); - } - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java deleted file mode 100644 index bd44209..0000000 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.streaming.compress; - -import java.io.IOException; - -import com.google.common.base.Throwables; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.io.compress.CompressionMetadata; -import org.apache.cassandra.io.sstable.SSTableMultiWriter; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.TrackedDataInputPlus; -import org.apache.cassandra.streaming.ProgressInfo; -import org.apache.cassandra.streaming.StreamReader; -import org.apache.cassandra.streaming.StreamSession; -import org.apache.cassandra.streaming.messages.FileMessageHeader; -import org.apache.cassandra.utils.ChecksumType; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; - -import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause; - -/** - * StreamReader that reads from streamed compressed SSTable - */ -public class CompressedStreamReader extends StreamReader -{ - private static final Logger logger = LoggerFactory.getLogger(CompressedStreamReader.class); - - protected final CompressionInfo compressionInfo; - - public CompressedStreamReader(FileMessageHeader header, StreamSession session) - { - super(header, session); - this.compressionInfo = header.compressionInfo; - } - - /** - * @return SSTable transferred - * @throws java.io.IOException if reading the remote sstable fails. Will throw an RTE if local write fails. - */ - @Override - @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed - public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException - { - long totalSize = totalSize(); - - ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); - - if (cfs == null) - { - // schema was dropped during streaming - throw new IOException("CF " + tableId + " was dropped during streaming"); - } - - logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', pendingRepair = '{}', table = '{}'.", - session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), pendingRepair, - cfs.getTableName()); - - StreamDeserializer deserializer = null; - SSTableMultiWriter writer = null; - try (CompressedInputStream cis = new CompressedInputStream(inputPlus, compressionInfo, ChecksumType.CRC32, cfs::getCrcCheckChance)) - { - TrackedDataInputPlus in = new TrackedDataInputPlus(cis); - deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata())); - writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, format); - String filename = writer.getFilename(); - int sectionIdx = 0; - for (Pair<Long, Long> section : sections) - { - assert cis.getTotalCompressedBytesRead() <= totalSize; - long sectionLength = section.right - section.left; - - logger.trace("[Stream #{}] Reading section {} with length {} from stream.", session.planId(), sectionIdx++, sectionLength); - // skip to beginning of section inside chunk - cis.position(section.left); - in.reset(0); - - while (in.getBytesRead() < sectionLength) - { - writePartition(deserializer, writer); - // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred - session.progress(filename, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize); - } - } - logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", session.planId(), fileSeqNum, - session.peer, FBUtilities.prettyPrintMemory(cis.getTotalCompressedBytesRead()), FBUtilities.prettyPrintMemory(totalSize)); - return writer; - } - catch (Throwable e) - { - Object partitionKey = deserializer != null ? deserializer.partitionKey() : ""; - logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", - session.planId(), partitionKey, cfs.keyspace.getName(), cfs.getTableName()); - if (writer != null) - { - writer.abort(e); - } - if (extractIOExceptionCause(e).isPresent()) - throw e; - throw Throwables.propagate(e); - } - } - - @Override - protected long totalSize() - { - long size = 0; - // calculate total length of transferring chunks - for (CompressionMetadata.Chunk chunk : compressionInfo.chunks) - size += chunk.length + 4; // 4 bytes for CRC - return size; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java deleted file mode 100644 index 0e78b7d..0000000 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.streaming.compress; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.io.compress.CompressionMetadata; -import org.apache.cassandra.io.sstable.Component; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.util.ChannelProxy; -import org.apache.cassandra.io.util.DataOutputStreamPlus; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; -import org.apache.cassandra.streaming.ProgressInfo; -import org.apache.cassandra.streaming.StreamSession; -import org.apache.cassandra.streaming.StreamWriter; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; - -/** - * StreamWriter for compressed SSTable. - */ -public class CompressedStreamWriter extends StreamWriter -{ - private static final int CHUNK_SIZE = 1 << 16; - - private static final Logger logger = LoggerFactory.getLogger(CompressedStreamWriter.class); - - private final CompressionInfo compressionInfo; - - public CompressedStreamWriter(SSTableReader sstable, Collection<Pair<Long, Long>> sections, CompressionInfo compressionInfo, StreamSession session) - { - super(sstable, sections, session); - this.compressionInfo = compressionInfo; - } - - @Override - public void write(DataOutputStreamPlus out) throws IOException - { - assert out instanceof ByteBufDataOutputStreamPlus; - ByteBufDataOutputStreamPlus output = (ByteBufDataOutputStreamPlus)out; - long totalSize = totalSize(); - logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(), - sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); - try (ChannelProxy fc = sstable.getDataChannel().sharedCopy()) - { - long progress = 0L; - // calculate chunks to transfer. we want to send continuous chunks altogether. - List<Pair<Long, Long>> sections = getTransferSections(compressionInfo.chunks); - - int sectionIdx = 0; - - // stream each of the required sections of the file - for (final Pair<Long, Long> section : sections) - { - // length of the section to stream - long length = section.right - section.left; - - logger.trace("[Stream #{}] Writing section {} with length {} to stream.", session.planId(), sectionIdx++, length); - - // tracks write progress - long bytesTransferred = 0; - while (bytesTransferred < length) - { - final int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred); - limiter.acquire(toTransfer); - - ByteBuffer outBuffer = ByteBuffer.allocateDirect(toTransfer); - long lastWrite; - try - { - lastWrite = fc.read(outBuffer, section.left + bytesTransferred); - assert lastWrite == toTransfer : String.format("could not read required number of bytes from file to be streamed: read %d bytes, wanted %d bytes", lastWrite, toTransfer); - outBuffer.flip(); - output.writeToChannel(outBuffer); - } - catch (IOException e) - { - FileUtils.clean(outBuffer); - throw e; - } - - bytesTransferred += lastWrite; - progress += lastWrite; - session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize); - } - } - logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}", - session.planId(), sstable.getFilename(), session.peer, FBUtilities.prettyPrintMemory(progress), FBUtilities.prettyPrintMemory(totalSize)); - } - } - - @Override - protected long totalSize() - { - long size = 0; - // calculate total length of transferring chunks - for (CompressionMetadata.Chunk chunk : compressionInfo.chunks) - size += chunk.length + 4; // 4 bytes for CRC - return size; - } - - // chunks are assumed to be sorted by offset - private List<Pair<Long, Long>> getTransferSections(CompressionMetadata.Chunk[] chunks) - { - List<Pair<Long, Long>> transferSections = new ArrayList<>(); - Pair<Long, Long> lastSection = null; - for (CompressionMetadata.Chunk chunk : chunks) - { - if (lastSection != null) - { - if (chunk.offset == lastSection.right) - { - // extend previous section to end of this chunk - lastSection = Pair.create(lastSection.left, chunk.offset + chunk.length + 4); // 4 bytes for CRC - } - else - { - transferSections.add(lastSection); - lastSection = Pair.create(chunk.offset, chunk.offset + chunk.length + 4); - } - } - else - { - lastSection = Pair.create(chunk.offset, chunk.offset + chunk.length + 4); - } - } - if (lastSection != null) - transferSections.add(lastSection); - return transferSections; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/compress/CompressionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressionInfo.java b/src/java/org/apache/cassandra/streaming/compress/CompressionInfo.java deleted file mode 100644 index bd0c2d5..0000000 --- a/src/java/org/apache/cassandra/streaming/compress/CompressionInfo.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.streaming.compress; - -import java.io.IOException; - -import org.apache.cassandra.db.TypeSizes; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.compress.CompressionMetadata; -import org.apache.cassandra.schema.CompressionParams; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputPlus; - -/** - * Container that carries compression parameters and chunks to decompress data from stream. - */ -public class CompressionInfo -{ - public static final IVersionedSerializer<CompressionInfo> serializer = new CompressionInfoSerializer(); - - public final CompressionMetadata.Chunk[] chunks; - public final CompressionParams parameters; - - public CompressionInfo(CompressionMetadata.Chunk[] chunks, CompressionParams parameters) - { - assert chunks != null && parameters != null; - this.chunks = chunks; - this.parameters = parameters; - } - - static class CompressionInfoSerializer implements IVersionedSerializer<CompressionInfo> - { - public void serialize(CompressionInfo info, DataOutputPlus out, int version) throws IOException - { - if (info == null) - { - out.writeInt(-1); - return; - } - - int chunkCount = info.chunks.length; - out.writeInt(chunkCount); - for (int i = 0; i < chunkCount; i++) - CompressionMetadata.Chunk.serializer.serialize(info.chunks[i], out, version); - // compression params - CompressionParams.serializer.serialize(info.parameters, out, version); - } - - public CompressionInfo deserialize(DataInputPlus in, int version) throws IOException - { - // chunks - int chunkCount = in.readInt(); - if (chunkCount < 0) - return null; - - CompressionMetadata.Chunk[] chunks = new CompressionMetadata.Chunk[chunkCount]; - for (int i = 0; i < chunkCount; i++) - chunks[i] = CompressionMetadata.Chunk.serializer.deserialize(in, version); - - // compression params - CompressionParams parameters = CompressionParams.serializer.deserialize(in, version); - return new CompressionInfo(chunks, parameters); - } - - public long serializedSize(CompressionInfo info, int version) - { - if (info == null) - return TypeSizes.sizeof(-1); - - // chunks - int chunkCount = info.chunks.length; - long size = TypeSizes.sizeof(chunkCount); - for (int i = 0; i < chunkCount; i++) - size += CompressionMetadata.Chunk.serializer.serializedSize(info.chunks[i], version); - // compression params - size += CompressionParams.serializer.serializedSize(info.parameters, version); - return size; - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java index 964fe10..a1fa19f 100644 --- a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java +++ b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java @@ -42,7 +42,7 @@ public class ProgressInfoCompositeData "Session peer", "Session peer storage port", "Index of session", - "Name of the file", + "Name of the stream", "Direction('IN' or 'OUT')", "Current bytes transferred", "Total bytes to transfer"}; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
