http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java deleted file mode 100644 index 62ac175..0000000 --- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java +++ /dev/null @@ -1,294 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.io.sstable; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; - -import com.google.common.collect.AbstractIterator; -import com.google.common.util.concurrent.RateLimiter; - -import org.apache.cassandra.db.DataRange; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.RowIndexEntry; -import org.apache.cassandra.db.RowPosition; -import org.apache.cassandra.db.columniterator.IColumnIteratorFactory; -import org.apache.cassandra.db.columniterator.LazyColumnIterator; -import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; -import org.apache.cassandra.db.compaction.ICompactionScanner; -import org.apache.cassandra.dht.AbstractBounds; -import org.apache.cassandra.dht.Bounds; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.io.util.RandomAccessReader; -import org.apache.cassandra.utils.ByteBufferUtil; - -public class SSTableScanner implements ICompactionScanner -{ - protected final RandomAccessReader dfile; - protected final RandomAccessReader ifile; - public final SSTableReader sstable; - - private final Iterator<AbstractBounds<RowPosition>> rangeIterator; - private AbstractBounds<RowPosition> currentRange; - - private final DataRange dataRange; - - protected Iterator<OnDiskAtomIterator> iterator; - - /** - * @param sstable SSTable to scan; must not be null - * @param dataRange a single range to scan; must not be null - * @param limiter background i/o RateLimiter; may be null - */ - SSTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter) - { - assert sstable != null; - - this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter); - this.ifile = sstable.openIndexReader(); - this.sstable = sstable; - this.dataRange = dataRange; - - List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(2); - if (dataRange.isWrapAround() && !dataRange.stopKey().isMinimum(sstable.partitioner)) - { - // split the wrapping range into two parts: 1) the part that starts at the beginning of the sstable, and - // 2) the part that comes before the wrap-around - boundsList.add(new Bounds<>(sstable.partitioner.getMinimumToken().minKeyBound(), dataRange.stopKey(), sstable.partitioner)); - boundsList.add(new Bounds<>(dataRange.startKey(), sstable.partitioner.getMinimumToken().maxKeyBound(), sstable.partitioner)); - } - else - { - boundsList.add(new Bounds<>(dataRange.startKey(), dataRange.stopKey(), sstable.partitioner)); - } - this.rangeIterator = boundsList.iterator(); - } - - /** - * @param sstable SSTable to scan; must not be null - * @param tokenRanges A set of token ranges to scan - * @param limiter background i/o RateLimiter; may be null - */ - SSTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter) - { - assert sstable != null; - - this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter); - this.ifile = sstable.openIndexReader(); - this.sstable = sstable; - this.dataRange = null; - - List<Range<Token>> normalized = Range.normalize(tokenRanges); - List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(normalized.size()); - for (Range<Token> range : normalized) - boundsList.add(new Range<RowPosition>(range.left.maxKeyBound(sstable.partitioner), - range.right.maxKeyBound(sstable.partitioner), - sstable.partitioner)); - - this.rangeIterator = boundsList.iterator(); - } - - private void seekToCurrentRangeStart() - { - if (currentRange.left.isMinimum(sstable.partitioner)) - return; - - long indexPosition = sstable.getIndexScanPosition(currentRange.left); - // -1 means the key is before everything in the sstable. So just start from the beginning. - if (indexPosition == -1) - { - // Note: this method shouldn't assume we're at the start of the sstable already (see #6638) and - // the seeks are no-op anyway if we are. - ifile.seek(0); - dfile.seek(0); - return; - } - - ifile.seek(indexPosition); - try - { - - while (!ifile.isEOF()) - { - indexPosition = ifile.getFilePointer(); - DecoratedKey indexDecoratedKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile)); - int comparison = indexDecoratedKey.compareTo(currentRange.left); - // because our range start may be inclusive or exclusive, we need to also contains() - // instead of just checking (comparison >= 0) - if (comparison > 0 || currentRange.contains(indexDecoratedKey)) - { - // Found, just read the dataPosition and seek into index and data files - long dataPosition = ifile.readLong(); - ifile.seek(indexPosition); - dfile.seek(dataPosition); - break; - } - else - { - RowIndexEntry.Serializer.skip(ifile); - } - } - } - catch (IOException e) - { - sstable.markSuspect(); - throw new CorruptSSTableException(e, sstable.getFilename()); - } - } - - public void close() throws IOException - { - FileUtils.close(dfile, ifile); - } - - public long getLengthInBytes() - { - return dfile.length(); - } - - public long getCurrentPosition() - { - return dfile.getFilePointer(); - } - - public String getBackingFiles() - { - return sstable.toString(); - } - - public boolean hasNext() - { - if (iterator == null) - iterator = createIterator(); - return iterator.hasNext(); - } - - public OnDiskAtomIterator next() - { - if (iterator == null) - iterator = createIterator(); - return iterator.next(); - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - - private Iterator<OnDiskAtomIterator> createIterator() - { - return new KeyScanningIterator(); - } - - protected class KeyScanningIterator extends AbstractIterator<OnDiskAtomIterator> - { - private DecoratedKey nextKey; - private RowIndexEntry nextEntry; - private DecoratedKey currentKey; - private RowIndexEntry currentEntry; - - protected OnDiskAtomIterator computeNext() - { - try - { - if (nextEntry == null) - { - do - { - // we're starting the first range or we just passed the end of the previous range - if (!rangeIterator.hasNext()) - return endOfData(); - - currentRange = rangeIterator.next(); - seekToCurrentRangeStart(); - - if (ifile.isEOF()) - return endOfData(); - - currentKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile)); - currentEntry = sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(ifile, sstable.descriptor.version); - } while (!currentRange.contains(currentKey)); - } - else - { - // we're in the middle of a range - currentKey = nextKey; - currentEntry = nextEntry; - } - - long readEnd; - if (ifile.isEOF()) - { - nextEntry = null; - nextKey = null; - readEnd = dfile.length(); - } - else - { - // we need the position of the start of the next key, regardless of whether it falls in the current range - nextKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile)); - nextEntry = sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(ifile, sstable.descriptor.version); - readEnd = nextEntry.position; - - if (!currentRange.contains(nextKey)) - { - nextKey = null; - nextEntry = null; - } - } - - if (dataRange == null || dataRange.selectsFullRowFor(currentKey.getKey())) - { - dfile.seek(currentEntry.position); - ByteBufferUtil.readWithShortLength(dfile); // key - long dataSize = readEnd - dfile.getFilePointer(); - return new SSTableIdentityIterator(sstable, dfile, currentKey, dataSize); - } - - return new LazyColumnIterator(currentKey, new IColumnIteratorFactory() - { - public OnDiskAtomIterator create() - { - return dataRange.columnFilter(currentKey.getKey()).getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry); - } - }); - - } - catch (IOException e) - { - sstable.markSuspect(); - throw new CorruptSSTableException(e, sstable.getFilename()); - } - } - } - - @Override - public String toString() - { - return getClass().getSimpleName() + "(" + - "dfile=" + dfile + - " ifile=" + ifile + - " sstable=" + sstable + - ")"; - } -}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java index 3cfdc7b..bc9f2ca 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java @@ -27,6 +27,7 @@ import java.util.concurrent.SynchronousQueue; import com.google.common.base.Throwables; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ArrayBackedSortedColumns; import org.apache.cassandra.db.Cell; import org.apache.cassandra.db.ColumnFamily; @@ -35,6 +36,8 @@ import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.compress.CompressionParameters; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.JVMStabilityInspector; @@ -96,8 +99,8 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter public SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, IPartitioner partitioner, long bufferSizeInMB) { super(directory, metadata, partitioner); - this.bufferSize = bufferSizeInMB * 1024L * 1024L; - this.diskWriter.start(); + bufferSize = bufferSizeInMB * 1024L * 1024L; + diskWriter.start(); } protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java index 87c8e33..3417d68 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java @@ -24,6 +24,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.FSError; +import org.apache.cassandra.io.sstable.format.SSTableWriter; /** * A SSTable writer that assumes rows are in (partitioner) sorted order. http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java deleted file mode 100644 index 4da967e..0000000 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ /dev/null @@ -1,641 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.io.sstable; - -import java.io.Closeable; -import java.io.DataInput; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import com.google.common.collect.Sets; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.ArrayBackedSortedColumns; -import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.ColumnIndex; -import org.apache.cassandra.db.ColumnSerializer; -import org.apache.cassandra.db.CounterCell; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.DeletionTime; -import org.apache.cassandra.db.OnDiskAtom; -import org.apache.cassandra.db.RangeTombstone; -import org.apache.cassandra.db.RowIndexEntry; -import org.apache.cassandra.db.compaction.AbstractCompactedRow; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.io.FSWriteError; -import org.apache.cassandra.io.compress.CompressedSequentialWriter; -import org.apache.cassandra.io.sstable.metadata.MetadataCollector; -import org.apache.cassandra.io.sstable.metadata.MetadataComponent; -import org.apache.cassandra.io.sstable.metadata.MetadataType; -import org.apache.cassandra.io.sstable.metadata.StatsMetadata; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.io.util.DataOutputStreamAndChannel; -import org.apache.cassandra.io.util.FileMark; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.io.util.SegmentedFile; -import org.apache.cassandra.io.util.SequentialWriter; -import org.apache.cassandra.service.ActiveRepairService; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FilterFactory; -import org.apache.cassandra.utils.IFilter; -import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.StreamingHistogram; - -public class SSTableWriter extends SSTable -{ - private static final Logger logger = LoggerFactory.getLogger(SSTableWriter.class); - - // not very random, but the only value that can't be mistaken for a legal column-name length - public static final int END_OF_ROW = 0x0000; - - private IndexWriter iwriter; - private SegmentedFile.Builder dbuilder; - private final SequentialWriter dataFile; - private DecoratedKey lastWrittenKey; - private FileMark dataMark; - private final MetadataCollector sstableMetadataCollector; - private final long repairedAt; - - public SSTableWriter(String filename, long keyCount, long repairedAt, int sstableLevel) - { - this(filename, - keyCount, - repairedAt, - Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)), - StorageService.getPartitioner(), - new MetadataCollector(Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)).comparator).sstableLevel(sstableLevel)); - } - - public SSTableWriter(String filename, long keyCount) - { - this(filename, keyCount, ActiveRepairService.UNREPAIRED_SSTABLE, 0); - } - - private static Set<Component> components(CFMetaData metadata) - { - Set<Component> components = new HashSet<Component>(Arrays.asList(Component.DATA, - Component.PRIMARY_INDEX, - Component.STATS, - Component.SUMMARY, - Component.TOC, - Component.DIGEST)); - - if (metadata.getBloomFilterFpChance() < 1.0) - components.add(Component.FILTER); - - if (metadata.compressionParameters().sstableCompressor != null) - { - components.add(Component.COMPRESSION_INFO); - } - else - { - // it would feel safer to actually add this component later in maybeWriteDigest(), - // but the components are unmodifiable after construction - components.add(Component.CRC); - } - return components; - } - - public SSTableWriter(String filename, - long keyCount, - long repairedAt, - CFMetaData metadata, - IPartitioner<?> partitioner, - MetadataCollector sstableMetadataCollector) - { - super(Descriptor.fromFilename(filename), - components(metadata), - metadata, - partitioner); - this.repairedAt = repairedAt; - iwriter = new IndexWriter(keyCount); - - if (compression) - { - dataFile = SequentialWriter.open(getFilename(), - descriptor.filenameFor(Component.COMPRESSION_INFO), - metadata.compressionParameters(), - sstableMetadataCollector); - dbuilder = SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile); - } - else - { - dataFile = SequentialWriter.open(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC))); - dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); - } - - this.sstableMetadataCollector = sstableMetadataCollector; - } - - public void mark() - { - dataMark = dataFile.mark(); - iwriter.mark(); - } - - public void resetAndTruncate() - { - dataFile.resetAndTruncate(dataMark); - iwriter.resetAndTruncate(); - } - - /** - * Perform sanity checks on @param decoratedKey and @return the position in the data file before any data is written - */ - private long beforeAppend(DecoratedKey decoratedKey) - { - assert decoratedKey != null : "Keys must not be null"; // empty keys ARE allowed b/c of indexed column values - if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) >= 0) - throw new RuntimeException("Last written key " + lastWrittenKey + " >= current key " + decoratedKey + " writing into " + getFilename()); - return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer(); - } - - private void afterAppend(DecoratedKey decoratedKey, long dataPosition, RowIndexEntry index) - { - sstableMetadataCollector.addKey(decoratedKey.getKey()); - lastWrittenKey = decoratedKey; - last = lastWrittenKey; - if (first == null) - first = lastWrittenKey; - - if (logger.isTraceEnabled()) - logger.trace("wrote {} at {}", decoratedKey, dataPosition); - iwriter.append(decoratedKey, index); - dbuilder.addPotentialBoundary(dataPosition); - } - - /** - * @param row - * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row - */ - public RowIndexEntry append(AbstractCompactedRow row) - { - long currentPosition = beforeAppend(row.key); - RowIndexEntry entry; - try - { - entry = row.write(currentPosition, dataFile.stream); - if (entry == null) - return null; - } - catch (IOException e) - { - throw new FSWriteError(e, dataFile.getPath()); - } - sstableMetadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats()); - afterAppend(row.key, currentPosition, entry); - return entry; - } - - public void append(DecoratedKey decoratedKey, ColumnFamily cf) - { - long startPosition = beforeAppend(decoratedKey); - try - { - RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, dataFile.stream); - afterAppend(decoratedKey, startPosition, entry); - } - catch (IOException e) - { - throw new FSWriteError(e, dataFile.getPath()); - } - sstableMetadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats()); - } - - public static RowIndexEntry rawAppend(ColumnFamily cf, long startPosition, DecoratedKey key, DataOutputPlus out) throws IOException - { - assert cf.hasColumns() || cf.isMarkedForDelete(); - - ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, key.getKey(), out); - ColumnIndex index = builder.build(cf); - - out.writeShort(END_OF_ROW); - return RowIndexEntry.create(startPosition, cf.deletionInfo().getTopLevelDeletion(), index); - } - - /** - * @throws IOException if a read from the DataInput fails - * @throws FSWriteError if a write to the dataFile fails - */ - public long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Descriptor.Version version) throws IOException - { - long currentPosition = beforeAppend(key); - - ColumnStats.MaxLongTracker maxTimestampTracker = new ColumnStats.MaxLongTracker(Long.MAX_VALUE); - ColumnStats.MinLongTracker minTimestampTracker = new ColumnStats.MinLongTracker(Long.MIN_VALUE); - ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE); - List<ByteBuffer> minColumnNames = Collections.emptyList(); - List<ByteBuffer> maxColumnNames = Collections.emptyList(); - StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE); - boolean hasLegacyCounterShards = false; - - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata); - cf.delete(DeletionTime.serializer.deserialize(in)); - - ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.getKey(), dataFile.stream); - - if (cf.deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE) - { - tombstones.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime); - maxDeletionTimeTracker.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime); - minTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt); - maxTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt); - } - - Iterator<RangeTombstone> rangeTombstoneIterator = cf.deletionInfo().rangeIterator(); - while (rangeTombstoneIterator.hasNext()) - { - RangeTombstone rangeTombstone = rangeTombstoneIterator.next(); - tombstones.update(rangeTombstone.getLocalDeletionTime()); - minTimestampTracker.update(rangeTombstone.timestamp()); - maxTimestampTracker.update(rangeTombstone.timestamp()); - maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime()); - minColumnNames = ColumnNameHelper.minComponents(minColumnNames, rangeTombstone.min, metadata.comparator); - maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, rangeTombstone.max, metadata.comparator); - } - - Iterator<OnDiskAtom> iter = metadata.getOnDiskIterator(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version); - try - { - while (iter.hasNext()) - { - OnDiskAtom atom = iter.next(); - if (atom == null) - break; - - if (atom instanceof CounterCell) - { - atom = ((CounterCell) atom).markLocalToBeCleared(); - hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) atom).hasLegacyShards(); - } - - int deletionTime = atom.getLocalDeletionTime(); - if (deletionTime < Integer.MAX_VALUE) - tombstones.update(deletionTime); - minTimestampTracker.update(atom.timestamp()); - maxTimestampTracker.update(atom.timestamp()); - minColumnNames = ColumnNameHelper.minComponents(minColumnNames, atom.name(), metadata.comparator); - maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, atom.name(), metadata.comparator); - maxDeletionTimeTracker.update(atom.getLocalDeletionTime()); - - columnIndexer.add(atom); // This write the atom on disk too - } - - columnIndexer.maybeWriteEmptyRowHeader(); - dataFile.stream.writeShort(END_OF_ROW); - } - catch (IOException e) - { - throw new FSWriteError(e, dataFile.getPath()); - } - - sstableMetadataCollector.updateMinTimestamp(minTimestampTracker.get()) - .updateMaxTimestamp(maxTimestampTracker.get()) - .updateMaxLocalDeletionTime(maxDeletionTimeTracker.get()) - .addRowSize(dataFile.getFilePointer() - currentPosition) - .addColumnCount(columnIndexer.writtenAtomCount()) - .mergeTombstoneHistogram(tombstones) - .updateMinColumnNames(minColumnNames) - .updateMaxColumnNames(maxColumnNames) - .updateHasLegacyCounterShards(hasLegacyCounterShards); - afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), columnIndexer.build())); - return currentPosition; - } - - /** - * After failure, attempt to close the index writer and data file before deleting all temp components for the sstable - */ - public void abort() - { - abort(true); - } - public void abort(boolean closeBf) - { - assert descriptor.type.isTemporary; - if (iwriter == null && dataFile == null) - return; - if (iwriter != null) - { - FileUtils.closeQuietly(iwriter.indexFile); - if (closeBf) - { - iwriter.bf.close(); - } - } - if (dataFile!= null) - FileUtils.closeQuietly(dataFile); - - Set<Component> components = SSTable.componentsFor(descriptor); - try - { - if (!components.isEmpty()) - SSTable.delete(descriptor, components); - } - catch (FSWriteError e) - { - logger.error(String.format("Failed deleting temp components for %s", descriptor), e); - throw e; - } - } - - // we use this method to ensure any managed data we may have retained references to during the write are no - // longer referenced, so that we do not need to enclose the expensive call to closeAndOpenReader() in a transaction - public void isolateReferences() - { - // currently we only maintain references to first/last/lastWrittenKey from the data provided; all other - // data retention is done through copying - first = getMinimalKey(first); - last = lastWrittenKey = getMinimalKey(last); - } - - public SSTableReader openEarly(long maxDataAge) - { - StatsMetadata sstableMetadata = (StatsMetadata) sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(), - metadata.getBloomFilterFpChance(), - repairedAt).get(MetadataType.STATS); - - // find the max (exclusive) readable key - DecoratedKey exclusiveUpperBoundOfReadableIndex = iwriter.getMaxReadableKey(0); - if (exclusiveUpperBoundOfReadableIndex == null) - return null; - - // create temp links if they don't already exist - Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK); - if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists()) - { - FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX))); - FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA))); - } - - // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers - SegmentedFile ifile = iwriter.builder.openEarly(link.filenameFor(Component.PRIMARY_INDEX)); - SegmentedFile dfile = dbuilder.openEarly(link.filenameFor(Component.DATA)); - SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL), - components, metadata, - partitioner, ifile, - dfile, iwriter.summary.build(partitioner, exclusiveUpperBoundOfReadableIndex), - iwriter.bf, maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY); - - // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed) - sstable.first = getMinimalKey(first); - sstable.last = getMinimalKey(exclusiveUpperBoundOfReadableIndex); - DecoratedKey inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(1); - if (inclusiveUpperBoundOfReadableData == null) - return null; - int offset = 2; - while (true) - { - RowIndexEntry indexEntry = sstable.getPosition(inclusiveUpperBoundOfReadableData, SSTableReader.Operator.GT); - if (indexEntry != null && indexEntry.position <= dataFile.getLastFlushOffset()) - break; - inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(offset++); - if (inclusiveUpperBoundOfReadableData == null) - return null; - } - sstable.last = getMinimalKey(inclusiveUpperBoundOfReadableData); - return sstable; - } - - public SSTableReader closeAndOpenReader() - { - return closeAndOpenReader(System.currentTimeMillis()); - } - - public SSTableReader closeAndOpenReader(long maxDataAge) - { - return closeAndOpenReader(maxDataAge, this.repairedAt); - } - - public SSTableReader closeAndOpenReader(long maxDataAge, long repairedAt) - { - Pair<Descriptor, StatsMetadata> p = close(repairedAt); - Descriptor newdesc = p.left; - StatsMetadata sstableMetadata = p.right; - - // finalize in-memory state for the reader - SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(Component.PRIMARY_INDEX)); - SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(Component.DATA)); - SSTableReader sstable = SSTableReader.internalOpen(newdesc, - components, - metadata, - partitioner, - ifile, - dfile, - iwriter.summary.build(partitioner), - iwriter.bf, - maxDataAge, - sstableMetadata, - SSTableReader.OpenReason.NORMAL); - sstable.first = getMinimalKey(first); - sstable.last = getMinimalKey(last); - // try to save the summaries to disk - sstable.saveSummary(iwriter.builder, dbuilder); - iwriter = null; - dbuilder = null; - return sstable; - } - - // Close the writer and return the descriptor to the new sstable and it's metadata - public Pair<Descriptor, StatsMetadata> close() - { - return close(this.repairedAt); - } - - private Pair<Descriptor, StatsMetadata> close(long repairedAt) - { - - // index and filter - iwriter.close(); - // main data, close will truncate if necessary - dataFile.close(); - dataFile.writeFullChecksum(descriptor); - // write sstable statistics - Map<MetadataType, MetadataComponent> metadataComponents = sstableMetadataCollector.finalizeMetadata( - partitioner.getClass().getCanonicalName(), - metadata.getBloomFilterFpChance(), - repairedAt); - writeMetadata(descriptor, metadataComponents); - - // save the table of components - SSTable.appendTOC(descriptor, components); - - // remove the 'tmp' marker from all components - return Pair.create(rename(descriptor, components), (StatsMetadata) metadataComponents.get(MetadataType.STATS)); - - } - - - private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components) - { - SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS))); - try - { - desc.getMetadataSerializer().serialize(components, out.stream); - } - catch (IOException e) - { - throw new FSWriteError(e, out.getPath()); - } - finally - { - out.close(); - } - } - - static Descriptor rename(Descriptor tmpdesc, Set<Component> components) - { - Descriptor newdesc = tmpdesc.asType(Descriptor.Type.FINAL); - rename(tmpdesc, newdesc, components); - return newdesc; - } - - public static void rename(Descriptor tmpdesc, Descriptor newdesc, Set<Component> components) - { - for (Component component : Sets.difference(components, Sets.newHashSet(Component.DATA, Component.SUMMARY))) - { - FileUtils.renameWithConfirm(tmpdesc.filenameFor(component), newdesc.filenameFor(component)); - } - - // do -Data last because -Data present should mean the sstable was completely renamed before crash - FileUtils.renameWithConfirm(tmpdesc.filenameFor(Component.DATA), newdesc.filenameFor(Component.DATA)); - - // rename it without confirmation because summary can be available for loadNewSSTables but not for closeAndOpenReader - FileUtils.renameWithOutConfirm(tmpdesc.filenameFor(Component.SUMMARY), newdesc.filenameFor(Component.SUMMARY)); - } - - public long getFilePointer() - { - return dataFile.getFilePointer(); - } - - public long getOnDiskFilePointer() - { - return dataFile.getOnDiskFilePointer(); - } - - /** - * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed. - */ - class IndexWriter implements Closeable - { - private final SequentialWriter indexFile; - public final SegmentedFile.Builder builder; - public final IndexSummaryBuilder summary; - public final IFilter bf; - private FileMark mark; - - IndexWriter(long keyCount) - { - indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))); - builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); - summary = new IndexSummaryBuilder(keyCount, metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL); - bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true); - } - - // finds the last (-offset) decorated key that can be guaranteed to occur fully in the flushed portion of the index file - DecoratedKey getMaxReadableKey(int offset) - { - long maxIndexLength = indexFile.getLastFlushOffset(); - return summary.getMaxReadableKey(maxIndexLength, offset); - } - - public void append(DecoratedKey key, RowIndexEntry indexEntry) - { - bf.add(key.getKey()); - long indexPosition = indexFile.getFilePointer(); - try - { - ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile.stream); - metadata.comparator.rowIndexEntrySerializer().serialize(indexEntry, indexFile.stream); - } - catch (IOException e) - { - throw new FSWriteError(e, indexFile.getPath()); - } - - if (logger.isTraceEnabled()) - logger.trace("wrote index entry: {} at {}", indexEntry, indexPosition); - - summary.maybeAddEntry(key, indexPosition); - builder.addPotentialBoundary(indexPosition); - } - - /** - * Closes the index and bloomfilter, making the public state of this writer valid for consumption. - */ - public void close() - { - if (components.contains(Component.FILTER)) - { - String path = descriptor.filenameFor(Component.FILTER); - try - { - // bloom filter - FileOutputStream fos = new FileOutputStream(path); - DataOutputStreamAndChannel stream = new DataOutputStreamAndChannel(fos); - FilterFactory.serialize(bf, stream); - stream.flush(); - fos.getFD().sync(); - stream.close(); - } - catch (IOException e) - { - throw new FSWriteError(e, path); - } - } - - // index - long position = indexFile.getFilePointer(); - indexFile.close(); // calls force - FileUtils.truncate(indexFile.getPath(), position); - } - - public void mark() - { - mark = indexFile.mark(); - } - - public void resetAndTruncate() - { - // we can't un-set the bloom filter addition, but extra keys in there are harmless. - // we can't reset dbuilder either, but that is the last thing called in afterappend so - // we assume that if that worked then we won't be trying to reset. - indexFile.resetAndTruncate(mark); - } - - @Override - public String toString() - { - return "IndexWriter(" + descriptor + ")"; - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java new file mode 100644 index 0000000..ca003b6 --- /dev/null +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format; + +import com.google.common.base.CharMatcher; +import com.google.common.collect.ImmutableList; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.ColumnSerializer; +import org.apache.cassandra.db.OnDiskAtom; +import org.apache.cassandra.db.RowIndexEntry; +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.db.compaction.AbstractCompactedRow; +import org.apache.cassandra.db.compaction.CompactionController; +import org.apache.cassandra.io.sstable.format.big.BigFormat; +import org.apache.cassandra.io.util.FileDataInput; + +import java.util.Iterator; + +/** + * Provides the accessors to data on disk. + */ +public interface SSTableFormat +{ + static boolean enableSSTableDevelopmentTestMode = Boolean.valueOf(System.getProperty("cassandra.test.sstableformatdevelopment","false")); + + + Version getLatestVersion(); + Version getVersion(String version); + + SSTableWriter.Factory getWriterFactory(); + SSTableReader.Factory getReaderFactory(); + + Iterator<OnDiskAtom> getOnDiskIterator(FileDataInput in, ColumnSerializer.Flag flag, int expireBefore, CFMetaData cfm, Version version); + + AbstractCompactedRow getCompactedRowWriter(CompactionController controller, ImmutableList<OnDiskAtomIterator> onDiskAtomIterators); + + RowIndexEntry.IndexSerializer<?> getIndexSerializer(CFMetaData cfm); + + public static enum Type + { + //Used internally to refer to files with no + //format flag in the filename + LEGACY("big", BigFormat.instance), + + //The original sstable format + BIG("big", BigFormat.instance); + + public final SSTableFormat info; + public final String name; + private Type(String name, SSTableFormat info) + { + //Since format comes right after generation + //we disallow formats with numeric names + assert !CharMatcher.DIGIT.matchesAllOf(name); + + this.name = name; + this.info = info; + } + + public static Type validate(String name) + { + for (Type valid : Type.values()) + { + //This is used internally for old sstables + if (valid == LEGACY) + continue; + + if (valid.name.equalsIgnoreCase(name)) + return valid; + } + + throw new IllegalArgumentException("No Type constant " + name); + } + } +}
