Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/882df8a2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/882df8a2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/882df8a2 Branch: refs/heads/cassandra-3.1 Commit: 882df8a21711559d18bf6b38cd6026d78b7e4956 Parents: 29ed6fe 0b26ca6 Author: Marcus Eriksson <marc...@apache.org> Authored: Wed Dec 2 15:06:43 2015 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Wed Dec 2 15:06:43 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnIndex.java | 16 ++-- .../org/apache/cassandra/db/RangeTombstone.java | 5 ++ .../cassandra/db/compaction/Scrubber.java | 25 +++++- .../io/sstable/SSTableIdentityIterator.java | 76 +++++++++++++++---- .../io/sstable/format/big/BigTableWriter.java | 1 + .../Keyspace1-Standard3-jb-1-Summary.db | Bin 71 -> 63 bytes .../Keyspace1-StandardInteger1-ka-2-CRC.db | Bin 0 -> 8 bytes .../Keyspace1-StandardInteger1-ka-2-Data.db | Bin 0 -> 12357 bytes .../Keyspace1-StandardInteger1-ka-2-Digest.sha1 | 1 + .../Keyspace1-StandardInteger1-ka-2-Filter.db | Bin 0 -> 176 bytes .../Keyspace1-StandardInteger1-ka-2-Index.db | Bin 0 -> 108 bytes ...eyspace1-StandardInteger1-ka-2-Statistics.db | Bin 0 -> 4470 bytes .../Keyspace1-StandardInteger1-ka-2-Summary.db | Bin 0 -> 112 bytes .../Keyspace1-StandardInteger1-ka-2-TOC.txt | 8 ++ .../apache/cassandra/db/RowIndexEntryTest.java | 1 + .../unit/org/apache/cassandra/db/ScrubTest.java | 60 +++++++++++++++ .../streaming/StreamingTransferTest.java | 46 ++++++++++- 18 files changed, 216 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index eaad3a2,e00abfe..bca5fb0 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,23 -1,5 +1,24 @@@ -2.1.12 +2.2.4 + * Show CQL help in cqlsh in web browser (CASSANDRA-7225) + * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775) + * Reject index queries while the index is building (CASSANDRA-8505) + * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747) + * Fix JSON update with prepared statements (CASSANDRA-10631) + * Don't do anticompaction after subrange repair (CASSANDRA-10422) + * Fix SimpleDateType type compatibility (CASSANDRA-10027) + * (Hadoop) fix splits calculation (CASSANDRA-10640) + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058) + * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645) + * Use most up-to-date version of schema for system tables (CASSANDRA-10652) + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628) + * Expose phi values from failure detector via JMX and tweak debug + and trace logging (CASSANDRA-9526) + * Fix RangeNamesQueryPager (CASSANDRA-10509) + * Deprecate Pig support (CASSANDRA-10542) + * Reduce contention getting instances of CompositeType (CASSANDRA-10433) + * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large buffers (CASSANDRA-10592) +Merged from 2.1: + * Avoid writing range tombstones after END_OF_ROW marker (CASSANDRA-10791) * Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768) * Add proper error handling to stream receiver (CASSANDRA-10774) * Warn or fail when changing cluster topology live (CASSANDRA-10243) http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/src/java/org/apache/cassandra/db/RangeTombstone.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java index 670c682,e02f901..9fd8560 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@@ -162,6 -155,22 +162,22 @@@ public class Scrubber implements Closea if (scrubInfo.isStopRequested()) throw new CompactionInterruptedException(scrubInfo.getCompactionInfo()); + updateIndexKey(); + + if (prevKey != null && indexFile != null) + { + long nextRowStart = currentRowPositionFromIndex == -1 ? dataFile.length() : currentRowPositionFromIndex; + if (dataFile.getFilePointer() < nextRowStart) + { + // Encountered CASSANDRA-10791. Place post-END_OF_ROW data in the out-of-order table. + saveOutOfOrderRow(prevKey, - SSTableIdentityIterator.createFragmentIterator(sstable, dataFile, prevKey, nextRowStart - dataFile.getFilePointer(), validateColumns), ++ SSTableIdentityIterator.createFragmentIterator(sstable, dataFile, prevKey, nextRowStart - dataFile.getFilePointer(), checkData), + String.format("Row fragment detected after END_OF_ROW at key %s", prevKey)); + if (dataFile.isEOF()) + break; + } + } + long rowStart = dataFile.getFilePointer(); outputHandler.debug("Reading row at " + rowStart); http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java index 17f9a8d,45994d0..8c02ee7 --- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java @@@ -20,13 -20,13 +20,16 @@@ package org.apache.cassandra.io.sstable import java.io.*; import java.util.Iterator; + import com.google.common.collect.AbstractIterator; + import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; + import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.io.sstable.Descriptor.Version; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.serializers.MarshalException; @@@ -60,13 -62,43 +63,42 @@@ * @param sstable SSTable we are reading ffrom. * @param file Reading using this file. * @param key Key of this row. - * @param dataSize length of row data * @param checkData if true, do its best to deserialize and check the coherence of row data */ - public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, long dataSize, boolean checkData) + public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, boolean checkData) { - this(sstable.metadata, file, file.getPath(), key, dataSize, checkData, sstable, ColumnSerializer.Flag.LOCAL); + this(sstable.metadata, file, file.getPath(), key, checkData, sstable, ColumnSerializer.Flag.LOCAL); } + /** + * Used only by scrubber to solve problems with data written after the END_OF_ROW marker. Iterates atoms for the given dataSize only and does not accept an END_OF_ROW marker. + */ + public static SSTableIdentityIterator createFragmentIterator(SSTableReader sstable, final RandomAccessReader file, DecoratedKey key, long dataSize, boolean checkData) + { + final ColumnSerializer.Flag flag = ColumnSerializer.Flag.LOCAL; + final CellNameType type = sstable.metadata.comparator; + final int expireBefore = (int) (System.currentTimeMillis() / 1000); + final Version version = sstable.descriptor.version; + final long dataEnd = file.getFilePointer() + dataSize; - return new SSTableIdentityIterator(sstable.metadata, file, file.getPath(), key, dataSize, checkData, sstable, flag, DeletionTime.LIVE, ++ return new SSTableIdentityIterator(sstable.metadata, file, file.getPath(), key, checkData, sstable, flag, DeletionTime.LIVE, + new AbstractIterator<OnDiskAtom>() + { + protected OnDiskAtom computeNext() + { + if (file.getFilePointer() >= dataEnd) + return endOfData(); + try + { + return type.onDiskAtomSerializer().deserializeFromSSTable(file, flag, expireBefore, version); + } + catch (IOException e) + { + throw new IOError(e); + } + } + }); + } + // sstable may be null *if* checkData is false // If it is null, we assume the data is in the current file format private SSTableIdentityIterator(CFMetaData metadata, @@@ -77,22 -110,15 +109,16 @@@ SSTableReader sstable, ColumnSerializer.Flag flag) { - assert !checkData || (sstable != null); - this.in = in; - this.filename = filename; - this.key = key; - this.flag = flag; - this.validateColumns = checkData; - this.sstable = sstable; - - Version dataVersion = sstable == null ? DatabaseDescriptor.getSSTableFormat().info.getLatestVersion() : sstable.descriptor.version; - int expireBefore = (int) (System.currentTimeMillis() / 1000); - columnFamily = ArrayBackedSortedColumns.factory.create(metadata); - this(metadata, in, filename, key, dataSize, checkData, sstable, flag, readDeletionTime(in, sstable, filename), - metadata.getOnDiskIterator(in, flag, (int) (System.currentTimeMillis() / 1000), sstable == null ? Descriptor.Version.CURRENT : sstable.descriptor.version)); ++ this(metadata, in, filename, key, checkData, sstable, flag, readDeletionTime(in, sstable, filename), ++ metadata.getOnDiskIterator(in, flag, (int) (System.currentTimeMillis() / 1000), ++ sstable == null ? DatabaseDescriptor.getSSTableFormat().info.getLatestVersion() : sstable.descriptor.version)); + } + private static DeletionTime readDeletionTime(DataInput in, SSTableReader sstable, String filename) + { try { - columnFamily.delete(DeletionTime.serializer.deserialize(in)); - atomIterator = columnFamily.metadata().getOnDiskIterator(in, flag, expireBefore, dataVersion); + return DeletionTime.serializer.deserialize(in); } catch (IOException e) { @@@ -102,6 -128,32 +128,30 @@@ } } + // sstable may be null *if* checkData is false + // If it is null, we assume the data is in the current file format + private SSTableIdentityIterator(CFMetaData metadata, + DataInput in, + String filename, + DecoratedKey key, - long dataSize, + boolean checkData, + SSTableReader sstable, + ColumnSerializer.Flag flag, + DeletionTime deletion, + Iterator<OnDiskAtom> atomIterator) + { + assert !checkData || (sstable != null); + this.in = in; + this.filename = filename; + this.key = key; - this.dataSize = dataSize; + this.flag = flag; + this.validateColumns = checkData; + this.sstable = sstable; + columnFamily = ArrayBackedSortedColumns.factory.create(metadata); + columnFamily.delete(deletion); + this.atomIterator = atomIterator; + } + public DecoratedKey getKey() { return key; http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index d064e69,0000000..505bac0 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@@ -1,587 -1,0 +1,588 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.big; + +import java.io.*; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.io.sstable.format.Version; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.compaction.AbstractCompactedRow; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.CompressedSequentialWriter; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.sstable.metadata.MetadataComponent; +import org.apache.cassandra.io.sstable.metadata.MetadataType; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.*; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.FilterFactory; +import org.apache.cassandra.utils.IFilter; +import org.apache.cassandra.utils.StreamingHistogram; +import org.apache.cassandra.utils.concurrent.Transactional; + +import static org.apache.cassandra.utils.Throwables.merge; +import org.apache.cassandra.utils.SyncUtil; + +public class BigTableWriter extends SSTableWriter +{ + private static final Logger logger = LoggerFactory.getLogger(BigTableWriter.class); + + // not very random, but the only value that can't be mistaken for a legal column-name length + public static final int END_OF_ROW = 0x0000; + + private final IndexWriter iwriter; + private SegmentedFile.Builder dbuilder; + private final SequentialWriter dataFile; + private DecoratedKey lastWrittenKey; + private FileMark dataMark; + + BigTableWriter(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector) + { + super(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector); + + if (compression) + { + dataFile = SequentialWriter.open(getFilename(), + descriptor.filenameFor(Component.COMPRESSION_INFO), + metadata.compressionParameters(), + metadataCollector); + dbuilder = SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile); + } + else + { + dataFile = SequentialWriter.open(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC))); + dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), false); + } + iwriter = new IndexWriter(keyCount, dataFile); + } + + public void mark() + { + dataMark = dataFile.mark(); + iwriter.mark(); + } + + public void resetAndTruncate() + { + dataFile.resetAndTruncate(dataMark); + iwriter.resetAndTruncate(); + } + + /** + * Perform sanity checks on @param decoratedKey and @return the position in the data file before any data is written + */ + private long beforeAppend(DecoratedKey decoratedKey) + { + assert decoratedKey != null : "Keys must not be null"; // empty keys ARE allowed b/c of indexed column values + if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) >= 0) + throw new RuntimeException("Last written key " + lastWrittenKey + " >= current key " + decoratedKey + " writing into " + getFilename()); + return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer(); + } + + private void afterAppend(DecoratedKey decoratedKey, long dataEnd, RowIndexEntry index) throws IOException + { + metadataCollector.addKey(decoratedKey.getKey()); + lastWrittenKey = decoratedKey; + last = lastWrittenKey; + if (first == null) + first = lastWrittenKey; + + if (logger.isTraceEnabled()) + logger.trace("wrote {} at {}", decoratedKey, dataEnd); + iwriter.append(decoratedKey, index, dataEnd); + dbuilder.addPotentialBoundary(dataEnd); + } + + /** + * @param row + * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row + */ + public RowIndexEntry append(AbstractCompactedRow row) + { + long startPosition = beforeAppend(row.key); + RowIndexEntry entry; + try + { + entry = row.write(startPosition, dataFile); + if (entry == null) + return null; + long endPosition = dataFile.getFilePointer(); + long rowSize = endPosition - startPosition; + maybeLogLargePartitionWarning(row.key, rowSize); + metadataCollector.update(rowSize, row.columnStats()); + afterAppend(row.key, endPosition, entry); + return entry; + } + catch (IOException e) + { + throw new FSWriteError(e, dataFile.getPath()); + } + } + + public void append(DecoratedKey decoratedKey, ColumnFamily cf) + { + if (decoratedKey.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT) + { + logger.error("Key size {} exceeds maximum of {}, skipping row", + decoratedKey.getKey().remaining(), + FBUtilities.MAX_UNSIGNED_SHORT); + return; + } + + long startPosition = beforeAppend(decoratedKey); + long endPosition; + try + { + RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, dataFile.stream); + endPosition = dataFile.getFilePointer(); + afterAppend(decoratedKey, endPosition, entry); + } + catch (IOException e) + { + throw new FSWriteError(e, dataFile.getPath()); + } + long rowSize = endPosition - startPosition; + maybeLogLargePartitionWarning(decoratedKey, rowSize); + metadataCollector.update(endPosition - startPosition, cf.getColumnStats()); + } + + private void maybeLogLargePartitionWarning(DecoratedKey key, long rowSize) + { + if (rowSize > DatabaseDescriptor.getCompactionLargePartitionWarningThreshold()) + { + String keyString = metadata.getKeyValidator().getString(key.getKey()); + logger.warn("Writing large partition {}/{}:{} ({} bytes)", metadata.ksName, metadata.cfName, keyString, rowSize); + } + } + + private static RowIndexEntry rawAppend(ColumnFamily cf, long startPosition, DecoratedKey key, DataOutputPlus out) throws IOException + { + assert cf.hasColumns() || cf.isMarkedForDelete(); + + ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, key.getKey(), out); + ColumnIndex index = builder.build(cf); + + out.writeShort(END_OF_ROW); + return RowIndexEntry.create(startPosition, cf.deletionInfo().getTopLevelDeletion(), index); + } + + /** + * @throws IOException if a read from the DataInput fails + * @throws FSWriteError if a write to the dataFile fails + */ + public long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException + { + long currentPosition = beforeAppend(key); + + ColumnStats.MaxLongTracker maxTimestampTracker = new ColumnStats.MaxLongTracker(Long.MAX_VALUE); + ColumnStats.MinLongTracker minTimestampTracker = new ColumnStats.MinLongTracker(Long.MIN_VALUE); + ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE); + List<ByteBuffer> minColumnNames = Collections.emptyList(); + List<ByteBuffer> maxColumnNames = Collections.emptyList(); + StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE); + boolean hasLegacyCounterShards = false; + + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata); + cf.delete(DeletionTime.serializer.deserialize(in)); + + ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.getKey(), dataFile.stream); + + if (cf.deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE) + { + tombstones.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime); + maxDeletionTimeTracker.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime); + minTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt); + maxTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt); + } + + Iterator<RangeTombstone> rangeTombstoneIterator = cf.deletionInfo().rangeIterator(); + while (rangeTombstoneIterator.hasNext()) + { + RangeTombstone rangeTombstone = rangeTombstoneIterator.next(); + tombstones.update(rangeTombstone.getLocalDeletionTime()); + minTimestampTracker.update(rangeTombstone.timestamp()); + maxTimestampTracker.update(rangeTombstone.timestamp()); + maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime()); + minColumnNames = ColumnNameHelper.minComponents(minColumnNames, rangeTombstone.min, metadata.comparator); + maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, rangeTombstone.max, metadata.comparator); + } + + Iterator<OnDiskAtom> iter = AbstractCell.onDiskIterator(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version, metadata.comparator); + try + { + while (iter.hasNext()) + { + OnDiskAtom atom = iter.next(); + if (atom == null) + break; + + if (atom instanceof CounterCell) + { + atom = ((CounterCell) atom).markLocalToBeCleared(); + hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) atom).hasLegacyShards(); + } + + int deletionTime = atom.getLocalDeletionTime(); + if (deletionTime < Integer.MAX_VALUE) + tombstones.update(deletionTime); + minTimestampTracker.update(atom.timestamp()); + maxTimestampTracker.update(atom.timestamp()); + minColumnNames = ColumnNameHelper.minComponents(minColumnNames, atom.name(), metadata.comparator); + maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, atom.name(), metadata.comparator); + maxDeletionTimeTracker.update(atom.getLocalDeletionTime()); + + columnIndexer.add(atom); // This write the atom on disk too + } ++ columnIndexer.finishAddingAtoms(); + + columnIndexer.maybeWriteEmptyRowHeader(); + dataFile.stream.writeShort(END_OF_ROW); + } + catch (IOException e) + { + throw new FSWriteError(e, dataFile.getPath()); + } + + metadataCollector.updateMinTimestamp(minTimestampTracker.get()) + .updateMaxTimestamp(maxTimestampTracker.get()) + .updateMaxLocalDeletionTime(maxDeletionTimeTracker.get()) + .addRowSize(dataFile.getFilePointer() - currentPosition) + .addColumnCount(columnIndexer.writtenAtomCount()) + .mergeTombstoneHistogram(tombstones) + .updateMinColumnNames(minColumnNames) + .updateMaxColumnNames(maxColumnNames) + .updateHasLegacyCounterShards(hasLegacyCounterShards); + + afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), columnIndexer.build())); + return currentPosition; + } + + private Descriptor makeTmpLinks() + { + // create temp links if they don't already exist + Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK); + if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists()) + { + FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX))); + FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA))); + } + return link; + } + + @SuppressWarnings("resource") + public SSTableReader openEarly() + { + // find the max (exclusive) readable key + IndexSummaryBuilder.ReadableBoundary boundary = iwriter.getMaxReadable(); + if (boundary == null) + return null; + + StatsMetadata stats = statsMetadata(); + assert boundary.indexLength > 0 && boundary.dataLength > 0; + Descriptor link = makeTmpLinks(); + // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers + SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), boundary.indexLength); + SegmentedFile dfile = dbuilder.complete(link.filenameFor(Component.DATA), boundary.dataLength); + SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL), + components, metadata, + partitioner, ifile, + dfile, iwriter.summary.build(partitioner, boundary), + iwriter.bf.sharedCopy(), maxDataAge, stats, SSTableReader.OpenReason.EARLY); + + // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed) + sstable.first = getMinimalKey(first); + sstable.last = getMinimalKey(boundary.lastKey); + return sstable; + } + + public SSTableReader openFinalEarly() + { + // we must ensure the data is completely flushed to disk + dataFile.sync(); + iwriter.indexFile.sync(); + return openFinal(makeTmpLinks(), SSTableReader.OpenReason.EARLY); + } + + @SuppressWarnings("resource") + private SSTableReader openFinal(Descriptor desc, SSTableReader.OpenReason openReason) + { + if (maxDataAge < 0) + maxDataAge = System.currentTimeMillis(); + + StatsMetadata stats = statsMetadata(); + // finalize in-memory state for the reader + SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX)); + SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA)); + SSTableReader sstable = SSTableReader.internalOpen(desc.asType(Descriptor.Type.FINAL), + components, + this.metadata, + partitioner, + ifile, + dfile, + iwriter.summary.build(partitioner), + iwriter.bf.sharedCopy(), + maxDataAge, + stats, + openReason); + sstable.first = getMinimalKey(first); + sstable.last = getMinimalKey(last); + return sstable; + } + + protected SSTableWriter.TransactionalProxy txnProxy() + { + return new TransactionalProxy(); + } + + class TransactionalProxy extends SSTableWriter.TransactionalProxy + { + // finalise our state on disk, including renaming + protected void doPrepare() + { + iwriter.prepareToCommit(); + + // write sstable statistics + dataFile.setDescriptor(descriptor).prepareToCommit(); + writeMetadata(descriptor, finalizeMetadata()); + + // save the table of components + SSTable.appendTOC(descriptor, components); + + // rename to final + rename(descriptor, components); + + if (openResult) + finalReader = openFinal(descriptor.asType(Descriptor.Type.FINAL), SSTableReader.OpenReason.NORMAL); + } + + protected Throwable doCommit(Throwable accumulate) + { + accumulate = dataFile.commit(accumulate); + accumulate = iwriter.commit(accumulate); + return accumulate; + } + + @Override + protected Throwable doPreCleanup(Throwable accumulate) + { + accumulate = dbuilder.close(accumulate); + return accumulate; + } + + protected Throwable doAbort(Throwable accumulate) + { + accumulate = iwriter.abort(accumulate); + accumulate = dataFile.abort(accumulate); + + accumulate = delete(descriptor, accumulate); + if (!openResult) + accumulate = delete(descriptor.asType(Descriptor.Type.FINAL), accumulate); + return accumulate; + } + + private Throwable delete(Descriptor desc, Throwable accumulate) + { + try + { + Set<Component> components = SSTable.discoverComponentsFor(desc); + if (!components.isEmpty()) + SSTable.delete(desc, components); + } + catch (Throwable t) + { + logger.error(String.format("Failed deleting temp components for %s", descriptor), t); + accumulate = merge(accumulate, t); + } + return accumulate; + } + } + + private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components) + { + File file = new File(desc.filenameFor(Component.STATS)); + try (SequentialWriter out = SequentialWriter.open(file);) + { + desc.getMetadataSerializer().serialize(components, out.stream); + out.setDescriptor(desc).finish(); + } + catch (IOException e) + { + throw new FSWriteError(e, file.getPath()); + } + } + + public long getFilePointer() + { + return dataFile.getFilePointer(); + } + + public long getOnDiskFilePointer() + { + return dataFile.getOnDiskFilePointer(); + } + + /** + * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed. + */ + class IndexWriter extends AbstractTransactional implements Transactional + { + private final SequentialWriter indexFile; + public final SegmentedFile.Builder builder; + public final IndexSummaryBuilder summary; + public final IFilter bf; + private FileMark mark; + + IndexWriter(long keyCount, final SequentialWriter dataFile) + { + indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))); + builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false); + summary = new IndexSummaryBuilder(keyCount, metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL); + bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true); + // register listeners to be alerted when the data files are flushed + indexFile.setPostFlushListener(new Runnable() + { + public void run() + { + summary.markIndexSynced(indexFile.getLastFlushOffset()); + } + }); + dataFile.setPostFlushListener(new Runnable() + { + public void run() + { + summary.markDataSynced(dataFile.getLastFlushOffset()); + } + }); + } + + // finds the last (-offset) decorated key that can be guaranteed to occur fully in the flushed portion of the index file + IndexSummaryBuilder.ReadableBoundary getMaxReadable() + { + return summary.getLastReadableBoundary(); + } + + public void append(DecoratedKey key, RowIndexEntry indexEntry, long dataEnd) throws IOException + { + bf.add(key); + long indexStart = indexFile.getFilePointer(); + try + { + ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile.stream); + rowIndexEntrySerializer.serialize(indexEntry, indexFile.stream); + } + catch (IOException e) + { + throw new FSWriteError(e, indexFile.getPath()); + } + long indexEnd = indexFile.getFilePointer(); + + if (logger.isTraceEnabled()) + logger.trace("wrote index entry: {} at {}", indexEntry, indexStart); + + summary.maybeAddEntry(key, indexStart, indexEnd, dataEnd); + builder.addPotentialBoundary(indexStart); + } + + /** + * Closes the index and bloomfilter, making the public state of this writer valid for consumption. + */ + void flushBf() + { + if (components.contains(Component.FILTER)) + { + String path = descriptor.filenameFor(Component.FILTER); + try (FileOutputStream fos = new FileOutputStream(path); + DataOutputStreamPlus stream = new BufferedDataOutputStreamPlus(fos)) + { + // bloom filter + FilterFactory.serialize(bf, stream); + stream.flush(); + SyncUtil.sync(fos); + } + catch (IOException e) + { + throw new FSWriteError(e, path); + } + } + } + + public void mark() + { + mark = indexFile.mark(); + } + + public void resetAndTruncate() + { + // we can't un-set the bloom filter addition, but extra keys in there are harmless. + // we can't reset dbuilder either, but that is the last thing called in afterappend so + // we assume that if that worked then we won't be trying to reset. + indexFile.resetAndTruncate(mark); + } + + protected void doPrepare() + { + flushBf(); + + // truncate index file + long position = iwriter.indexFile.getFilePointer(); + iwriter.indexFile.setDescriptor(descriptor).prepareToCommit(); + FileUtils.truncate(iwriter.indexFile.getPath(), position); + + // save summary + summary.prepareToCommit(); + try (IndexSummary summary = iwriter.summary.build(partitioner)) + { + SSTableReader.saveSummary(descriptor, first, last, iwriter.builder, dbuilder, summary); + } + } + + protected Throwable doCommit(Throwable accumulate) + { + return indexFile.commit(accumulate); + } + + protected Throwable doAbort(Throwable accumulate) + { + return indexFile.abort(accumulate); + } + + @Override + protected Throwable doPreCleanup(Throwable accumulate) + { + accumulate = summary.close(accumulate); + accumulate = bf.close(accumulate); + accumulate = builder.close(accumulate); + return accumulate; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db ---------------------------------------------------------------------- diff --cc test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db index 0000000,22cfa6a..190922a mode 000000,100644..100644 Binary files differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/ScrubTest.java index 0d90354,167671b..b69a1f8 --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@@ -20,98 -20,68 +20,103 @@@ package org.apache.cassandra.db * */ -import java.io.*; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.ExecutionException; +import java.io.File; +import java.io.IOError; +import java.io.IOException; +import java.io.RandomAccessFile; + +import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.db.composites.CellNameType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.CounterColumnType; +import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.db.marshal.CompositeType; -import org.apache.cassandra.db.marshal.LongType; -import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.utils.UUIDGen; + import org.apache.commons.lang3.StringUtils; + import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; -import org.apache.cassandra.OrderedJUnit4ClassRunner; -import org.apache.cassandra.cql3.UntypedResultSet; -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.Util; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; +import org.apache.cassandra.cql3.Operator; +import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; + import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.Scrubber; +import org.apache.cassandra.db.index.SecondaryIndex; +import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.SSTableRewriter; +import org.apache.cassandra.OrderedJUnit4ClassRunner; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.Util; import org.apache.cassandra.utils.ByteBufferUtil; +import static org.junit.Assert.*; +import static org.junit.Assume.assumeTrue; + import static org.apache.cassandra.Util.cellname; import static org.apache.cassandra.Util.column; -import static junit.framework.Assert.assertNotNull; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.junit.Assume.assumeTrue; @RunWith(OrderedJUnit4ClassRunner.class) -public class ScrubTest extends SchemaLoader +public class ScrubTest { - public String KEYSPACE = "Keyspace1"; - public String CF = "Standard1"; - public String CF3 = "Standard2"; - public String COUNTER_CF = "Counter1"; - private static Integer COMPRESSION_CHUNK_LENGTH = 4096; + public static final String KEYSPACE = "Keyspace1"; + public static final String CF = "Standard1"; + public static final String CF2 = "Standard2"; + public static final String CF3 = "Standard3"; ++ public static final String CFI1 = "StandardInteger1"; + public static final String COUNTER_CF = "Counter1"; + public static final String CF_UUID = "UUIDKeys"; + public static final String CF_INDEX1 = "Indexed1"; + public static final String CF_INDEX2 = "Indexed2"; + + public static final String COL_KEYS_INDEX = "birthdate"; + public static final String COL_COMPOSITES_INDEX = "col1"; + public static final String COL_NON_INDEX = "notanindexcol"; + + public static final Integer COMPRESSION_CHUNK_LENGTH = 4096; @BeforeClass - public static void loadSchema() throws ConfigurationException + public static void defineSchema() throws ConfigurationException { - loadSchema(COMPRESSION_CHUNK_LENGTH); + SchemaLoader.loadSchema(); + SchemaLoader.createKeyspace(KEYSPACE, + SimpleStrategy.class, + KSMetaData.optsWithRF(1), + SchemaLoader.standardCFMD(KEYSPACE, CF), + SchemaLoader.standardCFMD(KEYSPACE, CF2), + SchemaLoader.standardCFMD(KEYSPACE, CF3), ++ SchemaLoader.standardCFMD(KEYSPACE, CFI1), + SchemaLoader.standardCFMD(KEYSPACE, COUNTER_CF) + .defaultValidator(CounterColumnType.instance) + .compressionParameters(SchemaLoader.getCompressionParameters(COMPRESSION_CHUNK_LENGTH)), + SchemaLoader.standardCFMD(KEYSPACE, CF_UUID).keyValidator(UUIDType.instance), + SchemaLoader.indexCFMD(KEYSPACE, CF_INDEX1, true), + SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEX2, true)); } @Test @@@ -422,6 -364,59 +427,61 @@@ assert rows.size() == 6 : "Got " + rows.size(); } + @Test + public void testScrub10791() throws Exception + { + // Table is created by StreamingTransferTest.testTransferRangeTombstones with CASSANDRA-10791 fix disabled. + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); - String columnFamily = "StandardInteger1"; ++ String columnFamily = CFI1; + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(columnFamily); + cfs.clearUnsafe(); + + String root = System.getProperty("corrupt-sstable-root"); + assert root != null; + File rootDir = new File(root); + assert rootDir.isDirectory(); - Descriptor desc = new Descriptor(new Descriptor.Version("ka"), rootDir, KEYSPACE, columnFamily, 2, Descriptor.Type.FINAL); - CFMetaData metadata = Schema.instance.getCFMetaData(desc.ksname, desc.cfname); ++ Descriptor desc = new Descriptor("ka", rootDir, KEYSPACE, columnFamily, 2, Descriptor.Type.FINAL, SSTableFormat.Type.LEGACY); + + // open without validation for scrubbing + Set<Component> components = new HashSet<>(); + components.add(Component.DATA); + components.add(Component.PRIMARY_INDEX); + components.add(Component.FILTER); + components.add(Component.STATS); + components.add(Component.SUMMARY); + components.add(Component.TOC); - SSTableReader sstable = SSTableReader.openNoValidation(desc, components, metadata); ++ SSTableReader sstable = SSTableReader.openNoValidation(desc, components, cfs); + - Scrubber scrubber = new Scrubber(cfs, sstable, false, true, true); - scrubber.scrub(); ++ try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.SCRUB, sstable); ++ Scrubber scrubber = new Scrubber(cfs, txn, false, true, true);) ++ { ++ scrubber.scrub(); ++ } + + cfs.loadNewSSTables(); + assertEquals(7, countCells(cfs)); + } + + private int countCells(ColumnFamilyStore cfs) + { + int cellCount = 0; + for (SSTableReader sstable : cfs.getSSTables()) + { + Iterator<OnDiskAtomIterator> it = sstable.getScanner(); + while (it.hasNext()) + { + Iterator<OnDiskAtom> itr = it.next(); + while (itr.hasNext()) + { + ++cellCount; + itr.next(); + } + } + } + return cellCount; + } + private void overrideWithGarbage(SSTableReader sstable, ByteBuffer key1, ByteBuffer key2) throws IOException { boolean compression = Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false")); http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index 3c799e2,31dc492..e751968 --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@@ -26,7 -26,7 +26,8 @@@ import java.util.concurrent.TimeUnit import com.google.common.collect.Iterables; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import org.apache.cassandra.io.sstable.format.SSTableReader; + import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @@@ -306,9 -280,21 +312,21 @@@ public class StreamingTransferTes // add RangeTombstones cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000))); cf.delete(new DeletionInfo(cellname(5), cellname(7), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000))); + cf.delete(new DeletionInfo(cellname(8), cellname(10), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000))); - rm.apply(); ++ rm.applyUnsafe(); + + key = "key1"; + rm = new Mutation(ks, ByteBufferUtil.bytes(key)); + // add columns of size slightly less than column_index_size to force insert column index + rm.add(cfname, cellname(1), ByteBuffer.wrap(new byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2); + cf = rm.addOrGet(cfname); + // add RangeTombstones + cf.delete(new DeletionInfo(cellname(2), cellname(3), cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000))); - rm.apply(); + rm.applyUnsafe(); + cfs.forceBlockingFlush(); + int cellCount = countCells(cfs); SSTableReader sstable = cfs.getSSTables().iterator().next(); cfs.clearUnsafe(); transferSSTables(sstable);