http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java deleted file mode 100644 index 6db9c3d..0000000 --- a/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java +++ /dev/null @@ -1,542 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.io.sstable.format.big; - -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Deque; -import java.util.List; - -import com.google.common.collect.AbstractIterator; - -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.db.filter.ColumnSlice; -import org.apache.cassandra.io.sstable.CorruptSSTableException; -import org.apache.cassandra.io.sstable.IndexHelper; -import org.apache.cassandra.io.sstable.IndexHelper.IndexInfo; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.io.util.FileMark; -import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.ByteBufferUtil; - -/** - * This is a reader that finds the block for a starting column and returns blocks before/after it for each next call. - * This function assumes that the CF is sorted by name and exploits the name index. - */ -class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator -{ - private final ColumnFamily emptyColumnFamily; - - private final SSTableReader sstable; - private final List<IndexHelper.IndexInfo> indexes; - private final FileDataInput originalInput; - private FileDataInput file; - private final boolean reversed; - private final ColumnSlice[] slices; - private final BlockFetcher fetcher; - private final Deque<OnDiskAtom> blockColumns = new ArrayDeque<OnDiskAtom>(); - private final CellNameType comparator; - - // Holds range tombstone in reverse queries. See addColumn() - private final Deque<OnDiskAtom> rangeTombstonesReversed; - - /** - * This slice reader assumes that slices are sorted correctly, e.g. that for forward lookup slices are in - * lexicographic order of start elements and that for reverse lookup they are in reverse lexicographic order of - * finish (reverse start) elements. i.e. forward: [a,b],[d,e],[g,h] reverse: [h,g],[e,d],[b,a]. This reader also - * assumes that validation has been performed in terms of intervals (no overlapping intervals). - */ - IndexedSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, ColumnSlice[] slices, boolean reversed) - { - Tracing.trace("Seeking to partition indexed section in data file"); - this.sstable = sstable; - this.originalInput = input; - this.reversed = reversed; - this.slices = slices; - this.comparator = sstable.metadata.comparator; - this.rangeTombstonesReversed = reversed ? new ArrayDeque<OnDiskAtom>() : null; - - try - { - this.indexes = indexEntry.columnsIndex(); - emptyColumnFamily = ArrayBackedSortedColumns.factory.create(sstable.metadata); - if (indexes.isEmpty()) - { - setToRowStart(indexEntry, input); - emptyColumnFamily.delete(DeletionTime.serializer.deserialize(file)); - fetcher = new SimpleBlockFetcher(); - } - else - { - emptyColumnFamily.delete(indexEntry.deletionTime()); - fetcher = new IndexedBlockFetcher(indexEntry.position); - } - } - catch (IOException e) - { - sstable.markSuspect(); - throw new CorruptSSTableException(e, file.getPath()); - } - } - - /** - * Sets the seek position to the start of the row for column scanning. - */ - private void setToRowStart(RowIndexEntry rowEntry, FileDataInput in) throws IOException - { - if (in == null) - { - this.file = sstable.getFileDataInput(rowEntry.position); - } - else - { - this.file = in; - in.seek(rowEntry.position); - } - sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file)); - } - - public ColumnFamily getColumnFamily() - { - return emptyColumnFamily; - } - - public DecoratedKey getKey() - { - throw new UnsupportedOperationException(); - } - - protected OnDiskAtom computeNext() - { - while (true) - { - if (reversed) - { - // Return all tombstone for the block first (see addColumn() below) - OnDiskAtom column = rangeTombstonesReversed.poll(); - if (column != null) - return column; - } - - OnDiskAtom column = blockColumns.poll(); - if (column == null) - { - if (!fetcher.fetchMoreData()) - return endOfData(); - } - else - { - return column; - } - } - } - - public void close() throws IOException - { - if (originalInput == null && file != null) - file.close(); - } - - protected void addColumn(OnDiskAtom col) - { - if (reversed) - { - /* - * We put range tomstone markers at the beginning of the range they delete. But for reversed queries, - * the caller still need to know about a RangeTombstone before it sees any column that it covers. - * To make that simple, we keep said tombstones separate and return them all before any column for - * a given block. - */ - if (col instanceof RangeTombstone) - rangeTombstonesReversed.addFirst(col); - else - blockColumns.addFirst(col); - } - else - { - blockColumns.addLast(col); - } - } - - private abstract class BlockFetcher - { - protected int currentSliceIdx; - - protected BlockFetcher(int sliceIdx) - { - this.currentSliceIdx = sliceIdx; - } - - /* - * Return the smallest key selected by the current ColumnSlice. - */ - protected Composite currentStart() - { - return reversed ? slices[currentSliceIdx].finish : slices[currentSliceIdx].start; - } - - /* - * Return the biggest key selected by the current ColumnSlice. - */ - protected Composite currentFinish() - { - return reversed ? slices[currentSliceIdx].start : slices[currentSliceIdx].finish; - } - - protected abstract boolean setNextSlice(); - - protected abstract boolean fetchMoreData(); - - protected boolean isColumnBeforeSliceStart(OnDiskAtom column) - { - return isBeforeSliceStart(column.name()); - } - - protected boolean isBeforeSliceStart(Composite name) - { - Composite start = currentStart(); - return !start.isEmpty() && comparator.compare(name, start) < 0; - } - - protected boolean isColumnBeforeSliceFinish(OnDiskAtom column) - { - Composite finish = currentFinish(); - return finish.isEmpty() || comparator.compare(column.name(), finish) <= 0; - } - - protected boolean isAfterSliceFinish(Composite name) - { - Composite finish = currentFinish(); - return !finish.isEmpty() && comparator.compare(name, finish) > 0; - } - } - - private class IndexedBlockFetcher extends BlockFetcher - { - // where this row starts - private final long columnsStart; - - // the index entry for the next block to deserialize - private int nextIndexIdx = -1; - - // index of the last block we've read from disk; - private int lastDeserializedBlock = -1; - - // For reversed, keep columns at the beginning of the last deserialized block that - // may still match a slice - private final Deque<OnDiskAtom> prefetched; - - public IndexedBlockFetcher(long columnsStart) - { - super(-1); - this.columnsStart = columnsStart; - this.prefetched = reversed ? new ArrayDeque<OnDiskAtom>() : null; - setNextSlice(); - } - - protected boolean setNextSlice() - { - while (++currentSliceIdx < slices.length) - { - nextIndexIdx = IndexHelper.indexFor(slices[currentSliceIdx].start, indexes, comparator, reversed, nextIndexIdx); - if (nextIndexIdx < 0 || nextIndexIdx >= indexes.size()) - // no index block for that slice - continue; - - // Check if we can exclude this slice entirely from the index - IndexInfo info = indexes.get(nextIndexIdx); - if (reversed) - { - if (!isBeforeSliceStart(info.lastName)) - return true; - } - else - { - if (!isAfterSliceFinish(info.firstName)) - return true; - } - } - nextIndexIdx = -1; - return false; - } - - protected boolean hasMoreSlice() - { - return currentSliceIdx < slices.length; - } - - protected boolean fetchMoreData() - { - if (!hasMoreSlice()) - return false; - - // If we read blocks in reversed disk order, we may have columns from the previous block to handle. - // Note that prefetched keeps columns in reversed disk order. - // Also note that Range Tombstone handling is a bit tricky, because we may run into range tombstones - // that cover a slice *after* we've move to the previous slice. To keep it simple, we simply include - // every RT in prefetched: it's only slightly inefficient to do so and there is only so much RT that - // can be mistakenly added this way. - if (reversed && !prefetched.isEmpty()) - { - // Avoids some comparison when we know it's not useful - boolean inSlice = false; - - OnDiskAtom prefetchedCol; - while ((prefetchedCol = prefetched.peek()) != null) - { - // col is before slice, we update the slice - if (isColumnBeforeSliceStart(prefetchedCol)) - { - inSlice = false; - - // As explained above, we add RT unconditionally - if (prefetchedCol instanceof RangeTombstone) - { - blockColumns.addLast(prefetched.poll()); - continue; - } - - // Otherwise, we either move to the next slice. If we have no more slice, then - // simply unwind prefetched entirely and add all RT. - if (!setNextSlice()) - { - while ((prefetchedCol = prefetched.poll()) != null) - if (prefetchedCol instanceof RangeTombstone) - blockColumns.addLast(prefetchedCol); - break; - } - - } - // col is within slice, all columns - // (we go in reverse, so as soon as we are in a slice, no need to check - // we're after the slice until we change slice) - else if (inSlice || isColumnBeforeSliceFinish(prefetchedCol)) - { - blockColumns.addLast(prefetched.poll()); - inSlice = true; - } - // if col is after slice, ignore - else - { - prefetched.poll(); - } - } - - if (!blockColumns.isEmpty()) - return true; - else if (!hasMoreSlice()) - return false; - } - try - { - return getNextBlock(); - } - catch (IOException e) - { - throw new CorruptSSTableException(e, file.getPath()); - } - } - - private boolean getNextBlock() throws IOException - { - if (lastDeserializedBlock == nextIndexIdx) - { - if (reversed) - nextIndexIdx--; - else - nextIndexIdx++; - } - lastDeserializedBlock = nextIndexIdx; - - // Are we done? - if (lastDeserializedBlock < 0 || lastDeserializedBlock >= indexes.size()) - return false; - - IndexInfo currentIndex = indexes.get(lastDeserializedBlock); - - /* seek to the correct offset to the data, and calculate the data size */ - long positionToSeek = columnsStart + currentIndex.offset; - - // With new promoted indexes, our first seek in the data file will happen at that point. - if (file == null) - file = originalInput == null ? sstable.getFileDataInput(positionToSeek) : originalInput; - - AtomDeserializer deserializer = emptyColumnFamily.metadata().getOnDiskDeserializer(file, sstable.descriptor.version); - - file.seek(positionToSeek); - FileMark mark = file.mark(); - - // We remenber when we are whithin a slice to avoid some comparison - boolean inSlice = false; - - // scan from index start - while (file.bytesPastMark(mark) < currentIndex.width || deserializer.hasUnprocessed()) - { - // col is before slice - // (If in slice, don't bother checking that until we change slice) - Composite start = currentStart(); - if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start) < 0) - { - // If it's a rangeTombstone, then we need to read it and include it unless it's end - // stops before our slice start. - if (deserializer.nextIsRangeTombstone()) - { - RangeTombstone rt = (RangeTombstone)deserializer.readNext(); - if (comparator.compare(rt.max, start) >= 0) - addColumn(rt); - continue; - } - - if (reversed) - { - // the next slice select columns that are before the current one, so it may - // match this column, so keep it around. - prefetched.addFirst(deserializer.readNext()); - } - else - { - deserializer.skipNext(); - } - } - // col is within slice - else - { - Composite finish = currentFinish(); - if (finish.isEmpty() || deserializer.compareNextTo(finish) <= 0) - { - inSlice = true; - addColumn(deserializer.readNext()); - } - // col is after slice. - else - { - // When reading forward, if we hit a column that sorts after the current slice, it means we're done with this slice. - // For reversed, this may either mean that we're done with the current slice, or that we need to read the previous - // index block. However, we can be sure that we are in the first case though (the current slice is done) if the first - // columns of the block were not part of the current slice, i.e. if we have columns in prefetched. - if (reversed && prefetched.isEmpty()) - break; - - if (!setNextSlice()) - break; - - inSlice = false; - - // The next index block now corresponds to the first block that may have columns for the newly set slice. - // So if it's different from the current block, we're done with this block. And in that case, we know - // that our prefetched columns won't match. - if (nextIndexIdx != lastDeserializedBlock) - { - if (reversed) - prefetched.clear(); - break; - } - - // Even if the next slice may have column in this blocks, if we're reversed, those columns have been - // prefetched and we're done with that block - if (reversed) - break; - - // otherwise, we will deal with that column at the next iteration - } - } - } - return true; - } - } - - private class SimpleBlockFetcher extends BlockFetcher - { - public SimpleBlockFetcher() throws IOException - { - // Since we have to deserialize in order and will read all slices might as well reverse the slices and - // behave as if it was not reversed - super(reversed ? slices.length - 1 : 0); - - // We remenber when we are whithin a slice to avoid some comparison - boolean inSlice = false; - - AtomDeserializer deserializer = emptyColumnFamily.metadata().getOnDiskDeserializer(file, sstable.descriptor.version); - while (deserializer.hasNext()) - { - // col is before slice - // (If in slice, don't bother checking that until we change slice) - Composite start = currentStart(); - if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start) < 0) - { - // If it's a rangeTombstone, then we need to read it and include it unless it's end - // stops before our slice start. Otherwise, we can skip it. - if (deserializer.nextIsRangeTombstone()) - { - RangeTombstone rt = (RangeTombstone)deserializer.readNext(); - if (comparator.compare(rt.max, start) >= 0) - addColumn(rt); - } - else - { - deserializer.skipNext(); - } - continue; - } - - // col is within slice - Composite finish = currentFinish(); - if (finish.isEmpty() || deserializer.compareNextTo(finish) <= 0) - { - inSlice = true; - addColumn(deserializer.readNext()); - } - // col is after slice. more slices? - else - { - inSlice = false; - if (!setNextSlice()) - break; - } - } - } - - protected boolean setNextSlice() - { - if (reversed) - { - if (currentSliceIdx <= 0) - return false; - - currentSliceIdx--; - } - else - { - if (currentSliceIdx >= slices.length - 1) - return false; - - currentSliceIdx++; - } - return true; - } - - protected boolean fetchMoreData() - { - return false; - } - } -}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java b/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java deleted file mode 100644 index b8910c7..0000000 --- a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.io.sstable.format.big; - -import java.io.IOException; -import java.util.*; - -import com.google.common.collect.AbstractIterator; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.io.sstable.CorruptSSTableException; -import org.apache.cassandra.io.sstable.IndexHelper; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.io.util.FileMark; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.utils.ByteBufferUtil; - -class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator -{ - private ColumnFamily cf; - private final SSTableReader sstable; - private FileDataInput fileToClose; - private Iterator<OnDiskAtom> iter; - public final SortedSet<CellName> columns; - public final DecoratedKey key; - - public SSTableNamesIterator(SSTableReader sstable, DecoratedKey key, SortedSet<CellName> columns) - { - assert columns != null; - this.sstable = sstable; - this.columns = columns; - this.key = key; - - RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ); - if (indexEntry == null) - return; - - try - { - read(sstable, null, indexEntry); - } - catch (IOException e) - { - sstable.markSuspect(); - throw new CorruptSSTableException(e, sstable.getFilename()); - } - finally - { - if (fileToClose != null) - FileUtils.closeQuietly(fileToClose); - } - } - - public SSTableNamesIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry) - { - assert columns != null; - this.sstable = sstable; - this.columns = columns; - this.key = key; - - try - { - read(sstable, file, indexEntry); - } - catch (IOException e) - { - sstable.markSuspect(); - throw new CorruptSSTableException(e, sstable.getFilename()); - } - } - - private FileDataInput createFileDataInput(long position) - { - fileToClose = sstable.getFileDataInput(position); - return fileToClose; - } - - @SuppressWarnings("resource") - private void read(SSTableReader sstable, FileDataInput file, RowIndexEntry indexEntry) - throws IOException - { - List<IndexHelper.IndexInfo> indexList; - - // If the entry is not indexed or the index is not promoted, read from the row start - if (!indexEntry.isIndexed()) - { - if (file == null) - file = createFileDataInput(indexEntry.position); - else - file.seek(indexEntry.position); - - DecoratedKey keyInDisk = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file)); - assert keyInDisk.equals(key) : String.format("%s != %s in %s", keyInDisk, key, file.getPath()); - } - - indexList = indexEntry.columnsIndex(); - - if (!indexEntry.isIndexed()) - { - ColumnFamilySerializer serializer = ColumnFamily.serializer; - try - { - cf = ArrayBackedSortedColumns.factory.create(sstable.metadata); - cf.delete(DeletionTime.serializer.deserialize(file)); - } - catch (Exception e) - { - throw new IOException(serializer + " failed to deserialize " + sstable.getColumnFamilyName() + " with " + sstable.metadata + " from " + file, e); - } - } - else - { - cf = ArrayBackedSortedColumns.factory.create(sstable.metadata); - cf.delete(indexEntry.deletionTime()); - } - - List<OnDiskAtom> result = new ArrayList<OnDiskAtom>(); - if (indexList.isEmpty()) - { - readSimpleColumns(file, columns, result); - } - else - { - readIndexedColumns(sstable.metadata, file, columns, indexList, indexEntry.position, result); - } - - // create an iterator view of the columns we read - iter = result.iterator(); - } - - private void readSimpleColumns(FileDataInput file, SortedSet<CellName> columnNames, List<OnDiskAtom> result) - { - Iterator<OnDiskAtom> atomIterator = cf.metadata().getOnDiskIterator(file, sstable.descriptor.version); - int n = 0; - while (atomIterator.hasNext()) - { - OnDiskAtom column = atomIterator.next(); - if (column instanceof Cell) - { - if (columnNames.contains(column.name())) - { - result.add(column); - if (++n >= columns.size()) - break; - } - } - else - { - result.add(column); - } - } - } - - @SuppressWarnings("resource") - private void readIndexedColumns(CFMetaData metadata, - FileDataInput file, - SortedSet<CellName> columnNames, - List<IndexHelper.IndexInfo> indexList, - long basePosition, - List<OnDiskAtom> result) - throws IOException - { - /* get the various column ranges we have to read */ - CellNameType comparator = metadata.comparator; - List<IndexHelper.IndexInfo> ranges = new ArrayList<IndexHelper.IndexInfo>(); - int lastIndexIdx = -1; - for (CellName name : columnNames) - { - int index = IndexHelper.indexFor(name, indexList, comparator, false, lastIndexIdx); - if (index < 0 || index == indexList.size()) - continue; - IndexHelper.IndexInfo indexInfo = indexList.get(index); - // Check the index block does contain the column names and that we haven't inserted this block yet. - if (comparator.compare(name, indexInfo.firstName) < 0 || index == lastIndexIdx) - continue; - - ranges.add(indexInfo); - lastIndexIdx = index; - } - - if (ranges.isEmpty()) - return; - - Iterator<CellName> toFetch = columnNames.iterator(); - CellName nextToFetch = toFetch.next(); - for (IndexHelper.IndexInfo indexInfo : ranges) - { - long positionToSeek = basePosition + indexInfo.offset; - - // With new promoted indexes, our first seek in the data file will happen at that point. - if (file == null) - file = createFileDataInput(positionToSeek); - - AtomDeserializer deserializer = cf.metadata().getOnDiskDeserializer(file, sstable.descriptor.version); - file.seek(positionToSeek); - FileMark mark = file.mark(); - while (file.bytesPastMark(mark) < indexInfo.width && nextToFetch != null) - { - int cmp = deserializer.compareNextTo(nextToFetch); - if (cmp < 0) - { - // If it's a rangeTombstone, then we need to read it and include - // it if it includes our target. Otherwise, we can skip it. - if (deserializer.nextIsRangeTombstone()) - { - RangeTombstone rt = (RangeTombstone)deserializer.readNext(); - if (comparator.compare(rt.max, nextToFetch) >= 0) - result.add(rt); - } - else - { - deserializer.skipNext(); - } - } - else if (cmp == 0) - { - nextToFetch = toFetch.hasNext() ? toFetch.next() : null; - result.add(deserializer.readNext()); - } - else - nextToFetch = toFetch.hasNext() ? toFetch.next() : null; - } - } - } - - public DecoratedKey getKey() - { - return key; - } - - public ColumnFamily getColumnFamily() - { - return cf; - } - - protected OnDiskAtom computeNext() - { - if (iter == null || !iter.hasNext()) - return endOfData(); - return iter.next(); - } - - public void close() throws IOException { } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/SSTableSliceIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableSliceIterator.java b/src/java/org/apache/cassandra/io/sstable/format/big/SSTableSliceIterator.java deleted file mode 100644 index 07d867d..0000000 --- a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableSliceIterator.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.io.sstable.format.big; - -import java.io.IOException; - -import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.OnDiskAtom; -import org.apache.cassandra.db.RowIndexEntry; -import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; -import org.apache.cassandra.db.filter.ColumnSlice; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.util.FileDataInput; - -/** - * A Cell Iterator over SSTable - */ -class SSTableSliceIterator implements OnDiskAtomIterator -{ - private final OnDiskAtomIterator reader; - private final DecoratedKey key; - - public SSTableSliceIterator(SSTableReader sstable, DecoratedKey key, ColumnSlice[] slices, boolean reversed) - { - this.key = key; - RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ); - this.reader = indexEntry == null ? null : createReader(sstable, indexEntry, null, slices, reversed); - } - - /** - * An iterator for a slice within an SSTable - * @param sstable Keyspace for the CFS we are reading from - * @param file Optional parameter that input is read from. If null is passed, this class creates an appropriate one automatically. - * If this class creates, it will close the underlying file when #close() is called. - * If a caller passes a non-null argument, this class will NOT close the underlying file when the iterator is closed (i.e. the caller is responsible for closing the file) - * In all cases the caller should explicitly #close() this iterator. - * @param key The key the requested slice resides under - * @param slices the column slices - * @param reversed Results are returned in reverse order iff reversed is true. - * @param indexEntry position of the row - */ - public SSTableSliceIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, ColumnSlice[] slices, boolean reversed, RowIndexEntry indexEntry) - { - this.key = key; - reader = createReader(sstable, indexEntry, file, slices, reversed); - } - - private static OnDiskAtomIterator createReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput file, ColumnSlice[] slices, boolean reversed) - { - return slices.length == 1 && slices[0].start.isEmpty() && !reversed - ? new SimpleSliceReader(sstable, indexEntry, file, slices[0].finish) - : new IndexedSliceReader(sstable, indexEntry, file, slices, reversed); - } - - public DecoratedKey getKey() - { - return key; - } - - public ColumnFamily getColumnFamily() - { - return reader == null ? null : reader.getColumnFamily(); - } - - public boolean hasNext() - { - return reader != null && reader.hasNext(); - } - - public OnDiskAtom next() - { - return reader.next(); - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - - public void close() throws IOException - { - if (reader != null) - reader.close(); - } - -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/SimpleSliceReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/SimpleSliceReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/SimpleSliceReader.java deleted file mode 100644 index 9fec303..0000000 --- a/src/java/org/apache/cassandra/io/sstable/format/big/SimpleSliceReader.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.io.sstable.format.big; - -import java.io.IOException; -import java.util.Iterator; - -import com.google.common.collect.AbstractIterator; -import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.io.sstable.CorruptSSTableException; -import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.ByteBufferUtil; - -class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator -{ - private static final Logger logger = LoggerFactory.getLogger(SimpleSliceReader.class); - - private final FileDataInput file; - private final boolean needsClosing; - private final Composite finishColumn; - private final CellNameType comparator; - private final ColumnFamily emptyColumnFamily; - private final Iterator<OnDiskAtom> atomIterator; - - SimpleSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, Composite finishColumn) - { - Tracing.trace("Seeking to partition beginning in data file"); - this.finishColumn = finishColumn; - this.comparator = sstable.metadata.comparator; - try - { - if (input == null) - { - this.file = sstable.getFileDataInput(indexEntry.position); - this.needsClosing = true; - } - else - { - this.file = input; - input.seek(indexEntry.position); - this.needsClosing = false; - } - - // Skip key and data size - ByteBufferUtil.skipShortLength(file); - - emptyColumnFamily = ArrayBackedSortedColumns.factory.create(sstable.metadata); - emptyColumnFamily.delete(DeletionTime.serializer.deserialize(file)); - atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, sstable.descriptor.version); - } - catch (IOException e) - { - sstable.markSuspect(); - throw new CorruptSSTableException(e, sstable.getFilename()); - } - } - - protected OnDiskAtom computeNext() - { - if (!atomIterator.hasNext()) - return endOfData(); - - OnDiskAtom column = atomIterator.next(); - if (!finishColumn.isEmpty() && comparator.compare(column.name(), finishColumn) > 0) - return endOfData(); - - return column; - } - - public ColumnFamily getColumnFamily() - { - return emptyColumnFamily; - } - - public void close() throws IOException - { - if (needsClosing) - file.close(); - } - - public DecoratedKey getKey() - { - throw new UnsupportedOperationException(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java index 4bd060e..90a9f24 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java @@ -23,6 +23,7 @@ import java.util.*; import com.google.common.collect.Maps; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; @@ -64,12 +65,12 @@ public class LegacyMetadataSerializer extends MetadataSerializer out.writeInt(g); StreamingHistogram.serializer.serialize(stats.estimatedTombstoneDropTime, out); out.writeInt(stats.sstableLevel); - out.writeInt(stats.minColumnNames.size()); - for (ByteBuffer columnName : stats.minColumnNames) - ByteBufferUtil.writeWithShortLength(columnName, out); - out.writeInt(stats.maxColumnNames.size()); - for (ByteBuffer columnName : stats.maxColumnNames) - ByteBufferUtil.writeWithShortLength(columnName, out); + out.writeInt(stats.minClusteringValues.size()); + for (ByteBuffer value : stats.minClusteringValues) + ByteBufferUtil.writeWithShortLength(value, out); + out.writeInt(stats.maxClusteringValues.size()); + for (ByteBuffer value : stats.maxClusteringValues) + ByteBufferUtil.writeWithShortLength(value, out); } /** @@ -127,14 +128,19 @@ public class LegacyMetadataSerializer extends MetadataSerializer replayPosition, minTimestamp, maxTimestamp, + Integer.MAX_VALUE, maxLocalDeletionTime, + 0, + Integer.MAX_VALUE, compressionRatio, tombstoneHistogram, sstableLevel, minColumnNames, maxColumnNames, true, - ActiveRepairService.UNREPAIRED_SSTABLE)); + ActiveRepairService.UNREPAIRED_SSTABLE, + -1, + -1)); if (types.contains(MetadataType.COMPACTION)) components.put(MetadataType.COMPACTION, new CompactionMetadata(ancestors, null)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java index 5962a46..2574c62 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java @@ -19,22 +19,20 @@ package org.apache.cassandra.io.sstable.metadata; import java.io.File; import java.nio.ByteBuffer; -import java.util.Collection; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; import com.clearspring.analytics.stream.cardinality.ICardinality; +import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.ReplayPosition; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.io.sstable.ColumnNameHelper; -import org.apache.cassandra.io.sstable.ColumnStats; +import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.format.SSTableReader; @@ -47,13 +45,13 @@ public class MetadataCollector { public static final double NO_COMPRESSION_RATIO = -1.0; - static EstimatedHistogram defaultColumnCountHistogram() + static EstimatedHistogram defaultCellPerPartitionCountHistogram() { // EH of 114 can track a max value of 2395318855, i.e., > 2B columns return new EstimatedHistogram(114); } - static EstimatedHistogram defaultRowSizeHistogram() + static EstimatedHistogram defaultPartitionSizeHistogram() { // EH of 150 can track a max value of 1697806495183, i.e., > 1.5PB return new EstimatedHistogram(150); @@ -66,34 +64,42 @@ public class MetadataCollector public static StatsMetadata defaultStatsMetadata() { - return new StatsMetadata(defaultRowSizeHistogram(), - defaultColumnCountHistogram(), + return new StatsMetadata(defaultPartitionSizeHistogram(), + defaultCellPerPartitionCountHistogram(), ReplayPosition.NONE, Long.MIN_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE, + Integer.MAX_VALUE, + 0, + Integer.MAX_VALUE, NO_COMPRESSION_RATIO, defaultTombstoneDropTimeHistogram(), 0, Collections.<ByteBuffer>emptyList(), Collections.<ByteBuffer>emptyList(), true, - ActiveRepairService.UNREPAIRED_SSTABLE); + ActiveRepairService.UNREPAIRED_SSTABLE, + -1, + -1); } - protected EstimatedHistogram estimatedRowSize = defaultRowSizeHistogram(); - protected EstimatedHistogram estimatedColumnCount = defaultColumnCountHistogram(); + protected EstimatedHistogram estimatedPartitionSize = defaultPartitionSizeHistogram(); + // TODO: cound the number of row per partition (either with the number of cells, or instead) + protected EstimatedHistogram estimatedCellPerPartitionCount = defaultCellPerPartitionCountHistogram(); protected ReplayPosition replayPosition = ReplayPosition.NONE; - protected long minTimestamp = Long.MAX_VALUE; - protected long maxTimestamp = Long.MIN_VALUE; - protected int maxLocalDeletionTime = Integer.MIN_VALUE; + protected final MinMaxLongTracker timestampTracker = new MinMaxLongTracker(); + protected final MinMaxIntTracker localDeletionTimeTracker = new MinMaxIntTracker(LivenessInfo.NO_DELETION_TIME, LivenessInfo.NO_DELETION_TIME); + protected final MinMaxIntTracker ttlTracker = new MinMaxIntTracker(LivenessInfo.NO_TTL, LivenessInfo.NO_TTL); protected double compressionRatio = NO_COMPRESSION_RATIO; protected Set<Integer> ancestors = new HashSet<>(); protected StreamingHistogram estimatedTombstoneDropTime = defaultTombstoneDropTimeHistogram(); protected int sstableLevel; - protected List<ByteBuffer> minColumnNames = Collections.emptyList(); - protected List<ByteBuffer> maxColumnNames = Collections.emptyList(); + protected ByteBuffer[] minClusteringValues; + protected ByteBuffer[] maxClusteringValues; protected boolean hasLegacyCounterShards = false; + protected long totalColumnsSet; + protected long totalRows; /** * Default cardinality estimation method is to use HyperLogLog++. @@ -102,16 +108,19 @@ public class MetadataCollector * See CASSANDRA-5906 for detail. */ protected ICardinality cardinality = new HyperLogLogPlus(13, 25); - private final CellNameType columnNameComparator; + private final ClusteringComparator comparator; - public MetadataCollector(CellNameType columnNameComparator) + public MetadataCollector(ClusteringComparator comparator) { - this.columnNameComparator = columnNameComparator; + this.comparator = comparator; + + this.minClusteringValues = new ByteBuffer[comparator.size()]; + this.maxClusteringValues = new ByteBuffer[comparator.size()]; } - public MetadataCollector(Iterable<SSTableReader> sstables, CellNameType columnNameComparator, int level, boolean skipAncestors) + public MetadataCollector(Iterable<SSTableReader> sstables, ClusteringComparator comparator, int level, boolean skipAncestors) { - this(columnNameComparator); + this(comparator); replayPosition(ReplayPosition.getReplayPosition(sstables)); sstableLevel(level); @@ -129,9 +138,9 @@ public class MetadataCollector } } - public MetadataCollector(Iterable<SSTableReader> sstables, CellNameType columnNameComparator, int level) + public MetadataCollector(Iterable<SSTableReader> sstables, ClusteringComparator comparator, int level) { - this(sstables, columnNameComparator, level, false); + this(sstables, comparator, level, false); } public MetadataCollector addKey(ByteBuffer key) @@ -141,15 +150,15 @@ public class MetadataCollector return this; } - public MetadataCollector addRowSize(long rowSize) + public MetadataCollector addPartitionSizeInBytes(long partitionSize) { - estimatedRowSize.add(rowSize); + estimatedPartitionSize.add(partitionSize); return this; } - public MetadataCollector addColumnCount(long columnCount) + public MetadataCollector addCellPerPartitionCount(long cellCount) { - estimatedColumnCount.add(columnCount); + estimatedCellPerPartitionCount.add(cellCount); return this; } @@ -169,34 +178,50 @@ public class MetadataCollector return this; } - public MetadataCollector updateMinTimestamp(long potentialMin) + public MetadataCollector update(LivenessInfo newInfo) { - minTimestamp = Math.min(minTimestamp, potentialMin); + // If the info doesn't have a timestamp, this means the info is basically irrelevant (it's a row + // update whose only info we care are the cells info basically). + if (newInfo.hasTimestamp()) + { + updateTimestamp(newInfo.timestamp()); + updateTTL(newInfo.ttl()); + updateLocalDeletionTime(newInfo.localDeletionTime()); + } return this; } - public MetadataCollector updateMaxTimestamp(long potentialMax) + public MetadataCollector update(DeletionTime dt) { - maxTimestamp = Math.max(maxTimestamp, potentialMax); + if (!dt.isLive()) + { + updateTimestamp(dt.markedForDeleteAt()); + updateLocalDeletionTime(dt.localDeletionTime()); + } return this; } - public MetadataCollector updateMaxLocalDeletionTime(int maxLocalDeletionTime) + public MetadataCollector updateColumnSetPerRow(long columnSetInRow) { - this.maxLocalDeletionTime = Math.max(this.maxLocalDeletionTime, maxLocalDeletionTime); + totalColumnsSet += columnSetInRow; + ++totalRows; return this; } - public MetadataCollector estimatedRowSize(EstimatedHistogram estimatedRowSize) + private void updateTimestamp(long newTimestamp) { - this.estimatedRowSize = estimatedRowSize; - return this; + timestampTracker.update(newTimestamp); } - public MetadataCollector estimatedColumnCount(EstimatedHistogram estimatedColumnCount) + private void updateLocalDeletionTime(int newLocalDeletionTime) { - this.estimatedColumnCount = estimatedColumnCount; - return this; + localDeletionTimeTracker.update(newLocalDeletionTime); + estimatedTombstoneDropTime.update(newLocalDeletionTime); + } + + private void updateTTL(int newTTL) + { + ttlTracker.update(newTTL); } public MetadataCollector replayPosition(ReplayPosition replayPosition) @@ -217,58 +242,179 @@ public class MetadataCollector return this; } - public MetadataCollector updateMinColumnNames(List<ByteBuffer> minColumnNames) + public MetadataCollector updateClusteringValues(ClusteringPrefix clustering) { - if (minColumnNames.size() > 0) - this.minColumnNames = ColumnNameHelper.mergeMin(this.minColumnNames, minColumnNames, columnNameComparator); + int size = clustering.size(); + for (int i = 0; i < size; i++) + { + AbstractType<?> type = comparator.subtype(i); + ByteBuffer newValue = clustering.get(i); + minClusteringValues[i] = min(minClusteringValues[i], newValue, type); + maxClusteringValues[i] = max(maxClusteringValues[i], newValue, type); + } return this; } - public MetadataCollector updateMaxColumnNames(List<ByteBuffer> maxColumnNames) + private static ByteBuffer min(ByteBuffer b1, ByteBuffer b2, AbstractType<?> comparator) { - if (maxColumnNames.size() > 0) - this.maxColumnNames = ColumnNameHelper.mergeMax(this.maxColumnNames, maxColumnNames, columnNameComparator); - return this; + if (b1 == null) + return b2; + if (b2 == null) + return b1; + + if (comparator.compare(b1, b2) >= 0) + return b2; + return b1; } - public MetadataCollector updateHasLegacyCounterShards(boolean hasLegacyCounterShards) + private static ByteBuffer max(ByteBuffer b1, ByteBuffer b2, AbstractType<?> comparator) { - this.hasLegacyCounterShards = this.hasLegacyCounterShards || hasLegacyCounterShards; - return this; + if (b1 == null) + return b2; + if (b2 == null) + return b1; + + if (comparator.compare(b1, b2) >= 0) + return b1; + return b2; } - public MetadataCollector update(long rowSize, ColumnStats stats) + public MetadataCollector updateHasLegacyCounterShards(boolean hasLegacyCounterShards) { - updateMinTimestamp(stats.minTimestamp); - updateMaxTimestamp(stats.maxTimestamp); - updateMaxLocalDeletionTime(stats.maxLocalDeletionTime); - addRowSize(rowSize); - addColumnCount(stats.columnCount); - mergeTombstoneHistogram(stats.tombstoneHistogram); - updateMinColumnNames(stats.minColumnNames); - updateMaxColumnNames(stats.maxColumnNames); - updateHasLegacyCounterShards(stats.hasLegacyCounterShards); + this.hasLegacyCounterShards = this.hasLegacyCounterShards || hasLegacyCounterShards; return this; } - public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt) + public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt, SerializationHeader header) { Map<MetadataType, MetadataComponent> components = Maps.newHashMap(); components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance)); - components.put(MetadataType.STATS, new StatsMetadata(estimatedRowSize, - estimatedColumnCount, + components.put(MetadataType.STATS, new StatsMetadata(estimatedPartitionSize, + estimatedCellPerPartitionCount, replayPosition, - minTimestamp, - maxTimestamp, - maxLocalDeletionTime, + timestampTracker.min(), + timestampTracker.max(), + localDeletionTimeTracker.min(), + localDeletionTimeTracker.max(), + ttlTracker.min(), + ttlTracker.max(), compressionRatio, estimatedTombstoneDropTime, sstableLevel, - ImmutableList.copyOf(minColumnNames), - ImmutableList.copyOf(maxColumnNames), + makeList(minClusteringValues), + makeList(maxClusteringValues), hasLegacyCounterShards, - repairedAt)); + repairedAt, + totalColumnsSet, + totalRows)); components.put(MetadataType.COMPACTION, new CompactionMetadata(ancestors, cardinality)); + components.put(MetadataType.HEADER, header.toComponent()); return components; } + + private static List<ByteBuffer> makeList(ByteBuffer[] values) + { + // In most case, l will be the same size than values, but it's possible for it to be smaller + List<ByteBuffer> l = new ArrayList<ByteBuffer>(values.length); + for (int i = 0; i < values.length; i++) + if (values[i] == null) + break; + else + l.add(values[i]); + return l; + } + + public static class MinMaxLongTracker + { + private final long defaultMin; + private final long defaultMax; + + private boolean isSet = false; + private long min; + private long max; + + public MinMaxLongTracker() + { + this(Long.MIN_VALUE, Long.MAX_VALUE); + } + + public MinMaxLongTracker(long defaultMin, long defaultMax) + { + this.defaultMin = defaultMin; + this.defaultMax = defaultMax; + } + + public void update(long value) + { + if (!isSet) + { + min = max = value; + isSet = true; + } + else + { + if (value < min) + min = value; + if (value > max) + max = value; + } + } + + public long min() + { + return isSet ? min : defaultMin; + } + + public long max() + { + return isSet ? max : defaultMax; + } + } + + public static class MinMaxIntTracker + { + private final int defaultMin; + private final int defaultMax; + + private boolean isSet = false; + private int min; + private int max; + + public MinMaxIntTracker() + { + this(Integer.MIN_VALUE, Integer.MAX_VALUE); + } + + public MinMaxIntTracker(int defaultMin, int defaultMax) + { + this.defaultMin = defaultMin; + this.defaultMax = defaultMax; + } + + public void update(int value) + { + if (!isSet) + { + min = max = value; + isSet = true; + } + else + { + if (value < min) + min = value; + if (value > max) + max = value; + } + } + + public int min() + { + return isSet ? min : defaultMin; + } + + public int max() + { + return isSet ? max : defaultMax; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java index 8a65d8d..fcdf57a 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java @@ -75,7 +75,7 @@ public class MetadataSerializer implements IMetadataSerializer } } - public Map<MetadataType, MetadataComponent> deserialize(Descriptor descriptor, EnumSet<MetadataType> types) throws IOException + public Map<MetadataType, MetadataComponent> deserialize( Descriptor descriptor, EnumSet<MetadataType> types) throws IOException { Map<MetadataType, MetadataComponent> components; logger.debug("Load metadata for {}", descriptor); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java index 9717da1..875cec4 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.io.sstable.metadata; +import org.apache.cassandra.db.SerializationHeader; + /** * Defines Metadata component type. */ @@ -27,7 +29,9 @@ public enum MetadataType /** Metadata only used at compaction */ COMPACTION(CompactionMetadata.serializer), /** Metadata always keep in memory */ - STATS(StatsMetadata.serializer); + STATS(StatsMetadata.serializer), + /** Serialization header */ + HEADER((IMetadataComponentSerializer)SerializationHeader.serializer); public final IMetadataComponentSerializer<MetadataComponent> serializer; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java index f2eb1af..809d6b3 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java @@ -46,42 +46,57 @@ public class StatsMetadata extends MetadataComponent public final ReplayPosition replayPosition; public final long minTimestamp; public final long maxTimestamp; + public final int minLocalDeletionTime; public final int maxLocalDeletionTime; + public final int minTTL; + public final int maxTTL; public final double compressionRatio; public final StreamingHistogram estimatedTombstoneDropTime; public final int sstableLevel; - public final List<ByteBuffer> maxColumnNames; - public final List<ByteBuffer> minColumnNames; + public final List<ByteBuffer> minClusteringValues; + public final List<ByteBuffer> maxClusteringValues; public final boolean hasLegacyCounterShards; public final long repairedAt; + public final long totalColumnsSet; + public final long totalRows; public StatsMetadata(EstimatedHistogram estimatedRowSize, EstimatedHistogram estimatedColumnCount, ReplayPosition replayPosition, long minTimestamp, long maxTimestamp, + int minLocalDeletionTime, int maxLocalDeletionTime, + int minTTL, + int maxTTL, double compressionRatio, StreamingHistogram estimatedTombstoneDropTime, int sstableLevel, - List<ByteBuffer> minColumnNames, - List<ByteBuffer> maxColumnNames, + List<ByteBuffer> minClusteringValues, + List<ByteBuffer> maxClusteringValues, boolean hasLegacyCounterShards, - long repairedAt) + long repairedAt, + long totalColumnsSet, + long totalRows) { this.estimatedRowSize = estimatedRowSize; this.estimatedColumnCount = estimatedColumnCount; this.replayPosition = replayPosition; this.minTimestamp = minTimestamp; this.maxTimestamp = maxTimestamp; + this.minLocalDeletionTime = minLocalDeletionTime; this.maxLocalDeletionTime = maxLocalDeletionTime; + this.minTTL = minTTL; + this.maxTTL = maxTTL; this.compressionRatio = compressionRatio; this.estimatedTombstoneDropTime = estimatedTombstoneDropTime; this.sstableLevel = sstableLevel; - this.minColumnNames = minColumnNames; - this.maxColumnNames = maxColumnNames; + this.minClusteringValues = minClusteringValues; + this.maxClusteringValues = maxClusteringValues; this.hasLegacyCounterShards = hasLegacyCounterShards; this.repairedAt = repairedAt; + this.totalColumnsSet = totalColumnsSet; + this.totalRows = totalRows; } public MetadataType getType() @@ -120,14 +135,19 @@ public class StatsMetadata extends MetadataComponent replayPosition, minTimestamp, maxTimestamp, + minLocalDeletionTime, maxLocalDeletionTime, + minTTL, + maxTTL, compressionRatio, estimatedTombstoneDropTime, newLevel, - minColumnNames, - maxColumnNames, + minClusteringValues, + maxClusteringValues, hasLegacyCounterShards, - repairedAt); + repairedAt, + totalColumnsSet, + totalRows); } public StatsMetadata mutateRepairedAt(long newRepairedAt) @@ -137,14 +157,19 @@ public class StatsMetadata extends MetadataComponent replayPosition, minTimestamp, maxTimestamp, + minLocalDeletionTime, maxLocalDeletionTime, + minTTL, + maxTTL, compressionRatio, estimatedTombstoneDropTime, sstableLevel, - minColumnNames, - maxColumnNames, + minClusteringValues, + maxClusteringValues, hasLegacyCounterShards, - newRepairedAt); + newRepairedAt, + totalColumnsSet, + totalRows); } @Override @@ -160,14 +185,19 @@ public class StatsMetadata extends MetadataComponent .append(replayPosition, that.replayPosition) .append(minTimestamp, that.minTimestamp) .append(maxTimestamp, that.maxTimestamp) + .append(minLocalDeletionTime, that.minLocalDeletionTime) .append(maxLocalDeletionTime, that.maxLocalDeletionTime) + .append(minTTL, that.minTTL) + .append(maxTTL, that.maxTTL) .append(compressionRatio, that.compressionRatio) .append(estimatedTombstoneDropTime, that.estimatedTombstoneDropTime) .append(sstableLevel, that.sstableLevel) .append(repairedAt, that.repairedAt) - .append(maxColumnNames, that.maxColumnNames) - .append(minColumnNames, that.minColumnNames) + .append(maxClusteringValues, that.maxClusteringValues) + .append(minClusteringValues, that.minClusteringValues) .append(hasLegacyCounterShards, that.hasLegacyCounterShards) + .append(totalColumnsSet, that.totalColumnsSet) + .append(totalRows, that.totalRows) .build(); } @@ -180,14 +210,19 @@ public class StatsMetadata extends MetadataComponent .append(replayPosition) .append(minTimestamp) .append(maxTimestamp) + .append(minLocalDeletionTime) .append(maxLocalDeletionTime) + .append(minTTL) + .append(maxTTL) .append(compressionRatio) .append(estimatedTombstoneDropTime) .append(sstableLevel) .append(repairedAt) - .append(maxColumnNames) - .append(minColumnNames) + .append(maxClusteringValues) + .append(minClusteringValues) .append(hasLegacyCounterShards) + .append(totalColumnsSet) + .append(totalRows) .build(); } @@ -199,18 +234,19 @@ public class StatsMetadata extends MetadataComponent size += EstimatedHistogram.serializer.serializedSize(component.estimatedRowSize, TypeSizes.NATIVE); size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount, TypeSizes.NATIVE); size += ReplayPosition.serializer.serializedSize(component.replayPosition, TypeSizes.NATIVE); - size += 8 + 8 + 4 + 8 + 8; // mix/max timestamp(long), maxLocalDeletionTime(int), compressionRatio(double), repairedAt (long) + size += 8 + 8 + 4 + 4 + 4 + 4 + 8 + 8; // mix/max timestamp(long), min/maxLocalDeletionTime(int), min/max TTL, compressionRatio(double), repairedAt (long) size += StreamingHistogram.serializer.serializedSize(component.estimatedTombstoneDropTime, TypeSizes.NATIVE); size += TypeSizes.NATIVE.sizeof(component.sstableLevel); // min column names size += 4; - for (ByteBuffer columnName : component.minColumnNames) - size += 2 + columnName.remaining(); // with short length + for (ByteBuffer value : component.minClusteringValues) + size += 2 + value.remaining(); // with short length // max column names size += 4; - for (ByteBuffer columnName : component.maxColumnNames) - size += 2 + columnName.remaining(); // with short length + for (ByteBuffer value : component.maxClusteringValues) + size += 2 + value.remaining(); // with short length size += TypeSizes.NATIVE.sizeof(component.hasLegacyCounterShards); + size += 8 + 8; // totalColumnsSet, totalRows return size; } @@ -221,18 +257,24 @@ public class StatsMetadata extends MetadataComponent ReplayPosition.serializer.serialize(component.replayPosition, out); out.writeLong(component.minTimestamp); out.writeLong(component.maxTimestamp); + out.writeInt(component.minLocalDeletionTime); out.writeInt(component.maxLocalDeletionTime); + out.writeInt(component.minTTL); + out.writeInt(component.maxTTL); out.writeDouble(component.compressionRatio); StreamingHistogram.serializer.serialize(component.estimatedTombstoneDropTime, out); out.writeInt(component.sstableLevel); out.writeLong(component.repairedAt); - out.writeInt(component.minColumnNames.size()); - for (ByteBuffer columnName : component.minColumnNames) - ByteBufferUtil.writeWithShortLength(columnName, out); - out.writeInt(component.maxColumnNames.size()); - for (ByteBuffer columnName : component.maxColumnNames) - ByteBufferUtil.writeWithShortLength(columnName, out); + out.writeInt(component.minClusteringValues.size()); + for (ByteBuffer value : component.minClusteringValues) + ByteBufferUtil.writeWithShortLength(value, out); + out.writeInt(component.maxClusteringValues.size()); + for (ByteBuffer value : component.maxClusteringValues) + ByteBufferUtil.writeWithShortLength(value, out); out.writeBoolean(component.hasLegacyCounterShards); + + out.writeLong(component.totalColumnsSet); + out.writeLong(component.totalRows); } public StatsMetadata deserialize(Version version, DataInput in) throws IOException @@ -242,7 +284,11 @@ public class StatsMetadata extends MetadataComponent ReplayPosition replayPosition = ReplayPosition.serializer.deserialize(in); long minTimestamp = in.readLong(); long maxTimestamp = in.readLong(); + // We use MAX_VALUE as that's the default value for "no deletion time" + int minLocalDeletionTime = version.storeRows() ? in.readInt() : Integer.MAX_VALUE; int maxLocalDeletionTime = in.readInt(); + int minTTL = version.storeRows() ? in.readInt() : 0; + int maxTTL = version.storeRows() ? in.readInt() : Integer.MAX_VALUE; double compressionRatio = in.readDouble(); StreamingHistogram tombstoneHistogram = StreamingHistogram.serializer.deserialize(in); int sstableLevel = in.readInt(); @@ -251,32 +297,40 @@ public class StatsMetadata extends MetadataComponent repairedAt = in.readLong(); int colCount = in.readInt(); - List<ByteBuffer> minColumnNames = new ArrayList<>(colCount); + List<ByteBuffer> minClusteringValues = new ArrayList<>(colCount); for (int i = 0; i < colCount; i++) - minColumnNames.add(ByteBufferUtil.readWithShortLength(in)); + minClusteringValues.add(ByteBufferUtil.readWithShortLength(in)); colCount = in.readInt(); - List<ByteBuffer> maxColumnNames = new ArrayList<>(colCount); + List<ByteBuffer> maxClusteringValues = new ArrayList<>(colCount); for (int i = 0; i < colCount; i++) - maxColumnNames.add(ByteBufferUtil.readWithShortLength(in)); + maxClusteringValues.add(ByteBufferUtil.readWithShortLength(in)); boolean hasLegacyCounterShards = true; if (version.tracksLegacyCounterShards()) hasLegacyCounterShards = in.readBoolean(); + long totalColumnsSet = version.storeRows() ? in.readLong() : -1L; + long totalRows = version.storeRows() ? in.readLong() : -1L; + return new StatsMetadata(rowSizes, columnCounts, replayPosition, minTimestamp, maxTimestamp, + minLocalDeletionTime, maxLocalDeletionTime, + minTTL, + maxTTL, compressionRatio, tombstoneHistogram, sstableLevel, - minColumnNames, - maxColumnNames, + minClusteringValues, + maxClusteringValues, hasLegacyCounterShards, - repairedAt); + repairedAt, + totalColumnsSet, + totalRows); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java index 4362cee..c3a7f98 100644 --- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java +++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java @@ -112,10 +112,10 @@ public class DataIntegrityMetadata } catch (Exception e) { + close(); // Attempting to create a FileDigestValidator without a DIGEST file will fail throw new IOException("Corrupted SSTable : " + descriptor.filenameFor(Component.DATA)); } - } // Validate the entire file @@ -133,7 +133,14 @@ public class DataIntegrityMetadata public void close() { - this.digestReader.close(); + try + { + this.digestReader.close(); + } + finally + { + this.dataReader.close(); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java index 35e1419..c182e58 100644 --- a/src/java/org/apache/cassandra/io/util/FileUtils.java +++ b/src/java/org/apache/cassandra/io/util/FileUtils.java @@ -227,6 +227,19 @@ public class FileUtils } } + public static void closeQuietly(AutoCloseable c) + { + try + { + if (c != null) + c.close(); + } + catch (Exception e) + { + logger.warn("Failed closing {}", c, e); + } + } + public static void close(Closeable... cs) throws IOException { close(Arrays.asList(cs)); @@ -252,6 +265,22 @@ public class FileUtils throw e; } + public static void closeQuietly(Iterable<? extends AutoCloseable> cs) + { + for (AutoCloseable c : cs) + { + try + { + if (c != null) + c.close(); + } + catch (Exception ex) + { + logger.warn("Failed closing {}", c, ex); + } + } + } + public static String getCanonicalPath(String filename) { try http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 83bc337..3f2160f 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -81,7 +81,8 @@ public final class MessagingService implements MessagingServiceMBean public static final int VERSION_20 = 7; public static final int VERSION_21 = 8; public static final int VERSION_22 = 9; - public static final int current_version = VERSION_22; + public static final int VERSION_30 = 10; + public static final int current_version = VERSION_30; public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC"; public static final byte[] ONE_BYTE = new byte[1]; @@ -104,7 +105,7 @@ public final class MessagingService implements MessagingServiceMBean @Deprecated STREAM_INITIATE_DONE, @Deprecated STREAM_REPLY, @Deprecated STREAM_REQUEST, - RANGE_SLICE, + @Deprecated RANGE_SLICE, @Deprecated BOOTSTRAP_TOKEN, @Deprecated TREE_REQUEST, @Deprecated TREE_RESPONSE, @@ -132,7 +133,7 @@ public final class MessagingService implements MessagingServiceMBean PAXOS_PREPARE, PAXOS_PROPOSE, PAXOS_COMMIT, - PAGED_RANGE, + @Deprecated PAGED_RANGE, // remember to add new verbs at the end, since we serialize by ordinal UNUSED_1, UNUSED_2, @@ -204,8 +205,8 @@ public final class MessagingService implements MessagingServiceMBean put(Verb.MUTATION, Mutation.serializer); put(Verb.READ_REPAIR, Mutation.serializer); put(Verb.READ, ReadCommand.serializer); - put(Verb.RANGE_SLICE, RangeSliceCommand.serializer); - put(Verb.PAGED_RANGE, PagedRangeCommand.serializer); + //put(Verb.RANGE_SLICE, ReadCommand.legacyRangeSliceCommandSerializer); + //put(Verb.PAGED_RANGE, ReadCommand.legacyPagedRangeCommandSerializer); put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance); put(Verb.REPAIR_MESSAGE, RepairMessage.serializer); put(Verb.GOSSIP_DIGEST_ACK, GossipDigestAck.serializer); @@ -230,8 +231,8 @@ public final class MessagingService implements MessagingServiceMBean put(Verb.MUTATION, WriteResponse.serializer); put(Verb.READ_REPAIR, WriteResponse.serializer); put(Verb.COUNTER_MUTATION, WriteResponse.serializer); - put(Verb.RANGE_SLICE, RangeSliceReply.serializer); - put(Verb.PAGED_RANGE, RangeSliceReply.serializer); + put(Verb.RANGE_SLICE, ReadResponse.legacyRangeSliceReplySerializer); + put(Verb.PAGED_RANGE, ReadResponse.legacyRangeSliceReplySerializer); put(Verb.READ, ReadResponse.serializer); put(Verb.TRUNCATE, TruncateResponse.serializer); put(Verb.SNAPSHOT, null); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/repair/RepairJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index 754e26f..ac20428 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -180,7 +180,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable String message = String.format("Requesting merkle trees for %s (to %s)", desc.columnFamily, endpoints); logger.info("[repair #{}] {}", desc.sessionId, message); Tracing.traceRepair(message); - int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis()); + int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(FBUtilities.nowInSeconds()); List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size()); for (InetAddress endpoint : endpoints) { @@ -197,7 +197,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable */ private ListenableFuture<List<TreeResponse>> sendSequentialValidationRequest(Collection<InetAddress> endpoints) { - int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis()); + int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(FBUtilities.nowInSeconds()); List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size()); Queue<InetAddress> requests = new LinkedList<>(endpoints); @@ -236,7 +236,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable */ private ListenableFuture<List<TreeResponse>> sendDCAwareValidationRequest(Collection<InetAddress> endpoints) { - int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis()); + int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(FBUtilities.nowInSeconds()); List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size()); Map<String, Queue<InetAddress>> requestsByDatacenter = new HashMap<>();
