Abstract streaming for pluggable storage Patch by Blake Eggleston; Reviewed by Jason Brown for CASSANDRA-14115
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9714a7c8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9714a7c8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9714a7c8 Branch: refs/heads/trunk Commit: 9714a7c817b64a3358f69e536535c756c5c6df48 Parents: 253c003 Author: Blake Eggleston <bdeggles...@gmail.com> Authored: Sun Mar 4 14:44:30 2018 -0800 Committer: Blake Eggleston <bdeggles...@gmail.com> Committed: Thu Mar 15 16:40:05 2018 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 10 + .../db/streaming/CassandraIncomingFile.java | 117 +++++++ .../db/streaming/CassandraOutgoingFile.java | 151 +++++++++ .../db/streaming/CassandraStreamHeader.java | 212 +++++++++++++ .../db/streaming/CassandraStreamManager.java | 166 ++++++++++ .../db/streaming/CassandraStreamReader.java | 285 +++++++++++++++++ .../db/streaming/CassandraStreamReceiver.java | 248 +++++++++++++++ .../db/streaming/CassandraStreamWriter.java | 176 +++++++++++ .../CompressedCassandraStreamReader.java | 131 ++++++++ .../CompressedCassandraStreamWriter.java | 153 ++++++++++ .../db/streaming/CompressedInputStream.java | 285 +++++++++++++++++ .../cassandra/db/streaming/CompressionInfo.java | 110 +++++++ .../cassandra/db/streaming/package-info.java | 53 ++++ .../org/apache/cassandra/dht/RangeStreamer.java | 4 +- .../cassandra/io/sstable/SSTableLoader.java | 18 +- .../repair/AsymmetricLocalSyncTask.java | 1 - .../apache/cassandra/repair/LocalSyncTask.java | 2 +- .../cassandra/repair/StreamingRepairTask.java | 2 +- .../cassandra/streaming/IncomingStream.java | 45 +++ .../cassandra/streaming/OutgoingStream.java | 52 ++++ .../apache/cassandra/streaming/PreviewKind.java | 21 +- .../cassandra/streaming/ProgressInfo.java | 4 +- .../apache/cassandra/streaming/SessionInfo.java | 8 +- .../cassandra/streaming/StreamCoordinator.java | 40 ++- .../apache/cassandra/streaming/StreamHook.java | 14 +- .../apache/cassandra/streaming/StreamPlan.java | 20 +- .../cassandra/streaming/StreamReader.java | 278 ----------------- .../cassandra/streaming/StreamReceiveTask.java | 211 ++----------- .../cassandra/streaming/StreamReceiver.java | 58 ++++ .../cassandra/streaming/StreamResultFuture.java | 7 +- .../cassandra/streaming/StreamSession.java | 199 ++++-------- .../cassandra/streaming/StreamTransferTask.java | 58 ++-- .../cassandra/streaming/StreamWriter.java | 173 ----------- .../cassandra/streaming/TableStreamManager.java | 57 ++++ .../async/NettyStreamingMessageSender.java | 41 ++- .../async/StreamingInboundHandler.java | 23 +- .../cassandra/streaming/async/package-info.java | 36 +-- .../compress/CompressedInputStream.java | 285 ----------------- .../compress/CompressedStreamReader.java | 132 -------- .../compress/CompressedStreamWriter.java | 154 ---------- .../streaming/compress/CompressionInfo.java | 95 ------ .../management/ProgressInfoCompositeData.java | 2 +- .../streaming/messages/FileMessageHeader.java | 304 ------------------- .../streaming/messages/IncomingFileMessage.java | 95 ------ .../messages/IncomingStreamMessage.java | 107 +++++++ .../streaming/messages/OutgoingFileMessage.java | 158 ---------- .../messages/OutgoingStreamMessage.java | 138 +++++++++ .../streaming/messages/StreamInitMessage.java | 9 +- .../streaming/messages/StreamMessage.java | 2 +- .../streaming/messages/StreamMessageHeader.java | 144 +++++++++ .../db/streaming/CassandraStreamHeaderTest.java | 50 +++ .../streaming/CassandraStreamManagerTest.java | 237 +++++++++++++++ .../cassandra/dht/StreamStateStoreTest.java | 5 +- .../cassandra/io/sstable/LegacySSTableTest.java | 14 +- .../io/sstable/SSTableRewriterTest.java | 59 ---- .../serializers/SerializationUtils.java | 68 +++++ .../cassandra/streaming/StreamSessionTest.java | 132 -------- .../streaming/StreamTransferTaskTest.java | 19 +- .../streaming/StreamingTransferTest.java | 27 +- .../async/NettyStreamingMessageSenderTest.java | 5 +- .../async/StreamingInboundHandlerTest.java | 23 +- .../compression/CompressedInputStreamTest.java | 4 +- 63 files changed, 3314 insertions(+), 2424 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 42add69..9458f19 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Abstract streaming for pluggable storage (CASSANDRA-14115) * Forced incremental repairs should promote sstables if they can (CASSANDRA-14294) * Use Murmur3 for validation compactions (CASSANDRA-14002) * Comma at the end of the seed list is interpretated as localhost (CASSANDRA-14285) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 007bf2d..1be0083 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -50,6 +50,7 @@ import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.compaction.*; import org.apache.cassandra.db.filter.ClusteringIndexFilter; import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.streaming.CassandraStreamManager; import org.apache.cassandra.db.view.TableViews; import org.apache.cassandra.db.lifecycle.*; import org.apache.cassandra.db.partitions.CachedPartition; @@ -78,6 +79,7 @@ import org.apache.cassandra.schema.*; import org.apache.cassandra.schema.CompactionParams.TombstoneOption; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.streaming.TableStreamManager; import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.TopKSampler.SamplerResult; import org.apache.cassandra.utils.concurrent.OpOrder; @@ -213,6 +215,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public volatile long sampleLatencyNanos; private final ScheduledFuture<?> latencyCalculator; + private final CassandraStreamManager streamManager; + private volatile boolean compactionSpaceCheck = true; @VisibleForTesting @@ -465,6 +469,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean mbeanName = null; oldMBeanName= null; } + streamManager = new CassandraStreamManager(this); + } + + public TableStreamManager getStreamManager() + { + return streamManager; } public TableMetadata metadata() http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java new file mode 100644 index 0000000..16698e5 --- /dev/null +++ b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.util.Objects; + +import com.google.common.base.Preconditions; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.IncomingStream; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; + +/** + * used to receive the part(or whole) of a SSTable data file. + * + * This class deserializes the data stream into partitions and rows, and writes that out as an sstable + */ +public class CassandraIncomingFile implements IncomingStream +{ + private final ColumnFamilyStore cfs; + private final StreamSession session; + private final StreamMessageHeader header; + + private volatile SSTableMultiWriter sstable; + private volatile long size = -1; + + public CassandraIncomingFile(ColumnFamilyStore cfs, StreamSession session, StreamMessageHeader header) + { + this.cfs = cfs; + this.session = session; + this.header = header; + } + + @Override + public synchronized void read(DataInputPlus in, int version) throws IOException + { + CassandraStreamHeader streamHeader = CassandraStreamHeader.serializer.deserialize(in, version); + CassandraStreamReader reader = !streamHeader.isCompressed() + ? new CassandraStreamReader(header, streamHeader, session) + : new CompressedCassandraStreamReader(header, streamHeader, session); + size = streamHeader.size(); + sstable = reader.read(in); + } + + @Override + public synchronized String getName() + { + return sstable == null ? "null" : sstable.getFilename(); + } + + @Override + public synchronized long getSize() + { + Preconditions.checkState(size > 0, "Stream hasn't been read yet"); + return size; + } + + @Override + public TableId getTableId() + { + Preconditions.checkState(sstable != null, "Stream hasn't been read yet"); + return sstable.getTableId(); + } + + @Override + public String toString() + { + SSTableMultiWriter sst = sstable; + return "CassandraIncomingFile{" + + "sstable=" + (sst == null ? "null" : sst.getFilename()) + + '}'; + } + + public SSTableMultiWriter getSSTable() + { + Preconditions.checkState(sstable != null, "Stream hasn't been read yet"); + return sstable; + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CassandraIncomingFile that = (CassandraIncomingFile) o; + return Objects.equals(cfs, that.cfs) && + Objects.equals(session, that.session) && + Objects.equals(header, that.header) && + Objects.equals(sstable, that.sstable); + } + + public int hashCode() + { + + return Objects.hash(cfs, session, header, sstable); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java new file mode 100644 index 0000000..3fd3f9d --- /dev/null +++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.UUID; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.OutgoingStream; +import org.apache.cassandra.streaming.StreamOperation; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.concurrent.Ref; + +/** + * used to transfer the part(or whole) of a SSTable data file + */ +public class CassandraOutgoingFile implements OutgoingStream +{ + private final Ref<SSTableReader> ref; + private final long estimatedKeys; + private final List<Pair<Long, Long>> sections; + private final String filename; + private final CassandraStreamHeader header; + private final boolean keepSSTableLevel; + + public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> ref, List<Pair<Long, Long>> sections, long estimatedKeys) + { + Preconditions.checkNotNull(ref.get()); + this.ref = ref; + this.estimatedKeys = estimatedKeys; + this.sections = sections; + this.filename = ref.get().getFilename(); + + SSTableReader sstable = ref.get(); + keepSSTableLevel = operation == StreamOperation.BOOTSTRAP || operation == StreamOperation.REBUILD; + this.header = new CassandraStreamHeader(sstable.descriptor.version, + sstable.descriptor.formatType, + estimatedKeys, + sections, + sstable.compression ? sstable.getCompressionMetadata() : null, + keepSSTableLevel ? sstable.getSSTableLevel() : 0, + sstable.header.toComponent()); + } + + public static CassandraOutgoingFile fromStream(OutgoingStream stream) + { + Preconditions.checkArgument(stream instanceof CassandraOutgoingFile); + return (CassandraOutgoingFile) stream; + } + + @VisibleForTesting + public Ref<SSTableReader> getRef() + { + return ref; + } + + @Override + public String getName() + { + return filename; + } + + @Override + public long getSize() + { + return header.size(); + } + + @Override + public TableId getTableId() + { + return ref.get().metadata().id; + } + + @Override + public long getRepairedAt() + { + return ref.get().getRepairedAt(); + } + + @Override + public UUID getPendingRepair() + { + return ref.get().getPendingRepair(); + } + + @Override + public void write(StreamSession session, DataOutputStreamPlus out, int version) throws IOException + { + SSTableReader sstable = ref.get(); + CassandraStreamHeader.serializer.serialize(header, out, version); + out.flush(); + + CassandraStreamWriter writer = header.compressionInfo == null ? + new CassandraStreamWriter(sstable, header.sections, session) : + new CompressedCassandraStreamWriter(sstable, header.sections, + header.compressionInfo, session); + writer.write(out); + } + + @Override + public void finish() + { + ref.release(); + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CassandraOutgoingFile that = (CassandraOutgoingFile) o; + return estimatedKeys == that.estimatedKeys && + Objects.equals(ref, that.ref) && + Objects.equals(sections, that.sections); + } + + public int hashCode() + { + return Objects.hash(ref, estimatedKeys, sections); + } + + @Override + public String toString() + { + return "CassandraOutgoingFile{" + ref.get().getFilename() + '}'; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java new file mode 100644 index 0000000..2603da1 --- /dev/null +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.Pair; + +public class CassandraStreamHeader +{ + /** SSTable version */ + public final Version version; + + /** SSTable format **/ + public final SSTableFormat.Type format; + public final long estimatedKeys; + public final List<Pair<Long, Long>> sections; + /** + * Compression info for SSTable to send. Can be null if SSTable is not compressed. + * On sender, this field is always null to avoid holding large number of Chunks. + * Use compressionMetadata instead. + */ + private final CompressionMetadata compressionMetadata; + public volatile CompressionInfo compressionInfo; + public final int sstableLevel; + public final SerializationHeader.Component header; + + /* cached size value */ + private transient final long size; + + private CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, List<Pair<Long, Long>> sections, CompressionMetadata compressionMetadata, CompressionInfo compressionInfo, int sstableLevel, SerializationHeader.Component header) + { + this.version = version; + this.format = format; + this.estimatedKeys = estimatedKeys; + this.sections = sections; + this.compressionMetadata = compressionMetadata; + this.compressionInfo = compressionInfo; + this.sstableLevel = sstableLevel; + this.header = header; + + this.size = calculateSize(); + } + + public CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, List<Pair<Long, Long>> sections, CompressionMetadata compressionMetadata, int sstableLevel, SerializationHeader.Component header) + { + this(version, format, estimatedKeys, sections, compressionMetadata, null, sstableLevel, header); + } + + public CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, List<Pair<Long, Long>> sections, CompressionInfo compressionInfo, int sstableLevel, SerializationHeader.Component header) + { + this(version, format, estimatedKeys, sections, null, compressionInfo, sstableLevel, header); + } + + public boolean isCompressed() + { + return compressionInfo != null; + } + + /** + * @return total file size to transfer in bytes + */ + public long size() + { + return size; + } + + private long calculateSize() + { + long transferSize = 0; + if (compressionInfo != null) + { + // calculate total length of transferring chunks + for (CompressionMetadata.Chunk chunk : compressionInfo.chunks) + transferSize += chunk.length + 4; // 4 bytes for CRC + } + else + { + for (Pair<Long, Long> section : sections) + transferSize += section.right - section.left; + } + return transferSize; + } + + public synchronized void calculateCompressionInfo() + { + if (compressionMetadata != null && compressionInfo == null) + { + compressionInfo = CompressionInfo.fromCompressionMetadata(compressionMetadata, sections); + } + } + + @Override + public String toString() + { + return "CassandraStreamHeader{" + + "version=" + version + + ", format=" + format + + ", estimatedKeys=" + estimatedKeys + + ", sections=" + sections + + ", compressionInfo=" + compressionInfo + + ", sstableLevel=" + sstableLevel + + ", header=" + header + + '}'; + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CassandraStreamHeader that = (CassandraStreamHeader) o; + return estimatedKeys == that.estimatedKeys && + sstableLevel == that.sstableLevel && + Objects.equals(version, that.version) && + format == that.format && + Objects.equals(sections, that.sections) && + Objects.equals(compressionInfo, that.compressionInfo) && + Objects.equals(header, that.header); + } + + public int hashCode() + { + return Objects.hash(version, format, estimatedKeys, sections, compressionInfo, sstableLevel, header); + } + + + public static final IVersionedSerializer<CassandraStreamHeader> serializer = new IVersionedSerializer<CassandraStreamHeader>() + { + public void serialize(CassandraStreamHeader header, DataOutputPlus out, int version) throws IOException + { + out.writeUTF(header.version.toString()); + out.writeUTF(header.format.name); + + out.writeLong(header.estimatedKeys); + out.writeInt(header.sections.size()); + for (Pair<Long, Long> section : header.sections) + { + out.writeLong(section.left); + out.writeLong(section.right); + } + header.calculateCompressionInfo(); + CompressionInfo.serializer.serialize(header.compressionInfo, out, version); + out.writeInt(header.sstableLevel); + SerializationHeader.serializer.serialize(header.version, header.header, out); + } + + public CassandraStreamHeader deserialize(DataInputPlus in, int version) throws IOException + { + Version sstableVersion = SSTableFormat.Type.current().info.getVersion(in.readUTF()); + SSTableFormat.Type format = SSTableFormat.Type.validate(in.readUTF()); + + long estimatedKeys = in.readLong(); + int count = in.readInt(); + List<Pair<Long, Long>> sections = new ArrayList<>(count); + for (int k = 0; k < count; k++) + sections.add(Pair.create(in.readLong(), in.readLong())); + CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, version); + int sstableLevel = in.readInt(); + SerializationHeader.Component header = SerializationHeader.serializer.deserialize(sstableVersion, in); + + return new CassandraStreamHeader(sstableVersion, format, estimatedKeys, sections, compressionInfo, sstableLevel, header); + } + + public long serializedSize(CassandraStreamHeader header, int version) + { + long size = 0; + size += TypeSizes.sizeof(header.version.toString()); + size += TypeSizes.sizeof(header.format.name); + size += TypeSizes.sizeof(header.estimatedKeys); + + size += TypeSizes.sizeof(header.sections.size()); + for (Pair<Long, Long> section : header.sections) + { + size += TypeSizes.sizeof(section.left); + size += TypeSizes.sizeof(section.right); + } + size += CompressionInfo.serializer.serializedSize(header.compressionInfo, version); + size += TypeSizes.sizeof(header.sstableLevel); + + size += SerializationHeader.serializer.serializedSize(header.version, header.header); + + return size; + } + }; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java new file mode 100644 index 0000000..466fa36 --- /dev/null +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.lifecycle.SSTableIntervalTree; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.streaming.IncomingStream; +import org.apache.cassandra.streaming.OutgoingStream; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.TableStreamManager; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.concurrent.Ref; +import org.apache.cassandra.utils.concurrent.Refs; + +/** + * Implements the streaming interface for the native cassandra storage engine. + * + * Handles the streaming a one or more section of one of more sstables to and from a specific + * remote node. The sending side performs a block-level transfer of the source stream, while the receiver + * must deserilaize that data stream into an partitions and rows, and then write that out as an sstable. + */ +public class CassandraStreamManager implements TableStreamManager +{ + private static final Logger logger = LoggerFactory.getLogger(CassandraStreamManager.class); + + private final ColumnFamilyStore cfs; + + public CassandraStreamManager(ColumnFamilyStore cfs) + { + this.cfs = cfs; + } + + @Override + public IncomingStream prepareIncomingStream(StreamSession session, StreamMessageHeader header) + { + return new CassandraIncomingFile(cfs, session, header); + } + + @Override + public StreamReceiver createStreamReceiver(StreamSession session, int totalStreams) + { + return new CassandraStreamReceiver(cfs, session, totalStreams); + } + + private static Predicate<SSTableReader> getPreviewPredicate(PreviewKind kind) + { + switch (kind) + { + case ALL: + return Predicates.alwaysTrue(); + case UNREPAIRED: + return Predicates.not(SSTableReader::isRepaired); + case REPAIRED: + return SSTableReader::isRepaired; + default: + throw new IllegalArgumentException("Unsupported kind: " + kind); + } + } + + @Override + public Collection<OutgoingStream> createOutgoingStreams(StreamSession session, Collection<Range<Token>> ranges, UUID pendingRepair, PreviewKind previewKind) + { + Refs<SSTableReader> refs = new Refs<>(); + try + { + final List<Range<PartitionPosition>> keyRanges = new ArrayList<>(ranges.size()); + for (Range<Token> range : ranges) + keyRanges.add(Range.makeRowRange(range)); + refs.addAll(cfs.selectAndReference(view -> { + Set<SSTableReader> sstables = Sets.newHashSet(); + SSTableIntervalTree intervalTree = SSTableIntervalTree.build(view.select(SSTableSet.CANONICAL)); + Predicate<SSTableReader> predicate; + if (previewKind.isPreview()) + { + predicate = getPreviewPredicate(previewKind); + } + else if (pendingRepair == ActiveRepairService.NO_PENDING_REPAIR) + { + predicate = Predicates.alwaysTrue(); + } + else + { + predicate = s -> s.isPendingRepair() && s.getSSTableMetadata().pendingRepair.equals(pendingRepair); + } + + for (Range<PartitionPosition> keyRange : keyRanges) + { + // keyRange excludes its start, while sstableInBounds is inclusive (of both start and end). + // This is fine however, because keyRange has been created from a token range through Range.makeRowRange (see above). + // And that later method uses the Token.maxKeyBound() method to creates the range, which return a "fake" key that + // sort after all keys having the token. That "fake" key cannot however be equal to any real key, so that even + // including keyRange.left will still exclude any key having the token of the original token range, and so we're + // still actually selecting what we wanted. + for (SSTableReader sstable : Iterables.filter(View.sstablesInBounds(keyRange.left, keyRange.right, intervalTree), predicate)) + { + sstables.add(sstable); + } + } + + if (logger.isDebugEnabled()) + logger.debug("ViewFilter for {}/{} sstables", sstables.size(), Iterables.size(view.select(SSTableSet.CANONICAL))); + return sstables; + }).refs); + + + List<OutgoingStream> streams = new ArrayList<>(refs.size()); + for (SSTableReader sstable: refs) + { + Ref<SSTableReader> ref = refs.get(sstable); + List<Pair<Long, Long>> sections = sstable.getPositionsForRanges(ranges); + if (sections.isEmpty()) + { + ref.release(); + continue; + } + streams.add(new CassandraOutgoingFile(session.getStreamOperation(), ref, sections, sstable.estimatedKeysForRanges(ranges))); + } + + return streams; + } + catch (Throwable t) + { + refs.release(); + throw t; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java new file mode 100644 index 0000000..26ef5ed --- /dev/null +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.streaming; + +import java.io.*; +import java.util.Collection; +import java.util.UUID; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.UnmodifiableIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.util.TrackedDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.SSTableSimpleIterator; +import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.compress.StreamCompressionInputStream; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.streaming.messages.StreamMessage; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; + +/** + * CassandraStreamReader reads from stream and writes to SSTable. + */ +public class CassandraStreamReader +{ + private static final Logger logger = LoggerFactory.getLogger(CassandraStreamReader.class); + protected final TableId tableId; + protected final long estimatedKeys; + protected final Collection<Pair<Long, Long>> sections; + protected final StreamSession session; + protected final Version inputVersion; + protected final long repairedAt; + protected final UUID pendingRepair; + protected final SSTableFormat.Type format; + protected final int sstableLevel; + protected final SerializationHeader.Component header; + protected final int fileSeqNum; + + public CassandraStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session) + { + if (session.getPendingRepair() != null) + { + // we should only ever be streaming pending repair + // sstables if the session has a pending repair id + assert session.getPendingRepair().equals(header.pendingRepair); + } + this.session = session; + this.tableId = header.tableId; + this.estimatedKeys = streamHeader.estimatedKeys; + this.sections = streamHeader.sections; + this.inputVersion = streamHeader.version; + this.repairedAt = header.repairedAt; + this.pendingRepair = header.pendingRepair; + this.format = streamHeader.format; + this.sstableLevel = streamHeader.sstableLevel; + this.header = streamHeader.header; + this.fileSeqNum = header.sequenceNumber; + } + + /** + * @param inputPlus where this reads data from + * @return SSTable transferred + * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ + @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed + public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException + { + long totalSize = totalSize(); + + ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); + if (cfs == null) + { + // schema was dropped during streaming + throw new IOException("CF " + tableId + " was dropped during streaming"); + } + + logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}', pendingRepair = '{}'.", + session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), + cfs.getTableName(), pendingRepair); + + StreamDeserializer deserializer = null; + SSTableMultiWriter writer = null; + try (StreamCompressionInputStream streamCompressionInputStream = new StreamCompressionInputStream(inputPlus, StreamMessage.CURRENT_VERSION)) + { + TrackedDataInputPlus in = new TrackedDataInputPlus(streamCompressionInputStream); + deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata())); + writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, format); + while (in.getBytesRead() < totalSize) + { + writePartition(deserializer, writer); + // TODO move this to BytesReadTracker + session.progress(writer.getFilename(), ProgressInfo.Direction.IN, in.getBytesRead(), totalSize); + } + logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", + session.planId(), fileSeqNum, session.peer, FBUtilities.prettyPrintMemory(in.getBytesRead()), FBUtilities.prettyPrintMemory(totalSize)); + return writer; + } + catch (Throwable e) + { + Object partitionKey = deserializer != null ? deserializer.partitionKey() : ""; + logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", + session.planId(), partitionKey, cfs.keyspace.getName(), cfs.getTableName(), e); + if (writer != null) + { + writer.abort(e); + } + throw Throwables.propagate(e); + } + } + + protected SerializationHeader getHeader(TableMetadata metadata) + { + return header != null? header.toHeader(metadata) : null; //pre-3.0 sstable have no SerializationHeader + } + + protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, UUID pendingRepair, SSTableFormat.Type format) throws IOException + { + Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize); + if (localDir == null) + throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize))); + + StreamReceiver streamReceiver = session.getAggregator(tableId); + Preconditions.checkState(streamReceiver instanceof CassandraStreamReceiver); + LifecycleTransaction txn = CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).getTransaction(); + + RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, pendingRepair, format, sstableLevel, totalSize, txn, getHeader(cfs.metadata())); + return writer; + } + + protected long totalSize() + { + long size = 0; + for (Pair<Long, Long> section : sections) + size += section.right - section.left; + return size; + } + + protected void writePartition(StreamDeserializer deserializer, SSTableMultiWriter writer) throws IOException + { + writer.append(deserializer.newPartition()); + deserializer.checkForExceptions(); + } + + public static class StreamDeserializer extends UnmodifiableIterator<Unfiltered> implements UnfilteredRowIterator + { + private final TableMetadata metadata; + private final DataInputPlus in; + private final SerializationHeader header; + private final SerializationHelper helper; + + private DecoratedKey key; + private DeletionTime partitionLevelDeletion; + private SSTableSimpleIterator iterator; + private Row staticRow; + private IOException exception; + + public StreamDeserializer(TableMetadata metadata, DataInputPlus in, Version version, SerializationHeader header) throws IOException + { + this.metadata = metadata; + this.in = in; + this.helper = new SerializationHelper(metadata, version.correspondingMessagingVersion(), SerializationHelper.Flag.PRESERVE_SIZE); + this.header = header; + } + + public StreamDeserializer newPartition() throws IOException + { + key = metadata.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in)); + partitionLevelDeletion = DeletionTime.serializer.deserialize(in); + iterator = SSTableSimpleIterator.create(metadata, in, header, helper, partitionLevelDeletion); + staticRow = iterator.readStaticRow(); + return this; + } + + public TableMetadata metadata() + { + return metadata; + } + + public RegularAndStaticColumns columns() + { + // We don't know which columns we'll get so assume it can be all of them + return metadata.regularAndStaticColumns(); + } + + public boolean isReverseOrder() + { + return false; + } + + public DecoratedKey partitionKey() + { + return key; + } + + public DeletionTime partitionLevelDeletion() + { + return partitionLevelDeletion; + } + + public Row staticRow() + { + return staticRow; + } + + public EncodingStats stats() + { + return header.stats(); + } + + public boolean hasNext() + { + try + { + return iterator.hasNext(); + } + catch (IOError e) + { + if (e.getCause() != null && e.getCause() instanceof IOException) + { + exception = (IOException)e.getCause(); + return false; + } + throw e; + } + } + + public Unfiltered next() + { + // Note that in practice we know that IOException will be thrown by hasNext(), because that's + // where the actual reading happens, so we don't bother catching RuntimeException here (contrarily + // to what we do in hasNext) + Unfiltered unfiltered = iterator.next(); + return metadata.isCounter() && unfiltered.kind() == Unfiltered.Kind.ROW + ? maybeMarkLocalToBeCleared((Row) unfiltered) + : unfiltered; + } + + private Row maybeMarkLocalToBeCleared(Row row) + { + return metadata.isCounter() ? row.markCounterLocalToBeCleared() : row; + } + + public void checkForExceptions() throws IOException + { + if (exception != null) + throw exception; + } + + public void close() + { + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java new file mode 100644 index 0000000..6a57e49 --- /dev/null +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.ThrottledUnfilteredIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.view.View; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.streaming.IncomingStream; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.CloseableIterator; +import org.apache.cassandra.utils.Throwables; +import org.apache.cassandra.utils.concurrent.Refs; + +public class CassandraStreamReceiver implements StreamReceiver +{ + private static final Logger logger = LoggerFactory.getLogger(CassandraStreamReceiver.class); + + private static final int MAX_ROWS_PER_BATCH = Integer.getInteger("cassandra.repair.mutation_repair_rows_per_batch", 100); + + private final ColumnFamilyStore cfs; + private final StreamSession session; + + // Transaction tracking new files received + private final LifecycleTransaction txn; + + // holds references to SSTables received + protected Collection<SSTableReader> sstables; + + private final boolean requiresWritePath; + + + public CassandraStreamReceiver(ColumnFamilyStore cfs, StreamSession session, int totalFiles) + { + this.cfs = cfs; + this.session = session; + // this is an "offline" transaction, as we currently manually expose the sstables once done; + // this should be revisited at a later date, so that LifecycleTransaction manages all sstable state changes + this.txn = LifecycleTransaction.offline(OperationType.STREAM); + this.sstables = new ArrayList<>(totalFiles); + this.requiresWritePath = requiresWritePath(cfs); + } + + public LifecycleTransaction getTransaction() + { + return txn; + } + + public static CassandraStreamReceiver fromReceiver(StreamReceiver receiver) + { + Preconditions.checkArgument(receiver instanceof CassandraStreamReceiver); + return (CassandraStreamReceiver) receiver; + } + + private static CassandraIncomingFile getFile(IncomingStream stream) + { + Preconditions.checkArgument(stream instanceof CassandraIncomingFile, "Wrong stream type: {}", stream); + return (CassandraIncomingFile) stream; + } + + @Override + public void received(IncomingStream stream) + { + CassandraIncomingFile file = getFile(stream); + + Collection<SSTableReader> finished = null; + SSTableMultiWriter sstable = file.getSSTable(); + try + { + finished = sstable.finish(true); + } + catch (Throwable t) + { + Throwables.maybeFail(sstable.abort(t)); + } + txn.update(finished, false); + sstables.addAll(finished); + } + + @Override + public void discardStream(IncomingStream stream) + { + CassandraIncomingFile file = getFile(stream); + Throwables.maybeFail(file.getSSTable().abort(null)); + } + + @Override + public void abort() + { + sstables.clear(); + txn.abort(); + } + + private boolean hasViews(ColumnFamilyStore cfs) + { + return !Iterables.isEmpty(View.findAll(cfs.metadata.keyspace, cfs.getTableName())); + } + + private boolean hasCDC(ColumnFamilyStore cfs) + { + return cfs.metadata().params.cdc; + } + + /* + * We have a special path for views and for CDC. + * + * For views, since the view requires cleaning up any pre-existing state, we must put all partitions + * through the same write path as normal mutations. This also ensures any 2is are also updated. + * + * For CDC-enabled tables, we want to ensure that the mutations are run through the CommitLog so they + * can be archived by the CDC process on discard. + */ + private boolean requiresWritePath(ColumnFamilyStore cfs) { + return hasCDC(cfs) || (session.streamOperation().requiresViewBuild() && hasViews(cfs)); + } + + private void sendThroughWritePath(ColumnFamilyStore cfs, Collection<SSTableReader> readers) { + boolean hasCdc = hasCDC(cfs); + ColumnFilter filter = ColumnFilter.all(cfs.metadata()); + for (SSTableReader reader : readers) + { + Keyspace ks = Keyspace.open(reader.getKeyspaceName()); + // When doing mutation-based repair we split each partition into smaller batches + // ({@link Stream MAX_ROWS_PER_BATCH}) to avoid OOMing and generating heap pressure + try (ISSTableScanner scanner = reader.getScanner(); + CloseableIterator<UnfilteredRowIterator> throttledPartitions = ThrottledUnfilteredIterator.throttle(scanner, MAX_ROWS_PER_BATCH)) + { + while (throttledPartitions.hasNext()) + { + // MV *can* be applied unsafe if there's no CDC on the CFS as we flush + // before transaction is done. + // + // If the CFS has CDC, however, these updates need to be written to the CommitLog + // so they get archived into the cdc_raw folder + ks.apply(new Mutation(PartitionUpdate.fromIterator(throttledPartitions.next(), filter)), + hasCdc, + true, + false); + } + } + } + } + + private synchronized void finishTransaction() + { + txn.finish(); + } + + @Override + public void finished() + { + boolean requiresWritePath = requiresWritePath(cfs); + Collection<SSTableReader> readers = sstables; + + try (Refs<SSTableReader> refs = Refs.ref(readers)) + { + if (requiresWritePath) + { + sendThroughWritePath(cfs, readers); + } + else + { + finishTransaction(); + + // add sstables (this will build secondary indexes too, see CASSANDRA-10130) + logger.debug("[Stream #{}] Received {} sstables from {} ({})", session.planId(), readers.size(), session.peer, readers); + cfs.addSSTables(readers); + + //invalidate row and counter cache + if (cfs.isRowCacheEnabled() || cfs.metadata().isCounter()) + { + List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size()); + readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()))); + Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate); + + if (cfs.isRowCacheEnabled()) + { + int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds); + if (invalidatedKeys > 0) + logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " + + "receive task completed.", session.planId(), invalidatedKeys, + cfs.keyspace.getName(), cfs.getTableName()); + } + + if (cfs.metadata().isCounter()) + { + int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds); + if (invalidatedKeys > 0) + logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " + + "receive task completed.", session.planId(), invalidatedKeys, + cfs.keyspace.getName(), cfs.getTableName()); + } + } + } + } + } + + @Override + public void cleanup() + { + // We don't keep the streamed sstables since we've applied them manually so we abort the txn and delete + // the streamed sstables. + if (requiresWritePath) + { + cfs.forceBlockingFlush(); + abort(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java new file mode 100644 index 0000000..b86f99a --- /dev/null +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.ChannelProxy; +import org.apache.cassandra.io.util.DataIntegrityMetadata; +import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.compress.ByteBufCompressionDataOutputStreamPlus; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; + +/** + * CassandraStreamWriter writes given section of the SSTable to given channel. + */ +public class CassandraStreamWriter +{ + private static final int DEFAULT_CHUNK_SIZE = 64 * 1024; + + private static final Logger logger = LoggerFactory.getLogger(CassandraStreamWriter.class); + + protected final SSTableReader sstable; + protected final Collection<Pair<Long, Long>> sections; + protected final StreamRateLimiter limiter; + protected final StreamSession session; + + public CassandraStreamWriter(SSTableReader sstable, Collection<Pair<Long, Long>> sections, StreamSession session) + { + this.session = session; + this.sstable = sstable; + this.sections = sections; + this.limiter = StreamManager.getRateLimiter(session.peer); + } + + /** + * Stream file of specified sections to given channel. + * + * CassandraStreamWriter uses LZF compression on wire to decrease size to transfer. + * + * @param output where this writes data to + * @throws IOException on any I/O error + */ + public void write(DataOutputStreamPlus output) throws IOException + { + long totalSize = totalSize(); + logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); + + try(ChannelProxy proxy = sstable.getDataChannel().sharedCopy(); + ChecksumValidator validator = new File(sstable.descriptor.filenameFor(Component.CRC)).exists() + ? DataIntegrityMetadata.checksumValidator(sstable.descriptor) + : null) + { + int bufferSize = validator == null ? DEFAULT_CHUNK_SIZE: validator.chunkSize; + + // setting up data compression stream + long progress = 0L; + + try (DataOutputStreamPlus compressedOutput = new ByteBufCompressionDataOutputStreamPlus(output, limiter)) + { + // stream each of the required sections of the file + for (Pair<Long, Long> section : sections) + { + long start = validator == null ? section.left : validator.chunkStart(section.left); + // if the transfer does not start on the valididator's chunk boundary, this is the number of bytes to offset by + int transferOffset = (int) (section.left - start); + if (validator != null) + validator.seek(start); + + // length of the section to read + long length = section.right - start; + // tracks write progress + long bytesRead = 0; + while (bytesRead < length) + { + int toTransfer = (int) Math.min(bufferSize, length - bytesRead); + long lastBytesRead = write(proxy, validator, compressedOutput, start, transferOffset, toTransfer, bufferSize); + start += lastBytesRead; + bytesRead += lastBytesRead; + progress += (lastBytesRead - transferOffset); + session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize); + transferOffset = 0; + } + + // make sure that current section is sent + output.flush(); + } + logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}", + session.planId(), sstable.getFilename(), session.peer, FBUtilities.prettyPrintMemory(progress), FBUtilities.prettyPrintMemory(totalSize)); + } + } + } + + protected long totalSize() + { + long size = 0; + for (Pair<Long, Long> section : sections) + size += section.right - section.left; + return size; + } + + /** + * Sequentially read bytes from the file and write them to the output stream + * + * @param proxy The file reader to read from + * @param validator validator to verify data integrity + * @param start The readd offset from the beginning of the {@code proxy} file. + * @param transferOffset number of bytes to skip transfer, but include for validation. + * @param toTransfer The number of bytes to be transferred. + * + * @return Number of bytes transferred. + * + * @throws java.io.IOException on any I/O error + */ + protected long write(ChannelProxy proxy, ChecksumValidator validator, DataOutputStreamPlus output, long start, int transferOffset, int toTransfer, int bufferSize) throws IOException + { + // the count of bytes to read off disk + int minReadable = (int) Math.min(bufferSize, proxy.size() - start); + + // this buffer will hold the data from disk. as it will be compressed on the fly by + // ByteBufCompressionDataOutputStreamPlus.write(ByteBuffer), we can release this buffer as soon as we can. + ByteBuffer buffer = ByteBuffer.allocateDirect(minReadable); + try + { + int readCount = proxy.read(buffer, start); + assert readCount == minReadable : String.format("could not read required number of bytes from file to be streamed: read %d bytes, wanted %d bytes", readCount, minReadable); + buffer.flip(); + + if (validator != null) + { + validator.validate(buffer); + buffer.flip(); + } + + buffer.position(transferOffset); + buffer.limit(transferOffset + (toTransfer - transferOffset)); + output.write(buffer); + } + finally + { + FileUtils.clean(buffer); + } + + return toTransfer; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java new file mode 100644 index 0000000..343d7ed --- /dev/null +++ b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.streaming; + +import java.io.IOException; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.TrackedDataInputPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.ChecksumType; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; + +import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause; + +/** + * CassandraStreamReader that reads from streamed compressed SSTable + */ +public class CompressedCassandraStreamReader extends CassandraStreamReader +{ + private static final Logger logger = LoggerFactory.getLogger(CompressedCassandraStreamReader.class); + + protected final CompressionInfo compressionInfo; + + public CompressedCassandraStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session) + { + super(header, streamHeader, session); + this.compressionInfo = streamHeader.compressionInfo; + } + + /** + * @return SSTable transferred + * @throws java.io.IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ + @Override + @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed + public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException + { + long totalSize = totalSize(); + + ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); + + if (cfs == null) + { + // schema was dropped during streaming + throw new IOException("CF " + tableId + " was dropped during streaming"); + } + + logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', pendingRepair = '{}', table = '{}'.", + session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), pendingRepair, + cfs.getTableName()); + + StreamDeserializer deserializer = null; + SSTableMultiWriter writer = null; + try (CompressedInputStream cis = new CompressedInputStream(inputPlus, compressionInfo, ChecksumType.CRC32, cfs::getCrcCheckChance)) + { + TrackedDataInputPlus in = new TrackedDataInputPlus(cis); + deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata())); + writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, format); + String filename = writer.getFilename(); + int sectionIdx = 0; + for (Pair<Long, Long> section : sections) + { + assert cis.getTotalCompressedBytesRead() <= totalSize; + long sectionLength = section.right - section.left; + + logger.trace("[Stream #{}] Reading section {} with length {} from stream.", session.planId(), sectionIdx++, sectionLength); + // skip to beginning of section inside chunk + cis.position(section.left); + in.reset(0); + + while (in.getBytesRead() < sectionLength) + { + writePartition(deserializer, writer); + // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred + session.progress(filename, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize); + } + } + logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", session.planId(), fileSeqNum, + session.peer, FBUtilities.prettyPrintMemory(cis.getTotalCompressedBytesRead()), FBUtilities.prettyPrintMemory(totalSize)); + return writer; + } + catch (Throwable e) + { + Object partitionKey = deserializer != null ? deserializer.partitionKey() : ""; + logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", + session.planId(), partitionKey, cfs.keyspace.getName(), cfs.getTableName()); + if (writer != null) + { + writer.abort(e); + } + if (extractIOExceptionCause(e).isPresent()) + throw e; + throw Throwables.propagate(e); + } + } + + @Override + protected long totalSize() + { + long size = 0; + // calculate total length of transferring chunks + for (CompressionMetadata.Chunk chunk : compressionInfo.chunks) + size += chunk.length + 4; // 4 bytes for CRC + return size; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java new file mode 100644 index 0000000..3fcbc38 --- /dev/null +++ b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.ChannelProxy; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; + +/** + * CassandraStreamWriter for compressed SSTable. + */ +public class CompressedCassandraStreamWriter extends CassandraStreamWriter +{ + private static final int CHUNK_SIZE = 1 << 16; + + private static final Logger logger = LoggerFactory.getLogger(CompressedCassandraStreamWriter.class); + + private final CompressionInfo compressionInfo; + + public CompressedCassandraStreamWriter(SSTableReader sstable, Collection<Pair<Long, Long>> sections, CompressionInfo compressionInfo, StreamSession session) + { + super(sstable, sections, session); + this.compressionInfo = compressionInfo; + } + + @Override + public void write(DataOutputStreamPlus out) throws IOException + { + assert out instanceof ByteBufDataOutputStreamPlus; + ByteBufDataOutputStreamPlus output = (ByteBufDataOutputStreamPlus)out; + long totalSize = totalSize(); + logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); + try (ChannelProxy fc = sstable.getDataChannel().sharedCopy()) + { + long progress = 0L; + // calculate chunks to transfer. we want to send continuous chunks altogether. + List<Pair<Long, Long>> sections = getTransferSections(compressionInfo.chunks); + + int sectionIdx = 0; + + // stream each of the required sections of the file + for (final Pair<Long, Long> section : sections) + { + // length of the section to stream + long length = section.right - section.left; + + logger.trace("[Stream #{}] Writing section {} with length {} to stream.", session.planId(), sectionIdx++, length); + + // tracks write progress + long bytesTransferred = 0; + while (bytesTransferred < length) + { + final int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred); + limiter.acquire(toTransfer); + + ByteBuffer outBuffer = ByteBuffer.allocateDirect(toTransfer); + long lastWrite; + try + { + lastWrite = fc.read(outBuffer, section.left + bytesTransferred); + assert lastWrite == toTransfer : String.format("could not read required number of bytes from file to be streamed: read %d bytes, wanted %d bytes", lastWrite, toTransfer); + outBuffer.flip(); + output.writeToChannel(outBuffer); + } + catch (IOException e) + { + FileUtils.clean(outBuffer); + throw e; + } + + bytesTransferred += lastWrite; + progress += lastWrite; + session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize); + } + } + logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}", + session.planId(), sstable.getFilename(), session.peer, FBUtilities.prettyPrintMemory(progress), FBUtilities.prettyPrintMemory(totalSize)); + } + } + + @Override + protected long totalSize() + { + long size = 0; + // calculate total length of transferring chunks + for (CompressionMetadata.Chunk chunk : compressionInfo.chunks) + size += chunk.length + 4; // 4 bytes for CRC + return size; + } + + // chunks are assumed to be sorted by offset + private List<Pair<Long, Long>> getTransferSections(CompressionMetadata.Chunk[] chunks) + { + List<Pair<Long, Long>> transferSections = new ArrayList<>(); + Pair<Long, Long> lastSection = null; + for (CompressionMetadata.Chunk chunk : chunks) + { + if (lastSection != null) + { + if (chunk.offset == lastSection.right) + { + // extend previous section to end of this chunk + lastSection = Pair.create(lastSection.left, chunk.offset + chunk.length + 4); // 4 bytes for CRC + } + else + { + transferSections.add(lastSection); + lastSection = Pair.create(chunk.offset, chunk.offset + chunk.length + 4); + } + } + else + { + lastSection = Pair.create(chunk.offset, chunk.offset + chunk.length + 4); + } + } + if (lastSection != null) + transferSections.add(lastSection); + return transferSections; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org