Merge branch 'cassandra-2.1' into trunk Conflicts: src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ce76e1e6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ce76e1e6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ce76e1e6 Branch: refs/heads/trunk Commit: ce76e1e63c80318671b9260b0faca7f4557f5d14 Parents: 62ee147 4eb9fa7 Author: Benedict Elliott Smith <bened...@apache.org> Authored: Thu Feb 12 13:39:36 2015 +0000 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Thu Feb 12 13:39:36 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../io/compress/CompressedSequentialWriter.java | 2 + .../io/sstable/IndexSummaryBuilder.java | 162 ++++++++++++++----- .../io/sstable/format/big/BigTableWriter.java | 77 +++++---- .../cassandra/io/util/SequentialWriter.java | 10 ++ 5 files changed, 168 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce76e1e6/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index f2ac518,cbb4334..3dd723a --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,63 -1,5 +1,64 @@@ +3.0 + * Add role based access control (CASSANDRA-7653, 8650) + * Avoid accessing partitioner through StorageProxy (CASSANDRA-8244, 8268) + * Upgrade Metrics library and remove depricated metrics (CASSANDRA-5657) + * Serializing Row cache alternative, fully off heap (CASSANDRA-7438) + * Duplicate rows returned when in clause has repeated values (CASSANDRA-6707) + * Make CassandraException unchecked, extend RuntimeException (CASSANDRA-8560) + * Support direct buffer decompression for reads (CASSANDRA-8464) + * DirectByteBuffer compatible LZ4 methods (CASSANDRA-7039) + * Group sstables for anticompaction correctly (CASSANDRA-8578) + * Add ReadFailureException to native protocol, respond + immediately when replicas encounter errors while handling + a read request (CASSANDRA-7886) + * Switch CommitLogSegment from RandomAccessFile to nio (CASSANDRA-8308) + * Allow mixing token and partition key restrictions (CASSANDRA-7016) + * Support index key/value entries on map collections (CASSANDRA-8473) + * Modernize schema tables (CASSANDRA-8261) + * Support for user-defined aggregation functions (CASSANDRA-8053) + * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419) + * Refactor SelectStatement, return IN results in natural order instead + of IN value list order and ignore duplicate values in partition key IN restrictions (CASSANDRA-7981) + * Support UDTs, tuples, and collections in user-defined + functions (CASSANDRA-7563) + * Fix aggregate fn results on empty selection, result column name, + and cqlsh parsing (CASSANDRA-8229) + * Mark sstables as repaired after full repair (CASSANDRA-7586) + * Extend Descriptor to include a format value and refactor reader/writer + APIs (CASSANDRA-7443) + * Integrate JMH for microbenchmarks (CASSANDRA-8151) + * Keep sstable levels when bootstrapping (CASSANDRA-7460) + * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838) + * Support for aggregation functions (CASSANDRA-4914) + * Remove cassandra-cli (CASSANDRA-7920) + * Accept dollar quoted strings in CQL (CASSANDRA-7769) + * Make assassinate a first class command (CASSANDRA-7935) + * Support IN clause on any partition key column (CASSANDRA-7855) + * Support IN clause on any clustering column (CASSANDRA-4762) + * Improve compaction logging (CASSANDRA-7818) + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917) + * Do anticompaction in groups (CASSANDRA-6851) + * Support user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929, + 7924, 7812, 8063, 7813, 7708) + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416) + * Move sstable RandomAccessReader to nio2, which allows using the + FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050) + * Remove CQL2 (CASSANDRA-5918) + * Add Thrift get_multi_slice call (CASSANDRA-6757) + * Optimize fetching multiple cells by name (CASSANDRA-6933) + * Allow compilation in java 8 (CASSANDRA-7028) + * Make incremental repair default (CASSANDRA-7250) + * Enable code coverage thru JaCoCo (CASSANDRA-7226) + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) + * Shorten SSTable path (CASSANDRA-6962) + * Use unsafe mutations for most unit tests (CASSANDRA-6969) + * Fix race condition during calculation of pending ranges (CASSANDRA-7390) + * Fail on very large batch sizes (CASSANDRA-8011) + * Improve concurrency of repair (CASSANDRA-6455, 8208) + + 2.1.4 + * Make SSTableWriter.openEarly more robust and obvious (CASSANDRA-8747) * Enforce SSTableReader.first/last (CASSANDRA-8744) * Cleanup SegmentedFile API (CASSANDRA-8749) * Avoid overlap with early compaction replacement (CASSANDRA-8683) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce76e1e6/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce76e1e6/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java index df326d7,3b93b31..eda7ca7 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java @@@ -45,6 -47,41 +47,41 @@@ public class IndexSummaryBuilde private long keysWritten = 0; private long indexIntervalMatches = 0; private long offheapSize = 0; + private long nextSamplePosition; + + // for each ReadableBoundary, we map its dataLength property to itself, permitting us to lookup the + // last readable boundary from the perspective of the data file + // [data file position limit] => [ReadableBoundary] + private TreeMap<Long, ReadableBoundary> lastReadableByData = new TreeMap<>(); + // for each ReadableBoundary, we map its indexLength property to itself, permitting us to lookup the + // last readable boundary from the perspective of the index file + // [index file position limit] => [ReadableBoundary] + private TreeMap<Long, ReadableBoundary> lastReadableByIndex = new TreeMap<>(); + // the last synced data file position + private long dataSyncPosition; + // the last synced index file position + private long indexSyncPosition; + + // the last summary interval boundary that is fully readable in both data and index files + private ReadableBoundary lastReadableBoundary; + + /** + * Represents a boundary that is guaranteed fully readable in the summary, index file and data file. + * The key contained is the last key readable if the index and data files have been flushed to the + * stored lengths. + */ + public static class ReadableBoundary + { - final DecoratedKey lastKey; - final long indexLength; - final long dataLength; ++ public final DecoratedKey lastKey; ++ public final long indexLength; ++ public final long dataLength; + public ReadableBoundary(DecoratedKey lastKey, long indexLength, long dataLength) + { + this.lastKey = lastKey; + this.indexLength = indexLength; + this.dataLength = dataLength; + } + } public IndexSummaryBuilder(long expectedKeys, int minIndexInterval, int samplingLevel) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce76e1e6/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index bba7550,0000000..89ecd99 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@@ -1,588 -1,0 +1,583 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.big; + +import java.io.DataInput; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.io.sstable.format.Version; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.compaction.AbstractCompactedRow; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.CompressedSequentialWriter; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.sstable.metadata.MetadataComponent; +import org.apache.cassandra.io.sstable.metadata.MetadataType; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.DataOutputStreamAndChannel; +import org.apache.cassandra.io.util.FileMark; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.SegmentedFile; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.FilterFactory; +import org.apache.cassandra.utils.IFilter; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.StreamingHistogram; + +public class BigTableWriter extends SSTableWriter +{ + private static final Logger logger = LoggerFactory.getLogger(BigTableWriter.class); + + // not very random, but the only value that can't be mistaken for a legal column-name length + public static final int END_OF_ROW = 0x0000; + + private IndexWriter iwriter; + private SegmentedFile.Builder dbuilder; + private final SequentialWriter dataFile; + private DecoratedKey lastWrittenKey; + private FileMark dataMark; + + BigTableWriter(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector) + { + super(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector); + - iwriter = new IndexWriter(keyCount); - + if (compression) + { + dataFile = SequentialWriter.open(getFilename(), + descriptor.filenameFor(Component.COMPRESSION_INFO), + metadata.compressionParameters(), + metadataCollector); + dbuilder = SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile); + } + else + { + dataFile = SequentialWriter.open(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC))); + dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); + } ++ iwriter = new IndexWriter(keyCount, dataFile); + } + + public void mark() + { + dataMark = dataFile.mark(); + iwriter.mark(); + } + + public void resetAndTruncate() + { + dataFile.resetAndTruncate(dataMark); + iwriter.resetAndTruncate(); + } + + /** + * Perform sanity checks on @param decoratedKey and @return the position in the data file before any data is written + */ + private long beforeAppend(DecoratedKey decoratedKey) + { + assert decoratedKey != null : "Keys must not be null"; // empty keys ARE allowed b/c of indexed column values + if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) >= 0) + throw new RuntimeException("Last written key " + lastWrittenKey + " >= current key " + decoratedKey + " writing into " + getFilename()); + return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer(); + } + + private void afterAppend(DecoratedKey decoratedKey, long dataPosition, RowIndexEntry index) + { + metadataCollector.addKey(decoratedKey.getKey()); + lastWrittenKey = decoratedKey; + last = lastWrittenKey; + if (first == null) + first = lastWrittenKey; + + if (logger.isTraceEnabled()) + logger.trace("wrote {} at {}", decoratedKey, dataPosition); - iwriter.append(decoratedKey, index); ++ iwriter.append(decoratedKey, index, dataPosition); + dbuilder.addPotentialBoundary(dataPosition); + } + + /** + * @param row + * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row + */ + public RowIndexEntry append(AbstractCompactedRow row) + { - long currentPosition = beforeAppend(row.key); ++ long startPosition = beforeAppend(row.key); + RowIndexEntry entry; + try + { - entry = row.write(currentPosition, dataFile); ++ entry = row.write(startPosition, dataFile); + if (entry == null) + return null; + } + catch (IOException e) + { + throw new FSWriteError(e, dataFile.getPath()); + } - metadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats()); - afterAppend(row.key, currentPosition, entry); ++ long endPosition = dataFile.getFilePointer(); ++ metadataCollector.update(endPosition - startPosition, row.columnStats()); ++ afterAppend(row.key, endPosition, entry); + return entry; + } + + public void append(DecoratedKey decoratedKey, ColumnFamily cf) + { + if (decoratedKey.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT) + { + logger.error("Key size {} exceeds maximum of {}, skipping row", + decoratedKey.getKey().remaining(), + FBUtilities.MAX_UNSIGNED_SHORT); + return; + } + + long startPosition = beforeAppend(decoratedKey); + try + { + RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, dataFile.stream); + afterAppend(decoratedKey, startPosition, entry); + } + catch (IOException e) + { + throw new FSWriteError(e, dataFile.getPath()); + } + metadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats()); + } + + private static RowIndexEntry rawAppend(ColumnFamily cf, long startPosition, DecoratedKey key, DataOutputPlus out) throws IOException + { + assert cf.hasColumns() || cf.isMarkedForDelete(); + + ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, key.getKey(), out); + ColumnIndex index = builder.build(cf); + + out.writeShort(END_OF_ROW); + return RowIndexEntry.create(startPosition, cf.deletionInfo().getTopLevelDeletion(), index); + } + + /** + * @throws IOException if a read from the DataInput fails + * @throws FSWriteError if a write to the dataFile fails + */ + public long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException + { + long currentPosition = beforeAppend(key); + + ColumnStats.MaxLongTracker maxTimestampTracker = new ColumnStats.MaxLongTracker(Long.MAX_VALUE); + ColumnStats.MinLongTracker minTimestampTracker = new ColumnStats.MinLongTracker(Long.MIN_VALUE); + ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE); + List<ByteBuffer> minColumnNames = Collections.emptyList(); + List<ByteBuffer> maxColumnNames = Collections.emptyList(); + StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE); + boolean hasLegacyCounterShards = false; + + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata); + cf.delete(DeletionTime.serializer.deserialize(in)); + + ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.getKey(), dataFile.stream); + + if (cf.deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE) + { + tombstones.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime); + maxDeletionTimeTracker.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime); + minTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt); + maxTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt); + } + + Iterator<RangeTombstone> rangeTombstoneIterator = cf.deletionInfo().rangeIterator(); + while (rangeTombstoneIterator.hasNext()) + { + RangeTombstone rangeTombstone = rangeTombstoneIterator.next(); + tombstones.update(rangeTombstone.getLocalDeletionTime()); + minTimestampTracker.update(rangeTombstone.timestamp()); + maxTimestampTracker.update(rangeTombstone.timestamp()); + maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime()); + minColumnNames = ColumnNameHelper.minComponents(minColumnNames, rangeTombstone.min, metadata.comparator); + maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, rangeTombstone.max, metadata.comparator); + } + + Iterator<OnDiskAtom> iter = AbstractCell.onDiskIterator(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version, metadata.comparator); + try + { + while (iter.hasNext()) + { + OnDiskAtom atom = iter.next(); + if (atom == null) + break; + + if (atom instanceof CounterCell) + { + atom = ((CounterCell) atom).markLocalToBeCleared(); + hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) atom).hasLegacyShards(); + } + + int deletionTime = atom.getLocalDeletionTime(); + if (deletionTime < Integer.MAX_VALUE) + tombstones.update(deletionTime); + minTimestampTracker.update(atom.timestamp()); + maxTimestampTracker.update(atom.timestamp()); + minColumnNames = ColumnNameHelper.minComponents(minColumnNames, atom.name(), metadata.comparator); + maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, atom.name(), metadata.comparator); + maxDeletionTimeTracker.update(atom.getLocalDeletionTime()); + + columnIndexer.add(atom); // This write the atom on disk too + } + + columnIndexer.maybeWriteEmptyRowHeader(); + dataFile.stream.writeShort(END_OF_ROW); + } + catch (IOException e) + { + throw new FSWriteError(e, dataFile.getPath()); + } + + metadataCollector.updateMinTimestamp(minTimestampTracker.get()) + .updateMaxTimestamp(maxTimestampTracker.get()) + .updateMaxLocalDeletionTime(maxDeletionTimeTracker.get()) + .addRowSize(dataFile.getFilePointer() - currentPosition) + .addColumnCount(columnIndexer.writtenAtomCount()) + .mergeTombstoneHistogram(tombstones) + .updateMinColumnNames(minColumnNames) + .updateMaxColumnNames(maxColumnNames) + .updateHasLegacyCounterShards(hasLegacyCounterShards); + + afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), columnIndexer.build())); + return currentPosition; + } + + /** + * After failure, attempt to close the index writer and data file before deleting all temp components for the sstable + */ + public void abort() + { + assert descriptor.type.isTemporary; + if (iwriter == null && dataFile == null) + return; + + if (iwriter != null) + iwriter.abort(); + + if (dataFile!= null) + dataFile.abort(); + + Set<Component> components = SSTable.componentsFor(descriptor); + try + { + if (!components.isEmpty()) + SSTable.delete(descriptor, components); + } + catch (FSWriteError e) + { + logger.error(String.format("Failed deleting temp components for %s", descriptor), e); + throw e; + } + } + + // we use this method to ensure any managed data we may have retained references to during the write are no + // longer referenced, so that we do not need to enclose the expensive call to closeAndOpenReader() in a transaction + public void isolateReferences() + { + // currently we only maintain references to first/last/lastWrittenKey from the data provided; all other + // data retention is done through copying + first = getMinimalKey(first); + last = lastWrittenKey = getMinimalKey(last); + } + + private Descriptor makeTmpLinks() + { + // create temp links if they don't already exist + Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK); + if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists()) + { + FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX))); + FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA))); + } + return link; + } + + public SSTableReader openEarly(long maxDataAge) + { + StatsMetadata sstableMetadata = (StatsMetadata) metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(), + metadata.getBloomFilterFpChance(), + repairedAt).get(MetadataType.STATS); + + // find the max (exclusive) readable key - DecoratedKey exclusiveUpperBoundOfReadableIndex = iwriter.getMaxReadableKey(0); - if (exclusiveUpperBoundOfReadableIndex == null) ++ IndexSummaryBuilder.ReadableBoundary boundary = iwriter.getMaxReadable(); ++ if (boundary == null) + return null; + ++ assert boundary.indexLength > 0 && boundary.dataLength > 0; + Descriptor link = makeTmpLinks(); + // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers + SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), FinishType.EARLY); + SegmentedFile dfile = dbuilder.complete(link.filenameFor(Component.DATA), FinishType.EARLY); + SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL), + components, metadata, + partitioner, ifile, - dfile, iwriter.summary.build(partitioner, exclusiveUpperBoundOfReadableIndex), ++ dfile, iwriter.summary.build(partitioner, boundary.lastKey), + iwriter.bf.sharedCopy(), maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY); + + // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed) + sstable.first = getMinimalKey(first); - sstable.last = getMinimalKey(exclusiveUpperBoundOfReadableIndex); - DecoratedKey inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(1); - if (inclusiveUpperBoundOfReadableData == null) - { - // Prevent leaving tmplink files on disk - sstable.selfRef().release(); - return null; - } - int offset = 2; - while (true) - { - RowIndexEntry indexEntry = sstable.getPosition(inclusiveUpperBoundOfReadableData, SSTableReader.Operator.GT); - if (indexEntry != null && indexEntry.position <= dataFile.getLastFlushOffset()) - break; - inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(offset++); - if (inclusiveUpperBoundOfReadableData == null) - { - sstable.selfRef().release(); - return null; - } - } - sstable.last = getMinimalKey(inclusiveUpperBoundOfReadableData); ++ sstable.last = getMinimalKey(boundary.lastKey); + return sstable; + } + + public SSTableReader closeAndOpenReader() + { + return closeAndOpenReader(System.currentTimeMillis()); + } + + public SSTableReader closeAndOpenReader(long maxDataAge) + { + return finish(FinishType.NORMAL, maxDataAge, this.repairedAt); + } + + public SSTableReader finish(FinishType finishType, long maxDataAge, long repairedAt) + { + assert finishType != FinishType.CLOSE; + Pair<Descriptor, StatsMetadata> p; + + p = close(finishType, repairedAt < 0 ? this.repairedAt : repairedAt); + Descriptor desc = p.left; + StatsMetadata metadata = p.right; + + if (finishType == FinishType.EARLY) + desc = makeTmpLinks(); + + // finalize in-memory state for the reader + SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX), finishType); + SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA), finishType); + SSTableReader sstable = SSTableReader.internalOpen(desc.asType(Descriptor.Type.FINAL), + components, + this.metadata, + partitioner, + ifile, + dfile, + iwriter.summary.build(partitioner), + iwriter.bf.sharedCopy(), + maxDataAge, + metadata, + finishType.openReason); + sstable.first = getMinimalKey(first); + sstable.last = getMinimalKey(last); + + if (finishType.isFinal) + { + iwriter.bf.close(); + // try to save the summaries to disk + sstable.saveSummary(iwriter.builder, dbuilder); + iwriter = null; + dbuilder = null; + } + return sstable; + } + + // Close the writer and return the descriptor to the new sstable and it's metadata + public Pair<Descriptor, StatsMetadata> close() + { + return close(FinishType.CLOSE, this.repairedAt); + } + + private Pair<Descriptor, StatsMetadata> close(FinishType type, long repairedAt) + { + switch (type) + { + case EARLY: case CLOSE: case NORMAL: + iwriter.close(); + dataFile.close(); + if (type == FinishType.CLOSE) + iwriter.bf.close(); + } + + // write sstable statistics + Map<MetadataType, MetadataComponent> metadataComponents; + metadataComponents = metadataCollector + .finalizeMetadata(partitioner.getClass().getCanonicalName(), + metadata.getBloomFilterFpChance(),repairedAt); + + // remove the 'tmp' marker from all components + Descriptor descriptor = this.descriptor; + if (type.isFinal) + { + dataFile.writeFullChecksum(descriptor); + writeMetadata(descriptor, metadataComponents); + // save the table of components + SSTable.appendTOC(descriptor, components); + descriptor = rename(descriptor, components); + } + + return Pair.create(descriptor, (StatsMetadata) metadataComponents.get(MetadataType.STATS)); + } + + private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components) + { + SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS))); + try + { + desc.getMetadataSerializer().serialize(components, out.stream); + } + catch (IOException e) + { + throw new FSWriteError(e, out.getPath()); + } + finally + { + out.close(); + } + } + + public long getFilePointer() + { + return dataFile.getFilePointer(); + } + + public long getOnDiskFilePointer() + { + return dataFile.getOnDiskFilePointer(); + } + + /** + * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed. + */ + class IndexWriter + { + private final SequentialWriter indexFile; + public final SegmentedFile.Builder builder; + public final IndexSummaryBuilder summary; + public final IFilter bf; + private FileMark mark; + - IndexWriter(long keyCount) ++ IndexWriter(long keyCount, final SequentialWriter dataFile) + { + indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))); + builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); + summary = new IndexSummaryBuilder(keyCount, metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL); + bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true); ++ // register listeners to be alerted when the data files are flushed ++ indexFile.setPostFlushListener(new Runnable() ++ { ++ public void run() ++ { ++ summary.markIndexSynced(indexFile.getLastFlushOffset()); ++ } ++ }); ++ dataFile.setPostFlushListener(new Runnable() ++ { ++ public void run() ++ { ++ summary.markDataSynced(dataFile.getLastFlushOffset()); ++ } ++ }); + } + + // finds the last (-offset) decorated key that can be guaranteed to occur fully in the flushed portion of the index file - DecoratedKey getMaxReadableKey(int offset) ++ IndexSummaryBuilder.ReadableBoundary getMaxReadable() + { - long maxIndexLength = indexFile.getLastFlushOffset(); - return summary.getMaxReadableKey(maxIndexLength, offset); ++ return summary.getLastReadableBoundary(); + } + - public void append(DecoratedKey key, RowIndexEntry indexEntry) ++ public void append(DecoratedKey key, RowIndexEntry indexEntry, long dataEnd) + { + bf.add(key.getKey()); - long indexPosition = indexFile.getFilePointer(); ++ long indexStart = indexFile.getFilePointer(); + try + { + ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile.stream); + rowIndexEntrySerializer.serialize(indexEntry, indexFile.stream); + } + catch (IOException e) + { + throw new FSWriteError(e, indexFile.getPath()); + } ++ long indexEnd = indexFile.getFilePointer(); + + if (logger.isTraceEnabled()) - logger.trace("wrote index entry: {} at {}", indexEntry, indexPosition); ++ logger.trace("wrote index entry: {} at {}", indexEntry, indexStart); + - summary.maybeAddEntry(key, indexPosition); - builder.addPotentialBoundary(indexPosition); ++ summary.maybeAddEntry(key, indexStart, indexEnd, dataEnd); ++ builder.addPotentialBoundary(indexStart); + } + + public void abort() + { + indexFile.abort(); + bf.close(); + } + + /** + * Closes the index and bloomfilter, making the public state of this writer valid for consumption. + */ + public void close() + { + if (components.contains(Component.FILTER)) + { + String path = descriptor.filenameFor(Component.FILTER); + try + { + // bloom filter + FileOutputStream fos = new FileOutputStream(path); + DataOutputStreamAndChannel stream = new DataOutputStreamAndChannel(fos); + FilterFactory.serialize(bf, stream); + stream.flush(); + fos.getFD().sync(); + stream.close(); + } + catch (IOException e) + { + throw new FSWriteError(e, path); + } + } + + // index + long position = indexFile.getFilePointer(); + indexFile.close(); // calls force + FileUtils.truncate(indexFile.getPath(), position); + } + + public void mark() + { + mark = indexFile.mark(); + } + + public void resetAndTruncate() + { + // we can't un-set the bloom filter addition, but extra keys in there are harmless. + // we can't reset dbuilder either, but that is the last thing called in afterappend so + // we assume that if that worked then we won't be trying to reset. + indexFile.resetAndTruncate(mark); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce76e1e6/src/java/org/apache/cassandra/io/util/SequentialWriter.java ----------------------------------------------------------------------