Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 d5326d4aa -> 0e63000c3 refs/heads/trunk 7e4737716 -> c0fd119ce
Reduce heap spent when receiving many SSTables patch by Paulo Motta; reviewed by yukim for CASSANDRA-10797 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e63000c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e63000c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e63000c Branch: refs/heads/cassandra-3.0 Commit: 0e63000c3fd0029e5b620a7923ea2ac54771e8e9 Parents: d5326d4 Author: Paulo Motta <[email protected]> Authored: Fri Dec 18 20:33:21 2015 -0600 Committer: Yuki Morishita <[email protected]> Committed: Sat Dec 19 08:35:12 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/streaming/StreamReceiveTask.java | 19 +- .../io/sstable/SSTableRewriterTest.java | 165 +-------------- .../cassandra/io/sstable/SSTableWriterTest.java | 200 +++++++++++++++++++ .../io/sstable/SSTableWriterTestBase.java | 166 +++++++++++++++ 5 files changed, 374 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d0f1613..ff139c4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.3 + * Reduce heap spent when receiving many SSTables (CASSANDRA-10797) * Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873) * Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653) * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-1837) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 0230d14..92a14d1 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -72,7 +72,7 @@ public class StreamReceiveTask extends StreamTask private boolean done = false; // holds references to SSTables received - protected Collection<SSTableMultiWriter> sstables; + protected Collection<SSTableReader> sstables; public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize) { @@ -97,7 +97,10 @@ public class StreamReceiveTask extends StreamTask assert cfId.equals(sstable.getCfId()); - sstables.add(sstable); + Collection<SSTableReader> finished = sstable.finish(true); + txn.update(finished, false); + sstables.addAll(finished); + if (sstables.size() == totalFiles) { done = true; @@ -134,7 +137,6 @@ public class StreamReceiveTask extends StreamTask if (kscf == null) { // schema was dropped during streaming - task.sstables.forEach(SSTableMultiWriter::abortOrDie); task.sstables.clear(); task.txn.abort(); task.session.taskCompleted(task); @@ -143,15 +145,7 @@ public class StreamReceiveTask extends StreamTask cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right)); - List<SSTableReader> readers = new ArrayList<>(); - for (SSTableMultiWriter writer : task.sstables) - { - Collection<SSTableReader> newReaders = writer.finish(true); - readers.addAll(newReaders); - task.txn.update(newReaders, false); - } - - task.sstables.clear(); + Collection<SSTableReader> readers = task.sstables; try (Refs<SSTableReader> refs = Refs.ref(readers)) { @@ -245,7 +239,6 @@ public class StreamReceiveTask extends StreamTask return; done = true; - sstables.forEach(SSTableMultiWriter::abortOrDie); txn.abort(); sstables.clear(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index bfe7b08..008df06 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -69,48 +69,8 @@ import org.apache.cassandra.utils.UUIDGen; import static org.junit.Assert.*; -public class SSTableRewriterTest extends SchemaLoader +public class SSTableRewriterTest extends SSTableWriterTestBase { - private static final String KEYSPACE = "SSTableRewriterTest"; - private static final String CF = "Standard1"; - - private static Config.DiskAccessMode standardMode; - private static Config.DiskAccessMode indexMode; - - @BeforeClass - public static void defineSchema() throws ConfigurationException - { - if (FBUtilities.isWindows()) - { - standardMode = DatabaseDescriptor.getDiskAccessMode(); - indexMode = DatabaseDescriptor.getIndexAccessMode(); - - DatabaseDescriptor.setDiskAccessMode(Config.DiskAccessMode.standard); - DatabaseDescriptor.setIndexAccessMode(Config.DiskAccessMode.standard); - } - - SchemaLoader.prepareServer(); - SchemaLoader.createKeyspace(KEYSPACE, - KeyspaceParams.simple(1), - SchemaLoader.standardCFMD(KEYSPACE, CF)); - } - - @AfterClass - public static void revertDiskAccess() - { - DatabaseDescriptor.setDiskAccessMode(standardMode); - DatabaseDescriptor.setIndexAccessMode(indexMode); - } - - @After - public void truncateCF() - { - Keyspace keyspace = Keyspace.open(KEYSPACE); - ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF); - store.truncateBlocking(); - LifecycleTransaction.waitForDeletions(); - } - @Test public void basicTest() throws InterruptedException { @@ -239,56 +199,6 @@ public class SSTableRewriterTest extends SchemaLoader } @Test - public void testFileRemoval() throws InterruptedException - { - Keyspace keyspace = Keyspace.open(KEYSPACE); - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - truncate(cfs); - - File dir = cfs.getDirectories().getDirectoryForNewSSTables(); - LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE); - try (SSTableWriter writer = getWriter(cfs, dir, txn)) - { - for (int i = 0; i < 10000; i++) - { - UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1); - for (int j = 0; j < 100; j++) - builder.newRow("" + j).add("val", ByteBuffer.allocate(1000)); - writer.append(builder.build().unfilteredIterator()); - } - - SSTableReader s = writer.setMaxDataAge(1000).openEarly(); - assert s != null; - assertFileCounts(dir.list()); - for (int i = 10000; i < 20000; i++) - { - UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1); - for (int j = 0; j < 100; j++) - builder.newRow("" + j).add("val", ByteBuffer.allocate(1000)); - writer.append(builder.build().unfilteredIterator()); - } - SSTableReader s2 = writer.setMaxDataAge(1000).openEarly(); - assertTrue(s.last.compareTo(s2.last) < 0); - assertFileCounts(dir.list()); - s.selfRef().release(); - s2.selfRef().release(); - // These checks don't work on Windows because the writer has the channel still - // open till .abort() is called (via the builder) - if (!FBUtilities.isWindows()) - { - LifecycleTransaction.waitForDeletions(); - assertFileCounts(dir.list()); - } - writer.abort(); - txn.abort(); - LifecycleTransaction.waitForDeletions(); - int datafiles = assertFileCounts(dir.list()); - assertEquals(datafiles, 0); - validateCFS(cfs); - } - } - - @Test public void testNumberOfFilesAndSizes() throws Exception { Keyspace keyspace = Keyspace.open(KEYSPACE); @@ -919,16 +829,6 @@ public class SSTableRewriterTest extends SchemaLoader } } - public static void truncate(ColumnFamilyStore cfs) - { - cfs.truncateBlocking(); - LifecycleTransaction.waitForDeletions(); - Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS); - assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount()); - assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount()); - validateCFS(cfs); - } - public static SSTableReader writeFile(ColumnFamilyStore cfs, int count) { return Iterables.getFirst(writeFiles(cfs, 1, count * 5, count / 100, 1000), null); @@ -959,67 +859,4 @@ public class SSTableRewriterTest extends SchemaLoader } return result; } - - public static void validateCFS(ColumnFamilyStore cfs) - { - Set<Integer> liveDescriptors = new HashSet<>(); - long spaceUsed = 0; - for (SSTableReader sstable : cfs.getLiveSSTables()) - { - assertFalse(sstable.isMarkedCompacted()); - assertEquals(1, sstable.selfRef().globalCount()); - liveDescriptors.add(sstable.descriptor.generation); - spaceUsed += sstable.bytesOnDisk(); - } - for (File dir : cfs.getDirectories().getCFDirectories()) - { - for (File f : dir.listFiles()) - { - if (f.getName().contains("Data")) - { - Descriptor d = Descriptor.fromFilename(f.getAbsolutePath()); - assertTrue(d.toString(), liveDescriptors.contains(d.generation)); - } - } - } - assertEquals(spaceUsed, cfs.metric.liveDiskSpaceUsed.getCount()); - assertEquals(spaceUsed, cfs.metric.totalDiskSpaceUsed.getCount()); - assertTrue(cfs.getTracker().getCompacting().isEmpty()); - } - - public static int assertFileCounts(String [] files) - { - int tmplinkcount = 0; - int tmpcount = 0; - int datacount = 0; - for (String f : files) - { - if (f.endsWith("-CRC.db")) - continue; - if (f.contains("tmplink-")) - tmplinkcount++; - else if (f.contains("tmp-")) - tmpcount++; - else if (f.contains("Data")) - datacount++; - } - assertEquals(0, tmplinkcount); - assertEquals(0, tmpcount); - return datacount; - } - - public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn) - { - String filename = cfs.getSSTablePath(directory); - return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn); - } - - public static ByteBuffer random(int i, int size) - { - byte[] bytes = new byte[size + 4]; - ThreadLocalRandom.current().nextBytes(bytes); - ByteBuffer r = ByteBuffer.wrap(bytes); - r.putInt(0, i); - return r; - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java new file mode 100644 index 0000000..a73a164 --- /dev/null +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable; + +import java.io.File; +import java.nio.ByteBuffer; + +import org.junit.Test; + +import org.apache.cassandra.UpdateBuilder; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.utils.FBUtilities; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class SSTableWriterTest extends SSTableWriterTestBase +{ + @Test + public void testAbortTxnWithOpenEarlyShouldRemoveSSTable() throws InterruptedException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + truncate(cfs); + + File dir = cfs.getDirectories().getDirectoryForNewSSTables(); + LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE); + try (SSTableWriter writer = getWriter(cfs, dir, txn)) + { + for (int i = 0; i < 10000; i++) + { + UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1); + for (int j = 0; j < 100; j++) + builder.newRow("" + j).add("val", ByteBuffer.allocate(1000)); + writer.append(builder.build().unfilteredIterator()); + } + + SSTableReader s = writer.setMaxDataAge(1000).openEarly(); + assert s != null; + assertFileCounts(dir.list()); + for (int i = 10000; i < 20000; i++) + { + UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1); + for (int j = 0; j < 100; j++) + builder.newRow("" + j).add("val", ByteBuffer.allocate(1000)); + writer.append(builder.build().unfilteredIterator()); + } + SSTableReader s2 = writer.setMaxDataAge(1000).openEarly(); + assertTrue(s.last.compareTo(s2.last) < 0); + assertFileCounts(dir.list()); + s.selfRef().release(); + s2.selfRef().release(); + + int datafiles = assertFileCounts(dir.list()); + assertEquals(datafiles, 1); + + // These checks don't work on Windows because the writer has the channel still + // open till .abort() is called (via the builder) + if (!FBUtilities.isWindows()) + { + LifecycleTransaction.waitForDeletions(); + assertFileCounts(dir.list()); + } + writer.abort(); + txn.abort(); + LifecycleTransaction.waitForDeletions(); + datafiles = assertFileCounts(dir.list()); + assertEquals(datafiles, 0); + validateCFS(cfs); + } + } + + + @Test + public void testAbortTxnWithClosedWriterShouldRemoveSSTable() throws InterruptedException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + truncate(cfs); + + File dir = cfs.getDirectories().getDirectoryForNewSSTables(); + LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM); + try (SSTableWriter writer = getWriter(cfs, dir, txn)) + { + for (int i = 0; i < 10000; i++) + { + UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1); + for (int j = 0; j < 100; j++) + builder.newRow("" + j).add("val", ByteBuffer.allocate(1000)); + writer.append(builder.build().unfilteredIterator()); + } + + assertFileCounts(dir.list()); + for (int i = 10000; i < 20000; i++) + { + UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1); + for (int j = 0; j < 100; j++) + builder.newRow("" + j).add("val", ByteBuffer.allocate(1000)); + writer.append(builder.build().unfilteredIterator()); + } + SSTableReader sstable = writer.finish(true); + int datafiles = assertFileCounts(dir.list()); + assertEquals(datafiles, 1); + + sstable.selfRef().release(); + // These checks don't work on Windows because the writer has the channel still + // open till .abort() is called (via the builder) + if (!FBUtilities.isWindows()) + { + LifecycleTransaction.waitForDeletions(); + assertFileCounts(dir.list()); + } + + txn.abort(); + LifecycleTransaction.waitForDeletions(); + datafiles = assertFileCounts(dir.list()); + assertEquals(datafiles, 0); + validateCFS(cfs); + } + } + + @Test + public void testAbortTxnWithClosedAndOpenWriterShouldRemoveAllSSTables() throws InterruptedException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + truncate(cfs); + + File dir = cfs.getDirectories().getDirectoryForNewSSTables(); + LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM); + + SSTableWriter writer1 = getWriter(cfs, dir, txn); + SSTableWriter writer2 = getWriter(cfs, dir, txn); + try + { + for (int i = 0; i < 10000; i++) + { + UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1); + for (int j = 0; j < 100; j++) + builder.newRow("" + j).add("val", ByteBuffer.allocate(1000)); + writer1.append(builder.build().unfilteredIterator()); + } + + assertFileCounts(dir.list()); + for (int i = 10000; i < 20000; i++) + { + UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1); + for (int j = 0; j < 100; j++) + builder.newRow("" + j).add("val", ByteBuffer.allocate(1000)); + writer2.append(builder.build().unfilteredIterator()); + } + SSTableReader sstable = writer1.finish(true); + txn.update(sstable, false); + + assertFileCounts(dir.list()); + + int datafiles = assertFileCounts(dir.list()); + assertEquals(datafiles, 2); + + // These checks don't work on Windows because the writer has the channel still + // open till .abort() is called (via the builder) + if (!FBUtilities.isWindows()) + { + LifecycleTransaction.waitForDeletions(); + assertFileCounts(dir.list()); + } + txn.abort(); + LifecycleTransaction.waitForDeletions(); + datafiles = assertFileCounts(dir.list()); + assertEquals(datafiles, 0); + validateCFS(cfs); + } + finally + { + writer1.close(); + writer2.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java new file mode 100644 index 0000000..0af743d --- /dev/null +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.FBUtilities; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class SSTableWriterTestBase extends SchemaLoader +{ + + protected static final String KEYSPACE = "SSTableRewriterTest"; + protected static final String CF = "Standard1"; + + private static Config.DiskAccessMode standardMode; + private static Config.DiskAccessMode indexMode; + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + if (FBUtilities.isWindows()) + { + standardMode = DatabaseDescriptor.getDiskAccessMode(); + indexMode = DatabaseDescriptor.getIndexAccessMode(); + + DatabaseDescriptor.setDiskAccessMode(Config.DiskAccessMode.standard); + DatabaseDescriptor.setIndexAccessMode(Config.DiskAccessMode.standard); + } + + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE, CF)); + } + + @AfterClass + public static void revertDiskAccess() + { + DatabaseDescriptor.setDiskAccessMode(standardMode); + DatabaseDescriptor.setIndexAccessMode(indexMode); + } + + @After + public void truncateCF() + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF); + store.truncateBlocking(); + LifecycleTransaction.waitForDeletions(); + } + + public static void truncate(ColumnFamilyStore cfs) + { + cfs.truncateBlocking(); + LifecycleTransaction.waitForDeletions(); + Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS); + assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount()); + assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount()); + validateCFS(cfs); + } + + public static void validateCFS(ColumnFamilyStore cfs) + { + Set<Integer> liveDescriptors = new HashSet<>(); + long spaceUsed = 0; + for (SSTableReader sstable : cfs.getLiveSSTables()) + { + assertFalse(sstable.isMarkedCompacted()); + assertEquals(1, sstable.selfRef().globalCount()); + liveDescriptors.add(sstable.descriptor.generation); + spaceUsed += sstable.bytesOnDisk(); + } + for (File dir : cfs.getDirectories().getCFDirectories()) + { + for (File f : dir.listFiles()) + { + if (f.getName().contains("Data")) + { + Descriptor d = Descriptor.fromFilename(f.getAbsolutePath()); + assertTrue(d.toString(), liveDescriptors.contains(d.generation)); + } + } + } + assertEquals(spaceUsed, cfs.metric.liveDiskSpaceUsed.getCount()); + assertEquals(spaceUsed, cfs.metric.totalDiskSpaceUsed.getCount()); + assertTrue(cfs.getTracker().getCompacting().isEmpty()); + } + + public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn) + { + String filename = cfs.getSSTablePath(directory); + return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn); + } + + public static ByteBuffer random(int i, int size) + { + byte[] bytes = new byte[size + 4]; + ThreadLocalRandom.current().nextBytes(bytes); + ByteBuffer r = ByteBuffer.wrap(bytes); + r.putInt(0, i); + return r; + } + + public static int assertFileCounts(String [] files) + { + int tmplinkcount = 0; + int tmpcount = 0; + int datacount = 0; + for (String f : files) + { + if (f.endsWith("-CRC.db")) + continue; + if (f.contains("tmplink-")) + tmplinkcount++; + else if (f.contains("tmp-")) + tmpcount++; + else if (f.contains("Data")) + datacount++; + } + assertEquals(0, tmplinkcount); + assertEquals(0, tmpcount); + return datacount; + } +}
