Merge branch 'cassandra-3.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/693e52d6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/693e52d6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/693e52d6 Branch: refs/heads/trunk Commit: 693e52d66a3e4c12e2b9c8fcbc770d5d1a0cad93 Parents: 4210a25 05bacc7 Author: Carl Yeksigian <c...@apache.org> Authored: Wed Jun 15 10:57:30 2016 -0400 Committer: Carl Yeksigian <c...@apache.org> Committed: Wed Jun 15 10:57:30 2016 -0400 ---------------------------------------------------------------------- .../org/apache/cassandra/cache/CacheSize.java | 22 +++++++++++++++++++- .../org/apache/cassandra/cache/ChunkCache.java | 20 ++++++++++++++++++ .../cql3/selection/SelectionColumnMapping.java | 20 ++++++++++++++++++ .../cql3/selection/SelectionColumns.java | 20 ++++++++++++++++++ .../apache/cassandra/db/ClusteringBound.java | 22 +++++++++++++++++++- .../cassandra/db/ClusteringBoundOrBoundary.java | 20 ++++++++++++++++++ .../apache/cassandra/db/ClusteringBoundary.java | 22 +++++++++++++++++++- .../EncryptedFileSegmentInputStream.java | 20 ++++++++++++++++++ .../db/lifecycle/LogAwareFileLister.java | 20 ++++++++++++++++++ .../apache/cassandra/db/lifecycle/LogFile.java | 20 ++++++++++++++++++ .../cassandra/db/lifecycle/LogRecord.java | 20 ++++++++++++++++++ .../db/lifecycle/SSTableIntervalTree.java | 20 ++++++++++++++++++ .../cassandra/db/lifecycle/SSTableSet.java | 22 +++++++++++++++++++- .../UnfilteredRowIteratorWithLowerBound.java | 20 ++++++++++++++++++ .../cassandra/db/transform/BaseIterator.java | 20 ++++++++++++++++++ .../cassandra/db/transform/BasePartitions.java | 20 ++++++++++++++++++ .../apache/cassandra/db/transform/BaseRows.java | 20 ++++++++++++++++++ .../apache/cassandra/db/transform/Filter.java | 20 ++++++++++++++++++ .../db/transform/FilteredPartitions.java | 20 ++++++++++++++++++ .../cassandra/db/transform/FilteredRows.java | 20 ++++++++++++++++++ .../cassandra/db/transform/MoreContents.java | 20 ++++++++++++++++++ .../cassandra/db/transform/MorePartitions.java | 20 ++++++++++++++++++ .../apache/cassandra/db/transform/MoreRows.java | 20 ++++++++++++++++++ .../apache/cassandra/db/transform/Stack.java | 20 ++++++++++++++++++ .../db/transform/StoppingTransformation.java | 20 ++++++++++++++++++ .../cassandra/db/transform/Transformation.java | 20 ++++++++++++++++++ .../db/transform/UnfilteredPartitions.java | 20 ++++++++++++++++++ .../cassandra/db/transform/UnfilteredRows.java | 20 ++++++++++++++++++ src/java/org/apache/cassandra/index/Index.java | 20 ++++++++++++++++++ .../apache/cassandra/index/IndexRegistry.java | 20 ++++++++++++++++++ .../index/internal/CassandraIndex.java | 20 ++++++++++++++++++ .../index/internal/CassandraIndexSearcher.java | 20 ++++++++++++++++++ .../cassandra/index/internal/IndexEntry.java | 20 ++++++++++++++++++ .../index/internal/keys/KeysIndex.java | 20 ++++++++++++++++++ .../cassandra/index/sasi/SASIIndexBuilder.java | 20 ++++++++++++++++++ .../io/util/BufferManagingRebufferer.java | 20 ++++++++++++++++++ .../cassandra/io/util/LimitingRebufferer.java | 20 ++++++++++++++++++ .../cassandra/io/util/MmapRebufferer.java | 22 +++++++++++++++++++- .../cassandra/locator/PendingRangeMaps.java | 20 ++++++++++++++++++ .../cassandra/repair/RepairParallelism.java | 20 ++++++++++++++++++ .../cassandra/tools/BulkLoadException.java | 20 ++++++++++++++++++ .../apache/cassandra/tools/JsonTransformer.java | 22 +++++++++++++++++++- .../apache/cassandra/tools/LoaderOptions.java | 22 +++++++++++++++++++- .../apache/cassandra/utils/OverlapIterator.java | 22 +++++++++++++++++++- .../utils/RMIServerSocketFactoryImpl.java | 20 ++++++++++++++++++ .../org/apache/cassandra/utils/SyncUtil.java | 20 ++++++++++++++++++ .../apache/cassandra/utils/concurrent/Ref.java | 20 ++++++++++++++++++ .../apache/cassandra/utils/concurrent/Refs.java | 20 ++++++++++++++++++ .../cassandra/utils/memory/EnsureOnHeap.java | 20 ++++++++++++++++++ .../io/compress/CompressorPerformance.java | 20 ++++++++++++++++++ .../test/microbench/PendingRangesBench.java | 20 ++++++++++++++++++ .../cassandra/cql3/IndexQueryPagingTest.java | 20 ++++++++++++++++++ .../selection/SelectionColumnMappingTest.java | 20 ++++++++++++++++++ .../statements/PropertyDefinitionsTest.java | 20 ++++++++++++++++++ .../miscellaneous/SSTablesIteratedTest.java | 22 +++++++++++++++++++- .../validation/operations/SelectLimitTest.java | 20 ++++++++++++++++++ .../SelectOrderedPartitionerTest.java | 20 ++++++++++++++++++ .../db/SinglePartitionSliceCommandTest.java | 20 ++++++++++++++++++ .../commitlog/CommitLogSegmentManagerTest.java | 22 +++++++++++++++++++- .../db/marshal/AbstractCompositeTypeTest.java | 20 ++++++++++++++++++ .../rows/RowAndDeletionMergeIteratorTest.java | 20 ++++++++++++++++++ .../gms/ArrayBackedBoundedStatsTest.java | 20 ++++++++++++++++++ .../apache/cassandra/index/CustomIndexTest.java | 20 ++++++++++++++++++ .../index/internal/CustomCassandraIndex.java | 20 ++++++++++++++++++ .../io/util/BufferedDataOutputStreamTest.java | 20 ++++++++++++++++++ .../io/util/NIODataInputStreamTest.java | 20 ++++++++++++++++++ .../io/util/RandomAccessReaderTest.java | 20 ++++++++++++++++++ .../cassandra/locator/PendingRangeMapsTest.java | 20 ++++++++++++++++++ .../metrics/CassandraMetricsRegistryTest.java | 20 ++++++++++++++++++ .../cassandra/net/MessagingServiceTest.java | 20 ++++++++++++++++++ .../cassandra/schema/IndexMetadataTest.java | 20 ++++++++++++++++++ .../cassandra/security/CipherFactoryTest.java | 20 ++++++++++++++++++ .../service/RMIServerSocketFactoryImplTest.java | 20 ++++++++++++++++++ .../apache/cassandra/utils/TopKSamplerTest.java | 20 ++++++++++++++++++ 74 files changed, 1490 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/693e52d6/src/java/org/apache/cassandra/cache/CacheSize.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cache/CacheSize.java index 561c73d,0000000..71365bb mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/cache/CacheSize.java +++ b/src/java/org/apache/cassandra/cache/CacheSize.java @@@ -1,14 -1,0 +1,34 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.cache; + +public interface CacheSize +{ + + long capacity(); + + void setCapacity(long capacity); + + int size(); + + long weightedSize(); + - } ++} http://git-wip-us.apache.org/repos/asf/cassandra/blob/693e52d6/src/java/org/apache/cassandra/cache/ChunkCache.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cache/ChunkCache.java index faf41b4,0000000..9c32746 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/cache/ChunkCache.java +++ b/src/java/org/apache/cassandra/cache/ChunkCache.java @@@ -1,301 -1,0 +1,321 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.cache; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.MoreExecutors; + +import com.github.benmanes.caffeine.cache.*; +import com.codahale.metrics.Timer; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.util.*; +import org.apache.cassandra.metrics.CacheMissMetrics; +import org.apache.cassandra.utils.memory.BufferPool; + +public class ChunkCache + implements CacheLoader<ChunkCache.Key, ChunkCache.Buffer>, RemovalListener<ChunkCache.Key, ChunkCache.Buffer>, CacheSize +{ + public static final int RESERVED_POOL_SPACE_IN_MB = 32; + public static final long cacheSize = 1024L * 1024L * Math.max(0, DatabaseDescriptor.getFileCacheSizeInMB() - RESERVED_POOL_SPACE_IN_MB); + + private static boolean enabled = cacheSize > 0; + public static final ChunkCache instance = enabled ? new ChunkCache() : null; + + private final LoadingCache<Key, Buffer> cache; + public final CacheMissMetrics metrics; + + static class Key + { + final ChunkReader file; + final String path; + final long position; + + public Key(ChunkReader file, long position) + { + super(); + this.file = file; + this.position = position; + this.path = file.channel().filePath(); + } + + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + path.hashCode(); + result = prime * result + file.getClass().hashCode(); + result = prime * result + Long.hashCode(position); + return result; + } + + public boolean equals(Object obj) + { + if (this == obj) + return true; + if (obj == null) + return false; + + Key other = (Key) obj; + return (position == other.position) + && file.getClass() == other.file.getClass() + && path.equals(other.path); + } + } + + static class Buffer implements Rebufferer.BufferHolder + { + private final ByteBuffer buffer; + private final long offset; + private final AtomicInteger references; + + public Buffer(ByteBuffer buffer, long offset) + { + this.buffer = buffer; + this.offset = offset; + references = new AtomicInteger(1); // start referenced. + } + + Buffer reference() + { + int refCount; + do + { + refCount = references.get(); + if (refCount == 0) + // Buffer was released before we managed to reference it. + return null; + } while (!references.compareAndSet(refCount, refCount + 1)); + + return this; + } + + @Override + public ByteBuffer buffer() + { + assert references.get() > 0; + return buffer.duplicate(); + } + + @Override + public long offset() + { + return offset; + } + + @Override + public void release() + { + if (references.decrementAndGet() == 0) + BufferPool.put(buffer); + } + } + + public ChunkCache() + { + cache = Caffeine.newBuilder() + .maximumWeight(cacheSize) + .executor(MoreExecutors.directExecutor()) + .weigher((key, buffer) -> ((Buffer) buffer).buffer.capacity()) + .removalListener(this) + .build(this); + metrics = new CacheMissMetrics("ChunkCache", this); + } + + @Override + public Buffer load(Key key) throws Exception + { + ChunkReader rebufferer = key.file; + metrics.misses.mark(); + try (Timer.Context ctx = metrics.missLatency.time()) + { + ByteBuffer buffer = BufferPool.get(key.file.chunkSize()); + assert buffer != null; + rebufferer.readChunk(key.position, buffer); + return new Buffer(buffer, key.position); + } + } + + @Override + public void onRemoval(Key key, Buffer buffer, RemovalCause cause) + { + buffer.release(); + } + + public void close() + { + cache.invalidateAll(); + } + + public RebuffererFactory wrap(ChunkReader file) + { + return new CachingRebufferer(file); + } + + public static RebuffererFactory maybeWrap(ChunkReader file) + { + if (!enabled) + return file; + + return instance.wrap(file); + } + + public void invalidatePosition(SegmentedFile dfile, long position) + { + if (!(dfile.rebuffererFactory() instanceof CachingRebufferer)) + return; + + ((CachingRebufferer) dfile.rebuffererFactory()).invalidate(position); + } + + public void invalidateFile(String fileName) + { + cache.invalidateAll(Iterables.filter(cache.asMap().keySet(), x -> x.path.equals(fileName))); + } + + @VisibleForTesting + public void enable(boolean enabled) + { + ChunkCache.enabled = enabled; + cache.invalidateAll(); + metrics.reset(); + } + + // TODO: Invalidate caches for obsoleted/MOVED_START tables? + + /** + * Rebufferer providing cached chunks where data is obtained from the specified ChunkReader. + * Thread-safe. One instance per SegmentedFile, created by ChunkCache.maybeWrap if the cache is enabled. + */ + class CachingRebufferer implements Rebufferer, RebuffererFactory + { + private final ChunkReader source; + final long alignmentMask; + + public CachingRebufferer(ChunkReader file) + { + source = file; + int chunkSize = file.chunkSize(); + assert Integer.bitCount(chunkSize) == 1; // Must be power of two + alignmentMask = -chunkSize; + } + + @Override + public Buffer rebuffer(long position) + { + try + { + metrics.requests.mark(); + long pageAlignedPos = position & alignmentMask; + Buffer buf; + do + buf = cache.get(new Key(source, pageAlignedPos)).reference(); + while (buf == null); + + return buf; + } + catch (Throwable t) + { + Throwables.propagateIfInstanceOf(t.getCause(), CorruptSSTableException.class); + throw Throwables.propagate(t); + } + } + + public void invalidate(long position) + { + long pageAlignedPos = position & alignmentMask; + cache.invalidate(new Key(source, pageAlignedPos)); + } + + @Override + public Rebufferer instantiateRebufferer() + { + return this; + } + + @Override + public void close() + { + source.close(); + } + + @Override + public void closeReader() + { + // Instance is shared among readers. Nothing to release. + } + + @Override + public ChannelProxy channel() + { + return source.channel(); + } + + @Override + public long fileLength() + { + return source.fileLength(); + } + + @Override + public double getCrcCheckChance() + { + return source.getCrcCheckChance(); + } + + @Override + public String toString() + { + return "CachingRebufferer:" + source.toString(); + } + } + + @Override + public long capacity() + { + return cacheSize; + } + + @Override + public void setCapacity(long capacity) + { + throw new UnsupportedOperationException("Chunk cache size cannot be changed."); + } + + @Override + public int size() + { + return cache.asMap().size(); + } + + @Override + public long weightedSize() + { + return cache.policy().eviction() + .map(policy -> policy.weightedSize().orElseGet(cache::estimatedSize)) + .orElseGet(cache::estimatedSize); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/693e52d6/src/java/org/apache/cassandra/db/ClusteringBound.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ClusteringBound.java index 6366a37,0000000..c45f7ba mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/ClusteringBound.java +++ b/src/java/org/apache/cassandra/db/ClusteringBound.java @@@ -1,151 -1,0 +1,171 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.cassandra.utils.memory.AbstractAllocator; + +/** + * The start or end of a range of clusterings, either inclusive or exclusive. + */ +public class ClusteringBound extends ClusteringBoundOrBoundary +{ + /** The smallest start bound, i.e. the one that starts before any row. */ + public static final ClusteringBound BOTTOM = new ClusteringBound(Kind.INCL_START_BOUND, EMPTY_VALUES_ARRAY); + /** The biggest end bound, i.e. the one that ends after any row. */ + public static final ClusteringBound TOP = new ClusteringBound(Kind.INCL_END_BOUND, EMPTY_VALUES_ARRAY); + + protected ClusteringBound(Kind kind, ByteBuffer[] values) + { + super(kind, values); + } + + public static ClusteringBound create(Kind kind, ByteBuffer[] values) + { + assert !kind.isBoundary(); + return new ClusteringBound(kind, values); + } + + public static Kind boundKind(boolean isStart, boolean isInclusive) + { + return isStart + ? (isInclusive ? Kind.INCL_START_BOUND : Kind.EXCL_START_BOUND) + : (isInclusive ? Kind.INCL_END_BOUND : Kind.EXCL_END_BOUND); + } + + public static ClusteringBound inclusiveStartOf(ByteBuffer... values) + { + return create(Kind.INCL_START_BOUND, values); + } + + public static ClusteringBound inclusiveEndOf(ByteBuffer... values) + { + return create(Kind.INCL_END_BOUND, values); + } + + public static ClusteringBound exclusiveStartOf(ByteBuffer... values) + { + return create(Kind.EXCL_START_BOUND, values); + } + + public static ClusteringBound exclusiveEndOf(ByteBuffer... values) + { + return create(Kind.EXCL_END_BOUND, values); + } + + public static ClusteringBound inclusiveStartOf(ClusteringPrefix prefix) + { + ByteBuffer[] values = new ByteBuffer[prefix.size()]; + for (int i = 0; i < prefix.size(); i++) + values[i] = prefix.get(i); + return inclusiveStartOf(values); + } + + public static ClusteringBound exclusiveStartOf(ClusteringPrefix prefix) + { + ByteBuffer[] values = new ByteBuffer[prefix.size()]; + for (int i = 0; i < prefix.size(); i++) + values[i] = prefix.get(i); + return exclusiveStartOf(values); + } + + public static ClusteringBound inclusiveEndOf(ClusteringPrefix prefix) + { + ByteBuffer[] values = new ByteBuffer[prefix.size()]; + for (int i = 0; i < prefix.size(); i++) + values[i] = prefix.get(i); + return inclusiveEndOf(values); + } + + public static ClusteringBound create(ClusteringComparator comparator, boolean isStart, boolean isInclusive, Object... values) + { + CBuilder builder = CBuilder.create(comparator); + for (Object val : values) + { + if (val instanceof ByteBuffer) + builder.add((ByteBuffer) val); + else + builder.add(val); + } + return builder.buildBound(isStart, isInclusive); + } + + @Override + public ClusteringBound invert() + { + return create(kind().invert(), values); + } + + public ClusteringBound copy(AbstractAllocator allocator) + { + return (ClusteringBound) super.copy(allocator); + } + + public boolean isStart() + { + return kind().isStart(); + } + + public boolean isEnd() + { + return !isStart(); + } + + public boolean isInclusive() + { + return kind == Kind.INCL_START_BOUND || kind == Kind.INCL_END_BOUND; + } + + public boolean isExclusive() + { + return kind == Kind.EXCL_START_BOUND || kind == Kind.EXCL_END_BOUND; + } + + // For use by intersects, it's called with the sstable bound opposite to the slice bound + // (so if the slice bound is a start, it's call with the max sstable bound) + int compareTo(ClusteringComparator comparator, List<ByteBuffer> sstableBound) + { + for (int i = 0; i < sstableBound.size(); i++) + { + // Say the slice bound is a start. It means we're in the case where the max + // sstable bound is say (1:5) while the slice start is (1). So the start + // does start before the sstable end bound (and intersect it). It's the exact + // inverse with a end slice bound. + if (i >= size()) + return isStart() ? -1 : 1; + + int cmp = comparator.compareComponent(i, get(i), sstableBound.get(i)); + if (cmp != 0) + return cmp; + } + + // Say the slice bound is a start. I means we're in the case where the max + // sstable bound is say (1), while the slice start is (1:5). This again means + // that the slice start before the end bound. + if (size() > sstableBound.size()) + return isStart() ? -1 : 1; + + // The slice bound is equal to the sstable bound. Results depends on whether the slice is inclusive or not + return isInclusive() ? 0 : (isStart() ? 1 : -1); + } - } ++} http://git-wip-us.apache.org/repos/asf/cassandra/blob/693e52d6/src/java/org/apache/cassandra/db/ClusteringBoundOrBoundary.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ClusteringBoundOrBoundary.java index 0c965e9,0000000..7a2cce1 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/ClusteringBoundOrBoundary.java +++ b/src/java/org/apache/cassandra/db/ClusteringBoundOrBoundary.java @@@ -1,163 -1,0 +1,183 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.memory.AbstractAllocator; + +/** + * This class defines a threshold between ranges of clusterings. It can either be a start or end bound of a range, or + * the boundary between two different defined ranges. + * <p> + * The latter is used for range tombstones for 2 main reasons: + * 1) When merging multiple iterators having range tombstones (that are represented by their start and end markers), + * we need to know when a range is close on an iterator, if it is reopened right away. Otherwise, we cannot + * easily produce the markers on the merged iterators within risking to fail the sorting guarantees of an + * iterator. See this comment for more details: https://goo.gl/yyB5mR. + * 2) This saves some storage space. + */ +public abstract class ClusteringBoundOrBoundary extends AbstractBufferClusteringPrefix +{ + public static final ClusteringBoundOrBoundary.Serializer serializer = new Serializer(); + + protected ClusteringBoundOrBoundary(Kind kind, ByteBuffer[] values) + { + super(kind, values); + assert values.length > 0 || !kind.isBoundary(); + } + + public static ClusteringBoundOrBoundary create(Kind kind, ByteBuffer[] values) + { + return kind.isBoundary() + ? new ClusteringBoundary(kind, values) + : new ClusteringBound(kind, values); + } + + public boolean isBoundary() + { + return kind.isBoundary(); + } + + public boolean isOpen(boolean reversed) + { + return kind.isOpen(reversed); + } + + public boolean isClose(boolean reversed) + { + return kind.isClose(reversed); + } + + public static ClusteringBound inclusiveOpen(boolean reversed, ByteBuffer[] boundValues) + { + return new ClusteringBound(reversed ? Kind.INCL_END_BOUND : Kind.INCL_START_BOUND, boundValues); + } + + public static ClusteringBound exclusiveOpen(boolean reversed, ByteBuffer[] boundValues) + { + return new ClusteringBound(reversed ? Kind.EXCL_END_BOUND : Kind.EXCL_START_BOUND, boundValues); + } + + public static ClusteringBound inclusiveClose(boolean reversed, ByteBuffer[] boundValues) + { + return new ClusteringBound(reversed ? Kind.INCL_START_BOUND : Kind.INCL_END_BOUND, boundValues); + } + + public static ClusteringBound exclusiveClose(boolean reversed, ByteBuffer[] boundValues) + { + return new ClusteringBound(reversed ? Kind.EXCL_START_BOUND : Kind.EXCL_END_BOUND, boundValues); + } + + public static ClusteringBoundary inclusiveCloseExclusiveOpen(boolean reversed, ByteBuffer[] boundValues) + { + return new ClusteringBoundary(reversed ? Kind.EXCL_END_INCL_START_BOUNDARY : Kind.INCL_END_EXCL_START_BOUNDARY, boundValues); + } + + public static ClusteringBoundary exclusiveCloseInclusiveOpen(boolean reversed, ByteBuffer[] boundValues) + { + return new ClusteringBoundary(reversed ? Kind.INCL_END_EXCL_START_BOUNDARY : Kind.EXCL_END_INCL_START_BOUNDARY, boundValues); + } + + public ClusteringBoundOrBoundary copy(AbstractAllocator allocator) + { + ByteBuffer[] newValues = new ByteBuffer[size()]; + for (int i = 0; i < size(); i++) + newValues[i] = allocator.clone(get(i)); + return create(kind(), newValues); + } + + public String toString(CFMetaData metadata) + { + return toString(metadata.comparator); + } + + public String toString(ClusteringComparator comparator) + { + StringBuilder sb = new StringBuilder(); + sb.append(kind()).append('('); + for (int i = 0; i < size(); i++) + { + if (i > 0) + sb.append(", "); + sb.append(comparator.subtype(i).getString(get(i))); + } + return sb.append(')').toString(); + } + + /** + * Returns the inverse of the current bound. + * <p> + * This invert both start into end (and vice-versa) and inclusive into exclusive (and vice-versa). + * + * @return the invert of this bound. For instance, if this bound is an exlusive start, this return + * an inclusive end with the same values. + */ + public abstract ClusteringBoundOrBoundary invert(); + + public static class Serializer + { + public void serialize(ClusteringBoundOrBoundary bound, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException + { + out.writeByte(bound.kind().ordinal()); + out.writeShort(bound.size()); + ClusteringPrefix.serializer.serializeValuesWithoutSize(bound, out, version, types); + } + + public long serializedSize(ClusteringBoundOrBoundary bound, int version, List<AbstractType<?>> types) + { + return 1 // kind ordinal + + TypeSizes.sizeof((short)bound.size()) + + ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(bound, version, types); + } + + public ClusteringBoundOrBoundary deserialize(DataInputPlus in, int version, List<AbstractType<?>> types) throws IOException + { + Kind kind = Kind.values()[in.readByte()]; + return deserializeValues(in, kind, version, types); + } + + public void skipValues(DataInputPlus in, Kind kind, int version, List<AbstractType<?>> types) throws IOException + { + int size = in.readUnsignedShort(); + if (size == 0) + return; + + ClusteringPrefix.serializer.skipValuesWithoutSize(in, size, version, types); + } + + public ClusteringBoundOrBoundary deserializeValues(DataInputPlus in, Kind kind, int version, List<AbstractType<?>> types) throws IOException + { + int size = in.readUnsignedShort(); + if (size == 0) + return kind.isStart() ? ClusteringBound.BOTTOM : ClusteringBound.TOP; + + ByteBuffer[] values = ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, size, version, types); + return create(kind, values); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/693e52d6/src/java/org/apache/cassandra/db/ClusteringBoundary.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ClusteringBoundary.java index 503eaad,0000000..37b3210 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/ClusteringBoundary.java +++ b/src/java/org/apache/cassandra/db/ClusteringBoundary.java @@@ -1,45 -1,0 +1,65 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.utils.memory.AbstractAllocator; + +/** + * The threshold between two different ranges, i.e. a shortcut for the combination of two ClusteringBounds -- one + * specifying the end of one of the ranges, and its (implicit) complement specifying the beginning of the other. + */ +public class ClusteringBoundary extends ClusteringBoundOrBoundary +{ + protected ClusteringBoundary(Kind kind, ByteBuffer[] values) + { + super(kind, values); + } + + public static ClusteringBoundary create(Kind kind, ByteBuffer[] values) + { + assert kind.isBoundary(); + return new ClusteringBoundary(kind, values); + } + + @Override + public ClusteringBoundary invert() + { + return create(kind().invert(), values); + } + + @Override + public ClusteringBoundary copy(AbstractAllocator allocator) + { + return (ClusteringBoundary) super.copy(allocator); + } + + public ClusteringBound openBound(boolean reversed) + { + return ClusteringBound.create(kind.openBoundOfBoundary(reversed), values); + } + + public ClusteringBound closeBound(boolean reversed) + { + return ClusteringBound.create(kind.closeBoundOfBoundary(reversed), values); + } - } ++} http://git-wip-us.apache.org/repos/asf/cassandra/blob/693e52d6/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java index 56bb7d6,0000000..cd7f7cb mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java +++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java @@@ -1,73 -1,0 +1,93 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db.commitlog; + +import java.io.DataInput; +import java.nio.ByteBuffer; + +import org.apache.cassandra.io.util.DataPosition; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.FileSegmentInputStream; + +/** + * Each segment of an encrypted file may contain many encrypted chunks, and each chunk needs to be individually decrypted + * to reconstruct the full segment. + */ +public class EncryptedFileSegmentInputStream extends FileSegmentInputStream implements FileDataInput, DataInput +{ + private final long segmentOffset; + private final int expectedLength; + private final ChunkProvider chunkProvider; + + /** + * offset the decrypted chunks already processed in this segment. + */ + private int totalChunkOffset; + + public EncryptedFileSegmentInputStream(String filePath, long segmentOffset, int position, int expectedLength, ChunkProvider chunkProvider) + { + super(chunkProvider.nextChunk(), filePath, position); + this.segmentOffset = segmentOffset; + this.expectedLength = expectedLength; + this.chunkProvider = chunkProvider; + } + + public interface ChunkProvider + { + /** + * Get the next chunk from the backing provider, if any chunks remain. + * @return Next chunk, else null if no more chunks remain. + */ + ByteBuffer nextChunk(); + } + + public long getFilePointer() + { + return segmentOffset + totalChunkOffset + buffer.position(); + } + + public boolean isEOF() + { + return totalChunkOffset + buffer.position() >= expectedLength; + } + + public long bytesRemaining() + { + return expectedLength - (totalChunkOffset + buffer.position()); + } + + public void seek(long position) + { + // implement this when we actually need it + throw new UnsupportedOperationException(); + } + + public long bytesPastMark(DataPosition mark) + { + throw new UnsupportedOperationException(); + } + + public void reBuffer() + { + totalChunkOffset += buffer.position(); + buffer = chunkProvider.nextChunk(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/693e52d6/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/693e52d6/src/java/org/apache/cassandra/db/lifecycle/LogFile.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/693e52d6/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/693e52d6/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java index fd74fc6,0000000..14730ac mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java @@@ -1,220 -1,0 +1,240 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db.rows; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Comparator; +import java.util.List; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.io.sstable.IndexInfo; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.thrift.ThriftResultsMerger; +import org.apache.cassandra.utils.IteratorWithLowerBound; + +/** + * An unfiltered row iterator with a lower bound retrieved from either the global + * sstable statistics or the row index lower bounds (if available in the cache). + * Before initializing the sstable unfiltered row iterator, we return an empty row + * with the clustering set to the lower bound. The empty row will be filtered out and + * the result is that if we don't need to access this sstable, i.e. due to the LIMIT conditon, + * then we will not. See CASSANDRA-8180 for examples of why this is useful. + */ +public class UnfilteredRowIteratorWithLowerBound extends LazilyInitializedUnfilteredRowIterator implements IteratorWithLowerBound<Unfiltered> +{ + private final SSTableReader sstable; + private final ClusteringIndexFilter filter; + private final ColumnFilter selectedColumns; + private final boolean isForThrift; + private final int nowInSec; + private final boolean applyThriftTransformation; + private ClusteringBound lowerBound; + private boolean firstItemRetrieved; + + public UnfilteredRowIteratorWithLowerBound(DecoratedKey partitionKey, + SSTableReader sstable, + ClusteringIndexFilter filter, + ColumnFilter selectedColumns, + boolean isForThrift, + int nowInSec, + boolean applyThriftTransformation) + { + super(partitionKey); + this.sstable = sstable; + this.filter = filter; + this.selectedColumns = selectedColumns; + this.isForThrift = isForThrift; + this.nowInSec = nowInSec; + this.applyThriftTransformation = applyThriftTransformation; + this.lowerBound = null; + this.firstItemRetrieved = false; + } + + public Unfiltered lowerBound() + { + if (lowerBound != null) + return makeBound(lowerBound); + + // The partition index lower bound is more accurate than the sstable metadata lower bound but it is only + // present if the iterator has already been initialized, which we only do when there are tombstones since in + // this case we cannot use the sstable metadata clustering values + ClusteringBound ret = getPartitionIndexLowerBound(); + return ret != null ? makeBound(ret) : makeBound(getMetadataLowerBound()); + } + + private Unfiltered makeBound(ClusteringBound bound) + { + if (bound == null) + return null; + + if (lowerBound != bound) + lowerBound = bound; + + return new RangeTombstoneBoundMarker(lowerBound, DeletionTime.LIVE); + } + + @Override + protected UnfilteredRowIterator initializeIterator() + { + sstable.incrementReadCount(); + + @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator + UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), selectedColumns, filter.isReversed(), isForThrift); + return isForThrift && applyThriftTransformation + ? ThriftResultsMerger.maybeWrap(iter, nowInSec) + : iter; + } + + @Override + protected Unfiltered computeNext() + { + Unfiltered ret = super.computeNext(); + if (firstItemRetrieved) + return ret; + + // Check that the lower bound is not bigger than the first item retrieved + firstItemRetrieved = true; + if (lowerBound != null && ret != null) + assert comparator().compare(lowerBound, ret.clustering()) <= 0 + : String.format("Lower bound [%s ]is bigger than first returned value [%s] for sstable %s", + lowerBound.toString(sstable.metadata), + ret.toString(sstable.metadata), + sstable.getFilename()); + + return ret; + } + + private Comparator<Clusterable> comparator() + { + return filter.isReversed() ? sstable.metadata.comparator.reversed() : sstable.metadata.comparator; + } + + @Override + public CFMetaData metadata() + { + return sstable.metadata; + } + + @Override + public boolean isReverseOrder() + { + return filter.isReversed(); + } + + @Override + public PartitionColumns columns() + { + return selectedColumns.fetchedColumns(); + } + + @Override + public EncodingStats stats() + { + return sstable.stats(); + } + + @Override + public DeletionTime partitionLevelDeletion() + { + if (!sstable.hasTombstones()) + return DeletionTime.LIVE; + + return super.partitionLevelDeletion(); + } + + @Override + public Row staticRow() + { + if (columns().statics.isEmpty()) + return Rows.EMPTY_STATIC_ROW; + + return super.staticRow(); + } + + /** + * @return the lower bound stored on the index entry for this partition, if available. + */ + private ClusteringBound getPartitionIndexLowerBound() + { + // NOTE: CASSANDRA-11206 removed the lookup against the key-cache as the IndexInfo objects are no longer + // in memory for not heap backed IndexInfo objects (so, these are on disk). + // CASSANDRA-11369 is there to fix this afterwards. + + // Creating the iterator ensures that rowIndexEntry is loaded if available (partitions bigger than + // DatabaseDescriptor.column_index_size_in_kb) + if (!canUseMetadataLowerBound()) + maybeInit(); + + RowIndexEntry rowIndexEntry = sstable.getCachedPosition(partitionKey(), false); + if (rowIndexEntry == null || !rowIndexEntry.indexOnHeap()) + return null; + + try (RowIndexEntry.IndexInfoRetriever onHeapRetriever = rowIndexEntry.openWithIndex(null)) + { + IndexInfo column = onHeapRetriever.columnsIndex(filter.isReversed() ? rowIndexEntry.columnsIndexCount() - 1 : 0); + ClusteringPrefix lowerBoundPrefix = filter.isReversed() ? column.lastName : column.firstName; + assert lowerBoundPrefix.getRawValues().length <= sstable.metadata.comparator.size() : + String.format("Unexpected number of clustering values %d, expected %d or fewer for %s", + lowerBoundPrefix.getRawValues().length, + sstable.metadata.comparator.size(), + sstable.getFilename()); + return ClusteringBound.inclusiveOpen(filter.isReversed(), lowerBoundPrefix.getRawValues()); + } + catch (IOException e) + { + throw new RuntimeException("should never occur", e); + } + } + + /** + * @return true if we can use the clustering values in the stats of the sstable: + * - we need the latest stats file format (or else the clustering values create clusterings with the wrong size) + * - we cannot create tombstone bounds from these values only and so we rule out sstables with tombstones + */ + private boolean canUseMetadataLowerBound() + { + return !sstable.hasTombstones() && sstable.descriptor.version.hasNewStatsFile(); + } + + /** + * @return a global lower bound made from the clustering values stored in the sstable metadata, note that + * this currently does not correctly compare tombstone bounds, especially ranges. + */ + private ClusteringBound getMetadataLowerBound() + { + if (!canUseMetadataLowerBound()) + return null; + + final StatsMetadata m = sstable.getSSTableMetadata(); + List<ByteBuffer> vals = filter.isReversed() ? m.maxClusteringValues : m.minClusteringValues; + assert vals.size() <= sstable.metadata.comparator.size() : + String.format("Unexpected number of clustering values %d, expected %d or fewer for %s", + vals.size(), + sstable.metadata.comparator.size(), + sstable.getFilename()); + return ClusteringBound.inclusiveOpen(filter.isReversed(), vals.toArray(new ByteBuffer[vals.size()])); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/693e52d6/src/java/org/apache/cassandra/db/transform/BaseRows.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/transform/BaseRows.java index 825db44,b0e642b..fb3b9f9 --- a/src/java/org/apache/cassandra/db/transform/BaseRows.java +++ b/src/java/org/apache/cassandra/db/transform/BaseRows.java @@@ -1,3 -1,23 +1,23 @@@ + /* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * -*/ ++ */ package org.apache.cassandra.db.transform; import org.apache.cassandra.config.CFMetaData; http://git-wip-us.apache.org/repos/asf/cassandra/blob/693e52d6/src/java/org/apache/cassandra/db/transform/Stack.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/693e52d6/src/java/org/apache/cassandra/db/transform/Transformation.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/transform/Transformation.java index 230be5f,6a31ece..3134725 --- a/src/java/org/apache/cassandra/db/transform/Transformation.java +++ b/src/java/org/apache/cassandra/db/transform/Transformation.java @@@ -1,6 -1,25 +1,26 @@@ + /* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.cassandra.db.transform; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.DeletionTime; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; http://git-wip-us.apache.org/repos/asf/cassandra/blob/693e52d6/src/java/org/apache/cassandra/index/Index.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/index/Index.java index 5cbdab8,469ef07..f855965 --- a/src/java/org/apache/cassandra/index/Index.java +++ b/src/java/org/apache/cassandra/index/Index.java @@@ -1,8 -1,26 +1,28 @@@ + /* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.cassandra.index; +import java.util.Collection; import java.util.Optional; +import java.util.Set; import java.util.concurrent.Callable; import java.util.function.BiFunction; http://git-wip-us.apache.org/repos/asf/cassandra/blob/693e52d6/src/java/org/apache/cassandra/index/internal/CassandraIndex.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/693e52d6/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/693e52d6/src/java/org/apache/cassandra/index/internal/IndexEntry.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/693e52d6/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java index 6eb8b0a,0000000..de8d69b mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java @@@ -1,131 -1,0 +1,151 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.index.sasi; + +import java.io.File; +import java.io.IOException; +import java.util.*; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.RowIndexEntry; +import org.apache.cassandra.db.compaction.CompactionInfo; +import org.apache.cassandra.db.compaction.CompactionInterruptedException; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.index.SecondaryIndexBuilder; +import org.apache.cassandra.index.sasi.conf.ColumnIndex; +import org.apache.cassandra.index.sasi.disk.PerSSTableIndexWriter; +import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.io.sstable.KeyIterator; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableIdentityIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.UUIDGen; + +class SASIIndexBuilder extends SecondaryIndexBuilder +{ + private final ColumnFamilyStore cfs; + private final UUID compactionId = UUIDGen.getTimeUUID(); + + private final SortedMap<SSTableReader, Map<ColumnDefinition, ColumnIndex>> sstables; + + private long bytesProcessed = 0; + private final long totalSizeInBytes; + + public SASIIndexBuilder(ColumnFamilyStore cfs, SortedMap<SSTableReader, Map<ColumnDefinition, ColumnIndex>> sstables) + { + long totalIndexBytes = 0; + for (SSTableReader sstable : sstables.keySet()) + totalIndexBytes += getPrimaryIndexLength(sstable); + + this.cfs = cfs; + this.sstables = sstables; + this.totalSizeInBytes = totalIndexBytes; + } + + public void build() + { + AbstractType<?> keyValidator = cfs.metadata.getKeyValidator(); + for (Map.Entry<SSTableReader, Map<ColumnDefinition, ColumnIndex>> e : sstables.entrySet()) + { + SSTableReader sstable = e.getKey(); + Map<ColumnDefinition, ColumnIndex> indexes = e.getValue(); + + try (RandomAccessReader dataFile = sstable.openDataReader()) + { + PerSSTableIndexWriter indexWriter = SASIIndex.newWriter(keyValidator, sstable.descriptor, indexes, OperationType.COMPACTION); + + long previousKeyPosition = 0; + try (KeyIterator keys = new KeyIterator(sstable.descriptor, cfs.metadata)) + { + while (keys.hasNext()) + { + if (isStopRequested()) + throw new CompactionInterruptedException(getCompactionInfo()); + + final DecoratedKey key = keys.next(); + final long keyPosition = keys.getKeyPosition(); + + indexWriter.startPartition(key, keyPosition); + + try + { + RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ); + dataFile.seek(indexEntry.position); + ByteBufferUtil.readWithShortLength(dataFile); // key + + try (SSTableIdentityIterator partition = new SSTableIdentityIterator(sstable, dataFile, key)) + { + // if the row has statics attached, it has to be indexed separately + indexWriter.nextUnfilteredCluster(partition.staticRow()); + + while (partition.hasNext()) + indexWriter.nextUnfilteredCluster(partition.next()); + } + } + catch (IOException ex) + { + throw new FSReadError(ex, sstable.getFilename()); + } + + bytesProcessed += keyPosition - previousKeyPosition; + previousKeyPosition = keyPosition; + } + + completeSSTable(indexWriter, sstable, indexes.values()); + } + } + } + } + + public CompactionInfo getCompactionInfo() + { + return new CompactionInfo(cfs.metadata, + OperationType.INDEX_BUILD, + bytesProcessed, + totalSizeInBytes, + compactionId); + } + + private long getPrimaryIndexLength(SSTable sstable) + { + File primaryIndex = new File(sstable.getIndexFilename()); + return primaryIndex.exists() ? primaryIndex.length() : 0; + } + + private void completeSSTable(PerSSTableIndexWriter indexWriter, SSTableReader sstable, Collection<ColumnIndex> indexes) + { + indexWriter.complete(); + + for (ColumnIndex index : indexes) + { + File tmpIndex = new File(sstable.descriptor.filenameFor(index.getComponent())); + if (!tmpIndex.exists()) // no data was inserted into the index for given sstable + continue; + + index.update(Collections.<SSTableReader>emptyList(), Collections.singletonList(sstable)); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/693e52d6/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java index 0e67364,0000000..95af31f mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java +++ b/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java @@@ -1,127 -1,0 +1,147 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.io.util; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.utils.memory.BufferPool; + +/** + * Buffer manager used for reading from a ChunkReader when cache is not in use. Instances of this class are + * reader-specific and thus do not need to be thread-safe since the reader itself isn't. + * + * The instances reuse themselves as the BufferHolder to avoid having to return a new object for each rebuffer call. + */ +public abstract class BufferManagingRebufferer implements Rebufferer, Rebufferer.BufferHolder +{ + protected final ChunkReader source; + protected final ByteBuffer buffer; + protected long offset = 0; + + public static BufferManagingRebufferer on(ChunkReader wrapped) + { + return wrapped.alignmentRequired() + ? new Aligned(wrapped) + : new Unaligned(wrapped); + } + + abstract long alignedPosition(long position); + + public BufferManagingRebufferer(ChunkReader wrapped) + { + this.source = wrapped; + buffer = RandomAccessReader.allocateBuffer(wrapped.chunkSize(), wrapped.preferredBufferType()); + buffer.limit(0); + } + + @Override + public void closeReader() + { + BufferPool.put(buffer); + offset = -1; + } + + @Override + public void close() + { + assert offset == -1; // reader must be closed at this point. + source.close(); + } + + @Override + public ChannelProxy channel() + { + return source.channel(); + } + + @Override + public long fileLength() + { + return source.fileLength(); + } + + @Override + public BufferHolder rebuffer(long position) + { + offset = alignedPosition(position); + source.readChunk(offset, buffer); + return this; + } + + @Override + public double getCrcCheckChance() + { + return source.getCrcCheckChance(); + } + + @Override + public String toString() + { + return "BufferManagingRebufferer." + getClass().getSimpleName() + ":" + source.toString(); + } + + // BufferHolder methods + + public ByteBuffer buffer() + { + return buffer; + } + + public long offset() + { + return offset; + } + + @Override + public void release() + { + // nothing to do, we don't delete buffers before we're closed. + } + + public static class Unaligned extends BufferManagingRebufferer + { + public Unaligned(ChunkReader wrapped) + { + super(wrapped); + } + + @Override + long alignedPosition(long position) + { + return position; + } + } + + public static class Aligned extends BufferManagingRebufferer + { + public Aligned(ChunkReader wrapped) + { + super(wrapped); + assert Integer.bitCount(wrapped.chunkSize()) == 1; + } + + @Override + long alignedPosition(long position) + { + return position & -buffer.capacity(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/693e52d6/src/java/org/apache/cassandra/io/util/LimitingRebufferer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/util/LimitingRebufferer.java index e69da70,0000000..a1e9715 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/io/util/LimitingRebufferer.java +++ b/src/java/org/apache/cassandra/io/util/LimitingRebufferer.java @@@ -1,106 -1,0 +1,126 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.io.util; + +import java.nio.ByteBuffer; + +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.RateLimiter; + +/** + * Rebufferer wrapper that applies rate limiting. + * + * Instantiated once per RandomAccessReader, thread-unsafe. + * The instances reuse themselves as the BufferHolder to avoid having to return a new object for each rebuffer call. + */ +public class LimitingRebufferer implements Rebufferer, Rebufferer.BufferHolder +{ + final private Rebufferer wrapped; + final private RateLimiter limiter; + final private int limitQuant; + + private BufferHolder bufferHolder; + private ByteBuffer buffer; + private long offset; + + public LimitingRebufferer(Rebufferer wrapped, RateLimiter limiter, int limitQuant) + { + this.wrapped = wrapped; + this.limiter = limiter; + this.limitQuant = limitQuant; + } + + @Override + public BufferHolder rebuffer(long position) + { + bufferHolder = wrapped.rebuffer(position); + buffer = bufferHolder.buffer(); + offset = bufferHolder.offset(); + int posInBuffer = Ints.checkedCast(position - offset); + int remaining = buffer.limit() - posInBuffer; + if (remaining == 0) + return this; + + if (remaining > limitQuant) + { + buffer.limit(posInBuffer + limitQuant); // certainly below current limit + remaining = limitQuant; + } + limiter.acquire(remaining); + return this; + } + + @Override + public ChannelProxy channel() + { + return wrapped.channel(); + } + + @Override + public long fileLength() + { + return wrapped.fileLength(); + } + + @Override + public double getCrcCheckChance() + { + return wrapped.getCrcCheckChance(); + } + + @Override + public void close() + { + wrapped.close(); + } + + @Override + public void closeReader() + { + wrapped.closeReader(); + } + + @Override + public String toString() + { + return "LimitingRebufferer[" + limiter.toString() + "]:" + wrapped.toString(); + } + + // BufferHolder methods + + @Override + public ByteBuffer buffer() + { + return buffer; + } + + @Override + public long offset() + { + return offset; + } + + @Override + public void release() + { + bufferHolder.release(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/693e52d6/src/java/org/apache/cassandra/io/util/MmapRebufferer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/util/MmapRebufferer.java index 6c39cb1,0000000..9d79919 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/io/util/MmapRebufferer.java +++ b/src/java/org/apache/cassandra/io/util/MmapRebufferer.java @@@ -1,49 -1,0 +1,69 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.io.util; + +/** + * Rebufferer for memory-mapped files. Thread-safe and shared among reader instances. + * This is simply a thin wrapper around MmappedRegions as the buffers there can be used directly after duplication. + */ +class MmapRebufferer extends AbstractReaderFileProxy implements Rebufferer, RebuffererFactory +{ + protected final MmappedRegions regions; + + public MmapRebufferer(ChannelProxy channel, long fileLength, MmappedRegions regions) + { + super(channel, fileLength); + this.regions = regions; + } + + @Override + public BufferHolder rebuffer(long position) + { + return regions.floor(position); + } + + @Override + public Rebufferer instantiateRebufferer() + { + return this; + } + + @Override + public void close() + { + regions.closeQuietly(); + } + + @Override + public void closeReader() + { + // Instance is shared among readers. Nothing to release. + } + + @Override + public String toString() + { + return String.format("%s(%s - data length %d)", + getClass().getSimpleName(), + channel.filePath(), + fileLength()); + } - } ++} http://git-wip-us.apache.org/repos/asf/cassandra/blob/693e52d6/src/java/org/apache/cassandra/tools/BulkLoadException.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tools/BulkLoadException.java index fb5d459,0000000..3c5c03d mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/tools/BulkLoadException.java +++ b/src/java/org/apache/cassandra/tools/BulkLoadException.java @@@ -1,13 -1,0 +1,33 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.tools; + +public class BulkLoadException extends Exception +{ + + private static final long serialVersionUID = 1L; + + public BulkLoadException(Throwable cause) + { + super(cause); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/693e52d6/src/java/org/apache/cassandra/tools/JsonTransformer.java ----------------------------------------------------------------------