Merge branch 'cassandra-2.1' into trunk

Conflicts:
        src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ce76e1e6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ce76e1e6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ce76e1e6

Branch: refs/heads/trunk
Commit: ce76e1e63c80318671b9260b0faca7f4557f5d14
Parents: 62ee147 4eb9fa7
Author: Benedict Elliott Smith <bened...@apache.org>
Authored: Thu Feb 12 13:39:36 2015 +0000
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Thu Feb 12 13:39:36 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../io/compress/CompressedSequentialWriter.java |   2 +
 .../io/sstable/IndexSummaryBuilder.java         | 162 ++++++++++++++-----
 .../io/sstable/format/big/BigTableWriter.java   |  77 +++++----
 .../cassandra/io/util/SequentialWriter.java     |  10 ++
 5 files changed, 168 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce76e1e6/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f2ac518,cbb4334..3dd723a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,63 -1,5 +1,64 @@@
 +3.0
 + * Add role based access control (CASSANDRA-7653, 8650)
 + * Avoid accessing partitioner through StorageProxy (CASSANDRA-8244, 8268)
 + * Upgrade Metrics library and remove depricated metrics (CASSANDRA-5657)
 + * Serializing Row cache alternative, fully off heap (CASSANDRA-7438)
 + * Duplicate rows returned when in clause has repeated values (CASSANDRA-6707)
 + * Make CassandraException unchecked, extend RuntimeException (CASSANDRA-8560)
 + * Support direct buffer decompression for reads (CASSANDRA-8464)
 + * DirectByteBuffer compatible LZ4 methods (CASSANDRA-7039)
 + * Group sstables for anticompaction correctly (CASSANDRA-8578)
 + * Add ReadFailureException to native protocol, respond
 +   immediately when replicas encounter errors while handling
 +   a read request (CASSANDRA-7886)
 + * Switch CommitLogSegment from RandomAccessFile to nio (CASSANDRA-8308)
 + * Allow mixing token and partition key restrictions (CASSANDRA-7016)
 + * Support index key/value entries on map collections (CASSANDRA-8473)
 + * Modernize schema tables (CASSANDRA-8261)
 + * Support for user-defined aggregation functions (CASSANDRA-8053)
 + * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419)
 + * Refactor SelectStatement, return IN results in natural order instead
 +   of IN value list order and ignore duplicate values in partition key IN 
restrictions (CASSANDRA-7981)
 + * Support UDTs, tuples, and collections in user-defined
 +   functions (CASSANDRA-7563)
 + * Fix aggregate fn results on empty selection, result column name,
 +   and cqlsh parsing (CASSANDRA-8229)
 + * Mark sstables as repaired after full repair (CASSANDRA-7586)
 + * Extend Descriptor to include a format value and refactor reader/writer
 +   APIs (CASSANDRA-7443)
 + * Integrate JMH for microbenchmarks (CASSANDRA-8151)
 + * Keep sstable levels when bootstrapping (CASSANDRA-7460)
 + * Add Sigar library and perform basic OS settings check on startup 
(CASSANDRA-7838)
 + * Support for aggregation functions (CASSANDRA-4914)
 + * Remove cassandra-cli (CASSANDRA-7920)
 + * Accept dollar quoted strings in CQL (CASSANDRA-7769)
 + * Make assassinate a first class command (CASSANDRA-7935)
 + * Support IN clause on any partition key column (CASSANDRA-7855)
 + * Support IN clause on any clustering column (CASSANDRA-4762)
 + * Improve compaction logging (CASSANDRA-7818)
 + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
 + * Do anticompaction in groups (CASSANDRA-6851)
 + * Support user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 
7929,
 +   7924, 7812, 8063, 7813, 7708)
 + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
 + * Move sstable RandomAccessReader to nio2, which allows using the
 +   FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
 + * Remove CQL2 (CASSANDRA-5918)
 + * Add Thrift get_multi_slice call (CASSANDRA-6757)
 + * Optimize fetching multiple cells by name (CASSANDRA-6933)
 + * Allow compilation in java 8 (CASSANDRA-7028)
 + * Make incremental repair default (CASSANDRA-7250)
 + * Enable code coverage thru JaCoCo (CASSANDRA-7226)
 + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) 
 + * Shorten SSTable path (CASSANDRA-6962)
 + * Use unsafe mutations for most unit tests (CASSANDRA-6969)
 + * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
 + * Fail on very large batch sizes (CASSANDRA-8011)
 + * Improve concurrency of repair (CASSANDRA-6455, 8208)
 +
 +
  2.1.4
+  * Make SSTableWriter.openEarly more robust and obvious (CASSANDRA-8747)
   * Enforce SSTableReader.first/last (CASSANDRA-8744)
   * Cleanup SegmentedFile API (CASSANDRA-8749)
   * Avoid overlap with early compaction replacement (CASSANDRA-8683)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce76e1e6/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce76e1e6/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index df326d7,3b93b31..eda7ca7
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@@ -45,6 -47,41 +47,41 @@@ public class IndexSummaryBuilde
      private long keysWritten = 0;
      private long indexIntervalMatches = 0;
      private long offheapSize = 0;
+     private long nextSamplePosition;
+ 
+     // for each ReadableBoundary, we map its dataLength property to itself, 
permitting us to lookup the
+     // last readable boundary from the perspective of the data file
+     // [data file position limit] => [ReadableBoundary]
+     private TreeMap<Long, ReadableBoundary> lastReadableByData = new 
TreeMap<>();
+     // for each ReadableBoundary, we map its indexLength property to itself, 
permitting us to lookup the
+     // last readable boundary from the perspective of the index file
+     // [index file position limit] => [ReadableBoundary]
+     private TreeMap<Long, ReadableBoundary> lastReadableByIndex = new 
TreeMap<>();
+     // the last synced data file position
+     private long dataSyncPosition;
+     // the last synced index file position
+     private long indexSyncPosition;
+ 
+     // the last summary interval boundary that is fully readable in both data 
and index files
+     private ReadableBoundary lastReadableBoundary;
+ 
+     /**
+      * Represents a boundary that is guaranteed fully readable in the 
summary, index file and data file.
+      * The key contained is the last key readable if the index and data files 
have been flushed to the
+      * stored lengths.
+      */
+     public static class ReadableBoundary
+     {
 -        final DecoratedKey lastKey;
 -        final long indexLength;
 -        final long dataLength;
++        public final DecoratedKey lastKey;
++        public final long indexLength;
++        public final long dataLength;
+         public ReadableBoundary(DecoratedKey lastKey, long indexLength, long 
dataLength)
+         {
+             this.lastKey = lastKey;
+             this.indexLength = indexLength;
+             this.dataLength = dataLength;
+         }
+     }
  
      public IndexSummaryBuilder(long expectedKeys, int minIndexInterval, int 
samplingLevel)
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce76e1e6/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index bba7550,0000000..89ecd99
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@@ -1,588 -1,0 +1,583 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format.big;
 +
 +import java.io.DataInput;
 +import java.io.File;
 +import java.io.FileOutputStream;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.Collections;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
 +import org.apache.cassandra.io.sstable.format.Version;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.io.FSWriteError;
 +import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 +import org.apache.cassandra.io.sstable.metadata.MetadataType;
 +import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
 +import org.apache.cassandra.io.util.FileMark;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.io.util.SegmentedFile;
 +import org.apache.cassandra.io.util.SequentialWriter;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.FilterFactory;
 +import org.apache.cassandra.utils.IFilter;
 +import org.apache.cassandra.utils.Pair;
 +import org.apache.cassandra.utils.StreamingHistogram;
 +
 +public class BigTableWriter extends SSTableWriter
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(BigTableWriter.class);
 +
 +    // not very random, but the only value that can't be mistaken for a legal 
column-name length
 +    public static final int END_OF_ROW = 0x0000;
 +
 +    private IndexWriter iwriter;
 +    private SegmentedFile.Builder dbuilder;
 +    private final SequentialWriter dataFile;
 +    private DecoratedKey lastWrittenKey;
 +    private FileMark dataMark;
 +
 +    BigTableWriter(Descriptor descriptor, Long keyCount, Long repairedAt, 
CFMetaData metadata, IPartitioner partitioner, MetadataCollector 
metadataCollector)
 +    {
 +        super(descriptor, keyCount, repairedAt, metadata, partitioner, 
metadataCollector);
 +
-         iwriter = new IndexWriter(keyCount);
- 
 +        if (compression)
 +        {
 +            dataFile = SequentialWriter.open(getFilename(),
 +                                             
descriptor.filenameFor(Component.COMPRESSION_INFO),
 +                                             metadata.compressionParameters(),
 +                                             metadataCollector);
 +            dbuilder = 
SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile);
 +        }
 +        else
 +        {
 +            dataFile = SequentialWriter.open(new File(getFilename()), new 
File(descriptor.filenameFor(Component.CRC)));
 +            dbuilder = 
SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 +        }
++        iwriter = new IndexWriter(keyCount, dataFile);
 +    }
 +
 +    public void mark()
 +    {
 +        dataMark = dataFile.mark();
 +        iwriter.mark();
 +    }
 +
 +    public void resetAndTruncate()
 +    {
 +        dataFile.resetAndTruncate(dataMark);
 +        iwriter.resetAndTruncate();
 +    }
 +
 +    /**
 +     * Perform sanity checks on @param decoratedKey and @return the position 
in the data file before any data is written
 +     */
 +    private long beforeAppend(DecoratedKey decoratedKey)
 +    {
 +        assert decoratedKey != null : "Keys must not be null"; // empty keys 
ARE allowed b/c of indexed column values
 +        if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) 
>= 0)
 +            throw new RuntimeException("Last written key " + lastWrittenKey + 
" >= current key " + decoratedKey + " writing into " + getFilename());
 +        return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
 +    }
 +
 +    private void afterAppend(DecoratedKey decoratedKey, long dataPosition, 
RowIndexEntry index)
 +    {
 +        metadataCollector.addKey(decoratedKey.getKey());
 +        lastWrittenKey = decoratedKey;
 +        last = lastWrittenKey;
 +        if (first == null)
 +            first = lastWrittenKey;
 +
 +        if (logger.isTraceEnabled())
 +            logger.trace("wrote {} at {}", decoratedKey, dataPosition);
-         iwriter.append(decoratedKey, index);
++        iwriter.append(decoratedKey, index, dataPosition);
 +        dbuilder.addPotentialBoundary(dataPosition);
 +    }
 +
 +    /**
 +     * @param row
 +     * @return null if the row was compacted away entirely; otherwise, the PK 
index entry for this row
 +     */
 +    public RowIndexEntry append(AbstractCompactedRow row)
 +    {
-         long currentPosition = beforeAppend(row.key);
++        long startPosition = beforeAppend(row.key);
 +        RowIndexEntry entry;
 +        try
 +        {
-             entry = row.write(currentPosition, dataFile);
++            entry = row.write(startPosition, dataFile);
 +            if (entry == null)
 +                return null;
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
-         metadataCollector.update(dataFile.getFilePointer() - currentPosition, 
row.columnStats());
-         afterAppend(row.key, currentPosition, entry);
++        long endPosition = dataFile.getFilePointer();
++        metadataCollector.update(endPosition - startPosition, 
row.columnStats());
++        afterAppend(row.key, endPosition, entry);
 +        return entry;
 +    }
 +
 +    public void append(DecoratedKey decoratedKey, ColumnFamily cf)
 +    {
 +        if (decoratedKey.getKey().remaining() > 
FBUtilities.MAX_UNSIGNED_SHORT)
 +        {
 +            logger.error("Key size {} exceeds maximum of {}, skipping row",
 +                         decoratedKey.getKey().remaining(),
 +                         FBUtilities.MAX_UNSIGNED_SHORT);
 +            return;
 +        }
 +
 +        long startPosition = beforeAppend(decoratedKey);
 +        try
 +        {
 +            RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, 
dataFile.stream);
 +            afterAppend(decoratedKey, startPosition, entry);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
 +        metadataCollector.update(dataFile.getFilePointer() - startPosition, 
cf.getColumnStats());
 +    }
 +
 +    private static RowIndexEntry rawAppend(ColumnFamily cf, long 
startPosition, DecoratedKey key, DataOutputPlus out) throws IOException
 +    {
 +        assert cf.hasColumns() || cf.isMarkedForDelete();
 +
 +        ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, 
key.getKey(), out);
 +        ColumnIndex index = builder.build(cf);
 +
 +        out.writeShort(END_OF_ROW);
 +        return RowIndexEntry.create(startPosition, 
cf.deletionInfo().getTopLevelDeletion(), index);
 +    }
 +
 +    /**
 +     * @throws IOException if a read from the DataInput fails
 +     * @throws FSWriteError if a write to the dataFile fails
 +     */
 +    public long appendFromStream(DecoratedKey key, CFMetaData metadata, 
DataInput in, Version version) throws IOException
 +    {
 +        long currentPosition = beforeAppend(key);
 +
 +        ColumnStats.MaxLongTracker maxTimestampTracker = new 
ColumnStats.MaxLongTracker(Long.MAX_VALUE);
 +        ColumnStats.MinLongTracker minTimestampTracker = new 
ColumnStats.MinLongTracker(Long.MIN_VALUE);
 +        ColumnStats.MaxIntTracker maxDeletionTimeTracker = new 
ColumnStats.MaxIntTracker(Integer.MAX_VALUE);
 +        List<ByteBuffer> minColumnNames = Collections.emptyList();
 +        List<ByteBuffer> maxColumnNames = Collections.emptyList();
 +        StreamingHistogram tombstones = new 
StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
 +        boolean hasLegacyCounterShards = false;
 +
 +        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
 +        cf.delete(DeletionTime.serializer.deserialize(in));
 +
 +        ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, 
key.getKey(), dataFile.stream);
 +
 +        if (cf.deletionInfo().getTopLevelDeletion().localDeletionTime < 
Integer.MAX_VALUE)
 +        {
 +            
tombstones.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
 +            
maxDeletionTimeTracker.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
 +            
minTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
 +            
maxTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
 +        }
 +
 +        Iterator<RangeTombstone> rangeTombstoneIterator = 
cf.deletionInfo().rangeIterator();
 +        while (rangeTombstoneIterator.hasNext())
 +        {
 +            RangeTombstone rangeTombstone = rangeTombstoneIterator.next();
 +            tombstones.update(rangeTombstone.getLocalDeletionTime());
 +            minTimestampTracker.update(rangeTombstone.timestamp());
 +            maxTimestampTracker.update(rangeTombstone.timestamp());
 +            
maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime());
 +            minColumnNames = ColumnNameHelper.minComponents(minColumnNames, 
rangeTombstone.min, metadata.comparator);
 +            maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, 
rangeTombstone.max, metadata.comparator);
 +        }
 +
 +        Iterator<OnDiskAtom> iter = AbstractCell.onDiskIterator(in, 
ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version, 
metadata.comparator);
 +        try
 +        {
 +            while (iter.hasNext())
 +            {
 +                OnDiskAtom atom = iter.next();
 +                if (atom == null)
 +                    break;
 +
 +                if (atom instanceof CounterCell)
 +                {
 +                    atom = ((CounterCell) atom).markLocalToBeCleared();
 +                    hasLegacyCounterShards = hasLegacyCounterShards || 
((CounterCell) atom).hasLegacyShards();
 +                }
 +
 +                int deletionTime = atom.getLocalDeletionTime();
 +                if (deletionTime < Integer.MAX_VALUE)
 +                    tombstones.update(deletionTime);
 +                minTimestampTracker.update(atom.timestamp());
 +                maxTimestampTracker.update(atom.timestamp());
 +                minColumnNames = 
ColumnNameHelper.minComponents(minColumnNames, atom.name(), 
metadata.comparator);
 +                maxColumnNames = 
ColumnNameHelper.maxComponents(maxColumnNames, atom.name(), 
metadata.comparator);
 +                maxDeletionTimeTracker.update(atom.getLocalDeletionTime());
 +
 +                columnIndexer.add(atom); // This write the atom on disk too
 +            }
 +
 +            columnIndexer.maybeWriteEmptyRowHeader();
 +            dataFile.stream.writeShort(END_OF_ROW);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
 +
 +        metadataCollector.updateMinTimestamp(minTimestampTracker.get())
 +                         .updateMaxTimestamp(maxTimestampTracker.get())
 +                         
.updateMaxLocalDeletionTime(maxDeletionTimeTracker.get())
 +                         .addRowSize(dataFile.getFilePointer() - 
currentPosition)
 +                         .addColumnCount(columnIndexer.writtenAtomCount())
 +                         .mergeTombstoneHistogram(tombstones)
 +                         .updateMinColumnNames(minColumnNames)
 +                         .updateMaxColumnNames(maxColumnNames)
 +                         
.updateHasLegacyCounterShards(hasLegacyCounterShards);
 +
 +        afterAppend(key, currentPosition, 
RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), 
columnIndexer.build()));
 +        return currentPosition;
 +    }
 +
 +    /**
 +     * After failure, attempt to close the index writer and data file before 
deleting all temp components for the sstable
 +     */
 +    public void abort()
 +    {
 +        assert descriptor.type.isTemporary;
 +        if (iwriter == null && dataFile == null)
 +            return;
 +
 +        if (iwriter != null)
 +            iwriter.abort();
 +
 +        if (dataFile!= null)
 +            dataFile.abort();
 +
 +        Set<Component> components = SSTable.componentsFor(descriptor);
 +        try
 +        {
 +            if (!components.isEmpty())
 +                SSTable.delete(descriptor, components);
 +        }
 +        catch (FSWriteError e)
 +        {
 +            logger.error(String.format("Failed deleting temp components for 
%s", descriptor), e);
 +            throw e;
 +        }
 +    }
 +
 +    // we use this method to ensure any managed data we may have retained 
references to during the write are no
 +    // longer referenced, so that we do not need to enclose the expensive 
call to closeAndOpenReader() in a transaction
 +    public void isolateReferences()
 +    {
 +        // currently we only maintain references to first/last/lastWrittenKey 
from the data provided; all other
 +        // data retention is done through copying
 +        first = getMinimalKey(first);
 +        last = lastWrittenKey = getMinimalKey(last);
 +    }
 +
 +    private Descriptor makeTmpLinks()
 +    {
 +        // create temp links if they don't already exist
 +        Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
 +        if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
 +        {
 +            FileUtils.createHardLink(new 
File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new 
File(link.filenameFor(Component.PRIMARY_INDEX)));
 +            FileUtils.createHardLink(new 
File(descriptor.filenameFor(Component.DATA)), new 
File(link.filenameFor(Component.DATA)));
 +        }
 +        return link;
 +    }
 +
 +    public SSTableReader openEarly(long maxDataAge)
 +    {
 +        StatsMetadata sstableMetadata = (StatsMetadata) 
metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
 +                                                  
metadata.getBloomFilterFpChance(),
 +                                                  
repairedAt).get(MetadataType.STATS);
 +
 +        // find the max (exclusive) readable key
-         DecoratedKey exclusiveUpperBoundOfReadableIndex = 
iwriter.getMaxReadableKey(0);
-         if (exclusiveUpperBoundOfReadableIndex == null)
++        IndexSummaryBuilder.ReadableBoundary boundary = 
iwriter.getMaxReadable();
++        if (boundary == null)
 +            return null;
 +
++        assert boundary.indexLength > 0 && boundary.dataLength > 0;
 +        Descriptor link = makeTmpLinks();
 +        // open the reader early, giving it a FINAL descriptor type so that 
it is indistinguishable for other consumers
 +        SegmentedFile ifile = 
iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), 
FinishType.EARLY);
 +        SegmentedFile dfile = 
dbuilder.complete(link.filenameFor(Component.DATA), FinishType.EARLY);
 +        SSTableReader sstable = 
SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
 +                                                           components, 
metadata,
 +                                                           partitioner, ifile,
-                                                            dfile, 
iwriter.summary.build(partitioner, exclusiveUpperBoundOfReadableIndex),
++                                                           dfile, 
iwriter.summary.build(partitioner, boundary.lastKey),
 +                                                           
iwriter.bf.sharedCopy(), maxDataAge, sstableMetadata, 
SSTableReader.OpenReason.EARLY);
 +
 +        // now it's open, find the ACTUAL last readable key (i.e. for which 
the data file has also been flushed)
 +        sstable.first = getMinimalKey(first);
-         sstable.last = getMinimalKey(exclusiveUpperBoundOfReadableIndex);
-         DecoratedKey inclusiveUpperBoundOfReadableData = 
iwriter.getMaxReadableKey(1);
-         if (inclusiveUpperBoundOfReadableData == null)
-         {
-             // Prevent leaving tmplink files on disk
-             sstable.selfRef().release();
-             return null;
-         }
-         int offset = 2;
-         while (true)
-         {
-             RowIndexEntry indexEntry = 
sstable.getPosition(inclusiveUpperBoundOfReadableData, 
SSTableReader.Operator.GT);
-             if (indexEntry != null && indexEntry.position <= 
dataFile.getLastFlushOffset())
-                 break;
-             inclusiveUpperBoundOfReadableData = 
iwriter.getMaxReadableKey(offset++);
-             if (inclusiveUpperBoundOfReadableData == null)
-             {
-                 sstable.selfRef().release();
-                 return null;
-             }
-         }
-         sstable.last = getMinimalKey(inclusiveUpperBoundOfReadableData);
++        sstable.last = getMinimalKey(boundary.lastKey);
 +        return sstable;
 +    }
 +
 +    public SSTableReader closeAndOpenReader()
 +    {
 +        return closeAndOpenReader(System.currentTimeMillis());
 +    }
 +
 +    public SSTableReader closeAndOpenReader(long maxDataAge)
 +    {
 +        return finish(FinishType.NORMAL, maxDataAge, this.repairedAt);
 +    }
 +
 +    public SSTableReader finish(FinishType finishType, long maxDataAge, long 
repairedAt)
 +    {
 +        assert finishType != FinishType.CLOSE;
 +        Pair<Descriptor, StatsMetadata> p;
 +
 +        p = close(finishType, repairedAt < 0 ? this.repairedAt : repairedAt);
 +        Descriptor desc = p.left;
 +        StatsMetadata metadata = p.right;
 +
 +        if (finishType == FinishType.EARLY)
 +            desc = makeTmpLinks();
 +
 +        // finalize in-memory state for the reader
 +        SegmentedFile ifile = 
iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX), finishType);
 +        SegmentedFile dfile = 
dbuilder.complete(desc.filenameFor(Component.DATA), finishType);
 +        SSTableReader sstable = 
SSTableReader.internalOpen(desc.asType(Descriptor.Type.FINAL),
 +                                                           components,
 +                                                           this.metadata,
 +                                                           partitioner,
 +                                                           ifile,
 +                                                           dfile,
 +                                                           
iwriter.summary.build(partitioner),
 +                                                           
iwriter.bf.sharedCopy(),
 +                                                           maxDataAge,
 +                                                           metadata,
 +                                                           
finishType.openReason);
 +        sstable.first = getMinimalKey(first);
 +        sstable.last = getMinimalKey(last);
 +
 +        if (finishType.isFinal)
 +        {
 +            iwriter.bf.close();
 +            // try to save the summaries to disk
 +            sstable.saveSummary(iwriter.builder, dbuilder);
 +            iwriter = null;
 +            dbuilder = null;
 +        }
 +        return sstable;
 +    }
 +
 +    // Close the writer and return the descriptor to the new sstable and it's 
metadata
 +    public Pair<Descriptor, StatsMetadata> close()
 +    {
 +        return close(FinishType.CLOSE, this.repairedAt);
 +    }
 +
 +    private Pair<Descriptor, StatsMetadata> close(FinishType type, long 
repairedAt)
 +    {
 +        switch (type)
 +        {
 +            case EARLY: case CLOSE: case NORMAL:
 +            iwriter.close();
 +            dataFile.close();
 +            if (type == FinishType.CLOSE)
 +                iwriter.bf.close();
 +        }
 +
 +        // write sstable statistics
 +        Map<MetadataType, MetadataComponent> metadataComponents;
 +        metadataComponents = metadataCollector
 +                             
.finalizeMetadata(partitioner.getClass().getCanonicalName(),
 +                                               
metadata.getBloomFilterFpChance(),repairedAt);
 +
 +        // remove the 'tmp' marker from all components
 +        Descriptor descriptor = this.descriptor;
 +        if (type.isFinal)
 +        {
 +            dataFile.writeFullChecksum(descriptor);
 +            writeMetadata(descriptor, metadataComponents);
 +            // save the table of components
 +            SSTable.appendTOC(descriptor, components);
 +            descriptor = rename(descriptor, components);
 +        }
 +
 +        return Pair.create(descriptor, (StatsMetadata) 
metadataComponents.get(MetadataType.STATS));
 +    }
 +
 +    private static void writeMetadata(Descriptor desc, Map<MetadataType, 
MetadataComponent> components)
 +    {
 +        SequentialWriter out = SequentialWriter.open(new 
File(desc.filenameFor(Component.STATS)));
 +        try
 +        {
 +            desc.getMetadataSerializer().serialize(components, out.stream);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, out.getPath());
 +        }
 +        finally
 +        {
 +            out.close();
 +        }
 +    }
 +
 +    public long getFilePointer()
 +    {
 +        return dataFile.getFilePointer();
 +    }
 +
 +    public long getOnDiskFilePointer()
 +    {
 +        return dataFile.getOnDiskFilePointer();
 +    }
 +
 +    /**
 +     * Encapsulates writing the index and filter for an SSTable. The state of 
this object is not valid until it has been closed.
 +     */
 +    class IndexWriter
 +    {
 +        private final SequentialWriter indexFile;
 +        public final SegmentedFile.Builder builder;
 +        public final IndexSummaryBuilder summary;
 +        public final IFilter bf;
 +        private FileMark mark;
 +
-         IndexWriter(long keyCount)
++        IndexWriter(long keyCount, final SequentialWriter dataFile)
 +        {
 +            indexFile = SequentialWriter.open(new 
File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 +            builder = 
SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
 +            summary = new IndexSummaryBuilder(keyCount, 
metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL);
 +            bf = FilterFactory.getFilter(keyCount, 
metadata.getBloomFilterFpChance(), true);
++            // register listeners to be alerted when the data files are 
flushed
++            indexFile.setPostFlushListener(new Runnable()
++            {
++                public void run()
++                {
++                    summary.markIndexSynced(indexFile.getLastFlushOffset());
++                }
++            });
++            dataFile.setPostFlushListener(new Runnable()
++            {
++                public void run()
++                {
++                    summary.markDataSynced(dataFile.getLastFlushOffset());
++                }
++            });
 +        }
 +
 +        // finds the last (-offset) decorated key that can be guaranteed to 
occur fully in the flushed portion of the index file
-         DecoratedKey getMaxReadableKey(int offset)
++        IndexSummaryBuilder.ReadableBoundary getMaxReadable()
 +        {
-             long maxIndexLength = indexFile.getLastFlushOffset();
-             return summary.getMaxReadableKey(maxIndexLength, offset);
++            return summary.getLastReadableBoundary();
 +        }
 +
-         public void append(DecoratedKey key, RowIndexEntry indexEntry)
++        public void append(DecoratedKey key, RowIndexEntry indexEntry, long 
dataEnd)
 +        {
 +            bf.add(key.getKey());
-             long indexPosition = indexFile.getFilePointer();
++            long indexStart = indexFile.getFilePointer();
 +            try
 +            {
 +                ByteBufferUtil.writeWithShortLength(key.getKey(), 
indexFile.stream);
 +                rowIndexEntrySerializer.serialize(indexEntry, 
indexFile.stream);
 +            }
 +            catch (IOException e)
 +            {
 +                throw new FSWriteError(e, indexFile.getPath());
 +            }
++            long indexEnd = indexFile.getFilePointer();
 +
 +            if (logger.isTraceEnabled())
-                 logger.trace("wrote index entry: {} at {}", indexEntry, 
indexPosition);
++                logger.trace("wrote index entry: {} at {}", indexEntry, 
indexStart);
 +
-             summary.maybeAddEntry(key, indexPosition);
-             builder.addPotentialBoundary(indexPosition);
++            summary.maybeAddEntry(key, indexStart, indexEnd, dataEnd);
++            builder.addPotentialBoundary(indexStart);
 +        }
 +
 +        public void abort()
 +        {
 +            indexFile.abort();
 +            bf.close();
 +        }
 +
 +        /**
 +         * Closes the index and bloomfilter, making the public state of this 
writer valid for consumption.
 +         */
 +        public void close()
 +        {
 +            if (components.contains(Component.FILTER))
 +            {
 +                String path = descriptor.filenameFor(Component.FILTER);
 +                try
 +                {
 +                    // bloom filter
 +                    FileOutputStream fos = new FileOutputStream(path);
 +                    DataOutputStreamAndChannel stream = new 
DataOutputStreamAndChannel(fos);
 +                    FilterFactory.serialize(bf, stream);
 +                    stream.flush();
 +                    fos.getFD().sync();
 +                    stream.close();
 +                }
 +                catch (IOException e)
 +                {
 +                    throw new FSWriteError(e, path);
 +                }
 +            }
 +
 +            // index
 +            long position = indexFile.getFilePointer();
 +            indexFile.close(); // calls force
 +            FileUtils.truncate(indexFile.getPath(), position);
 +        }
 +
 +        public void mark()
 +        {
 +            mark = indexFile.mark();
 +        }
 +
 +        public void resetAndTruncate()
 +        {
 +            // we can't un-set the bloom filter addition, but extra keys in 
there are harmless.
 +            // we can't reset dbuilder either, but that is the last thing 
called in afterappend so
 +            // we assume that if that worked then we won't be trying to reset.
 +            indexFile.resetAndTruncate(mark);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce76e1e6/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------

Reply via email to