http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java deleted file mode 100644 index 93505ae..0000000 --- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java +++ /dev/null @@ -1,346 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.compaction; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.security.MessageDigest; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; - -import com.google.common.base.Predicates; -import com.google.common.collect.Iterators; - -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; -import org.apache.cassandra.db.index.SecondaryIndexManager; -import org.apache.cassandra.io.sstable.format.big.BigTableWriter; -import org.apache.cassandra.io.sstable.ColumnNameHelper; -import org.apache.cassandra.io.sstable.ColumnStats; -import org.apache.cassandra.io.sstable.SSTable; -import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.io.util.SequentialWriter; -import org.apache.cassandra.utils.MergeIterator; -import org.apache.cassandra.utils.StreamingHistogram; -import org.apache.cassandra.utils.Throwables; - -/** - * LazilyCompactedRow only computes the row bloom filter and column index in memory - * (at construction time); it does this by reading one column at a time from each - * of the rows being compacted, and merging them as it does so. So the most we have - * in memory at a time is the bloom filter, the index, and one column from each - * pre-compaction row. - */ -public class LazilyCompactedRow extends AbstractCompactedRow -{ - protected final List<? extends OnDiskAtomIterator> rows; - protected final CompactionController controller; - protected boolean hasCalculatedMaxPurgeableTimestamp = false; - protected long maxPurgeableTimestamp; - protected final ColumnFamily emptyColumnFamily; - protected ColumnStats columnStats; - protected boolean closed; - protected ColumnIndex.Builder indexBuilder; - protected final SecondaryIndexManager.Updater indexer; - protected final Reducer reducer; - protected final Iterator<OnDiskAtom> merger; - protected DeletionTime maxRowTombstone; - - public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows) - { - super(rows.get(0).getKey()); - this.rows = rows; - this.controller = controller; - indexer = controller.cfs.indexManager.gcUpdaterFor(key); - - // Combine top-level tombstones, keeping the one with the highest markedForDeleteAt timestamp. This may be - // purged (depending on gcBefore), but we need to remember it to properly delete columns during the merge - maxRowTombstone = DeletionTime.LIVE; - for (OnDiskAtomIterator row : rows) - { - DeletionTime rowTombstone = row.getColumnFamily().deletionInfo().getTopLevelDeletion(); - if (maxRowTombstone.compareTo(rowTombstone) < 0) - maxRowTombstone = rowTombstone; - } - - emptyColumnFamily = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata); - emptyColumnFamily.delete(maxRowTombstone); - if (!maxRowTombstone.isLive() && maxRowTombstone.markedForDeleteAt < getMaxPurgeableTimestamp()) - emptyColumnFamily.purgeTombstones(controller.gcBefore); - - reducer = new Reducer(); - merger = Iterators.filter(MergeIterator.get(rows, emptyColumnFamily.getComparator().onDiskAtomComparator(), reducer), Predicates.notNull()); - } - - /** - * tombstones with a localDeletionTime before this can be purged. This is the minimum timestamp for any sstable - * containing `key` outside of the set of sstables involved in this compaction. - */ - private long getMaxPurgeableTimestamp() - { - if (!hasCalculatedMaxPurgeableTimestamp) - { - hasCalculatedMaxPurgeableTimestamp = true; - maxPurgeableTimestamp = controller.maxPurgeableTimestamp(key); - } - return maxPurgeableTimestamp; - } - - private static void removeDeleted(ColumnFamily cf, boolean shouldPurge, DecoratedKey key, CompactionController controller) - { - // We should only purge cell tombstones if shouldPurge is true, but regardless, it's still ok to remove cells that - // are shadowed by a row or range tombstone; removeDeletedColumnsOnly(cf, Integer.MIN_VALUE) will accomplish this - // without purging tombstones. - int overriddenGCBefore = shouldPurge ? controller.gcBefore : Integer.MIN_VALUE; - ColumnFamilyStore.removeDeletedColumnsOnly(cf, overriddenGCBefore, controller.cfs.indexManager.gcUpdaterFor(key)); - } - - public RowIndexEntry write(long currentPosition, SequentialWriter dataFile) throws IOException - { - assert !closed; - - DataOutputPlus out = dataFile.stream; - - ColumnIndex columnsIndex; - try - { - indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.getKey(), out); - columnsIndex = indexBuilder.buildForCompaction(merger); - - // if there aren't any columns or tombstones, return null - if (columnsIndex.columnsIndex.isEmpty() && !emptyColumnFamily.isMarkedForDelete()) - return null; - } - catch (IOException e) - { - throw new RuntimeException(e); - } - // reach into the reducer (created during iteration) to get column count, size, max column timestamp - columnStats = new ColumnStats(reducer.columns, - reducer.minTimestampTracker.get(), - Math.max(emptyColumnFamily.deletionInfo().maxTimestamp(), reducer.maxTimestampTracker.get()), - reducer.maxDeletionTimeTracker.get(), - reducer.tombstones, - reducer.minColumnNameSeen, - reducer.maxColumnNameSeen, - reducer.hasLegacyCounterShards); - - // in case no columns were ever written, we may still need to write an empty header with a top-level tombstone - indexBuilder.maybeWriteEmptyRowHeader(); - - out.writeShort(BigTableWriter.END_OF_ROW); - - close(); - - return RowIndexEntry.create(currentPosition, emptyColumnFamily.deletionInfo().getTopLevelDeletion(), columnsIndex); - } - - public void update(MessageDigest digest) - { - assert !closed; - - // no special-case for rows.size == 1, we're actually skipping some bytes here so just - // blindly updating everything wouldn't be correct - try (DataOutputBuffer out = new DataOutputBuffer()) - { - // initialize indexBuilder for the benefit of its tombstoneTracker, used by our reducing iterator - indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.getKey(), out); - - DeletionTime.serializer.serialize(emptyColumnFamily.deletionInfo().getTopLevelDeletion(), out); - - // do not update digest in case of missing or purged row level tombstones, see CASSANDRA-8979 - // - digest for non-empty rows needs to be updated with deletion in any case to match digest with versions before patch - // - empty rows must not update digest in case of LIVE delete status to avoid mismatches with non-existing rows - // this will however introduce in return a digest mismatch for versions before patch (which would update digest in any case) - if (merger.hasNext() || emptyColumnFamily.deletionInfo().getTopLevelDeletion() != DeletionTime.LIVE) - { - digest.update(out.getData(), 0, out.getLength()); - } - } - catch (IOException e) - { - throw new AssertionError(e); - } - - while (merger.hasNext()) - merger.next().updateDigest(digest); - close(); - } - - public ColumnStats columnStats() - { - return columnStats; - } - - public void close() - { - Throwable accumulate = null; - for (OnDiskAtomIterator row : rows) - { - try - { - row.close(); - } - catch (IOException e) - { - accumulate = Throwables.merge(accumulate, e); - } - } - closed = true; - Throwables.maybeFail(accumulate); - } - - protected class Reducer extends MergeIterator.Reducer<OnDiskAtom, OnDiskAtom> - { - // all columns reduced together will have the same name, so there will only be one column - // in the container; we just want to leverage the conflict resolution code from CF. - // (Note that we add the row tombstone in getReduced.) - ColumnFamily container = ArrayBackedSortedColumns.factory.create(emptyColumnFamily.metadata()); - - // tombstone reference; will be reconciled w/ column during getReduced. Note that the top-level (row) tombstone - // is held by LCR.deletionInfo. - public RangeTombstone tombstone; - - public int columns = 0; - // if the row tombstone is 'live' we need to set timestamp to MAX_VALUE to be able to overwrite it later - // markedForDeleteAt is MIN_VALUE for 'live' row tombstones (which we use to default maxTimestampSeen) - - ColumnStats.MinLongTracker minTimestampTracker = new ColumnStats.MinLongTracker(Long.MIN_VALUE); - ColumnStats.MaxLongTracker maxTimestampTracker = new ColumnStats.MaxLongTracker(Long.MAX_VALUE); - // we need to set MIN_VALUE if we are 'live' since we want to overwrite it later - // we are bound to have either a RangeTombstone or standard cells will set this properly: - ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE); - - public StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE); - public List<ByteBuffer> minColumnNameSeen = Collections.emptyList(); - public List<ByteBuffer> maxColumnNameSeen = Collections.emptyList(); - public boolean hasLegacyCounterShards = false; - - public Reducer() - { - minTimestampTracker.update(maxRowTombstone.isLive() ? Long.MAX_VALUE : maxRowTombstone.markedForDeleteAt); - maxTimestampTracker.update(maxRowTombstone.markedForDeleteAt); - maxDeletionTimeTracker.update(maxRowTombstone.isLive() ? Integer.MIN_VALUE : maxRowTombstone.localDeletionTime); - } - - /** - * Called once per version of a cell that we need to merge, after which getReduced() is called. In other words, - * this will be called one or more times with cells that share the same column name. - */ - public void reduce(OnDiskAtom current) - { - if (current instanceof RangeTombstone) - { - if (tombstone == null || current.timestamp() >= tombstone.timestamp()) - tombstone = (RangeTombstone)current; - } - else - { - Cell cell = (Cell) current; - container.addColumn(cell); - - // skip the index-update checks if there is no indexing needed since they are a bit expensive - if (indexer == SecondaryIndexManager.nullUpdater) - return; - - if (cell.isLive() && !container.getColumn(cell.name()).equals(cell)) - indexer.remove(cell); - } - } - - /** - * Called after reduce() has been called for each cell sharing the same name. - */ - protected OnDiskAtom getReduced() - { - if (tombstone != null) - { - RangeTombstone t = tombstone; - tombstone = null; - - if (t.timestamp() < getMaxPurgeableTimestamp() && t.data.isGcAble(controller.gcBefore)) - { - indexBuilder.tombstoneTracker().update(t, true); - return null; - } - else - { - tombstones.update(t.getLocalDeletionTime()); - minTimestampTracker.update(t.timestamp()); - maxTimestampTracker.update(t.timestamp()); - maxDeletionTimeTracker.update(t.getLocalDeletionTime()); - minColumnNameSeen = ColumnNameHelper.minComponents(minColumnNameSeen, t.min, controller.cfs.metadata.comparator); - maxColumnNameSeen = ColumnNameHelper.maxComponents(maxColumnNameSeen, t.max, controller.cfs.metadata.comparator); - return t; - } - } - else - { - // when we clear() the container, it removes the deletion info, so this needs to be reset each time - container.delete(maxRowTombstone); - Iterator<Cell> iter = container.iterator(); - Cell c = iter.next(); - boolean shouldPurge = c.getLocalDeletionTime() < Integer.MAX_VALUE && c.timestamp() < getMaxPurgeableTimestamp(); - removeDeleted(container, shouldPurge, key, controller); - iter = container.iterator(); - if (!iter.hasNext()) - { - // don't call clear() because that resets the deletion time. See CASSANDRA-7808. - container = ArrayBackedSortedColumns.factory.create(emptyColumnFamily.metadata()); - return null; - } - - int localDeletionTime = container.deletionInfo().getTopLevelDeletion().localDeletionTime; - if (localDeletionTime < Integer.MAX_VALUE) - tombstones.update(localDeletionTime); - - Cell reduced = iter.next(); - container = ArrayBackedSortedColumns.factory.create(emptyColumnFamily.metadata()); - - // removeDeleted have only checked the top-level CF deletion times, - // not the range tombstone. For that we use the columnIndexer tombstone tracker. - if (indexBuilder.tombstoneTracker().isDeleted(reduced)) - { - // We skip that column so it won't be passed to the tracker by the index builded. So pass it now to - // make sure we still discard potentially un-needed RT as soon as possible. - indexBuilder.tombstoneTracker().update(reduced, false); - indexer.remove(reduced); - return null; - } - - columns++; - minTimestampTracker.update(reduced.timestamp()); - maxTimestampTracker.update(reduced.timestamp()); - maxDeletionTimeTracker.update(reduced.getLocalDeletionTime()); - minColumnNameSeen = ColumnNameHelper.minComponents(minColumnNameSeen, reduced.name(), controller.cfs.metadata.comparator); - maxColumnNameSeen = ColumnNameHelper.maxComponents(maxColumnNameSeen, reduced.name(), controller.cfs.metadata.comparator); - - int deletionTime = reduced.getLocalDeletionTime(); - if (deletionTime < Integer.MAX_VALUE) - tombstones.update(deletionTime); - - if (reduced instanceof CounterCell) - hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) reduced).hasLegacyShards(); - - return reduced; - } - } - } -}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index 9eb58ff..dcdfd7f 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db.compaction; -import java.io.IOException; import java.util.*; @@ -26,17 +25,18 @@ import com.google.common.base.Joiner; import com.google.common.collect.*; import com.google.common.primitives.Doubles; -import org.apache.cassandra.io.sstable.ISSTableScanner; -import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.FBUtilities; public class LeveledCompactionStrategy extends AbstractCompactionStrategy { @@ -244,7 +244,11 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy // Create a LeveledScanner that only opens one sstable at a time, in sorted order List<SSTableReader> intersecting = LeveledScanner.intersecting(byLevel.get(level), range); if (!intersecting.isEmpty()) - scanners.add(new LeveledScanner(intersecting, range)); + { + @SuppressWarnings("resource") // The ScannerList will be in charge of closing (and we close properly on errors) + ISSTableScanner scanner = new LeveledScanner(intersecting, range); + scanners.add(scanner); + } } } } @@ -284,7 +288,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy // Lazily creates SSTableBoundedScanner for sstable that are assumed to be from the // same level (e.g. non overlapping) - see #4142 - private static class LeveledScanner extends AbstractIterator<OnDiskAtomIterator> implements ISSTableScanner + private static class LeveledScanner extends AbstractIterator<UnfilteredRowIterator> implements ISSTableScanner { private final Range<Token> range; private final List<SSTableReader> sstables; @@ -332,36 +336,34 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy return filtered; } - protected OnDiskAtomIterator computeNext() + public boolean isForThrift() + { + return false; + } + + protected UnfilteredRowIterator computeNext() { if (currentScanner == null) return endOfData(); - try + while (true) { - while (true) - { - if (currentScanner.hasNext()) - return currentScanner.next(); + if (currentScanner.hasNext()) + return currentScanner.next(); - positionOffset += currentScanner.getLengthInBytes(); - currentScanner.close(); - if (!sstableIterator.hasNext()) - { - // reset to null so getCurrentPosition does not return wrong value - currentScanner = null; - return endOfData(); - } - currentScanner = sstableIterator.next().getScanner(range, CompactionManager.instance.getRateLimiter()); + positionOffset += currentScanner.getLengthInBytes(); + currentScanner.close(); + if (!sstableIterator.hasNext()) + { + // reset to null so getCurrentPosition does not return wrong value + currentScanner = null; + return endOfData(); } - } - catch (IOException e) - { - throw new RuntimeException(e); + currentScanner = sstableIterator.next().getScanner(range, CompactionManager.instance.getRateLimiter()); } } - public void close() throws IOException + public void close() { if (currentScanner != null) currentScanner.close(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index 0763316..0cee370 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -27,13 +27,14 @@ import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; + +import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -63,7 +64,7 @@ public class LeveledManifest private final ColumnFamilyStore cfs; @VisibleForTesting protected final List<SSTableReader>[] generations; - private final RowPosition[] lastCompactedKeys; + private final PartitionPosition[] lastCompactedKeys; private final long maxSSTableSizeInBytes; private final SizeTieredCompactionStrategyOptions options; private final int [] compactionCounter; @@ -75,7 +76,7 @@ public class LeveledManifest this.options = options; generations = new List[MAX_LEVEL_COUNT]; - lastCompactedKeys = new RowPosition[MAX_LEVEL_COUNT]; + lastCompactedKeys = new PartitionPosition[MAX_LEVEL_COUNT]; for (int i = 0; i < generations.length; i++) { generations[i] = new ArrayList<>(); @@ -402,8 +403,8 @@ public class LeveledManifest // say we are compacting 3 sstables: 0->30 in L1 and 0->12, 12->33 in L2 // this means that we will not create overlap in L2 if we add an sstable // contained within 0 -> 33 to the compaction - RowPosition max = null; - RowPosition min = null; + PartitionPosition max = null; + PartitionPosition min = null; for (SSTableReader candidate : candidates) { if (min == null || candidate.first.compareTo(min) < 0) @@ -414,10 +415,10 @@ public class LeveledManifest if (min == null || max == null || min.equals(max)) // single partition sstables - we cannot include a high level sstable. return candidates; Set<SSTableReader> compacting = cfs.getTracker().getCompacting(); - Range<RowPosition> boundaries = new Range<>(min, max); + Range<PartitionPosition> boundaries = new Range<>(min, max); for (SSTableReader sstable : getLevel(i)) { - Range<RowPosition> r = new Range<RowPosition>(sstable.first, sstable.last); + Range<PartitionPosition> r = new Range<PartitionPosition>(sstable.first, sstable.last); if (boundaries.contains(r) && !compacting.contains(sstable)) { logger.info("Adding high-level (L{}) {} to candidates", sstable.getSSTableLevel(), sstable); @@ -546,8 +547,8 @@ public class LeveledManifest { Set<SSTableReader> compactingL0 = getCompacting(0); - RowPosition lastCompactingKey = null; - RowPosition firstCompactingKey = null; + PartitionPosition lastCompactingKey = null; + PartitionPosition firstCompactingKey = null; for (SSTableReader candidate : compactingL0) { if (firstCompactingKey == null || candidate.first.compareTo(firstCompactingKey) < 0) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index 10952e7..562d681 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -26,6 +26,8 @@ import com.google.common.base.Throwables; import org.apache.cassandra.db.*; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; @@ -33,6 +35,7 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.OutputHandler; import org.apache.cassandra.utils.UUIDGen; @@ -45,7 +48,6 @@ public class Scrubber implements Closeable private final File destination; private final boolean skipCorrupted; - private final CompactionController controller; private final boolean isCommutative; private final boolean isIndex; private final boolean checkData; @@ -72,14 +74,14 @@ public class Scrubber implements Closeable private final OutputHandler outputHandler; - private static final Comparator<Row> rowComparator = new Comparator<Row>() + private static final Comparator<Partition> partitionComparator = new Comparator<Partition>() { - public int compare(Row r1, Row r2) + public int compare(Partition r1, Partition r2) { - return r1.key.compareTo(r2.key); + return r1.partitionKey().compareTo(r2.partitionKey()); } }; - private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator); + private final SortedSet<Partition> outOfOrder = new TreeSet<>(partitionComparator); public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, boolean isOffline, boolean checkData) throws IOException { @@ -95,7 +97,9 @@ public class Scrubber implements Closeable this.outputHandler = outputHandler; this.skipCorrupted = skipCorrupted; this.isOffline = isOffline; - this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata); + this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata, + sstable.descriptor.version, + sstable.header); List<SSTableReader> toScrub = Collections.singletonList(sstable); @@ -104,10 +108,6 @@ public class Scrubber implements Closeable if (destination == null) throw new IOException("disk full"); - // If we run scrub offline, we should never purge tombstone, as we cannot know if other sstable have data that the tombstone deletes. - this.controller = isOffline - ? new ScrubController(cfs) - : new CompactionController(cfs, Collections.singleton(sstable), CompactionManager.getDefaultGcBefore(cfs)); this.isCommutative = cfs.metadata.isCounter(); this.isIndex = cfs.isIndex(); this.checkData = checkData && !this.isIndex; //LocalByPartitionerType does not support validation @@ -127,15 +127,21 @@ public class Scrubber implements Closeable this.nextRowPositionFromIndex = 0; } + private UnfilteredRowIterator withValidation(UnfilteredRowIterator iter, String filename) + { + return checkData ? UnfilteredRowIterators.withValidation(iter, filename) : iter; + } + public void scrub() { outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length())); + int nowInSec = FBUtilities.nowInSeconds(); try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, sstable.maxDataAge, isOffline);) { nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile); { // throw away variable so we don't have a side effect in the assert - long firstRowPositionFromIndex = rowIndexEntrySerializer.deserialize(indexFile, sstable.descriptor.version).position; + long firstRowPositionFromIndex = rowIndexEntrySerializer.deserialize(indexFile).position; assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex; } @@ -188,7 +194,8 @@ public class Scrubber implements Closeable if (currentIndexKey != null && !key.getKey().equals(currentIndexKey)) { throw new IOError(new IOException(String.format("Key from data file (%s) does not match key from index file (%s)", - ByteBufferUtil.bytesToHex(key.getKey()), ByteBufferUtil.bytesToHex(currentIndexKey)))); + //ByteBufferUtil.bytesToHex(key.getKey()), ByteBufferUtil.bytesToHex(currentIndexKey)))); + "_too big_", ByteBufferUtil.bytesToHex(currentIndexKey)))); } if (dataSizeFromIndex > dataFile.length()) @@ -197,20 +204,19 @@ public class Scrubber implements Closeable if (dataStart != dataStartFromIndex) outputHandler.warn(String.format("Data file row position %d differs from index file row position %d", dataStart, dataStartFromIndex)); - SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, checkData); - - if (prevKey != null && prevKey.compareTo(key) > 0) + try (UnfilteredRowIterator iterator = withValidation(new SSTableIdentityIterator(sstable, dataFile, key), dataFile.getPath())) { - saveOutOfOrderRow(prevKey, key, atoms); - continue; - } + if (prevKey != null && prevKey.compareTo(key) > 0) + { + saveOutOfOrderRow(prevKey, key, iterator); + continue; + } - @SuppressWarnings("resource") - AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms)); - if (writer.tryAppend(compactedRow) == null) - emptyRows++; - else - goodRows++; + if (writer.tryAppend(iterator) == null) + emptyRows++; + else + goodRows++; + } prevKey = key; } @@ -229,20 +235,20 @@ public class Scrubber implements Closeable { dataFile.seek(dataStartFromIndex); - SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, checkData); - if (prevKey != null && prevKey.compareTo(key) > 0) + try (UnfilteredRowIterator iterator = withValidation(new SSTableIdentityIterator(sstable, dataFile, key), dataFile.getPath())) { - saveOutOfOrderRow(prevKey, key, atoms); - continue; + if (prevKey != null && prevKey.compareTo(key) > 0) + { + saveOutOfOrderRow(prevKey, key, iterator); + continue; + } + + if (writer.tryAppend(iterator) == null) + emptyRows++; + else + goodRows++; } - @SuppressWarnings("resource") - AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms)); - if (writer.tryAppend(compactedRow) == null) - emptyRows++; - else - goodRows++; - prevKey = key; } catch (Throwable th2) @@ -267,18 +273,18 @@ public class Scrubber implements Closeable } } - if (!outOfOrderRows.isEmpty()) + if (!outOfOrder.isEmpty()) { // out of order rows, but no bad rows found - we can keep our repairedAt time long repairedAt = badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt; try (SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);) { - for (Row row : outOfOrderRows) - inOrderWriter.append(row.key, row.cf); + for (Partition partition : outOfOrder) + inOrderWriter.append(partition.unfilteredIterator()); newInOrderSstable = inOrderWriter.finish(-1, sstable.maxDataAge, true); } transaction.update(newInOrderSstable, false); - outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable)); + outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrder.size(), sstable, newInOrderSstable)); } // finish obsoletes the old sstable @@ -290,10 +296,6 @@ public class Scrubber implements Closeable { throw Throwables.propagate(e); } - finally - { - controller.close(); - } if (newSstable == null) { @@ -318,8 +320,8 @@ public class Scrubber implements Closeable { nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile); nextRowPositionFromIndex = indexFile.isEOF() - ? dataFile.length() - : rowIndexEntrySerializer.deserialize(indexFile, sstable.descriptor.version).position; + ? dataFile.length() + : rowIndexEntrySerializer.deserialize(indexFile).position; } catch (Throwable th) { @@ -350,19 +352,11 @@ public class Scrubber implements Closeable } } - private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, SSTableIdentityIterator atoms) + private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, UnfilteredRowIterator iterator) { // TODO bitch if the row is too large? if it is there's not much we can do ... outputHandler.warn(String.format("Out of order row detected (%s found after %s)", key, prevKey)); - // adding atoms in sorted order is worst-case for TMBSC, but we shouldn't need to do this very often - // and there's no sense in failing on mis-sorted cells when a TreeMap could safe us - ColumnFamily cf = atoms.getColumnFamily().cloneMeShallow(ArrayBackedSortedColumns.factory, false); - while (atoms.hasNext()) - { - OnDiskAtom atom = atoms.next(); - cf.addAtom(atom); - } - outOfOrderRows.add(new Row(key, cf)); + outOfOrder.add(ArrayBackedPartition.create(iterator)); } public SSTableReader getNewSSTable() @@ -442,20 +436,6 @@ public class Scrubber implements Closeable } } - private static class ScrubController extends CompactionController - { - public ScrubController(ColumnFamilyStore cfs) - { - super(cfs, Integer.MAX_VALUE); - } - - @Override - public long maxPurgeableTimestamp(DecoratedKey key) - { - return Long.MIN_VALUE; - } - } - @VisibleForTesting public ScrubResult scrubWithResult() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/Upgrader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java index a0cce24..e3764c8 100644 --- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java +++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java @@ -21,16 +21,18 @@ import java.io.File; import java.util.*; import com.google.common.base.Throwables; +import com.google.common.collect.Sets; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; -import org.apache.cassandra.utils.CloseableIterator; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.OutputHandler; import org.apache.cassandra.utils.UUIDGen; @@ -78,24 +80,27 @@ public class Upgrader sstableMetadataCollector.addAncestor(i); } sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel()); - return SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(directory)), estimatedRows, repairedAt, cfs.metadata, cfs.partitioner, sstableMetadataCollector); + return SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(directory)), + estimatedRows, + repairedAt, + cfs.metadata, + cfs.partitioner, + sstableMetadataCollector, + SerializationHeader.make(cfs.metadata, Sets.newHashSet(sstable))); } public void upgrade() { outputHandler.output("Upgrading " + sstable); + int nowInSec = FBUtilities.nowInSeconds(); try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, CompactionTask.getMaxDataAge(transaction.originals()), true); AbstractCompactionStrategy.ScannerList scanners = strategyManager.getScanners(transaction.originals()); - CloseableIterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID()).iterator()) + CompactionIterator iter = new CompactionIterator(compactionType, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID())) { writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt)); while (iter.hasNext()) - { - @SuppressWarnings("resource") - AbstractCompactedRow row = iter.next(); - writer.append(row); - } + writer.append(iter.next()); writer.finish(); outputHandler.output("Upgrade of " + sstable + " complete."); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/Verifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java index 0177819..90a97a0 100644 --- a/src/java/org/apache/cassandra/db/compaction/Verifier.java +++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java @@ -18,8 +18,9 @@ package org.apache.cassandra.db.compaction; import com.google.common.base.Throwables; -import com.google.common.collect.Sets; + import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.CorruptSSTableException; @@ -31,7 +32,7 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.OutputHandler; import org.apache.cassandra.utils.UUIDGen; @@ -71,7 +72,7 @@ public class Verifier implements Closeable this.cfs = cfs; this.sstable = sstable; this.outputHandler = outputHandler; - this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata); + this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata, sstable.descriptor.version, sstable.header); this.controller = new VerifyController(cfs); @@ -102,7 +103,7 @@ public class Verifier implements Closeable } else { - outputHandler.output("Data digest missing, assuming extended verification of disk atoms"); + outputHandler.output("Data digest missing, assuming extended verification of disk values"); extended = true; } } @@ -119,14 +120,14 @@ public class Verifier implements Closeable if ( !extended ) return; - outputHandler.output("Extended Verify requested, proceeding to inspect atoms"); + outputHandler.output("Extended Verify requested, proceeding to inspect values"); try { ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile); { - long firstRowPositionFromIndex = rowIndexEntrySerializer.deserialize(indexFile, sstable.descriptor.version).position; + long firstRowPositionFromIndex = rowIndexEntrySerializer.deserialize(indexFile).position; if (firstRowPositionFromIndex != 0) markAndThrow(); } @@ -160,7 +161,7 @@ public class Verifier implements Closeable nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile); nextRowPositionFromIndex = indexFile.isEOF() ? dataFile.length() - : rowIndexEntrySerializer.deserialize(indexFile, sstable.descriptor.version).position; + : rowIndexEntrySerializer.deserialize(indexFile).position; } catch (Throwable th) { @@ -185,7 +186,10 @@ public class Verifier implements Closeable markAndThrow(); //mimic the scrub read path - new SSTableIdentityIterator(sstable, dataFile, key, true); + try (UnfilteredRowIterator iterator = new SSTableIdentityIterator(sstable, dataFile, key)) + { + } + if ( (prevKey != null && prevKey.compareTo(key) > 0) || !key.getKey().equals(currentIndexKey) || dataStart != dataStartFromIndex ) markAndThrow(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java index 20c96d6..610592f 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java @@ -23,7 +23,7 @@ import java.util.Set; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; -import org.apache.cassandra.db.compaction.AbstractCompactedRow; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.compaction.CompactionTask; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.SSTableRewriter; @@ -55,11 +55,11 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa } /** - * Writes a row in an implementation specific way - * @param row the row to append - * @return true if the row was written, false otherwise + * Writes a partition in an implementation specific way + * @param partition the partition to append + * @return true if the partition was written, false otherwise */ - public abstract boolean append(AbstractCompactedRow row); + public abstract boolean append(UnfilteredRowIterator partition); @Override protected Throwable doAbort(Throwable accumulate) @@ -117,4 +117,4 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa return directory; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java index 7d88458..8fc7bec 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java @@ -25,7 +25,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.compaction.AbstractCompactedRow; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.Descriptor; @@ -54,14 +55,15 @@ public class DefaultCompactionWriter extends CompactionAwareWriter minRepairedAt, cfs.metadata, cfs.partitioner, - new MetadataCollector(txn.originals(), cfs.metadata.comparator, 0)); + new MetadataCollector(txn.originals(), cfs.metadata.comparator, 0), + SerializationHeader.make(cfs.metadata, nonExpiredSSTables)); sstableWriter.switchWriter(writer); } @Override - public boolean append(AbstractCompactedRow row) + public boolean append(UnfilteredRowIterator partition) { - return sstableWriter.append(row) != null; + return sstableWriter.append(partition) != null; } @Override @@ -69,4 +71,4 @@ public class DefaultCompactionWriter extends CompactionAwareWriter { return estimatedTotalKeys; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java index 95d7a0c..5328fa5 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java @@ -25,7 +25,8 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.RowIndexEntry; -import org.apache.cassandra.db.compaction.AbstractCompactedRow; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.compaction.LeveledManifest; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; @@ -68,16 +69,17 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter minRepairedAt, cfs.metadata, cfs.partitioner, - new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors)); + new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors), + SerializationHeader.make(cfs.metadata, nonExpiredSSTables)); sstableWriter.switchWriter(writer); } @Override @SuppressWarnings("resource") - public boolean append(AbstractCompactedRow row) + public boolean append(UnfilteredRowIterator partition) { long posBefore = sstableWriter.currentWriter().getOnDiskFilePointer(); - RowIndexEntry rie = sstableWriter.append(row); + RowIndexEntry rie = sstableWriter.append(partition); totalWrittenInLevel += sstableWriter.currentWriter().getOnDiskFilePointer() - posBefore; partitionsWritten++; if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize) @@ -95,7 +97,8 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter minRepairedAt, cfs.metadata, cfs.partitioner, - new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors)); + new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors), + SerializationHeader.make(cfs.metadata, nonExpiredSSTables)); sstableWriter.switchWriter(writer); partitionsWritten = 0; sstablesWritten++; @@ -103,4 +106,4 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter return rie != null; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java index d30a612..4832fd5 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java @@ -22,7 +22,8 @@ import java.util.Set; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.RowIndexEntry; -import org.apache.cassandra.db.compaction.AbstractCompactedRow; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.Descriptor; @@ -57,14 +58,15 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter minRepairedAt, cfs.metadata, cfs.partitioner, - new MetadataCollector(allSSTables, cfs.metadata.comparator, level)); + new MetadataCollector(allSSTables, cfs.metadata.comparator, level), + SerializationHeader.make(cfs.metadata, nonExpiredSSTables)); sstableWriter.switchWriter(writer); } @Override - public boolean append(AbstractCompactedRow row) + public boolean append(UnfilteredRowIterator partition) { - RowIndexEntry rie = sstableWriter.append(row); + RowIndexEntry rie = sstableWriter.append(partition); if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize) { File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize)); @@ -74,7 +76,8 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter minRepairedAt, cfs.metadata, cfs.partitioner, - new MetadataCollector(allSSTables, cfs.metadata.comparator, level)); + new MetadataCollector(allSSTables, cfs.metadata.comparator, level), + SerializationHeader.make(cfs.metadata, nonExpiredSSTables)); sstableWriter.switchWriter(writer); } @@ -86,4 +89,4 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter { return estimatedTotalKeys; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java index 9ff1325..ba85eef 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java @@ -26,7 +26,8 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.RowIndexEntry; -import org.apache.cassandra.db.compaction.AbstractCompactedRow; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.Descriptor; @@ -90,16 +91,17 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter minRepairedAt, cfs.metadata, cfs.partitioner, - new MetadataCollector(allSSTables, cfs.metadata.comparator, 0)); + new MetadataCollector(allSSTables, cfs.metadata.comparator, 0), + SerializationHeader.make(cfs.metadata, nonExpiredSSTables)); sstableWriter.switchWriter(writer); logger.debug("Ratios={}, expectedKeys = {}, totalSize = {}, currentPartitionsToWrite = {}, currentBytesToWrite = {}", ratios, estimatedTotalKeys, totalSize, currentPartitionsToWrite, currentBytesToWrite); } @Override - public boolean append(AbstractCompactedRow row) + public boolean append(UnfilteredRowIterator partition) { - RowIndexEntry rie = sstableWriter.append(row); + RowIndexEntry rie = sstableWriter.append(partition); if (sstableWriter.currentWriter().getOnDiskFilePointer() > currentBytesToWrite && currentRatioIndex < ratios.length - 1) // if we underestimate how many keys we have, the last sstable might get more than we expect { currentRatioIndex++; @@ -112,10 +114,11 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter minRepairedAt, cfs.metadata, cfs.partitioner, - new MetadataCollector(allSSTables, cfs.metadata.comparator, 0)); + new MetadataCollector(allSSTables, cfs.metadata.comparator, 0), + SerializationHeader.make(cfs.metadata, nonExpiredSSTables)); sstableWriter.switchWriter(writer); logger.debug("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite); } return rie != null; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/composites/AbstractCType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/composites/AbstractCType.java b/src/java/org/apache/cassandra/db/composites/AbstractCType.java deleted file mode 100644 index a982280..0000000 --- a/src/java/org/apache/cassandra/db/composites/AbstractCType.java +++ /dev/null @@ -1,394 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.composites; - -import java.io.DataInput; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Comparator; - -import org.apache.cassandra.db.Cell; -import org.apache.cassandra.db.DeletionInfo; -import org.apache.cassandra.db.NativeCell; -import org.apache.cassandra.db.RangeTombstone; -import org.apache.cassandra.db.TypeSizes; -import org.apache.cassandra.db.filter.ColumnSlice; -import org.apache.cassandra.db.filter.SliceQueryFilter; -import org.apache.cassandra.db.marshal.AbstractCompositeType; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.utils.ByteBufferUtil; - -import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo; - -public abstract class AbstractCType implements CType -{ - static final Comparator<Cell> rightNativeCell = new Comparator<Cell>() - { - public int compare(Cell o1, Cell o2) - { - return -((NativeCell) o2).compareTo(o1.name()); - } - }; - - static final Comparator<Cell> neitherNativeCell = new Comparator<Cell>() - { - public int compare(Cell o1, Cell o2) - { - return compareUnsigned(o1.name(), o2.name()); - } - }; - - // only one or the other of these will ever be used - static final Comparator<Object> asymmetricRightNativeCell = new Comparator<Object>() - { - public int compare(Object o1, Object o2) - { - return -((NativeCell) o2).compareTo((Composite) o1); - } - }; - - static final Comparator<Object> asymmetricNeitherNativeCell = new Comparator<Object>() - { - public int compare(Object o1, Object o2) - { - return compareUnsigned((Composite) o1, ((Cell) o2).name()); - } - }; - - private final Comparator<Composite> reverseComparator; - private final Comparator<IndexInfo> indexComparator; - private final Comparator<IndexInfo> indexReverseComparator; - - private final Serializer serializer; - - private final IVersionedSerializer<ColumnSlice> sliceSerializer; - private final IVersionedSerializer<SliceQueryFilter> sliceQueryFilterSerializer; - private final DeletionInfo.Serializer deletionInfoSerializer; - private final RangeTombstone.Serializer rangeTombstoneSerializer; - - protected final boolean isByteOrderComparable; - - protected AbstractCType(boolean isByteOrderComparable) - { - reverseComparator = new Comparator<Composite>() - { - public int compare(Composite c1, Composite c2) - { - return AbstractCType.this.compare(c2, c1); - } - }; - indexComparator = new Comparator<IndexInfo>() - { - public int compare(IndexInfo o1, IndexInfo o2) - { - return AbstractCType.this.compare(o1.lastName, o2.lastName); - } - }; - indexReverseComparator = new Comparator<IndexInfo>() - { - public int compare(IndexInfo o1, IndexInfo o2) - { - return AbstractCType.this.compare(o1.firstName, o2.firstName); - } - }; - - serializer = new Serializer(this); - - sliceSerializer = new ColumnSlice.Serializer(this); - sliceQueryFilterSerializer = new SliceQueryFilter.Serializer(this); - deletionInfoSerializer = new DeletionInfo.Serializer(this); - rangeTombstoneSerializer = new RangeTombstone.Serializer(this); - this.isByteOrderComparable = isByteOrderComparable; - } - - protected static boolean isByteOrderComparable(Iterable<AbstractType<?>> types) - { - boolean isByteOrderComparable = true; - for (AbstractType<?> type : types) - isByteOrderComparable &= type.isByteOrderComparable(); - return isByteOrderComparable; - } - - static int compareUnsigned(Composite c1, Composite c2) - { - if (c1.isStatic() != c2.isStatic()) - { - // Static sorts before non-static no matter what, except for empty which - // always sort first - if (c1.isEmpty()) - return c2.isEmpty() ? 0 : -1; - if (c2.isEmpty()) - return 1; - return c1.isStatic() ? -1 : 1; - } - - int s1 = c1.size(); - int s2 = c2.size(); - int minSize = Math.min(s1, s2); - - for (int i = 0; i < minSize; i++) - { - int cmp = ByteBufferUtil.compareUnsigned(c1.get(i), c2.get(i)); - if (cmp != 0) - return cmp; - } - - if (s1 == s2) - return c1.eoc().compareTo(c2.eoc()); - return s1 < s2 ? c1.eoc().prefixComparisonResult : -c2.eoc().prefixComparisonResult; - } - - public int compare(Composite c1, Composite c2) - { - if (c1.isStatic() != c2.isStatic()) - { - // Static sorts before non-static no matter what, except for empty which - // always sort first - if (c1.isEmpty()) - return c2.isEmpty() ? 0 : -1; - if (c2.isEmpty()) - return 1; - return c1.isStatic() ? -1 : 1; - } - - int s1 = c1.size(); - int s2 = c2.size(); - int minSize = Math.min(s1, s2); - - for (int i = 0; i < minSize; i++) - { - int cmp = isByteOrderComparable - ? ByteBufferUtil.compareUnsigned(c1.get(i), c2.get(i)) - : subtype(i).compare(c1.get(i), c2.get(i)); - if (cmp != 0) - return cmp; - } - - if (s1 == s2) - return c1.eoc().compareTo(c2.eoc()); - return s1 < s2 ? c1.eoc().prefixComparisonResult : -c2.eoc().prefixComparisonResult; - } - - protected Comparator<Cell> getByteOrderColumnComparator(boolean isRightNative) - { - if (isRightNative) - return rightNativeCell; - return neitherNativeCell; - } - - protected Comparator<Object> getByteOrderAsymmetricColumnComparator(boolean isRightNative) - { - if (isRightNative) - return asymmetricRightNativeCell; - return asymmetricNeitherNativeCell; - } - - public void validate(Composite name) - { - ByteBuffer previous = null; - for (int i = 0; i < name.size(); i++) - { - AbstractType<?> comparator = subtype(i); - ByteBuffer value = name.get(i); - comparator.validateCollectionMember(value, previous); - previous = value; - } - } - - public boolean isCompatibleWith(CType previous) - { - if (this == previous) - return true; - - // Extending with new components is fine, shrinking is not - if (size() < previous.size()) - return false; - - for (int i = 0; i < previous.size(); i++) - { - AbstractType<?> tprev = previous.subtype(i); - AbstractType<?> tnew = subtype(i); - if (!tnew.isCompatibleWith(tprev)) - return false; - } - return true; - } - - public String getString(Composite c) - { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < c.size(); i++) - { - if (i > 0) - sb.append(":"); - sb.append(AbstractCompositeType.escape(subtype(i).getString(c.get(i)))); - } - switch (c.eoc()) - { - case START: - sb.append(":_"); - break; - case END: - sb.append(":!"); - break; - } - return sb.toString(); - } - - public Composite make(Object... components) - { - if (components.length > size()) - throw new IllegalArgumentException("Too many components, max is " + size()); - - CBuilder builder = builder(); - for (int i = 0; i < components.length; i++) - { - Object obj = components[i]; - if (obj instanceof ByteBuffer) - builder.add((ByteBuffer)obj); - else - builder.add(obj); - } - return builder.build(); - } - - public CType.Serializer serializer() - { - return serializer; - } - - public Comparator<Composite> reverseComparator() - { - return reverseComparator; - } - - public Comparator<IndexInfo> indexComparator() - { - return indexComparator; - } - - public Comparator<IndexInfo> indexReverseComparator() - { - return indexReverseComparator; - } - - public IVersionedSerializer<ColumnSlice> sliceSerializer() - { - return sliceSerializer; - } - - public IVersionedSerializer<SliceQueryFilter> sliceQueryFilterSerializer() - { - return sliceQueryFilterSerializer; - } - - public DeletionInfo.Serializer deletionInfoSerializer() - { - return deletionInfoSerializer; - } - - public RangeTombstone.Serializer rangeTombstoneSerializer() - { - return rangeTombstoneSerializer; - } - - @Override - public boolean equals(Object o) - { - if (this == o) - return true; - - if (o == null) - return false; - - if (!getClass().equals(o.getClass())) - return false; - - CType c = (CType)o; - if (size() != c.size()) - return false; - - for (int i = 0; i < size(); i++) - { - if (!subtype(i).equals(c.subtype(i))) - return false; - } - return true; - } - - @Override - public int hashCode() - { - int h = 31; - for (int i = 0; i < size(); i++) - h += subtype(i).hashCode(); - return h + getClass().hashCode(); - } - - @Override - public String toString() - { - return asAbstractType().toString(); - } - - protected static ByteBuffer sliceBytes(ByteBuffer bb, int offs, int length) - { - ByteBuffer copy = bb.duplicate(); - copy.position(offs); - copy.limit(offs + length); - return copy; - } - - protected static void checkRemaining(ByteBuffer bb, int offs, int length) - { - if (offs + length > bb.limit()) - throw new IllegalArgumentException("Not enough bytes"); - } - - private static class Serializer implements CType.Serializer - { - private final CType type; - - public Serializer(CType type) - { - this.type = type; - } - - public void serialize(Composite c, DataOutputPlus out) throws IOException - { - ByteBufferUtil.writeWithShortLength(c.toByteBuffer(), out); - } - - public Composite deserialize(DataInput in) throws IOException - { - return type.fromByteBuffer(ByteBufferUtil.readWithShortLength(in)); - } - - public long serializedSize(Composite c, TypeSizes type) - { - return type.sizeofWithShortLength(c.toByteBuffer()); - } - - public void skip(DataInput in) throws IOException - { - ByteBufferUtil.skipShortLength(in); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/composites/AbstractCellNameType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/composites/AbstractCellNameType.java b/src/java/org/apache/cassandra/db/composites/AbstractCellNameType.java deleted file mode 100644 index c62f890..0000000 --- a/src/java/org/apache/cassandra/db/composites/AbstractCellNameType.java +++ /dev/null @@ -1,454 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.composites; - -import java.io.DataInput; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.*; - -import com.google.common.collect.AbstractIterator; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.cql3.CQL3Row; -import org.apache.cassandra.cql3.ColumnIdentifier; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.filter.IDiskAtomFilter; -import org.apache.cassandra.db.filter.NamesQueryFilter; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.CollectionType; -import org.apache.cassandra.db.marshal.ColumnToCollectionType; -import org.apache.cassandra.io.ISerializer; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.utils.ByteBufferUtil; - -public abstract class AbstractCellNameType extends AbstractCType implements CellNameType -{ - final Comparator<Cell> columnComparator; - private final Comparator<Cell> columnReverseComparator; - final Comparator<Object> asymmetricComparator; - private final Comparator<OnDiskAtom> onDiskAtomComparator; - - private final ISerializer<CellName> cellSerializer; - private final ColumnSerializer columnSerializer; - private final OnDiskAtom.Serializer onDiskAtomSerializer; - private final IVersionedSerializer<NamesQueryFilter> namesQueryFilterSerializer; - private final IVersionedSerializer<IDiskAtomFilter> diskAtomFilterSerializer; - - protected AbstractCellNameType(boolean isByteOrderComparable) - { - super(isByteOrderComparable); - columnComparator = new Comparator<Cell>() - { - public int compare(Cell c1, Cell c2) - { - return AbstractCellNameType.this.compare(c1.name(), c2.name()); - } - }; - asymmetricComparator = new Comparator<Object>() - { - public int compare(Object c1, Object c2) - { - return AbstractCellNameType.this.compare((Composite) c1, ((Cell) c2).name()); - } - }; - columnReverseComparator = new Comparator<Cell>() - { - public int compare(Cell c1, Cell c2) - { - return AbstractCellNameType.this.compare(c2.name(), c1.name()); - } - }; - onDiskAtomComparator = new Comparator<OnDiskAtom>() - { - public int compare(OnDiskAtom c1, OnDiskAtom c2) - { - int comp = AbstractCellNameType.this.compare(c1.name(), c2.name()); - if (comp != 0) - return comp; - - if (c1 instanceof RangeTombstone) - { - if (c2 instanceof RangeTombstone) - { - RangeTombstone t1 = (RangeTombstone)c1; - RangeTombstone t2 = (RangeTombstone)c2; - int comp2 = AbstractCellNameType.this.compare(t1.max, t2.max); - return comp2 == 0 ? t1.data.compareTo(t2.data) : comp2; - } - else - { - return -1; - } - } - else - { - return c2 instanceof RangeTombstone ? 1 : 0; - } - } - }; - - // A trivial wrapped over the composite serializer - cellSerializer = new ISerializer<CellName>() - { - public void serialize(CellName c, DataOutputPlus out) throws IOException - { - serializer().serialize(c, out); - } - - public CellName deserialize(DataInput in) throws IOException - { - Composite ct = serializer().deserialize(in); - if (ct.isEmpty()) - throw ColumnSerializer.CorruptColumnException.create(in, ByteBufferUtil.EMPTY_BYTE_BUFFER); - - assert ct instanceof CellName : ct; - return (CellName)ct; - } - - public long serializedSize(CellName c, TypeSizes type) - { - return serializer().serializedSize(c, type); - } - }; - columnSerializer = new ColumnSerializer(this); - onDiskAtomSerializer = new OnDiskAtom.Serializer(this); - namesQueryFilterSerializer = new NamesQueryFilter.Serializer(this); - diskAtomFilterSerializer = new IDiskAtomFilter.Serializer(this); - } - - public final Comparator<Cell> columnComparator(boolean isRightNative) - { - if (!isByteOrderComparable) - return columnComparator; - return getByteOrderColumnComparator(isRightNative); - } - - public final Comparator<Object> asymmetricColumnComparator(boolean isRightNative) - { - if (!isByteOrderComparable) - return asymmetricComparator; - return getByteOrderAsymmetricColumnComparator(isRightNative); - } - - public Comparator<Cell> columnReverseComparator() - { - return columnReverseComparator; - } - - public Comparator<OnDiskAtom> onDiskAtomComparator() - { - return onDiskAtomComparator; - } - - public ISerializer<CellName> cellSerializer() - { - return cellSerializer; - } - - public ColumnSerializer columnSerializer() - { - return columnSerializer; - } - - public OnDiskAtom.Serializer onDiskAtomSerializer() - { - return onDiskAtomSerializer; - } - - public IVersionedSerializer<NamesQueryFilter> namesQueryFilterSerializer() - { - return namesQueryFilterSerializer; - } - - public IVersionedSerializer<IDiskAtomFilter> diskAtomFilterSerializer() - { - return diskAtomFilterSerializer; - } - - public CellName cellFromByteBuffer(ByteBuffer bytes) - { - // we're not guaranteed to get a CellName back from fromByteBuffer(), so it's on the caller to guarantee this - return (CellName)fromByteBuffer(bytes); - } - - public CellName create(Composite prefix, ColumnDefinition column, ByteBuffer collectionElement) - { - throw new UnsupportedOperationException(); - } - - public CellName rowMarker(Composite prefix) - { - throw new UnsupportedOperationException(); - } - - public Composite staticPrefix() - { - throw new UnsupportedOperationException(); - } - - public boolean hasCollections() - { - return false; - } - - public boolean supportCollections() - { - return false; - } - - public ColumnToCollectionType collectionType() - { - throw new UnsupportedOperationException(); - } - - public CellNameType addOrUpdateCollection(ColumnIdentifier columnName, CollectionType newCollection) - { - throw new UnsupportedOperationException(); - } - - @Override - public Composite make(Object... components) - { - return components.length == size() ? makeCellName(components) : super.make(components); - } - - public CellName makeCellName(Object... components) - { - ByteBuffer[] rawComponents = new ByteBuffer[components.length]; - for (int i = 0; i < components.length; i++) - { - Object c = components[i]; - if (c instanceof ByteBuffer) - { - rawComponents[i] = (ByteBuffer)c; - } - else - { - AbstractType<?> type = subtype(i); - // If it's a collection type, we need to find the right collection and use the key comparator (since we're building a cell name) - if (type instanceof ColumnToCollectionType) - { - assert i > 0; - type = ((ColumnToCollectionType)type).defined.get(rawComponents[i-1]).nameComparator(); - } - rawComponents[i] = ((AbstractType)type).decompose(c); - } - } - return makeCellName(rawComponents); - } - - protected abstract CellName makeCellName(ByteBuffer[] components); - - protected static CQL3Row.Builder makeDenseCQL3RowBuilder(final long now) - { - return new CQL3Row.Builder() - { - public CQL3Row.RowIterator group(Iterator<Cell> cells) - { - return new DenseRowIterator(cells, now); - } - }; - } - - private static class DenseRowIterator extends AbstractIterator<CQL3Row> implements CQL3Row.RowIterator - { - private final Iterator<Cell> cells; - private final long now; - - public DenseRowIterator(Iterator<Cell> cells, long now) - { - this.cells = cells; - this.now = now; - } - - public CQL3Row getStaticRow() - { - // There can't be static columns in dense tables - return null; - } - - protected CQL3Row computeNext() - { - while (cells.hasNext()) - { - final Cell cell = cells.next(); - if (!cell.isLive(now)) - continue; - - return new CQL3Row() - { - public ByteBuffer getClusteringColumn(int i) - { - return cell.name().get(i); - } - - public Cell getColumn(ColumnIdentifier name) - { - return cell; - } - - public List<Cell> getMultiCellColumn(ColumnIdentifier name) - { - return null; - } - }; - } - return endOfData(); - } - } - - protected static CQL3Row.Builder makeSparseCQL3RowBuilder(final CFMetaData cfMetaData, final CellNameType type, final long now) - { - return new CQL3Row.Builder() - { - public CQL3Row.RowIterator group(Iterator<Cell> cells) - { - return new SparseRowIterator(cfMetaData, type, cells, now); - } - }; - } - - private static class SparseRowIterator extends AbstractIterator<CQL3Row> implements CQL3Row.RowIterator - { - private final CFMetaData cfMetaData; - private final CellNameType type; - private final Iterator<Cell> cells; - private final long now; - private final CQL3Row staticRow; - - private Cell nextCell; - private CellName previous; - private CQL3RowOfSparse currentRow; - - public SparseRowIterator(CFMetaData cfMetaData, CellNameType type, Iterator<Cell> cells, long now) - { - this.cfMetaData = cfMetaData; - this.type = type; - this.cells = cells; - this.now = now; - this.staticRow = hasNextCell() && nextCell.name().isStatic() - ? computeNext() - : null; - } - - public CQL3Row getStaticRow() - { - return staticRow; - } - - private boolean hasNextCell() - { - if (nextCell != null) - return true; - - while (cells.hasNext()) - { - Cell cell = cells.next(); - if (!cell.isLive(now)) - continue; - - nextCell = cell; - return true; - } - return false; - } - - protected CQL3Row computeNext() - { - while (hasNextCell()) - { - CQL3Row toReturn = null; - CellName current = nextCell.name(); - if (currentRow == null || !current.isSameCQL3RowAs(type, previous)) - { - toReturn = currentRow; - currentRow = new CQL3RowOfSparse(cfMetaData, current); - } - currentRow.add(nextCell); - nextCell = null; - previous = current; - - if (toReturn != null) - return toReturn; - } - if (currentRow != null) - { - CQL3Row toReturn = currentRow; - currentRow = null; - return toReturn; - } - return endOfData(); - } - } - - private static class CQL3RowOfSparse implements CQL3Row - { - private final CFMetaData cfMetaData; - private final CellName cell; - private Map<ColumnIdentifier, Cell> columns; - private Map<ColumnIdentifier, List<Cell>> collections; - - CQL3RowOfSparse(CFMetaData metadata, CellName cell) - { - this.cfMetaData = metadata; - this.cell = cell; - } - - public ByteBuffer getClusteringColumn(int i) - { - return cell.get(i); - } - - void add(Cell cell) - { - CellName cellName = cell.name(); - ColumnIdentifier columnName = cellName.cql3ColumnName(cfMetaData); - if (cellName.isCollectionCell()) - { - if (collections == null) - collections = new HashMap<>(); - - List<Cell> values = collections.get(columnName); - if (values == null) - { - values = new ArrayList<Cell>(); - collections.put(columnName, values); - } - values.add(cell); - } - else - { - if (columns == null) - columns = new HashMap<>(); - columns.put(columnName, cell); - } - } - - public Cell getColumn(ColumnIdentifier name) - { - return columns == null ? null : columns.get(name); - } - - public List<Cell> getMultiCellColumn(ColumnIdentifier name) - { - return collections == null ? null : collections.get(name); - } - } -}
