Repository: cassandra Updated Branches: refs/heads/trunk 594183b03 -> 1e92ce43a
Faster Streaming Patch by tjake; reviewed by Stefania Alborghetti for CASSANDRA-9766 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1e92ce43 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1e92ce43 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1e92ce43 Branch: refs/heads/trunk Commit: 1e92ce43a5a730f81d3f6cfd72e7f4b126db788a Parents: 594183b Author: T Jake Luciani <[email protected]> Authored: Sat Apr 9 10:03:32 2016 -0400 Committer: T Jake Luciani <[email protected]> Committed: Wed May 4 17:04:12 2016 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../concurrent/NamedThreadFactory.java | 4 +- .../apache/cassandra/concurrent/SEPWorker.java | 3 +- .../org/apache/cassandra/db/ColumnIndex.java | 69 +++++--- .../columniterator/SSTableReversedIterator.java | 2 +- .../db/commitlog/FileDirectSegment.java | 3 +- .../org/apache/cassandra/db/rows/BTreeRow.java | 30 ++-- .../cassandra/db/rows/ComplexColumnData.java | 10 +- .../cassandra/db/rows/UnfilteredSerializer.java | 40 ++++- .../io/sstable/format/big/BigTableWriter.java | 50 +++--- .../cassandra/io/util/DataOutputBuffer.java | 38 ++++- .../io/util/DataOutputBufferFixed.java | 4 +- .../cassandra/io/util/DataOutputStreamPlus.java | 3 +- .../cassandra/io/util/SafeMemoryWriter.java | 2 +- .../cassandra/streaming/ConnectionHandler.java | 4 +- .../compress/CompressedInputStream.java | 59 +++++-- .../compress/CompressedStreamReader.java | 11 +- .../cassandra/tools/SSTableMetadataViewer.java | 10 +- .../org/apache/cassandra/utils/BloomFilter.java | 7 +- .../apache/cassandra/utils/FilterFactory.java | 6 +- .../cassandra/utils/StreamingHistogram.java | 84 +++++---- .../org/apache/cassandra/utils/btree/BTree.java | 67 ++++---- .../apache/cassandra/utils/btree/BTreeSet.java | 3 - .../cassandra/utils/btree/TreeBuilder.java | 30 +++- .../concurrent/WrappedSharedCloseable.java | 4 +- .../cassandra/utils/memory/BufferPool.java | 16 +- .../apache/cassandra/utils/vint/VIntCoding.java | 3 +- .../cassandra/streaming/LongStreamingTest.java | 171 +++++++++++++++++++ .../apache/cassandra/db/RowIndexEntryTest.java | 9 +- .../org/apache/cassandra/utils/BTreeTest.java | 20 ++- .../cassandra/utils/StreamingHistogramTest.java | 48 +++++- 31 files changed, 597 insertions(+), 214 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6fe57b2..c2165db 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.8 + * Faster streaming (CASSANDRA-9766) 3.6 * Enhanced Compaction Logging (CASSANDRA-10805) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java index 33c80d5..85edf74 100644 --- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java +++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java @@ -20,6 +20,8 @@ package org.apache.cassandra.concurrent; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import io.netty.util.concurrent.FastThreadLocalThread; + /** * This class is an implementation of the <i>ThreadFactory</i> interface. This * is useful to give Java threads meaningful names which is useful when using @@ -55,7 +57,7 @@ public class NamedThreadFactory implements ThreadFactory public Thread newThread(Runnable runnable) { String name = id + ":" + n.getAndIncrement(); - Thread thread = new Thread(threadGroup, runnable, name); + Thread thread = new FastThreadLocalThread(threadGroup, runnable, name); thread.setPriority(priority); thread.setDaemon(true); if (contextClassLoader != null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/concurrent/SEPWorker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/SEPWorker.java b/src/java/org/apache/cassandra/concurrent/SEPWorker.java index 3b3e7ad..d7c21bc 100644 --- a/src/java/org/apache/cassandra/concurrent/SEPWorker.java +++ b/src/java/org/apache/cassandra/concurrent/SEPWorker.java @@ -24,6 +24,7 @@ import java.util.concurrent.locks.LockSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.util.concurrent.FastThreadLocalThread; import org.apache.cassandra.utils.JVMStabilityInspector; final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnable @@ -45,7 +46,7 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl { this.pool = pool; this.workerId = workerId; - thread = new Thread(this, pool.poolName + "-Worker-" + workerId); + thread = new FastThreadLocalThread(this, pool.poolName + "-Worker-" + workerId); thread.setDaemon(true); set(initialState); thread.start(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/db/ColumnIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java index 4dcceff..2e7a2ee 100644 --- a/src/java/org/apache/cassandra/db/ColumnIndex.java +++ b/src/java/org/apache/cassandra/db/ColumnIndex.java @@ -41,13 +41,12 @@ import org.apache.cassandra.utils.ByteBufferUtil; */ public class ColumnIndex { - // used, if the row-index-entry reaches config column_index_cache_size_in_kb private DataOutputBuffer buffer; // used to track the size of the serialized size of row-index-entry (unused for buffer) private int indexSamplesSerializedSize; // used, until the row-index-entry reaches config column_index_cache_size_in_kb - public List<IndexInfo> indexSamples = new ArrayList<>(); + private final List<IndexInfo> indexSamples = new ArrayList<>(); public int columnIndexCount; private int[] indexOffsets; @@ -55,11 +54,10 @@ public class ColumnIndex private final SerializationHeader header; private final int version; private final SequentialWriter writer; - private final long initialPosition; - private final ISerializer<IndexInfo> idxSerializer; - public long headerLength = -1; - - private long startPosition = -1; + private long initialPosition; + private final ISerializer<IndexInfo> idxSerializer; + public long headerLength; + private long startPosition; private int written; private long previousRowStart; @@ -72,17 +70,32 @@ public class ColumnIndex private final Collection<SSTableFlushObserver> observers; public ColumnIndex(SerializationHeader header, - SequentialWriter writer, - Version version, - Collection<SSTableFlushObserver> observers, - ISerializer<IndexInfo> indexInfoSerializer) + SequentialWriter writer, + Version version, + Collection<SSTableFlushObserver> observers, + ISerializer<IndexInfo> indexInfoSerializer) { this.header = header; - this.idxSerializer = indexInfoSerializer; this.writer = writer; this.version = version.correspondingMessagingVersion(); this.observers = observers; + this.idxSerializer = indexInfoSerializer; + } + + public void reset() + { this.initialPosition = writer.position(); + this.headerLength = -1; + this.startPosition = -1; + this.previousRowStart = 0; + this.columnIndexCount = 0; + this.written = 0; + this.indexSamplesSerializedSize = 0; + this.indexSamples.clear(); + this.firstClustering = null; + this.lastClustering = null; + this.openMarker = null; + this.buffer = null; } public void buildRowIndex(UnfilteredRowIterator iterator) throws IOException @@ -93,7 +106,7 @@ public class ColumnIndex while (iterator.hasNext()) add(iterator.next()); - close(); + finish(); } private void writePartitionHeader(UnfilteredRowIterator iterator) throws IOException @@ -120,6 +133,16 @@ public class ColumnIndex return buffer != null ? buffer.buffer() : null; } + public List<IndexInfo> indexSamples() + { + if (indexSamplesSerializedSize + columnIndexCount * TypeSizes.sizeof(0) <= DatabaseDescriptor.getColumnIndexCacheSize()) + { + return indexSamples; + } + + return null; + } + public int[] offsets() { return indexOffsets != null @@ -136,7 +159,7 @@ public class ColumnIndex openMarker); // indexOffsets is used for both shallow (ShallowIndexedEntry) and non-shallow IndexedEntry. - // For shallow ones, we need it to serialize the offsts in close(). + // For shallow ones, we need it to serialize the offsts in finish(). // For non-shallow ones, the offsts are passed into IndexedEntry, so we don't have to // calculate the offsets again. @@ -149,10 +172,19 @@ public class ColumnIndex { if (columnIndexCount >= indexOffsets.length) indexOffsets = Arrays.copyOf(indexOffsets, indexOffsets.length + 10); - indexOffsets[columnIndexCount] = + + //the 0th element is always 0 + if (columnIndexCount == 0) + { + indexOffsets[columnIndexCount] = 0; + } + else + { + indexOffsets[columnIndexCount] = buffer != null - ? Ints.checkedCast(buffer.position()) - : indexSamplesSerializedSize; + ? Ints.checkedCast(buffer.position()) + : indexSamplesSerializedSize; + } } columnIndexCount++; @@ -168,7 +200,6 @@ public class ColumnIndex { idxSerializer.serialize(indexSample, buffer); } - indexSamples = null; } else { @@ -216,7 +247,7 @@ public class ColumnIndex addIndexBlock(); } - private void close() throws IOException + private void finish() throws IOException { UnfilteredSerializer.serializer.writeEndOfPartition(writer); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java index 7f6b462..6fef70f 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java @@ -353,7 +353,7 @@ public class SSTableReversedIterator extends AbstractSSTableIterator public void reset() { built = null; - rowBuilder.reuse(); + rowBuilder = BTree.builder(metadata.comparator); deletionBuilder = MutableDeletionInfo.builder(partitionLevelDeletion, metadata().comparator, false); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java index ec4aa91..50f9efd 100644 --- a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java @@ -23,6 +23,7 @@ import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; +import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.compress.BufferType; @@ -34,7 +35,7 @@ import org.apache.cassandra.io.util.FileUtils; */ public abstract class FileDirectSegment extends CommitLogSegment { - protected static final ThreadLocal<ByteBuffer> reusableBufferHolder = new ThreadLocal<ByteBuffer>() + protected static final FastThreadLocal<ByteBuffer> reusableBufferHolder = new FastThreadLocal<ByteBuffer>() { protected ByteBuffer initialValue() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/db/rows/BTreeRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java index 47cfd58..63aa157 100644 --- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java +++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java @@ -330,9 +330,10 @@ public class BTreeRow extends AbstractRow if (cd.column().isSimple()) return false; - if (!((ComplexColumnData)cd).complexDeletion().isLive()) + if (!((ComplexColumnData) cd).complexDeletion().isLive()) return true; } + return false; } @@ -620,7 +621,7 @@ public class BTreeRow extends AbstractRow protected Deletion deletion = Deletion.LIVE; private final boolean isSorted; - private final BTree.Builder<Cell> cells; + private BTree.Builder<Cell> cells_; private final CellResolver resolver; private boolean hasComplex = false; @@ -633,10 +634,19 @@ public class BTreeRow extends AbstractRow protected Builder(boolean isSorted, int nowInSecs) { - this.cells = BTree.builder(ColumnData.comparator); + cells_ = null; resolver = new CellResolver(nowInSecs); this.isSorted = isSorted; - this.cells.auto(false); + } + + private BTree.Builder<Cell> getCells() + { + if (cells_ == null) + { + cells_ = BTree.builder(ColumnData.comparator); + cells_.auto(false); + } + return cells_; } public boolean isSorted() @@ -660,7 +670,7 @@ public class BTreeRow extends AbstractRow this.clustering = null; this.primaryKeyLivenessInfo = LivenessInfo.EMPTY; this.deletion = Deletion.LIVE; - this.cells.reuse(); + this.cells_ = null; } public void addPrimaryKeyLivenessInfo(LivenessInfo info) @@ -676,25 +686,25 @@ public class BTreeRow extends AbstractRow public void addCell(Cell cell) { assert cell.column().isStatic() == (clustering == Clustering.STATIC_CLUSTERING) : "Column is " + cell.column() + ", clustering = " + clustering; - cells.add(cell); + getCells().add(cell); hasComplex |= cell.column.isComplex(); } public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion) { - cells.add(new ComplexColumnDeletion(column, complexDeletion)); + getCells().add(new ComplexColumnDeletion(column, complexDeletion)); hasComplex = true; } public Row build() { if (!isSorted) - cells.sort(); + getCells().sort(); // we can avoid resolving if we're sorted and have no complex values // (because we'll only have unique simple cells, which are already in their final condition) if (!isSorted | hasComplex) - cells.resolve(resolver); - Object[] btree = cells.build(); + getCells().resolve(resolver); + Object[] btree = getCells().build(); if (deletion.isShadowedBy(primaryKeyLivenessInfo)) deletion = Deletion.LIVE; http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java index f4678b7..154fc77 100644 --- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java +++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java @@ -19,23 +19,22 @@ package org.apache.cassandra.db.rows; import java.nio.ByteBuffer; import java.security.MessageDigest; -import java.util.*; +import java.util.Iterator; +import java.util.Objects; import java.util.function.BiFunction; import com.google.common.base.Function; -import com.google.common.collect.Iterables; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.db.DeletionTime; import org.apache.cassandra.db.DeletionPurger; +import org.apache.cassandra.db.DeletionTime; import org.apache.cassandra.db.LivenessInfo; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.marshal.ByteType; import org.apache.cassandra.db.marshal.SetType; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.btree.BTree; -import org.apache.cassandra.utils.btree.UpdateFunction; /** * The data for a complex column, that is it's cells and potential complex @@ -240,8 +239,7 @@ public class ComplexColumnData extends ColumnData implements Iterable<Cell> { this.column = column; this.complexDeletion = DeletionTime.LIVE; // default if writeComplexDeletion is not called - if (builder == null) builder = BTree.builder(column.cellComparator()); - else builder.reuse(column.cellComparator()); + this.builder = BTree.builder(column.cellComparator()); } public void addComplexDeletion(DeletionTime complexDeletion) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java index 8157bbb..bcc65aa 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java @@ -21,9 +21,11 @@ import java.io.IOException; import com.google.common.collect.Collections2; +import net.nicoulaj.compilecommand.annotations.Inline; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; /** @@ -178,11 +180,37 @@ public class UnfilteredSerializer if (header.isForSSTable()) { - out.writeUnsignedVInt(serializedRowBodySize(row, header, previousUnfilteredSize, version)); - // We write the size of the previous unfiltered to make reverse queries more efficient (and simpler). - // This is currently not used however and using it is tbd. - out.writeUnsignedVInt(previousUnfilteredSize); + DataOutputBuffer dob = DataOutputBuffer.RECYCLER.get(); + try + { + serializeRowBody(row, flags, header, dob); + + out.writeUnsignedVInt(dob.position() + TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize)); + // We write the size of the previous unfiltered to make reverse queries more efficient (and simpler). + // This is currently not used however and using it is tbd. + out.writeUnsignedVInt(previousUnfilteredSize); + out.write(dob.buffer()); + } + finally + { + dob.recycle(); + } } + else + { + serializeRowBody(row, flags, header, out); + } + } + + @Inline + private void serializeRowBody(Row row, int flags, SerializationHeader header, DataOutputPlus out) + throws IOException + { + boolean isStatic = row.isStatic(); + + Columns headerColumns = header.columns(isStatic); + LivenessInfo pkLiveness = row.primaryKeyLivenessInfo(); + Row.Deletion deletion = row.deletion(); if ((flags & HAS_TIMESTAMP) != 0) header.writeTimestamp(pkLiveness.timestamp(), out); @@ -194,7 +222,7 @@ public class UnfilteredSerializer if ((flags & HAS_DELETION) != 0) header.writeDeletionTime(deletion.time(), out); - if (!hasAllColumns) + if ((flags & HAS_ALL_COLUMNS) == 0) Columns.serializer.serializeSubset(Collections2.transform(row, ColumnData::column), headerColumns, out); for (ColumnData data : row) @@ -202,7 +230,7 @@ public class UnfilteredSerializer if (data.column.isSimple()) Cell.serializer.serialize((Cell) data, out, pkLiveness, header); else - writeComplexColumn((ComplexColumnData) data, hasComplexDeletion, pkLiveness, header, out); + writeComplexColumn((ComplexColumnData) data, (flags & HAS_COMPLEX_DELETION) != 0, pkLiveness, header, out); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index b6ea1d9..44b1c3a 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -17,50 +17,46 @@ */ package org.apache.cassandra.io.sstable.format.big; -import java.io.*; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collection; import java.util.Map; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.lifecycle.LifecycleTransaction; -import org.apache.cassandra.db.transform.Transformation; -import org.apache.cassandra.io.sstable.*; -import org.apache.cassandra.io.sstable.format.SSTableFlushObserver; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.sstable.format.SSTableWriter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.cache.ChunkCache; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.compress.CompressedSequentialWriter; +import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.io.sstable.format.SSTableFlushObserver; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.sstable.metadata.MetadataComponent; import org.apache.cassandra.io.sstable.metadata.MetadataType; import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.io.util.*; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.FilterFactory; -import org.apache.cassandra.utils.IFilter; +import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.concurrent.Transactional; -import org.apache.cassandra.utils.SyncUtil; - public class BigTableWriter extends SSTableWriter { private static final Logger logger = LoggerFactory.getLogger(BigTableWriter.class); + private final ColumnIndex columnIndexWriter; private final IndexWriter iwriter; private final SegmentedFile.Builder dbuilder; protected final SequentialWriter dataFile; private DecoratedKey lastWrittenKey; - private DataPosition dataMark; private long lastEarlyOpenLength = 0; @@ -90,6 +86,8 @@ public class BigTableWriter extends SSTableWriter dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), false); } iwriter = new IndexWriter(keyCount, dataFile); + + columnIndexWriter = new ColumnIndex(header, dataFile, descriptor.version, observers, getRowIndexEntrySerializer().indexInfoSerializer()); } public void mark() @@ -153,29 +151,31 @@ public class BigTableWriter extends SSTableWriter long startPosition = beforeAppend(key); observers.forEach((o) -> o.startPartition(key, iwriter.indexFile.position())); + //Reuse the writer for each row + columnIndexWriter.reset(); + try (UnfilteredRowIterator collecting = Transformation.apply(iterator, new StatsCollector(metadataCollector))) { - ColumnIndex columnIndex = new ColumnIndex(header, dataFile, descriptor.version, observers, - getRowIndexEntrySerializer().indexInfoSerializer()); - - columnIndex.buildRowIndex(collecting); + columnIndexWriter.buildRowIndex(collecting); // afterAppend() writes the partition key before the first RowIndexEntry - so we have to add it's // serialized size to the index-writer position long indexFilePosition = ByteBufferUtil.serializedSizeWithShortLength(key.getKey()) + iwriter.indexFile.position(); RowIndexEntry entry = RowIndexEntry.create(startPosition, indexFilePosition, - collecting.partitionLevelDeletion(), columnIndex.headerLength, columnIndex.columnIndexCount, - columnIndex.indexInfoSerializedSize(), - columnIndex.indexSamples, - columnIndex.offsets(), + collecting.partitionLevelDeletion(), + columnIndexWriter.headerLength, + columnIndexWriter.columnIndexCount, + columnIndexWriter.indexInfoSerializedSize(), + columnIndexWriter.indexSamples(), + columnIndexWriter.offsets(), getRowIndexEntrySerializer().indexInfoSerializer()); long endPosition = dataFile.position(); long rowSize = endPosition - startPosition; maybeLogLargePartitionWarning(key, rowSize); metadataCollector.addPartitionSizeInBytes(rowSize); - afterAppend(key, endPosition, entry, columnIndex.buffer()); + afterAppend(key, endPosition, entry, columnIndexWriter.buffer()); return entry; } catch (IOException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java index 91242b8..8dbad8c 100644 --- a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java +++ b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java @@ -21,11 +21,12 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; -import org.apache.cassandra.config.Config; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import io.netty.util.Recycler; +import org.apache.cassandra.config.Config; + /** * An implementation of the DataOutputStream interface using a FastByteArrayOutputStream and exposing * its buffer so copies can be avoided. @@ -39,6 +40,21 @@ public class DataOutputBuffer extends BufferedDataOutputStreamPlus */ private static final long DOUBLING_THRESHOLD = Long.getLong(Config.PROPERTY_PREFIX + "DOB_DOUBLING_THRESHOLD_MB", 64); + public static final Recycler<DataOutputBuffer> RECYCLER = new Recycler<DataOutputBuffer>() + { + protected DataOutputBuffer newObject(Handle handle) + { + return new DataOutputBuffer(handle); + } + }; + + private final Recycler.Handle handle; + + private DataOutputBuffer(Recycler.Handle handle) + { + this(128, handle); + } + public DataOutputBuffer() { this(128); @@ -46,12 +62,26 @@ public class DataOutputBuffer extends BufferedDataOutputStreamPlus public DataOutputBuffer(int size) { - super(ByteBuffer.allocate(size)); + this(size, null); + } + + protected DataOutputBuffer(int size, Recycler.Handle handle) + { + this(ByteBuffer.allocate(size), handle); } - protected DataOutputBuffer(ByteBuffer buffer) + protected DataOutputBuffer(ByteBuffer buffer, Recycler.Handle handle) { super(buffer); + this.handle = handle; + } + + public void recycle() + { + assert handle != null; + buffer.rewind(); + + RECYCLER.recycle(this, handle); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java index c2cc549..5193401 100644 --- a/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java +++ b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java @@ -38,12 +38,12 @@ public class DataOutputBufferFixed extends DataOutputBuffer public DataOutputBufferFixed(int size) { - super(ByteBuffer.allocate(size)); + super(size, null); } public DataOutputBufferFixed(ByteBuffer buffer) { - super(buffer); + super(buffer, null); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java index a846384..755783b 100644 --- a/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java +++ b/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java @@ -22,6 +22,7 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; +import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.config.Config; import org.apache.cassandra.utils.ByteBufferUtil; @@ -64,7 +65,7 @@ public abstract class DataOutputStreamPlus extends OutputStream implements DataO return bytes; } - private static final ThreadLocal<byte[]> tempBuffer = new ThreadLocal<byte[]>() + private static final FastThreadLocal<byte[]> tempBuffer = new FastThreadLocal<byte[]>() { @Override public byte[] initialValue() http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java index 24eb93c..88912f9 100644 --- a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java +++ b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java @@ -33,7 +33,7 @@ public class SafeMemoryWriter extends DataOutputBuffer private SafeMemoryWriter(SafeMemory memory) { - super(tailBuffer(memory).order(ByteOrder.BIG_ENDIAN)); + super(tailBuffer(memory).order(ByteOrder.BIG_ENDIAN), null); this.memory = memory; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/streaming/ConnectionHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java index 960b531..3600d5e 100644 --- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java +++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java @@ -37,6 +37,8 @@ import com.google.common.util.concurrent.SettableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import io.netty.util.concurrent.FastThreadLocalThread; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus; import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus; @@ -206,7 +208,7 @@ public class ConnectionHandler this.socket = socket; this.protocolVersion = protocolVersion; - new Thread(this, name() + "-" + socket.getRemoteSocketAddress()).start(); + new FastThreadLocalThread(this, name() + "-" + socket.getRemoteSocketAddress()).start(); } public ListenableFuture<?> close() http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java index c7d5f98..3aaa1a3 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java @@ -25,14 +25,13 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Supplier; -import java.util.zip.Checksum; import com.google.common.collect.Iterators; import com.google.common.primitives.Ints; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.util.concurrent.FastThreadLocalThread; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.utils.ChecksumType; import org.apache.cassandra.utils.WrappedRunnable; @@ -82,31 +81,61 @@ public class CompressedInputStream extends InputStream this.crcCheckChanceSupplier = crcCheckChanceSupplier; this.checksumType = checksumType; - new Thread(new Reader(source, info, dataBuffer)).start(); + new FastThreadLocalThread(new Reader(source, info, dataBuffer)).start(); } - public int read() throws IOException + private void decompressNextChunk() throws IOException { - if (current >= bufferOffset + buffer.length || validBufferBytes == -1) + try { - try - { - byte[] compressedWithCRC = dataBuffer.take(); - if (compressedWithCRC == POISON_PILL) - throw new EOFException("No chunk available"); - decompress(compressedWithCRC); - } - catch (InterruptedException e) - { + byte[] compressedWithCRC = dataBuffer.take(); + if (compressedWithCRC == POISON_PILL) throw new EOFException("No chunk available"); - } + decompress(compressedWithCRC); } + catch (InterruptedException e) + { + throw new EOFException("No chunk available"); + } + } + + @Override + public int read() throws IOException + { + if (current >= bufferOffset + buffer.length || validBufferBytes == -1) + decompressNextChunk(); assert current >= bufferOffset && current < bufferOffset + validBufferBytes; return ((int) buffer[(int) (current++ - bufferOffset)]) & 0xff; } + @Override + public int read(byte[] b, int off, int len) throws IOException + { + long nextCurrent = current + len; + + if (current >= bufferOffset + buffer.length || validBufferBytes == -1) + decompressNextChunk(); + + assert nextCurrent >= bufferOffset; + + int read = 0; + while (read < len) + { + int nextLen = Math.min((len - read), (int)((bufferOffset + validBufferBytes) - current)); + + System.arraycopy(buffer, (int)(current - bufferOffset), b, off + read, nextLen); + read += nextLen; + + current += nextLen; + if (read != len) + decompressNextChunk(); + } + + return len; + } + public void position(long position) { assert position >= current : "stream can only read forward."; http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index 5aa393e..8dafa9c 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@ -17,16 +17,11 @@ */ package org.apache.cassandra.streaming.compress; -import java.io.DataInputStream; import java.io.IOException; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import com.google.common.base.Throwables; - -import org.apache.cassandra.io.sstable.SSTableMultiWriter; -import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,11 +29,12 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.util.TrackedInputStream; import org.apache.cassandra.streaming.ProgressInfo; import org.apache.cassandra.streaming.StreamReader; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.messages.FileMessageHeader; -import org.apache.cassandra.io.util.TrackedInputStream; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; @@ -92,6 +88,7 @@ public class CompressedStreamReader extends StreamReader try { writer = createWriter(cfs, totalSize, repairedAt, format); + String filename = writer.getFilename(); int sectionIdx = 0; for (Pair<Long, Long> section : sections) { @@ -107,7 +104,7 @@ public class CompressedStreamReader extends StreamReader { writePartition(deserializer, writer); // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred - session.progress(writer.getFilename(), ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize); + session.progress(filename, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize); } } logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", session.planId(), fileSeqNum, http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java index d2e0513..de43c2f 100644 --- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java +++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java @@ -17,11 +17,7 @@ */ package org.apache.cassandra.tools; -import java.io.DataInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.PrintStream; +import java.io.*; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.EnumSet; @@ -121,9 +117,9 @@ public class SSTableMetadataViewer out.printf("totalRows: %s%n", stats.totalRows); out.println("Estimated tombstone drop times:"); - for (Map.Entry<Double, Long> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet()) + for (Map.Entry<Number, long[]> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet()) { - out.printf("%-10s:%10s%n",entry.getKey().intValue(), entry.getValue()); + out.printf("%-10s:%10s%n",entry.getKey().intValue(), entry.getValue()[0]); } printHistograms(stats, out); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/BloomFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/BloomFilter.java b/src/java/org/apache/cassandra/utils/BloomFilter.java index ce6c638..4ff07b7 100644 --- a/src/java/org/apache/cassandra/utils/BloomFilter.java +++ b/src/java/org/apache/cassandra/utils/BloomFilter.java @@ -19,13 +19,15 @@ package org.apache.cassandra.utils; import com.google.common.annotations.VisibleForTesting; +import io.netty.util.concurrent.FastThreadLocal; +import net.nicoulaj.compilecommand.annotations.Inline; import org.apache.cassandra.utils.concurrent.Ref; import org.apache.cassandra.utils.concurrent.WrappedSharedCloseable; import org.apache.cassandra.utils.obs.IBitSet; public class BloomFilter extends WrappedSharedCloseable implements IFilter { - private static final ThreadLocal<long[]> reusableIndexes = new ThreadLocal<long[]>() + private final static FastThreadLocal<long[]> reusableIndexes = new FastThreadLocal<long[]>() { protected long[] initialValue() { @@ -84,16 +86,19 @@ public class BloomFilter extends WrappedSharedCloseable implements IFilter // to avoid generating a lot of garbage since stack allocation currently does not support stores // (CASSANDRA-6609). it returns the array so that the caller does not need to perform // a second threadlocal lookup. + @Inline private long[] indexes(FilterKey key) { // we use the same array both for storing the hash result, and for storing the indexes we return, // so that we do not need to allocate two arrays. long[] indexes = reusableIndexes.get(); + key.filterHash(indexes); setIndexes(indexes[1], indexes[0], hashCount, bitset.capacity(), indexes); return indexes; } + @Inline private void setIndexes(long base, long inc, int count, long max, long[] results) { if (oldBfHashOrder) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/FilterFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FilterFactory.java b/src/java/org/apache/cassandra/utils/FilterFactory.java index 869f3fa..298e734 100644 --- a/src/java/org/apache/cassandra/utils/FilterFactory.java +++ b/src/java/org/apache/cassandra/utils/FilterFactory.java @@ -20,14 +20,14 @@ package org.apache.cassandra.utils; import java.io.DataInput; import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.obs.IBitSet; import org.apache.cassandra.utils.obs.OffHeapBitSet; import org.apache.cassandra.utils.obs.OpenBitSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class FilterFactory { public static final IFilter AlwaysPresent = new AlwaysPresentFilter(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/StreamingHistogram.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/StreamingHistogram.java b/src/java/org/apache/cassandra/utils/StreamingHistogram.java index b925395..7ea97e5 100644 --- a/src/java/org/apache/cassandra/utils/StreamingHistogram.java +++ b/src/java/org/apache/cassandra/utils/StreamingHistogram.java @@ -39,7 +39,10 @@ public class StreamingHistogram public static final StreamingHistogramSerializer serializer = new StreamingHistogramSerializer(); // TreeMap to hold bins of histogram. - private final TreeMap<Double, Long> bin; + // The key is a numeric type so we can avoid boxing/unboxing streams of different key types + // The value is a unboxed long array always of length == 1 + // Serialized Histograms always writes with double keys for backwards compatibility + private final TreeMap<Number, long[]> bin; // maximum bin size for this histogram private final int maxBinSize; @@ -51,22 +54,28 @@ public class StreamingHistogram public StreamingHistogram(int maxBinSize) { this.maxBinSize = maxBinSize; - bin = new TreeMap<>(); + bin = new TreeMap<>((o1, o2) -> { + if (o1.getClass().equals(o2.getClass())) + return ((Comparable)o1).compareTo(o2); + else + return ((Double)o1.doubleValue()).compareTo(o2.doubleValue()); + }); } private StreamingHistogram(int maxBinSize, Map<Double, Long> bin) { - this.maxBinSize = maxBinSize; - this.bin = new TreeMap<>(bin); + this(maxBinSize); + for (Map.Entry<Double, Long> entry : bin.entrySet()) + this.bin.put(entry.getKey(), new long[]{entry.getValue()}); } /** * Adds new point p to this histogram. * @param p */ - public void update(double p) + public void update(Number p) { - update(p, 1); + update(p, 1L); } /** @@ -74,30 +83,31 @@ public class StreamingHistogram * @param p * @param m */ - public void update(double p, long m) + public void update(Number p, long m) { - Long mi = bin.get(p); + long[] mi = bin.get(p); if (mi != null) { // we found the same p so increment that counter - bin.put(p, mi + m); + mi[0] += m; } else { - bin.put(p, m); + mi = new long[]{m}; + bin.put(p, mi); // if bin size exceeds maximum bin size then trim down to max size while (bin.size() > maxBinSize) { // find points p1, p2 which have smallest difference - Iterator<Double> keys = bin.keySet().iterator(); - double p1 = keys.next(); - double p2 = keys.next(); + Iterator<Number> keys = bin.keySet().iterator(); + double p1 = keys.next().doubleValue(); + double p2 = keys.next().doubleValue(); double smallestDiff = p2 - p1; double q1 = p1, q2 = p2; while (keys.hasNext()) { p1 = p2; - p2 = keys.next(); + p2 = keys.next().doubleValue(); double diff = p2 - p1; if (diff < smallestDiff) { @@ -107,9 +117,13 @@ public class StreamingHistogram } } // merge those two - long k1 = bin.remove(q1); - long k2 = bin.remove(q2); - bin.put((q1 * k1 + q2 * k2) / (k1 + k2), k1 + k2); + long[] a1 = bin.remove(q1); + long[] a2 = bin.remove(q2); + long k1 = a1[0]; + long k2 = a2[0]; + + a1[0] += k2; + bin.put((q1 * k1 + q2 * k2) / (k1 + k2), a1); } } } @@ -124,8 +138,8 @@ public class StreamingHistogram if (other == null) return; - for (Map.Entry<Double, Long> entry : other.getAsMap().entrySet()) - update(entry.getKey(), entry.getValue()); + for (Map.Entry<Number, long[]> entry : other.getAsMap().entrySet()) + update(entry.getKey(), entry.getValue()[0]); } /** @@ -138,32 +152,32 @@ public class StreamingHistogram { double sum = 0; // find the points pi, pnext which satisfy pi <= b < pnext - Map.Entry<Double, Long> pnext = bin.higherEntry(b); + Map.Entry<Number, long[]> pnext = bin.higherEntry(b); if (pnext == null) { // if b is greater than any key in this histogram, // just count all appearance and return - for (Long value : bin.values()) - sum += value; + for (long[] value : bin.values()) + sum += value[0]; } else { - Map.Entry<Double, Long> pi = bin.floorEntry(b); + Map.Entry<Number, long[]> pi = bin.floorEntry(b); if (pi == null) return 0; // calculate estimated count mb for point b - double weight = (b - pi.getKey()) / (pnext.getKey() - pi.getKey()); - double mb = pi.getValue() + (pnext.getValue() - pi.getValue()) * weight; - sum += (pi.getValue() + mb) * weight / 2; + double weight = (b - pi.getKey().doubleValue()) / (pnext.getKey().doubleValue() - pi.getKey().doubleValue()); + double mb = pi.getValue()[0] + (pnext.getValue()[0] - pi.getValue()[0]) * weight; + sum += (pi.getValue()[0] + mb) * weight / 2; - sum += pi.getValue() / 2.0; - for (Long value : bin.headMap(pi.getKey(), false).values()) - sum += value; + sum += pi.getValue()[0] / 2.0; + for (long[] value : bin.headMap(pi.getKey(), false).values()) + sum += value[0]; } return sum; } - public Map<Double, Long> getAsMap() + public Map<Number, long[]> getAsMap() { return Collections.unmodifiableMap(bin); } @@ -173,12 +187,12 @@ public class StreamingHistogram public void serialize(StreamingHistogram histogram, DataOutputPlus out) throws IOException { out.writeInt(histogram.maxBinSize); - Map<Double, Long> entries = histogram.getAsMap(); + Map<Number, long[]> entries = histogram.getAsMap(); out.writeInt(entries.size()); - for (Map.Entry<Double, Long> entry : entries.entrySet()) + for (Map.Entry<Number, long[]> entry : entries.entrySet()) { - out.writeDouble(entry.getKey()); - out.writeLong(entry.getValue()); + out.writeDouble(entry.getKey().doubleValue()); + out.writeLong(entry.getValue()[0]); } } @@ -198,7 +212,7 @@ public class StreamingHistogram public long serializedSize(StreamingHistogram histogram) { long size = TypeSizes.sizeof(histogram.maxBinSize); - Map<Double, Long> entries = histogram.getAsMap(); + Map<Number, long[]> entries = histogram.getAsMap(); size += TypeSizes.sizeof(entries.size()); // size of entries = size * (8(double) + 8(long)) size += entries.size() * (8L + 8L); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/btree/BTree.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java b/src/java/org/apache/cassandra/utils/btree/BTree.java index 1c3d2e2..4f21d26 100644 --- a/src/java/org/apache/cassandra/utils/btree/BTree.java +++ b/src/java/org/apache/cassandra/utils/btree/BTree.java @@ -24,6 +24,7 @@ import com.google.common.base.Function; import com.google.common.collect.Iterators; import com.google.common.collect.Ordering; +import io.netty.util.Recycler; import org.apache.cassandra.utils.ObjectSizes; import static com.google.common.collect.Iterables.concat; @@ -139,12 +140,9 @@ public class BTree return values; } - Queue<TreeBuilder> queue = modifier.get(); - TreeBuilder builder = queue.poll(); - if (builder == null) - builder = new TreeBuilder(); + TreeBuilder builder = TreeBuilder.newInstance(); Object[] btree = builder.build(source, updateF, size); - queue.add(builder); + return btree; } @@ -176,12 +174,9 @@ public class BTree if (isEmpty(btree)) return build(updateWith, updateWithLength, updateF); - Queue<TreeBuilder> queue = modifier.get(); - TreeBuilder builder = queue.poll(); - if (builder == null) - builder = new TreeBuilder(); + + TreeBuilder builder = TreeBuilder.newInstance(); btree = builder.update(btree, comparator, updateWith, updateF); - queue.add(builder); return btree; } @@ -203,12 +198,12 @@ public class BTree public static <V> Iterator<V> iterator(Object[] btree, Dir dir) { - return new BTreeSearchIterator<V, V>(btree, null, dir); + return new BTreeSearchIterator<>(btree, null, dir); } public static <V> Iterator<V> iterator(Object[] btree, int lb, int ub, Dir dir) { - return new BTreeSearchIterator<V, V>(btree, null, dir, lb, ub); + return new BTreeSearchIterator<>(btree, null, dir, lb, ub); } public static <V> Iterable<V> iterable(Object[] btree) @@ -771,28 +766,29 @@ public class BTree return 1 + lookupSizeMap(root, childIndex - 1); } - private static final ThreadLocal<Queue<TreeBuilder>> modifier = new ThreadLocal<Queue<TreeBuilder>>() + final static Recycler<Builder> builderRecycler = new Recycler<Builder>() { - @Override - protected Queue<TreeBuilder> initialValue() + protected Builder newObject(Handle handle) { - return new ArrayDeque<>(); + return new Builder(handle); } }; public static <V> Builder<V> builder(Comparator<? super V> comparator) { - return new Builder<>(comparator); + Builder<V> builder = builderRecycler.get(); + builder.reuse(comparator); + + return builder; } public static <V> Builder<V> builder(Comparator<? super V> comparator, int initialCapacity) { - return new Builder<>(comparator); + return builder(comparator); } public static class Builder<V> { - // a user-defined bulk resolution, to be applied manually via resolve() public static interface Resolver { @@ -817,16 +813,13 @@ public class BTree boolean detected = true; // true if we have managed to cheaply ensure sorted (+ filtered, if resolver == null) as we have added boolean auto = true; // false if the user has promised to enforce the sort order and resolve any duplicates QuickResolver<V> quickResolver; + final Recycler.Handle recycleHandle; - protected Builder(Comparator<? super V> comparator) - { - this(comparator, 16); - } - protected Builder(Comparator<? super V> comparator, int initialCapacity) + private Builder(Recycler.Handle handle) { - this.comparator = comparator; - this.values = new Object[initialCapacity]; + this.recycleHandle = handle; + this.values = new Object[16]; } public Builder<V> setQuickResolver(QuickResolver<V> quickResolver) @@ -835,16 +828,19 @@ public class BTree return this; } - public void reuse() + public void recycle() { - reuse(comparator); + if (recycleHandle != null) + builderRecycler.recycle(this, recycleHandle); } - public void reuse(Comparator<? super V> comparator) + private void reuse(Comparator<? super V> comparator) { this.comparator = comparator; + quickResolver = null; count = 0; detected = true; + auto = true; } public Builder<V> auto(boolean auto) @@ -1071,9 +1067,16 @@ public class BTree public Object[] build() { - if (auto) - autoEnforce(); - return BTree.build(Arrays.asList(values).subList(0, count), UpdateFunction.noOp()); + try + { + if (auto) + autoEnforce(); + return BTree.build(Arrays.asList(values).subList(0, count), UpdateFunction.noOp()); + } + finally + { + this.recycle(); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/btree/BTreeSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeSet.java b/src/java/org/apache/cassandra/utils/btree/BTreeSet.java index 9517009..a59e481 100644 --- a/src/java/org/apache/cassandra/utils/btree/BTreeSet.java +++ b/src/java/org/apache/cassandra/utils/btree/BTreeSet.java @@ -21,14 +21,11 @@ package org.apache.cassandra.utils.btree; import java.util.*; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import org.apache.cassandra.utils.btree.BTree.Dir; import static org.apache.cassandra.utils.btree.BTree.findIndex; -import static org.apache.cassandra.utils.btree.BTree.lower; -import static org.apache.cassandra.utils.btree.BTree.toArray; public class BTreeSet<V> implements NavigableSet<V>, List<V> { http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/btree/TreeBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/btree/TreeBuilder.java b/src/java/org/apache/cassandra/utils/btree/TreeBuilder.java index 024902e..f42de0f 100644 --- a/src/java/org/apache/cassandra/utils/btree/TreeBuilder.java +++ b/src/java/org/apache/cassandra/utils/btree/TreeBuilder.java @@ -20,6 +20,8 @@ package org.apache.cassandra.utils.btree; import java.util.Comparator; +import io.netty.util.Recycler; + import static org.apache.cassandra.utils.btree.BTree.EMPTY_LEAF; import static org.apache.cassandra.utils.btree.BTree.FAN_SHIFT; import static org.apache.cassandra.utils.btree.BTree.POSITIVE_INFINITY; @@ -28,12 +30,32 @@ import static org.apache.cassandra.utils.btree.BTree.POSITIVE_INFINITY; * A class for constructing a new BTree, either from an existing one and some set of modifications * or a new tree from a sorted collection of items. * <p/> - * This is a fairly heavy-weight object, so a ThreadLocal instance is created for making modifications to a tree + * This is a fairly heavy-weight object, so a Recycled instance is created for making modifications to a tree */ final class TreeBuilder { + + private final static Recycler<TreeBuilder> builderRecycler = new Recycler<TreeBuilder>() + { + protected TreeBuilder newObject(Handle handle) + { + return new TreeBuilder(handle); + } + }; + + public static TreeBuilder newInstance() + { + return builderRecycler.get(); + } + + private final Recycler.Handle recycleHandle; private final NodeBuilder rootBuilder = new NodeBuilder(); + private TreeBuilder(Recycler.Handle handle) + { + this.recycleHandle = handle; + } + /** * At the highest level, we adhere to the classic b-tree insertion algorithm: * @@ -93,6 +115,9 @@ final class TreeBuilder Object[] r = current.toNode(); current.clear(); + + builderRecycler.recycle(this, recycleHandle); + return r; } @@ -114,6 +139,9 @@ final class TreeBuilder Object[] r = current.toNode(); current.clear(); + + builderRecycler.recycle(this, recycleHandle); + return r; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java b/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java index 0eefae3..31894b1 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java +++ b/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java @@ -20,8 +20,6 @@ package org.apache.cassandra.utils.concurrent; import java.util.Arrays; -import org.apache.cassandra.utils.Throwables; - import static org.apache.cassandra.utils.Throwables.maybeFail; import static org.apache.cassandra.utils.Throwables.merge; @@ -35,7 +33,7 @@ public abstract class WrappedSharedCloseable extends SharedCloseableImpl public WrappedSharedCloseable(final AutoCloseable closeable) { - this(new AutoCloseable[] { closeable}); + this(new AutoCloseable[] {closeable}); } public WrappedSharedCloseable(final AutoCloseable[] closeable) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/memory/BufferPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java b/src/java/org/apache/cassandra/utils/memory/BufferPool.java index ad2404f..68f8911 100644 --- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java +++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java @@ -21,23 +21,23 @@ package org.apache.cassandra.utils.memory; import java.lang.ref.PhantomReference; import java.lang.ref.ReferenceQueue; import java.nio.ByteBuffer; -import java.util.*; +import java.util.Queue; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import org.apache.cassandra.concurrent.NamedThreadFactory; -import org.apache.cassandra.io.compress.BufferType; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.NoSpamLogger; - import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.util.concurrent.FastThreadLocal; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.BufferPoolMetrics; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.concurrent.Ref; /** @@ -68,7 +68,7 @@ public class BufferPool private static final GlobalPool globalPool = new GlobalPool(); /** A thread local pool of chunks, where chunks come from the global pool */ - private static final ThreadLocal<LocalPool> localPool = new ThreadLocal<LocalPool>() { + private static final FastThreadLocal<LocalPool> localPool = new FastThreadLocal<LocalPool>() { @Override protected LocalPool initialValue() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/vint/VIntCoding.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java index daf5006..3872424 100644 --- a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java +++ b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java @@ -50,6 +50,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import io.netty.util.concurrent.FastThreadLocal; import net.nicoulaj.compilecommand.annotations.Inline; /** @@ -103,7 +104,7 @@ public class VIntCoding return Integer.numberOfLeadingZeros(~firstByte) - 24; } - protected static final ThreadLocal<byte[]> encodingBuffer = new ThreadLocal<byte[]>() + protected static final FastThreadLocal<byte[]> encodingBuffer = new FastThreadLocal<byte[]>() { @Override public byte[] initialValue() http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/test/long/org/apache/cassandra/streaming/LongStreamingTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java new file mode 100644 index 0000000..7e53ba2 --- /dev/null +++ b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import com.google.common.io.Files; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.io.sstable.CQLSSTableWriter; +import org.apache.cassandra.io.sstable.SSTableLoader; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.OutputHandler; + +import static org.junit.Assert.assertEquals; + +public class LongStreamingTest +{ + @BeforeClass + public static void setup() throws Exception + { + SchemaLoader.cleanupAndLeaveDirs(); + Keyspace.setInitialized(); + StorageService.instance.initServer(); + + StorageService.instance.setCompactionThroughputMbPerSec(0); + StorageService.instance.setStreamThroughputMbPerSec(0); + StorageService.instance.setInterDCStreamThroughputMbPerSec(0); + } + + @AfterClass + public static void tearDown() + { + Config.setClientMode(false); + } + + @Test + public void testCompressedStream() throws InvalidRequestException, IOException, ExecutionException, InterruptedException + { + String KS = "cql_keyspace"; + String TABLE = "table1"; + + File tempdir = Files.createTempDir(); + File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE); + assert dataDir.mkdirs(); + + String schema = "CREATE TABLE cql_keyspace.table1 (" + + " k int PRIMARY KEY," + + " v1 text," + + " v2 int" + + ");";// with compression = {};"; + String insert = "INSERT INTO cql_keyspace.table1 (k, v1, v2) VALUES (?, ?, ?)"; + CQLSSTableWriter writer = CQLSSTableWriter.builder() + .sorted() + .inDirectory(dataDir) + .forTable(schema) + .using(insert).build(); + long start = System.nanoTime(); + + for (int i = 0; i < 10_000_000; i++) + writer.addRow(i, "test1", 24); + + writer.close(); + System.err.println(String.format("Writer finished after %d seconds....", TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start))); + + File[] dataFiles = dataDir.listFiles((dir, name) -> name.endsWith("-Data.db")); + long dataSize = 0l; + for (File file : dataFiles) + { + System.err.println("File : "+file.getAbsolutePath()); + dataSize += file.length(); + } + + SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client() + { + private String ks; + public void init(String keyspace) + { + for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace")) + addRangeForEndpoint(range, FBUtilities.getBroadcastAddress()); + + this.ks = keyspace; + } + + public CFMetaData getTableMetadata(String cfName) + { + return Schema.instance.getCFMetaData(ks, cfName); + } + }, new OutputHandler.SystemOutput(false, false)); + + start = System.nanoTime(); + loader.stream().get(); + + long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + System.err.println(String.format("Finished Streaming in %.2f seconds: %.2f Mb/sec", + millis/1000d, + (dataSize / (1 << 20) / (millis / 1000d)) * 8)); + + + //Stream again + loader = new SSTableLoader(dataDir, new SSTableLoader.Client() + { + private String ks; + public void init(String keyspace) + { + for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace")) + addRangeForEndpoint(range, FBUtilities.getBroadcastAddress()); + + this.ks = keyspace; + } + + public CFMetaData getTableMetadata(String cfName) + { + return Schema.instance.getCFMetaData(ks, cfName); + } + }, new OutputHandler.SystemOutput(false, false)); + + start = System.nanoTime(); + loader.stream().get(); + + millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + System.err.println(String.format("Finished Streaming in %.2f seconds: %.2f Mb/sec", + millis/1000d, + (dataSize / (1 << 20) / (millis / 1000d)) * 8)); + + + //Compact them both + start = System.nanoTime(); + Keyspace.open(KS).getColumnFamilyStore(TABLE).forceMajorCompaction(); + millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + + System.err.println(String.format("Finished Compacting in %.2f seconds: %.2f Mb/sec", + millis / 1000d, + (dataSize * 2 / (1 << 20) / (millis / 1000d)) * 8)); + + UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace.table1 limit 100;"); + assertEquals(100, rs.size()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java index ebacf34..244018e 100644 --- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java +++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java @@ -28,6 +28,8 @@ import java.util.Iterator; import java.util.List; import com.google.common.primitives.Ints; +import org.junit.Assert; +import org.junit.Test; import org.apache.cassandra.Util; import org.apache.cassandra.cache.IMeasurableMemory; @@ -37,12 +39,12 @@ import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.db.columniterator.AbstractSSTableIterator; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.LongType; +import org.apache.cassandra.db.partitions.ImmutableBTreePartition; import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator; import org.apache.cassandra.db.rows.BTreeRow; import org.apache.cassandra.db.rows.BufferCell; import org.apache.cassandra.db.rows.ColumnData; import org.apache.cassandra.db.rows.EncodingStats; -import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.RangeTombstoneMarker; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.Unfiltered; @@ -67,9 +69,6 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.btree.BTree; -import org.junit.Assert; -import org.junit.Test; - import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertTrue; @@ -191,7 +190,7 @@ public class RowIndexEntryTest extends CQLTester rieNew = RowIndexEntry.create(startPosition, 0L, deletionInfo, columnIndex.headerLength, columnIndex.columnIndexCount, columnIndex.indexInfoSerializedSize(), - columnIndex.indexSamples, columnIndex.offsets(), + columnIndex.indexSamples(), columnIndex.offsets(), rieSerializer.indexInfoSerializer()); rieSerializer.serialize(rieNew, rieOutput, columnIndex.buffer()); rieNewSerialized = rieOutput.buffer().duplicate(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/test/unit/org/apache/cassandra/utils/BTreeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/BTreeTest.java b/test/unit/org/apache/cassandra/utils/BTreeTest.java index ffd7315..a01ad2e 100644 --- a/test/unit/org/apache/cassandra/utils/BTreeTest.java +++ b/test/unit/org/apache/cassandra/utils/BTreeTest.java @@ -214,14 +214,16 @@ public class BTreeTest builder.add(i); // for sorted input, check non-resolve path works before checking resolution path checkResolverOutput(count, builder.build(), BTree.Dir.ASC); - builder.reuse(); + builder = BTree.builder(Comparator.naturalOrder()); + builder.setQuickResolver(resolver); for (int i = 0 ; i < 10 ; i++) { // now do a few runs of randomized inputs for (Accumulator j : resolverInput(count, true)) builder.add(j); checkResolverOutput(count, builder.build(), BTree.Dir.ASC); - builder.reuse(); + builder = BTree.builder(Comparator.naturalOrder()); + builder.setQuickResolver(resolver); } for (List<Accumulator> add : splitResolverInput(count)) { @@ -231,7 +233,6 @@ public class BTreeTest builder.addAll(new TreeSet<>(add)); } checkResolverOutput(count, builder.build(), BTree.Dir.ASC); - builder.reuse(); } } @@ -278,7 +279,14 @@ public class BTreeTest builder.add(i); // for sorted input, check non-resolve path works before checking resolution path Assert.assertTrue(Iterables.elementsEqual(sorted, BTree.iterable(builder.build()))); + + builder = BTree.builder(Comparator.naturalOrder()); + builder.auto(false); + for (Accumulator i : sorted) + builder.add(i); + // check resolution path checkResolverOutput(count, builder.resolve(resolver).build(), BTree.Dir.ASC); + builder = BTree.builder(Comparator.naturalOrder()); builder.auto(false); for (int i = 0 ; i < 10 ; i++) @@ -287,11 +295,13 @@ public class BTreeTest for (Accumulator j : resolverInput(count, true)) builder.add(j); checkResolverOutput(count, builder.sort().resolve(resolver).build(), BTree.Dir.ASC); - builder.reuse(); + builder = BTree.builder(Comparator.naturalOrder()); + builder.auto(false); for (Accumulator j : resolverInput(count, true)) builder.add(j); checkResolverOutput(count, builder.sort().reverse().resolve(resolver).build(), BTree.Dir.DESC); - builder.reuse(); + builder = BTree.builder(Comparator.naturalOrder()); + builder.auto(false); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java index b6b1882..94aac9e 100644 --- a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java +++ b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java @@ -22,6 +22,7 @@ import java.util.LinkedHashMap; import java.util.Map; import org.junit.Test; + import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBuffer; @@ -50,11 +51,11 @@ public class StreamingHistogramTest expected1.put(36.0, 1L); Iterator<Map.Entry<Double, Long>> expectedItr = expected1.entrySet().iterator(); - for (Map.Entry<Double, Long> actual : hist.getAsMap().entrySet()) + for (Map.Entry<Number, long[]> actual : hist.getAsMap().entrySet()) { Map.Entry<Double, Long> entry = expectedItr.next(); - assertEquals(entry.getKey(), actual.getKey(), 0.01); - assertEquals(entry.getValue(), actual.getValue()); + assertEquals(entry.getKey(), actual.getKey().doubleValue(), 0.01); + assertEquals(entry.getValue().longValue(), actual.getValue()[0]); } // merge test @@ -72,11 +73,11 @@ public class StreamingHistogramTest expected2.put(32.67, 3L); expected2.put(45.0, 1L); expectedItr = expected2.entrySet().iterator(); - for (Map.Entry<Double, Long> actual : hist.getAsMap().entrySet()) + for (Map.Entry<Number, long[]> actual : hist.getAsMap().entrySet()) { Map.Entry<Double, Long> entry = expectedItr.next(); - assertEquals(entry.getKey(), actual.getKey(), 0.01); - assertEquals(entry.getValue(), actual.getValue()); + assertEquals(entry.getKey(), actual.getKey().doubleValue(), 0.01); + assertEquals(entry.getValue().longValue(), actual.getValue()[0]); } // sum test @@ -112,11 +113,40 @@ public class StreamingHistogramTest expected1.put(36.0, 1L); Iterator<Map.Entry<Double, Long>> expectedItr = expected1.entrySet().iterator(); - for (Map.Entry<Double, Long> actual : deserialized.getAsMap().entrySet()) + for (Map.Entry<Number, long[]> actual : deserialized.getAsMap().entrySet()) { Map.Entry<Double, Long> entry = expectedItr.next(); - assertEquals(entry.getKey(), actual.getKey(), 0.01); - assertEquals(entry.getValue(), actual.getValue()); + assertEquals(entry.getKey(), actual.getKey().doubleValue(), 0.01); + assertEquals(entry.getValue().longValue(), actual.getValue()[0]); } } + + + @Test + public void testNumericTypes() throws Exception + { + StreamingHistogram hist = new StreamingHistogram(5); + + hist.update(2); + hist.update(2.0); + hist.update(2L); + + Map<Number, long[]> asMap = hist.getAsMap(); + + assertEquals(1, asMap.size()); + assertEquals(3L, asMap.get(2)[0]); + + //Make sure it's working with Serde + DataOutputBuffer out = new DataOutputBuffer(); + StreamingHistogram.serializer.serialize(hist, out); + byte[] bytes = out.toByteArray(); + + StreamingHistogram deserialized = StreamingHistogram.serializer.deserialize(new DataInputBuffer(bytes)); + + deserialized.update(2L); + + asMap = deserialized.getAsMap(); + assertEquals(1, asMap.size()); + assertEquals(4L, asMap.get(2)[0]); + } }
