[
https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555691#comment-16555691
]
ASF GitHub Bot commented on CASSANDRA-14556:
--------------------------------------------
Github user iamaleksey commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/239#discussion_r205108073
--- Diff:
src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.io.util.SequentialWriterOption;
+import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadataRef;
+
+public class BigTableBlockWriter extends SSTable implements
SSTableMultiWriter
+{
+ private final TableMetadataRef metadata;
+ private final LifecycleTransaction txn;
+ private volatile SSTableReader finalReader;
+ private final Map<Component.Type, SequentialWriter> componentWriters;
+
+ private final Logger logger =
LoggerFactory.getLogger(BigTableBlockWriter.class);
+
+ private final SequentialWriterOption writerOption =
SequentialWriterOption.newBuilder()
+
.trickleFsync(false)
+
.bufferSize(2 * 1024 * 1024)
+
.bufferType(BufferType.OFF_HEAP)
+
.build();
+ public static final ImmutableSet<Component> supportedComponents =
ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS,
+
Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY,
+
Component.DIGEST, Component.CRC);
+
+ public BigTableBlockWriter(Descriptor descriptor,
+ TableMetadataRef metadata,
+ LifecycleTransaction txn,
+ final Set<Component> components)
+ {
+ super(descriptor, ImmutableSet.copyOf(components), metadata,
+ DatabaseDescriptor.getDiskOptimizationStrategy());
+ txn.trackNew(this);
+ this.metadata = metadata;
+ this.txn = txn;
+ this.componentWriters = new HashMap<>(components.size());
+
+ assert supportedComponents.containsAll(components) :
String.format("Unsupported streaming component detected %s",
+
new HashSet(components).removeAll(supportedComponents));
+
+ for (Component c : components)
+ componentWriters.put(c.type, makeWriter(descriptor, c,
writerOption));
+ }
+
+ private static SequentialWriter makeWriter(Descriptor descriptor,
Component component, SequentialWriterOption writerOption)
+ {
+ return new SequentialWriter(new
File(descriptor.filenameFor(component)), writerOption, false);
+ }
+
+ private void write(DataInputPlus in, long size, SequentialWriter out)
throws FSWriteError
+ {
+ final int BUFFER_SIZE = 1 * 1024 * 1024;
+ long bytesRead = 0;
+ byte[] buff = new byte[BUFFER_SIZE];
+ try
+ {
+ while (bytesRead < size)
+ {
+ int toRead = (int) Math.min(size - bytesRead, BUFFER_SIZE);
+ in.readFully(buff, 0, toRead);
+ int count = Math.min(toRead, BUFFER_SIZE);
+ out.write(buff, 0, count);
+ bytesRead += count;
+ }
+ out.sync();
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, out.getPath());
+ }
+ }
+
+ @Override
+ public boolean append(UnfilteredRowIterator partition)
+ {
+ throw new UnsupportedOperationException("Operation not supported
by BigTableBlockWriter");
+ }
+
+ @Override
+ public Collection<SSTableReader> finish(long repairedAt, long
maxDataAge, boolean openResult)
+ {
+ return finish(openResult);
+ }
+
+ @Override
+ public Collection<SSTableReader> finish(boolean openResult)
+ {
+ setOpenResult(openResult);
+ return finished();
+ }
+
+ @Override
+ public Collection<SSTableReader> finished()
+ {
+ if (finalReader == null)
+ finalReader = SSTableReader.open(descriptor,
+ components,
+ metadata);
+
+ return ImmutableList.of(finalReader);
+ }
+
+ @Override
+ public SSTableMultiWriter setOpenResult(boolean openResult)
+ {
+ return null;
+ }
+
+ @Override
+ public long getFilePointer()
+ {
+ return 0;
+ }
+
+ @Override
+ public TableId getTableId()
+ {
+ return metadata.id;
+ }
+
+ @Override
+ public Throwable commit(Throwable accumulate)
+ {
+ for (SequentialWriter writer : componentWriters.values())
+ writer.commit(accumulate);
--- End diff --
We shouldn't be ignoring the result of writer.commit(), and should return
it instead of accumulate.
> Optimize streaming path in Cassandra
> ------------------------------------
>
> Key: CASSANDRA-14556
> URL: https://issues.apache.org/jira/browse/CASSANDRA-14556
> Project: Cassandra
> Issue Type: Improvement
> Components: Streaming and Messaging
> Reporter: Dinesh Joshi
> Assignee: Dinesh Joshi
> Priority: Major
> Labels: Performance
> Fix For: 4.x
>
>
> During streaming, Cassandra reifies the sstables into objects. This creates
> unnecessary garbage and slows down the whole streaming process as some
> sstables can be transferred as a whole file rather than individual
> partitions. The objective of the ticket is to detect when a whole sstable can
> be transferred and skip the object reification. We can also use a zero-copy
> path to avoid bringing data into user-space on both sending and receiving
> side.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]