Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 0d866456a -> 9ed272773
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java new file mode 100644 index 0000000..2112656 --- /dev/null +++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable; + +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.RowIndexEntry; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; + +public class SimpleSSTableMultiWriter implements SSTableMultiWriter +{ + private final SSTableWriter writer; + + private SimpleSSTableMultiWriter(SSTableWriter writer) + { + this.writer = writer; + } + + public boolean append(UnfilteredRowIterator partition) + { + RowIndexEntry indexEntry = writer.append(partition); + return indexEntry != null; + } + + public Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult) + { + return Collections.singleton(writer.finish(repairedAt, maxDataAge, openResult)); + } + + public Collection<SSTableReader> finish(boolean openResult) + { + return Collections.singleton(writer.finish(openResult)); + } + + public Collection<SSTableReader> finished() + { + return Collections.singleton(writer.finished()); + } + + public SSTableMultiWriter setOpenResult(boolean openResult) + { + writer.setOpenResult(openResult); + return this; + } + + public String getFilename() + { + return writer.getFilename(); + } + + public long getFilePointer() + { + return writer.getFilePointer(); + } + + public UUID getCfId() + { + return writer.metadata.cfId; + } + + public Throwable commit(Throwable accumulate) + { + return writer.commit(accumulate); + } + + public Throwable abort(Throwable accumulate) + { + return writer.abort(accumulate); + } + + public void prepareToCommit() + { + writer.prepareToCommit(); + } + + public void close() throws Exception + { + writer.close(); + } + + public static SSTableMultiWriter create(Descriptor descriptor, + long keyCount, + long repairedAt, + CFMetaData cfm, + MetadataCollector metadataCollector, + SerializationHeader header, + LifecycleTransaction txn) + { + SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, txn); + return new SimpleSSTableMultiWriter(writer); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java b/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java index 15230ea..56d6130 100644 --- a/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java +++ b/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java @@ -21,8 +21,8 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; public class SSTableAddedNotification implements INotification { - public final SSTableReader added; - public SSTableAddedNotification(SSTableReader added) + public final Iterable<SSTableReader> added; + public SSTableAddedNotification(Iterable<SSTableReader> added) { this.added = added; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index a098786..d4b7283 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -36,12 +36,11 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.SSTableSimpleIterator; import org.apache.cassandra.io.sstable.format.SSTableFormat; -import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.messages.FileMessageHeader; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.BytesReadTracker; @@ -85,7 +84,7 @@ public class StreamReader * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. */ @SuppressWarnings("resource") - public SSTableWriter read(ReadableByteChannel channel) throws IOException + public SSTableMultiWriter read(ReadableByteChannel channel) throws IOException { logger.debug("reading file from {}, repairedAt = {}, level = {}", session.peer, repairedAt, sstableLevel); long totalSize = totalSize(); @@ -98,7 +97,7 @@ public class StreamReader } ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); - SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format); + SSTableMultiWriter writer = createWriter(cfs, totalSize, repairedAt, format); DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel))); BytesReadTracker in = new BytesReadTracker(dis); @@ -115,7 +114,8 @@ public class StreamReader } catch (Throwable e) { - writer.abort(); + SSTableMultiWriter.abortOrDie(writer); + drain(dis, in.getBytesRead()); if (e instanceof IOException) throw (IOException) e; @@ -124,14 +124,16 @@ public class StreamReader } } - protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException + protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException { - Directories.DataDirectory localDir = cfs.directories.getWriteableLocation(totalSize); + Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize); if (localDir == null) throw new IOException("Insufficient disk space to store " + totalSize + " bytes"); - desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.directories.getLocationForDisk(localDir), format)); - return SSTableWriter.create(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), session.getTransaction(cfId)); + desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format)); + + + return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), session.getTransaction(cfId)); } protected void drain(InputStream dis, long bytesRead) throws IOException @@ -161,7 +163,7 @@ public class StreamReader return size; } - protected void writePartition(StreamDeserializer deserializer, SSTableWriter writer, ColumnFamilyStore cfs) throws IOException + protected void writePartition(StreamDeserializer deserializer, SSTableMultiWriter writer, ColumnFamilyStore cfs) throws IOException { DecoratedKey key = deserializer.newPartition(); writer.append(deserializer); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 7311069..2bcbbc1 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -40,6 +40,7 @@ import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.utils.JVMStabilityInspector; @@ -68,7 +69,7 @@ public class StreamReceiveTask extends StreamTask private boolean done = false; // holds references to SSTables received - protected Collection<SSTableWriter> sstables; + protected Collection<SSTableMultiWriter> sstables; public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize) { @@ -86,12 +87,12 @@ public class StreamReceiveTask extends StreamTask * * @param sstable SSTable file received. */ - public synchronized void received(SSTableWriter sstable) + public synchronized void received(SSTableMultiWriter sstable) { if (done) return; - assert cfId.equals(sstable.metadata.cfId); + assert cfId.equals(sstable.getCfId()); sstables.add(sstable); if (sstables.size() == totalFiles) @@ -126,8 +127,8 @@ public class StreamReceiveTask extends StreamTask if (kscf == null) { // schema was dropped during streaming - for (SSTableWriter writer : task.sstables) - writer.abort(); + task.sstables.forEach(SSTableMultiWriter::abortOrDie); + task.sstables.clear(); task.txn.abort(); return; @@ -138,11 +139,11 @@ public class StreamReceiveTask extends StreamTask try { List<SSTableReader> readers = new ArrayList<>(); - for (SSTableWriter writer : task.sstables) + for (SSTableMultiWriter writer : task.sstables) { - SSTableReader reader = writer.finish(true); - readers.add(reader); - task.txn.update(reader, false); + Collection<SSTableReader> newReaders = writer.finish(true); + readers.addAll(newReaders); + task.txn.update(newReaders, false); } task.sstables.clear(); @@ -211,8 +212,7 @@ public class StreamReceiveTask extends StreamTask return; done = true; - for (SSTableWriter writer : sstables) - writer.abort(); + sstables.forEach(SSTableMultiWriter::abortOrDie); txn.abort(); sstables.clear(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index 205291b..f702e24 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@ -25,7 +25,7 @@ import java.nio.channels.ReadableByteChannel; import com.google.common.base.Throwables; -import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,7 +62,7 @@ public class CompressedStreamReader extends StreamReader */ @Override @SuppressWarnings("resource") - public SSTableWriter read(ReadableByteChannel channel) throws IOException + public SSTableMultiWriter read(ReadableByteChannel channel) throws IOException { logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt); long totalSize = totalSize(); @@ -75,7 +75,7 @@ public class CompressedStreamReader extends StreamReader } ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); - SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format); + SSTableMultiWriter writer = createWriter(cfs, totalSize, repairedAt, format); CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.compressedChecksumType()); BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis)); @@ -102,7 +102,7 @@ public class CompressedStreamReader extends StreamReader } catch (Throwable e) { - writer.abort(); + SSTableMultiWriter.abortOrDie(writer); drain(cis, in.getBytesRead()); if (e instanceof IOException) throw (IOException) e; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java index bce9691..19f9e12 100644 --- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java @@ -17,12 +17,11 @@ */ package org.apache.cassandra.streaming.messages; -import java.io.DataInputStream; import java.io.IOException; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; -import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; import org.apache.cassandra.io.util.DataOutputStreamPlus; @@ -80,9 +79,9 @@ public class IncomingFileMessage extends StreamMessage }; public FileMessageHeader header; - public SSTableWriter sstable; + public SSTableMultiWriter sstable; - public IncomingFileMessage(SSTableWriter sstable, FileMessageHeader header) + public IncomingFileMessage(SSTableMultiWriter sstable, FileMessageHeader header) { super(Type.FILE); this.header = header; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java b/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java index c8587d8..701bbc3 100644 --- a/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java +++ b/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java @@ -29,7 +29,6 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; @@ -69,7 +68,7 @@ public class SSTableExpiredBlockers Keyspace ks = Keyspace.openWithoutSSTables(keyspace); ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnfamily); - Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true); + Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true); Set<SSTableReader> sstables = new HashSet<>(); for (Map.Entry<Descriptor, Set<Component>> sstable : lister.list().entrySet()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java b/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java index cb3cc5c..4f4e904 100644 --- a/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java +++ b/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java @@ -76,7 +76,7 @@ public class SSTableLevelResetter Keyspace keyspace = Keyspace.openWithoutSSTables(keyspaceName); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(columnfamily); boolean foundSSTable = false; - for (Map.Entry<Descriptor, Set<Component>> sstable : cfs.directories.sstableLister(Directories.OnTxnErr.THROW).list().entrySet()) + for (Map.Entry<Descriptor, Set<Component>> sstable : cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).list().entrySet()) { if (sstable.getValue().contains(Component.STATS)) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java index 95f516a..6554bd0 100644 --- a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java +++ b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java @@ -30,17 +30,14 @@ import java.util.Set; import com.google.common.base.Throwables; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.compaction.LeveledManifest; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.utils.Pair; /** * Create a decent leveling for the given keyspace/column family @@ -95,7 +92,7 @@ public class SSTableOfflineRelevel Keyspace ks = Keyspace.openWithoutSSTables(keyspace); ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnfamily); - Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true); + Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true); Set<SSTableReader> sstables = new HashSet<>(); for (Map.Entry<Descriptor, Set<Component>> sstable : lister.list().entrySet()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/StandaloneScrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java index f64b8d9..f3a1a35 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java +++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java @@ -84,7 +84,7 @@ public class StandaloneScrubber String snapshotName = "pre-scrub-" + System.currentTimeMillis(); OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug); - Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true); + Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true); List<SSTableReader> sstables = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java index 88e34b7..cf94c99 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java +++ b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java @@ -63,7 +63,7 @@ public class StandaloneUpgrader ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(options.cf); OutputHandler handler = new OutputHandler.SystemOutput(false, options.debug); - Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW); + Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW); if (options.snapshot != null) lister.onlyBackups(true).snapshots(options.snapshot); else http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/StandaloneVerifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java index 0b17e39..3412329 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java +++ b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java @@ -18,10 +18,6 @@ */ package org.apache.cassandra.tools; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; @@ -35,7 +31,6 @@ import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.OutputHandler; import org.apache.commons.cli.*; -import java.io.File; import java.util.*; import java.util.concurrent.TimeUnit; @@ -69,7 +64,7 @@ public class StandaloneVerifier ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(options.cfName); OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug); - Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true); + Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true); boolean extended = options.extended; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java index 7db978e..d684e11 100644 --- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java +++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java @@ -114,9 +114,9 @@ public class LongCompactionsTest builder.newRow(String.valueOf(i)).add("val", String.valueOf(i)); rows.put(key, builder.build()); } - SSTableReader sstable = SSTableUtils.prepare().write(rows); - sstables.add(sstable); - store.addSSTable(sstable); + Collection<SSTableReader> readers = SSTableUtils.prepare().write(rows); + sstables.addAll(readers); + store.addSSTables(readers); } // give garbage collection a bit of time to catch up http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/MockSchema.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/MockSchema.java b/test/unit/org/apache/cassandra/MockSchema.java index 249dd8d..6b50e49 100644 --- a/test/unit/org/apache/cassandra/MockSchema.java +++ b/test/unit/org/apache/cassandra/MockSchema.java @@ -86,7 +86,7 @@ public class MockSchema public static SSTableReader sstable(int generation, int size, boolean keepRef, ColumnFamilyStore cfs) { - Descriptor descriptor = new Descriptor(cfs.directories.getDirectoryForNewSSTables(), + Descriptor descriptor = new Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(), cfs.keyspace.getName(), cfs.getColumnFamilyName(), generation); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index 78f9e3e..6840e2b 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@ -329,7 +329,7 @@ public class ColumnFamilyStoreTest assertTrue(snapshotDetails.containsKey("ephemeralSnapshot")); assertTrue(snapshotDetails.containsKey("nonEphemeralSnapshot")); - ColumnFamilyStore.clearEphemeralSnapshots(cfs.directories); + ColumnFamilyStore.clearEphemeralSnapshots(cfs.getDirectories()); snapshotDetails = cfs.getSnapshotDetails(); assertEquals(1, snapshotDetails.size()); @@ -350,7 +350,7 @@ public class ColumnFamilyStoreTest for (int version = 1; version <= 2; ++version) { - Descriptor existing = new Descriptor(cfs.directories.getDirectoryForNewSSTables(), KEYSPACE2, CF_STANDARD1, version); + Descriptor existing = new Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(), KEYSPACE2, CF_STANDARD1, version); Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), KEYSPACE2, CF_STANDARD1, version); for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.STATS }) assertTrue("Cannot find backed-up file:" + desc.filenameFor(c), new File(desc.filenameFor(c)).exists()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java index 8889488..52b2aa8 100644 --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@ -324,12 +324,12 @@ public class ScrubTest String filename = cfs.getSSTablePath(tempDataDir); Descriptor desc = Descriptor.fromFilename(filename); - try (SSTableTxnWriter writer = SSTableTxnWriter.create(desc, - keys.size(), - 0L, - 0, - SerializationHeader.make(cfs.metadata, - Collections.emptyList()))) + try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, + keys.size(), + 0L, + 0, + SerializationHeader.make(cfs.metadata, + Collections.emptyList()))) { for (String k : keys) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java index 6dc5f53..a3167f9 100644 --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@ -53,6 +53,7 @@ import org.apache.cassandra.UpdateBuilder; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -162,10 +163,10 @@ public class AntiCompactionTest private SSTableReader writeFile(ColumnFamilyStore cfs, int count) { - File dir = cfs.directories.getDirectoryForNewSSTables(); + File dir = cfs.getDirectories().getDirectoryForNewSSTables(); String filename = cfs.getSSTablePath(dir); - try (SSTableTxnWriter writer = SSTableTxnWriter.create(filename, 0, 0, new SerializationHeader(cfm, cfm.partitionColumns(), EncodingStats.NO_STATS))) + try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(cfm, cfm.partitionColumns(), EncodingStats.NO_STATS))) { for (int i = 0; i < count; i++) { @@ -175,7 +176,10 @@ public class AntiCompactionTest writer.append(builder.build().unfilteredIterator()); } - return writer.finish(true); + Collection<SSTableReader> sstables = writer.finish(true); + assertNotNull(sstables); + assertEquals(1, sstables.size()); + return sstables.iterator().next(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java index 1b94a6b..68936f5 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java @@ -75,7 +75,7 @@ public class CompactionAwareWriterTest extends CQLTester populate(rowCount); LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getLiveSSTables(), OperationType.COMPACTION); long beforeSize = txn.originals().iterator().next().onDiskLength(); - CompactionAwareWriter writer = new DefaultCompactionWriter(cfs, txn, txn.originals()); + CompactionAwareWriter writer = new DefaultCompactionWriter(cfs, cfs.getDirectories(), txn, txn.originals()); int rows = compact(cfs, txn, writer); assertEquals(1, cfs.getLiveSSTables().size()); assertEquals(rowCount, rows); @@ -94,7 +94,7 @@ public class CompactionAwareWriterTest extends CQLTester LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getLiveSSTables(), OperationType.COMPACTION); long beforeSize = txn.originals().iterator().next().onDiskLength(); int sstableSize = (int)beforeSize/10; - CompactionAwareWriter writer = new MaxSSTableSizeWriter(cfs, txn, txn.originals(), sstableSize, 0); + CompactionAwareWriter writer = new MaxSSTableSizeWriter(cfs, cfs.getDirectories(), txn, txn.originals(), sstableSize, 0); int rows = compact(cfs, txn, writer); assertEquals(10, cfs.getLiveSSTables().size()); assertEquals(rowCount, rows); @@ -111,7 +111,7 @@ public class CompactionAwareWriterTest extends CQLTester populate(rowCount); LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getLiveSSTables(), OperationType.COMPACTION); long beforeSize = txn.originals().iterator().next().onDiskLength(); - CompactionAwareWriter writer = new SplittingSizeTieredCompactionWriter(cfs, txn, txn.originals(), 0); + CompactionAwareWriter writer = new SplittingSizeTieredCompactionWriter(cfs, cfs.getDirectories(), txn, txn.originals(), 0); int rows = compact(cfs, txn, writer); long expectedSize = beforeSize / 2; List<SSTableReader> sortedSSTables = new ArrayList<>(cfs.getLiveSSTables()); @@ -147,7 +147,7 @@ public class CompactionAwareWriterTest extends CQLTester LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getLiveSSTables(), OperationType.COMPACTION); long beforeSize = txn.originals().iterator().next().onDiskLength(); int sstableSize = (int)beforeSize/targetSSTableCount; - CompactionAwareWriter writer = new MajorLeveledCompactionWriter(cfs, txn, txn.originals(), sstableSize); + CompactionAwareWriter writer = new MajorLeveledCompactionWriter(cfs, cfs.getDirectories(), txn, txn.originals(), sstableSize); int rows = compact(cfs, txn, writer); assertEquals(targetSSTableCount, cfs.getLiveSSTables().size()); int [] levelCounts = new int[5]; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index 9d5e5fc..8035ac4 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -346,7 +346,7 @@ public class LeveledCompactionStrategyTest assertFalse(unrepaired.manifest.generations[2].contains(sstable1)); unrepaired.removeSSTable(sstable2); - strategy.handleNotification(new SSTableAddedNotification(sstable2), this); + strategy.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable2)), this); assertTrue(unrepaired.manifest.getLevel(1).contains(sstable2)); assertFalse(repaired.manifest.getLevel(1).contains(sstable2)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java index 309e35a..9eff1b1 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java @@ -34,7 +34,6 @@ import junit.framework.Assert; import org.apache.cassandra.MockSchema; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; @@ -50,7 +49,6 @@ import org.apache.cassandra.io.sstable.SSTableRewriter; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import static org.junit.Assert.assertEquals; @@ -134,11 +132,11 @@ public class RealTransactionsTest extends SchemaLoader { cfs.truncateBlocking(); - String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))"; - String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)"; + String schema = "CREATE TABLE \"%s\".\"%s\" (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))"; + String query = "INSERT INTO \"%s\".\"%s\" (key, name, val) VALUES (?, ?, ?)"; try (CQLSSTableWriter writer = CQLSSTableWriter.builder() - .inDirectory(cfs.directories.getDirectoryForNewSSTables()) + .inDirectory(cfs.getDirectories().getDirectoryForNewSSTables()) .forTable(String.format(schema, cfs.keyspace.getName(), cfs.name)) .using(String.format(query, cfs.keyspace.getName(), cfs.name)) .build()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java index 3a943c4..9bcd9e7 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java @@ -30,6 +30,7 @@ import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import org.junit.BeforeClass; import org.junit.Test; @@ -177,7 +178,7 @@ public class TrackerTest Assert.assertTrue(reader.isKeyCacheSetup()); Assert.assertEquals(17 + 121 + 9, cfs.metric.liveDiskSpaceUsed.getCount()); - Assert.assertEquals(3, listener.senders.size()); + Assert.assertEquals(1, listener.senders.size()); Assert.assertEquals(tracker, listener.senders.get(0)); Assert.assertTrue(listener.received.get(0) instanceof SSTableAddedNotification); DatabaseDescriptor.setIncrementalBackupsEnabled(backups); @@ -296,10 +297,10 @@ public class TrackerTest Assert.assertTrue(tracker.getView().flushingMemtables.contains(prev2)); SSTableReader reader = MockSchema.sstable(0, 10, false, cfs); - tracker.replaceFlushed(prev2, reader); + tracker.replaceFlushed(prev2, Collections.singleton(reader)); Assert.assertEquals(1, tracker.getView().sstables.size()); Assert.assertEquals(1, listener.received.size()); - Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added); + Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added); listener.received.clear(); Assert.assertTrue(reader.isKeyCacheSetup()); Assert.assertEquals(10, cfs.metric.liveDiskSpaceUsed.getCount()); @@ -313,12 +314,12 @@ public class TrackerTest tracker.markFlushing(prev1); reader = MockSchema.sstable(0, 10, true, cfs); cfs.invalidate(false); - tracker.replaceFlushed(prev1, reader); + tracker.replaceFlushed(prev1, Collections.singleton(reader)); Assert.assertEquals(0, tracker.getView().sstables.size()); Assert.assertEquals(0, tracker.getView().flushingMemtables.size()); Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount()); Assert.assertEquals(3, listener.received.size()); - Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added); + Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added); Assert.assertTrue(listener.received.get(1) instanceof SSTableDeletingNotification); Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(2)).removed.size()); DatabaseDescriptor.setIncrementalBackupsEnabled(backups); @@ -332,8 +333,8 @@ public class TrackerTest Tracker tracker = new Tracker(null, false); MockListener listener = new MockListener(false); tracker.subscribe(listener); - tracker.notifyAdded(r1); - Assert.assertEquals(r1, ((SSTableAddedNotification) listener.received.get(0)).added); + tracker.notifyAdded(singleton(r1)); + Assert.assertEquals(singleton(r1), ((SSTableAddedNotification) listener.received.get(0)).added); listener.received.clear(); tracker.notifyDeleting(r1); Assert.assertEquals(r1, ((SSTableDeletingNotification) listener.received.get(0)).deleting); @@ -353,8 +354,8 @@ public class TrackerTest MockListener failListener = new MockListener(true); tracker.subscribe(failListener); tracker.subscribe(listener); - Assert.assertNotNull(tracker.notifyAdded(r1, null)); - Assert.assertEquals(r1, ((SSTableAddedNotification) listener.received.get(0)).added); + Assert.assertNotNull(tracker.notifyAdded(singleton(r1), null)); + Assert.assertEquals(singleton(r1), ((SSTableAddedNotification) listener.received.get(0)).added); listener.received.clear(); Assert.assertNotNull(tracker.notifySSTablesChanged(singleton(r1), singleton(r2), OperationType.COMPACTION, null)); Assert.assertEquals(singleton(r1), ((SSTableListChangedNotification) listener.received.get(0)).removed); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java index 40afa54..523c203 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java @@ -195,7 +195,7 @@ public class ViewTest Assert.assertEquals(memtable3, cur.getCurrentMemtable()); SSTableReader sstable = MockSchema.sstable(1, cfs); - cur = View.replaceFlushed(memtable1, sstable).apply(cur); + cur = View.replaceFlushed(memtable1, Collections.singleton(sstable)).apply(cur); Assert.assertEquals(0, cur.flushingMemtables.size()); Assert.assertEquals(1, cur.liveMemtables.size()); Assert.assertEquals(memtable3, cur.getCurrentMemtable()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java index 856ef7c..e1ab48f 100644 --- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java @@ -64,12 +64,12 @@ public class BigTableWriterTest extends AbstractTransactionalTest private TestableBTW() { - this(cfs.getSSTablePath(cfs.directories.getDirectoryForNewSSTables())); + this(cfs.getSSTablePath(cfs.getDirectories().getDirectoryForNewSSTables())); } private TestableBTW(String file) { - this(file, SSTableTxnWriter.create(file, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS))); + this(file, SSTableTxnWriter.create(cfs, file, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS))); } private TestableBTW(String file, SSTableTxnWriter sw) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java index ceeb369..a9165f7 100644 --- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java @@ -30,6 +30,7 @@ import org.junit.Test; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.io.util.FileUtils; @@ -44,6 +45,7 @@ public class CQLSSTableWriterClientTest public void setUp() { this.testDirectory = Files.createTempDir(); + Keyspace.setInitialized(); } @After http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index d9516cb..1c61f51 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -246,7 +246,7 @@ public class SSTableRewriterTest extends SchemaLoader ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); truncate(cfs); - File dir = cfs.directories.getDirectoryForNewSSTables(); + File dir = cfs.getDirectories().getDirectoryForNewSSTables(); LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE, cfs.metadata); try (SSTableWriter writer = getWriter(cfs, dir, txn)) { @@ -941,10 +941,10 @@ public class SSTableRewriterTest extends SchemaLoader Set<SSTableReader> result = new LinkedHashSet<>(); for (int f = 0 ; f < fileCount ; f++) { - File dir = cfs.directories.getDirectoryForNewSSTables(); + File dir = cfs.getDirectories().getDirectoryForNewSSTables(); String filename = cfs.getSSTablePath(dir); - try (SSTableTxnWriter writer = SSTableTxnWriter.create(filename, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS))) + try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS))) { int end = f == fileCount - 1 ? partitionCount : ((f + 1) * partitionCount) / fileCount; for ( ; i < end ; i++) @@ -955,7 +955,7 @@ public class SSTableRewriterTest extends SchemaLoader writer.append(builder.build().unfilteredIterator()); } - result.add(writer.finish(true)); + result.addAll(writer.finish(true)); } } return result; @@ -972,7 +972,7 @@ public class SSTableRewriterTest extends SchemaLoader liveDescriptors.add(sstable.descriptor.generation); spaceUsed += sstable.bytesOnDisk(); } - for (File dir : cfs.directories.getCFDirectories()) + for (File dir : cfs.getDirectories().getCFDirectories()) { for (File f : dir.listFiles()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java index 6de5bb9..89c0d61 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.IOException; import java.util.*; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; @@ -164,7 +165,7 @@ public class SSTableUtils return this; } - public SSTableReader write(Set<String> keys) throws IOException + public Collection<SSTableReader> write(Set<String> keys) throws IOException { Map<String, PartitionUpdate> map = new HashMap<>(); for (String key : keys) @@ -176,7 +177,7 @@ public class SSTableUtils return write(map); } - public SSTableReader write(SortedMap<DecoratedKey, PartitionUpdate> sorted) throws IOException + public Collection<SSTableReader> write(SortedMap<DecoratedKey, PartitionUpdate> sorted) throws IOException { final Iterator<Map.Entry<DecoratedKey, PartitionUpdate>> iter = sorted.entrySet().iterator(); return write(sorted.size(), new Appender() @@ -192,7 +193,7 @@ public class SSTableUtils }); } - public SSTableReader write(Map<String, PartitionUpdate> entries) throws IOException + public Collection<SSTableReader> write(Map<String, PartitionUpdate> entries) throws IOException { SortedMap<DecoratedKey, PartitionUpdate> sorted = new TreeMap<>(); for (Map.Entry<String, PartitionUpdate> entry : entries.entrySet()) @@ -201,18 +202,22 @@ public class SSTableUtils return write(sorted); } - public SSTableReader write(int expectedSize, Appender appender) throws IOException + public Collection<SSTableReader> write(int expectedSize, Appender appender) throws IOException { File datafile = (dest == null) ? tempSSTableFile(ksname, cfname, generation) : new File(dest.filenameFor(Component.DATA)); - SerializationHeader header = SerializationHeader.make(Schema.instance.getCFMetaData(ksname, cfname), Collections.EMPTY_LIST); - SSTableTxnWriter writer = SSTableTxnWriter.create(datafile.getAbsolutePath(), expectedSize, ActiveRepairService.UNREPAIRED_SSTABLE, 0, header); + CFMetaData cfm = Schema.instance.getCFMetaData(ksname, cfname); + ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.cfId); + SerializationHeader header = SerializationHeader.make(cfm, Collections.EMPTY_LIST); + SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, datafile.getAbsolutePath(), expectedSize, ActiveRepairService.UNREPAIRED_SSTABLE, 0, header); while (appender.append(writer)) { /* pass */ } - SSTableReader reader = writer.finish(true); + Collection<SSTableReader> readers = writer.finish(true); + // mark all components for removal if (cleanup) - for (Component component : reader.components) - new File(reader.descriptor.filenameFor(component)).deleteOnExit(); - return reader; + for (SSTableReader reader: readers) + for (Component component : reader.components) + new File(reader.descriptor.filenameFor(component)).deleteOnExit(); + return readers; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/schema/DefsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/schema/DefsTest.java b/test/unit/org/apache/cassandra/schema/DefsTest.java index c10865a..c3bcf1a 100644 --- a/test/unit/org/apache/cassandra/schema/DefsTest.java +++ b/test/unit/org/apache/cassandra/schema/DefsTest.java @@ -204,7 +204,7 @@ public class DefsTest ColumnFamilyStore store = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfName); assertNotNull(store); store.forceBlockingFlush(); - assertTrue(store.directories.sstableLister(Directories.OnTxnErr.THROW).list().size() > 0); + assertTrue(store.getDirectories().sstableLister(Directories.OnTxnErr.THROW).list().size() > 0); MigrationManager.announceColumnFamilyDrop(ks.name, cfm.cfName); @@ -226,7 +226,7 @@ public class DefsTest // verify that the files are gone. Supplier<Object> lambda = () -> { - for (File file : store.directories.sstableLister(Directories.OnTxnErr.THROW).listFiles()) + for (File file : store.getDirectories().sstableLister(Directories.OnTxnErr.THROW).listFiles()) { if (file.getPath().endsWith("Data.db") && !new File(file.getPath().replace("Data.db", "Compacted")).exists()) return false; @@ -275,7 +275,7 @@ public class DefsTest ColumnFamilyStore cfs = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfName); assertNotNull(cfs); cfs.forceBlockingFlush(); - assertTrue(!cfs.directories.sstableLister(Directories.OnTxnErr.THROW).list().isEmpty()); + assertTrue(!cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).list().isEmpty()); MigrationManager.announceKeyspaceDrop(ks.name);
