Repository: cassandra Updated Branches: refs/heads/trunk b636b0742 -> f81a91d3f
Add sstable flush observer patch by Pavel Yaskevich; reviewed by Sam Tunnicliffe for CASSANDRA-10678 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f81a91d3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f81a91d3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f81a91d3 Branch: refs/heads/trunk Commit: f81a91d3fe0d1cd93f093c74356a1d7d018ed22f Parents: b636b07 Author: Pavel Yaskevich <[email protected]> Authored: Fri Nov 6 18:38:47 2015 -0800 Committer: Pavel Yaskevich <[email protected]> Committed: Thu Nov 19 14:16:59 2015 -0800 ---------------------------------------------------------------------- CHANGES.txt | 2 +- .../apache/cassandra/db/ColumnFamilyStore.java | 2 +- .../org/apache/cassandra/db/ColumnIndex.java | 18 +- .../compaction/AbstractCompactionStrategy.java | 11 +- .../db/compaction/CompactionManager.java | 2 + .../compaction/CompactionStrategyManager.java | 13 +- .../cassandra/db/compaction/Upgrader.java | 1 + .../writers/DefaultCompactionWriter.java | 1 + .../writers/MajorLeveledCompactionWriter.java | 1 + .../writers/MaxSSTableSizeWriter.java | 1 + .../SplittingSizeTieredCompactionWriter.java | 1 + src/java/org/apache/cassandra/index/Index.java | 15 ++ .../io/sstable/AbstractSSTableSimpleWriter.java | 4 +- .../cassandra/io/sstable/SSTableTxnWriter.java | 11 +- .../io/sstable/SimpleSSTableMultiWriter.java | 4 +- .../io/sstable/format/SSTableFlushObserver.java | 55 +++++ .../io/sstable/format/SSTableWriter.java | 83 +++++-- .../io/sstable/format/big/BigFormat.java | 9 +- .../io/sstable/format/big/BigTableWriter.java | 8 +- .../apache/cassandra/db/RowIndexEntryTest.java | 2 +- .../unit/org/apache/cassandra/db/ScrubTest.java | 2 +- .../db/lifecycle/RealTransactionsTest.java | 1 + .../io/sstable/SSTableRewriterTest.java | 2 +- .../format/SSTableFlushObserverTest.java | 217 +++++++++++++++++++ 24 files changed, 425 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index dd68a3c..d4efbcc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.2 + * Add sstable flush observer (CASSANDRA-10678) * Improve NTS endpoints calculation (CASSANDRA-10200) * Improve performance of the folderSize function (CASSANDRA-10677) * Add support for type casting in selection clause (CASSANDRA-10310) @@ -6,7 +7,6 @@ * Abort in-progress queries that time out (CASSANDRA-7392) * Add transparent data encryption core classes (CASSANDRA-9945) - 3.1 Merged from 3.0: * Correctly preserve deletion info on updated rows when notifying indexers http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 23cede4..08ce2dd 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -469,7 +469,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn) { - return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, txn); + return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, indexManager.listIndexes(), txn); } /** call when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/ColumnIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java index ede3f79..749c155 100644 --- a/src/java/org/apache/cassandra/db/ColumnIndex.java +++ b/src/java/org/apache/cassandra/db/ColumnIndex.java @@ -25,6 +25,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.sstable.IndexHelper; +import org.apache.cassandra.io.sstable.format.SSTableFlushObserver; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.SequentialWriter; import org.apache.cassandra.utils.ByteBufferUtil; @@ -44,11 +45,15 @@ public class ColumnIndex this.columnsIndex = columnsIndex; } - public static ColumnIndex writeAndBuildIndex(UnfilteredRowIterator iterator, SequentialWriter output, SerializationHeader header, Version version) throws IOException + public static ColumnIndex writeAndBuildIndex(UnfilteredRowIterator iterator, + SequentialWriter output, + SerializationHeader header, + Collection<SSTableFlushObserver> observers, + Version version) throws IOException { assert !iterator.isEmpty() && version.storeRows(); - Builder builder = new Builder(iterator, output, header, version.correspondingMessagingVersion()); + Builder builder = new Builder(iterator, output, header, observers, version.correspondingMessagingVersion()); return builder.build(); } @@ -83,15 +88,19 @@ public class ColumnIndex private DeletionTime openMarker; + private final Collection<SSTableFlushObserver> observers; + public Builder(UnfilteredRowIterator iterator, SequentialWriter writer, SerializationHeader header, + Collection<SSTableFlushObserver> observers, int version) { this.iterator = iterator; this.writer = writer; this.header = header; this.version = version; + this.observers = observers == null ? Collections.emptyList() : observers; this.initialPosition = writer.position(); } @@ -142,6 +151,11 @@ public class ColumnIndex } UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, pos - previousRowStart, version); + + // notify observers about each new cell added to the row + if (!observers.isEmpty() && unfiltered.isRow()) + ((Row) unfiltered).stream().forEach(cell -> observers.forEach((o) -> o.nextCell(cell))); + lastClustering = unfiltered.clustering(); previousRowStart = pos; ++written; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index ae8839e..cab56bb 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.RateLimiter; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.index.Index; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.SimpleSSTableMultiWriter; @@ -509,8 +510,14 @@ public abstract class AbstractCompactionStrategy return groupedSSTables; } - public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector meta, SerializationHeader header, LifecycleTransaction txn) + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, + long keyCount, + long repairedAt, + MetadataCollector meta, + SerializationHeader header, + Collection<Index> indexes, + LifecycleTransaction txn) { - return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, txn); + return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, indexes, txn); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 559a2ea..02d6aa1 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -963,6 +963,7 @@ public class CompactionManager implements CompactionManagerMBean repairedAt, sstable.getSSTableLevel(), sstable.header, + cfs.indexManager.listIndexes(), txn); } @@ -995,6 +996,7 @@ public class CompactionManager implements CompactionManagerMBean cfs.metadata, new MetadataCollector(sstables, cfs.metadata.comparator, minLevel), SerializationHeader.make(cfs.metadata, sstables), + cfs.indexManager.listIndexes(), txn); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index bd72c64..7c7e86a 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -22,6 +22,7 @@ import java.util.*; import java.util.concurrent.Callable; import com.google.common.collect.Iterables; +import org.apache.cassandra.index.Index; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -499,15 +500,21 @@ public class CompactionStrategyManager implements INotificationConsumer return Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES)); } - public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn) + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, + long keyCount, + long repairedAt, + MetadataCollector collector, + SerializationHeader header, + Collection<Index> indexes, + LifecycleTransaction txn) { if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE) { - return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn); + return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn); } else { - return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn); + return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/Upgrader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java index fcd1a3c..3f0f9a3 100644 --- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java +++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java @@ -75,6 +75,7 @@ public class Upgrader cfs.metadata, sstableMetadataCollector, SerializationHeader.make(cfs.metadata, Sets.newHashSet(sstable)), + cfs.indexManager.listIndexes(), transaction); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java index 8b90224..8b4351f 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java @@ -67,6 +67,7 @@ public class DefaultCompactionWriter extends CompactionAwareWriter cfs.metadata, new MetadataCollector(txn.originals(), cfs.metadata.comparator, 0), SerializationHeader.make(cfs.metadata, nonExpiredSSTables), + cfs.indexManager.listIndexes(), txn); sstableWriter.switchWriter(writer); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java index 6d191f8..b0c4562 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java @@ -106,6 +106,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter cfs.metadata, new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel), SerializationHeader.make(cfs.metadata, nonExpiredSSTables), + cfs.indexManager.listIndexes(), txn); sstableWriter.switchWriter(writer); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java index 142fe87..1dc72e7 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java @@ -87,6 +87,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter cfs.metadata, new MetadataCollector(allSSTables, cfs.metadata.comparator, level), SerializationHeader.make(cfs.metadata, nonExpiredSSTables), + cfs.indexManager.listIndexes(), txn); sstableWriter.switchWriter(writer); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java index 796391c..3a7f526 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java @@ -111,6 +111,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter cfs.metadata, new MetadataCollector(allSSTables, cfs.metadata.comparator, 0), SerializationHeader.make(cfs.metadata, nonExpiredSSTables), + cfs.indexManager.listIndexes(), txn); logger.trace("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite); sstableWriter.switchWriter(writer); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/index/Index.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java index 8655044..7bca924 100644 --- a/src/java/org/apache/cassandra/index/Index.java +++ b/src/java/org/apache/cassandra/index/Index.java @@ -7,6 +7,7 @@ import java.util.function.BiFunction; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.cql3.Operator; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.filter.RowFilter; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.partitions.PartitionIterator; @@ -15,6 +16,8 @@ import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.index.transactions.IndexTransaction; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.SSTableFlushObserver; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.utils.concurrent.OpOrder; @@ -185,6 +188,18 @@ public interface Index */ public boolean shouldBuildBlocking(); + /** + * Get flush observer to observe partition/cell events generated by flushing SSTable (memtable flush or compaction). + * + * @param descriptor The descriptor of the sstable observer is requested for. + * @param opType The type of the operation which requests observer e.g. memtable flush or compaction. + * + * @return SSTable flush observer. + */ + default SSTableFlushObserver getFlushObserver(Descriptor descriptor, OperationType opType) + { + return null; + } /* * Index selection http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java index 62348ec..0213fd5 100644 --- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java @@ -22,6 +22,7 @@ import java.io.FilenameFilter; import java.io.IOException; import java.io.Closeable; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -65,7 +66,8 @@ abstract class AbstractSSTableSimpleWriter implements Closeable 0, ActiveRepairService.UNREPAIRED_SSTABLE, 0, - new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS)); + new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS), + Collections.emptySet()); } private static Descriptor createDescriptor(File directory, final String keyspace, final String columnFamily, final SSTableFormat.Type fmt) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java index e889d85..5286ac5 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java @@ -27,6 +27,7 @@ import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.index.Index; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.utils.concurrent.Transactional; @@ -102,12 +103,18 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem } @SuppressWarnings("resource") // log and writer closed during postCleanup - public static SSTableTxnWriter create(CFMetaData cfm, Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header) + public static SSTableTxnWriter create(CFMetaData cfm, + Descriptor descriptor, + long keyCount, + long repairedAt, + int sstableLevel, + SerializationHeader header, + Collection<Index> indexes) { // if the column family store does not exist, we create a new default SSTableMultiWriter to use: LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE); MetadataCollector collector = new MetadataCollector(cfm.comparator).sstableLevel(sstableLevel); - SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfm, collector, header, txn); + SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfm, collector, header, indexes, txn); return new SSTableTxnWriter(txn, writer); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java index fd1b9a7..68dbd74 100644 --- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java @@ -27,6 +27,7 @@ import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.index.Index; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; @@ -109,9 +110,10 @@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter CFMetaData cfm, MetadataCollector metadataCollector, SerializationHeader header, + Collection<Index> indexes, LifecycleTransaction txn) { - SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, txn); + SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, indexes, txn); return new SimpleSSTableMultiWriter(writer); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java new file mode 100644 index 0000000..d6f54e2 --- /dev/null +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.rows.ColumnData; + +/** + * Observer for events in the lifecycle of writing out an sstable. + */ +public interface SSTableFlushObserver +{ + /** + * Called before writing any data to the sstable. + */ + void begin(); + + /** + * Called when a new partition in being written to the sstable, + * but before any cells are processed (see {@link #nextCell(ColumnData)}). + * + * @param key The key being appended to SSTable. + * @param indexPosition The position of the key in the SSTable PRIMARY_INDEX file. + */ + void startPartition(DecoratedKey key, long indexPosition); + + /** + * Called after the cell is written to the sstable. + * Will be preceded by a call to {@code startPartition(DecoratedKey, long)}, + * and the cell should be assumed to belong to that row. + * + * @param cell The cell being added to the row. + */ + void nextCell(ColumnData cell); + + /** + * Called when all data is written to the file and it's ready to be finished up. + */ + void complete(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index 4cbbd70..3203964 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@ -18,20 +18,21 @@ package org.apache.cassandra.io.sstable.format; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; +import java.util.*; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.index.Index; +import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTable; @@ -58,6 +59,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer; protected final SerializationHeader header; protected final TransactionalProxy txnProxy = txnProxy(); + protected final Collection<SSTableFlushObserver> observers; protected abstract TransactionalProxy txnProxy(); @@ -69,12 +71,13 @@ public abstract class SSTableWriter extends SSTable implements Transactional protected boolean openResult; } - protected SSTableWriter(Descriptor descriptor, - long keyCount, - long repairedAt, - CFMetaData metadata, - MetadataCollector metadataCollector, - SerializationHeader header) + protected SSTableWriter(Descriptor descriptor, + long keyCount, + long repairedAt, + CFMetaData metadata, + MetadataCollector metadataCollector, + SerializationHeader header, + Collection<SSTableFlushObserver> observers) { super(descriptor, components(metadata), metadata); this.keyCount = keyCount; @@ -82,6 +85,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional this.metadataCollector = metadataCollector; this.header = header; this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata, descriptor.version, header); + this.observers = observers == null ? Collections.emptySet() : observers; } public static SSTableWriter create(Descriptor descriptor, @@ -90,16 +94,23 @@ public abstract class SSTableWriter extends SSTable implements Transactional CFMetaData metadata, MetadataCollector metadataCollector, SerializationHeader header, + Collection<Index> indexes, LifecycleTransaction txn) { Factory writerFactory = descriptor.getFormat().getWriterFactory(); - return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, txn); + return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers(descriptor, indexes, txn.opType()), txn); } - public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn) + public static SSTableWriter create(Descriptor descriptor, + long keyCount, + long repairedAt, + int sstableLevel, + SerializationHeader header, + Collection<Index> indexes, + LifecycleTransaction txn) { CFMetaData metadata = Schema.instance.getCFMetaData(descriptor); - return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, txn); + return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, indexes, txn); } public static SSTableWriter create(CFMetaData metadata, @@ -108,21 +119,34 @@ public abstract class SSTableWriter extends SSTable implements Transactional long repairedAt, int sstableLevel, SerializationHeader header, + Collection<Index> indexes, LifecycleTransaction txn) { MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel); - return create(descriptor, keyCount, repairedAt, metadata, collector, header, txn); + return create(descriptor, keyCount, repairedAt, metadata, collector, header, indexes, txn); } - public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header,LifecycleTransaction txn) + public static SSTableWriter create(String filename, + long keyCount, + long repairedAt, + int sstableLevel, + SerializationHeader header, + Collection<Index> indexes, + LifecycleTransaction txn) { - return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, txn); + return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, indexes, txn); } @VisibleForTesting - public static SSTableWriter create(String filename, long keyCount, long repairedAt, SerializationHeader header, LifecycleTransaction txn) + public static SSTableWriter create(String filename, + long keyCount, + long repairedAt, + SerializationHeader header, + Collection<Index> indexes, + LifecycleTransaction txn) { - return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 0, header, txn); + Descriptor descriptor = Descriptor.fromFilename(filename); + return create(descriptor, keyCount, repairedAt, 0, header, indexes, txn); } private static Set<Component> components(CFMetaData metadata) @@ -150,6 +174,27 @@ public abstract class SSTableWriter extends SSTable implements Transactional return components; } + private static Collection<SSTableFlushObserver> observers(Descriptor descriptor, + Collection<Index> indexes, + OperationType operationType) + { + if (indexes == null) + return Collections.emptyList(); + + List<SSTableFlushObserver> observers = new ArrayList<>(indexes.size()); + for (Index index : indexes) + { + SSTableFlushObserver observer = index.getFlushObserver(descriptor, operationType); + if (observer != null) + { + observer.begin(); + observers.add(observer); + } + } + + return ImmutableList.copyOf(observers); + } + public abstract void mark(); /** @@ -211,6 +256,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional { setOpenResult(openResult); txnProxy.finish(); + observers.forEach(SSTableFlushObserver::complete); return finished(); } @@ -285,6 +331,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional CFMetaData metadata, MetadataCollector metadataCollector, SerializationHeader header, + Collection<SSTableFlushObserver> observers, LifecycleTransaction txn); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java index 1f2a98f..e030b5b 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.io.sstable.format.big; +import java.util.Collection; import java.util.Set; import org.apache.cassandra.config.CFMetaData; @@ -25,10 +26,7 @@ import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.format.SSTableFormat; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.sstable.format.SSTableWriter; -import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.format.*; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.net.MessagingService; @@ -88,9 +86,10 @@ public class BigFormat implements SSTableFormat CFMetaData metadata, MetadataCollector metadataCollector, SerializationHeader header, + Collection<SSTableFlushObserver> observers, LifecycleTransaction txn) { - return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, txn); + return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers, txn); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index b6077e0..5fe9147 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -18,12 +18,14 @@ package org.apache.cassandra.io.sstable.format.big; import java.io.*; +import java.util.Collection; import java.util.Map; import org.apache.cassandra.db.*; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.io.sstable.format.SSTableFlushObserver; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; @@ -63,9 +65,10 @@ public class BigTableWriter extends SSTableWriter CFMetaData metadata, MetadataCollector metadataCollector, SerializationHeader header, + Collection<SSTableFlushObserver> observers, LifecycleTransaction txn) { - super(descriptor, keyCount, repairedAt, metadata, metadataCollector, header); + super(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers); txn.trackNew(this); // must track before any files are created if (compression) @@ -143,10 +146,11 @@ public class BigTableWriter extends SSTableWriter return null; long startPosition = beforeAppend(key); + observers.forEach((o) -> o.startPartition(key, iwriter.indexFile.position())); try (UnfilteredRowIterator collecting = Transformation.apply(iterator, new StatsCollector(metadataCollector))) { - ColumnIndex index = ColumnIndex.writeAndBuildIndex(collecting, dataFile, header, descriptor.version); + ColumnIndex index = ColumnIndex.writeAndBuildIndex(collecting, dataFile, header, observers, descriptor.version); RowIndexEntry entry = RowIndexEntry.create(startPosition, collecting.partitionLevelDeletion(), index); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java index 62c88a0..0c7ee59 100644 --- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java +++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java @@ -135,7 +135,7 @@ public class RowIndexEntryTest extends CQLTester File tempFile = File.createTempFile("row_index_entry_test", null); tempFile.deleteOnExit(); SequentialWriter writer = SequentialWriter.open(tempFile); - ColumnIndex columnIndex = ColumnIndex.writeAndBuildIndex(partition.unfilteredIterator(), writer, header, BigFormat.latestVersion); + ColumnIndex columnIndex = ColumnIndex.writeAndBuildIndex(partition.unfilteredIterator(), writer, header, Collections.emptySet(), BigFormat.latestVersion); RowIndexEntry<IndexHelper.IndexInfo> withIndex = RowIndexEntry.create(0xdeadbeef, DeletionTime.LIVE, columnIndex); IndexHelper.IndexInfo.Serializer indexSerializer = new IndexHelper.IndexInfo.Serializer(cfs.metadata, BigFormat.latestVersion, header); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java index d5baec8..27b774d 100644 --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@ -652,7 +652,7 @@ public class ScrubTest TestWriter(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn) { - super(descriptor, keyCount, repairedAt, metadata, collector, header, txn); + super(descriptor, keyCount, repairedAt, metadata, collector, header, Collections.emptySet(), txn); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java index 4fbbb36..bab9c90 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java @@ -168,6 +168,7 @@ public class RealTransactionsTest extends SchemaLoader 0, 0, SerializationHeader.make(cfs.metadata, txn.originals()), + cfs.indexManager.listIndexes(), txn)); while (ci.hasNext()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index bfe7b08..de9b357 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -1011,7 +1011,7 @@ public class SSTableRewriterTest extends SchemaLoader public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn) { String filename = cfs.getSSTablePath(directory); - return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn); + return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), cfs.indexManager.listIndexes(), txn); } public static ByteBuffer random(int i, int size) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java b/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java new file mode 100644 index 0000000..29ad387 --- /dev/null +++ b/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.LongType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.big.BigTableWriter; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Pair; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; + +import junit.framework.Assert; +import org.junit.Test; + +public class SSTableFlushObserverTest +{ + private static final String KS_NAME = "test"; + private static final String CF_NAME = "flush_observer"; + + @Test + public void testFlushObserver() + { + CFMetaData cfm = CFMetaData.Builder.create(KS_NAME, CF_NAME) + .addPartitionKey("id", UTF8Type.instance) + .addRegularColumn("first_name", UTF8Type.instance) + .addRegularColumn("age", Int32Type.instance) + .addRegularColumn("height", LongType.instance) + .build(); + + LifecycleTransaction transaction = LifecycleTransaction.offline(OperationType.COMPACTION); + FlushObserver observer = new FlushObserver(); + + String sstableDirectory = DatabaseDescriptor.getAllDataFileLocations()[0]; + File directory = new File(sstableDirectory + File.pathSeparator + KS_NAME + File.pathSeparator + CF_NAME); + directory.deleteOnExit(); + + if (!directory.exists() && !directory.mkdirs()) + throw new FSWriteError(new IOException("failed to create tmp directory"), directory.getAbsolutePath()); + + SSTableFormat.Type sstableFormat = DatabaseDescriptor.getSSTableFormat(); + + BigTableWriter writer = new BigTableWriter(new Descriptor(sstableFormat.info.getLatestVersion().version, + directory, + KS_NAME, CF_NAME, + 0, + sstableFormat), + 10L, 0L, cfm, + new MetadataCollector(cfm.comparator).sstableLevel(0), + new SerializationHeader(true, cfm, cfm.partitionColumns(), EncodingStats.NO_STATS), + Collections.singletonList(observer), + transaction); + + SSTableReader reader = null; + Multimap<ByteBuffer, Cell> expected = ArrayListMultimap.create(); + + try + { + final long now = System.currentTimeMillis(); + + ByteBuffer key = UTF8Type.instance.fromString("key1"); + expected.putAll(key, Arrays.asList(BufferCell.live(cfm, getColumn(cfm, "first_name"), now,UTF8Type.instance.fromString("jack")), + BufferCell.live(cfm, getColumn(cfm, "age"), now, Int32Type.instance.decompose(27)), + BufferCell.live(cfm, getColumn(cfm, "height"), now, LongType.instance.decompose(183L)))); + + writer.append(new RowIterator(cfm, key.duplicate(), Collections.singletonList(buildRow(expected.get(key))))); + + key = UTF8Type.instance.fromString("key2"); + expected.putAll(key, Arrays.asList(BufferCell.live(cfm, getColumn(cfm, "first_name"), now,UTF8Type.instance.fromString("jim")), + BufferCell.live(cfm, getColumn(cfm, "age"), now, Int32Type.instance.decompose(30)), + BufferCell.live(cfm, getColumn(cfm, "height"), now, LongType.instance.decompose(180L)))); + + writer.append(new RowIterator(cfm, key, Collections.singletonList(buildRow(expected.get(key))))); + + key = UTF8Type.instance.fromString("key3"); + expected.putAll(key, Arrays.asList(BufferCell.live(cfm, getColumn(cfm, "first_name"), now,UTF8Type.instance.fromString("ken")), + BufferCell.live(cfm, getColumn(cfm, "age"), now, Int32Type.instance.decompose(30)), + BufferCell.live(cfm, getColumn(cfm, "height"), now, LongType.instance.decompose(178L)))); + + writer.append(new RowIterator(cfm, key, Collections.singletonList(buildRow(expected.get(key))))); + + reader = writer.finish(true); + } + finally + { + FileUtils.closeQuietly(writer); + } + + Assert.assertTrue(observer.isComplete); + Assert.assertEquals(expected.size(), observer.rows.size()); + + for (Pair<ByteBuffer, Long> e : observer.rows.keySet()) + { + ByteBuffer key = e.left; + Long indexPosition = e.right; + + try (FileDataInput index = reader.ifile.createReader(indexPosition)) + { + ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(index); + Assert.assertEquals(0, UTF8Type.instance.compare(key, indexKey)); + } + catch (IOException ex) + { + throw new FSReadError(ex, reader.getIndexFilename()); + } + + Assert.assertEquals(expected.get(key), observer.rows.get(e)); + } + } + + private static class RowIterator extends AbstractUnfilteredRowIterator + { + private final Iterator<Unfiltered> rows; + + public RowIterator(CFMetaData cfm, ByteBuffer key, Collection<Unfiltered> content) + { + super(cfm, + DatabaseDescriptor.getPartitioner().decorateKey(key), + DeletionTime.LIVE, + cfm.partitionColumns(), + BTreeRow.emptyRow(Clustering.STATIC_CLUSTERING), + false, + EncodingStats.NO_STATS); + + rows = content.iterator(); + } + + @Override + protected Unfiltered computeNext() + { + return rows.hasNext() ? rows.next() : endOfData(); + } + } + + private static class FlushObserver implements SSTableFlushObserver + { + private final Multimap<Pair<ByteBuffer, Long>, Cell> rows = ArrayListMultimap.create(); + private Pair<ByteBuffer, Long> currentKey; + private boolean isComplete; + + @Override + public void begin() + {} + + @Override + public void startPartition(DecoratedKey key, long indexPosition) + { + currentKey = Pair.create(key.getKey(), indexPosition); + } + + @Override + public void nextCell(ColumnData cell) + { + rows.put(currentKey, (Cell) cell); + } + + @Override + public void complete() + { + isComplete = true; + } + } + + private static Row buildRow(Collection<Cell> cells) + { + Row.Builder rowBuilder = BTreeRow.sortedBuilder(); + rowBuilder.newRow(Clustering.EMPTY); + cells.forEach(rowBuilder::addCell); + return rowBuilder.build(); + } + + private static ColumnDefinition getColumn(CFMetaData cfm, String name) + { + return cfm.getColumnDefinition(UTF8Type.instance.fromString(name)); + } +}
