This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new c9e4027 Add compaction allocation measurement test c9e4027 is described below commit c9e40277f0389a773b21d4310a85a8c761932d61 Author: Blake Eggleston <bdeggles...@gmail.com> AuthorDate: Tue Oct 15 13:04:26 2019 -0700 Add compaction allocation measurement test Patch by Blake Eggleston, reviewed by Benedict Elliott Smith and David Capwell for CASSANDRA-15388 --- CHANGES.txt | 1 + build.xml | 13 + ide/idea-iml-file.xml | 1 + .../db/compaction/CompactionAllocationTest.java | 767 +++++++++++++++++++++ 4 files changed, 782 insertions(+) diff --git a/CHANGES.txt b/CHANGES.txt index d863df2..c6a2eac 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-alpha4 + * Add compaction allocation measurement test (CASSANDRA-15388) * Added UnleveledSSTables global and table level metric (CASSANDRA-15620) * Added Virtual Table exposing Cassandra relevant system properties (CASSANDRA-15616) * Add data modeling introduction (CASSANDRA-15481) diff --git a/build.xml b/build.xml index 9c4f5b0..bd1f557 100644 --- a/build.xml +++ b/build.xml @@ -60,6 +60,7 @@ <property name="test.unit.src" value="${test.dir}/unit"/> <property name="test.long.src" value="${test.dir}/long"/> <property name="test.burn.src" value="${test.dir}/burn"/> + <property name="test.memory.src" value="${test.dir}/memory"/> <property name="test.microbench.src" value="${test.dir}/microbench"/> <property name="test.distributed.src" value="${test.dir}/distributed"/> <property name="test.compression_algo" value="LZ4"/> @@ -96,6 +97,7 @@ <property name="maven-repository-id" value="apache.snapshots.https"/> <property name="test.timeout" value="240000" /> + <property name="test.memory.timeout" value="480000" /> <property name="test.long.timeout" value="600000" /> <property name="test.burn.timeout" value="60000000" /> <property name="test.distributed.timeout" value="360000" /> @@ -118,6 +120,7 @@ <property name="ecj.version" value="4.6.1"/> <property name="ohc.version" value="0.5.1"/> <property name="asm.version" value="7.1"/> + <property name="allocation-instrumenter.version" value="3.1.0"/> <!-- https://mvnrepository.com/artifact/net.openhft/chronicle-bom/1.16.23 --> <property name="chronicle-queue.version" value="4.16.3" /> @@ -539,6 +542,7 @@ <dependency groupId="org.yaml" artifactId="snakeyaml" version="1.11"/> <dependency groupId="junit" artifactId="junit" version="4.12" /> <dependency groupId="org.quicktheories" artifactId="quicktheories" version="0.25" /> + <dependency groupId="com.google.code.java-allocation-instrumenter" artifactId="java-allocation-instrumenter" version="${allocation-instrumenter.version}" /> <dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10"> <exclusion groupId="commons-lang" artifactId="commons-lang"/> </dependency> @@ -672,6 +676,7 @@ version="${version}"/> <dependency groupId="junit" artifactId="junit"/> <dependency groupId="org.quicktheories" artifactId="quicktheories" /> + <dependency groupId="com.google.code.java-allocation-instrumenter" artifactId="java-allocation-instrumenter" version="${allocation-instrumenter.version}" /> <dependency groupId="org.psjava" artifactId="psjava" version="0.1.19" /> <dependency groupId="org.apache.rat" artifactId="apache-rat"/> <dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/> @@ -1324,6 +1329,7 @@ <src path="${test.unit.src}"/> <src path="${test.long.src}"/> <src path="${test.burn.src}"/> + <src path="${test.memory.src}"/> <src path="${test.microbench.src}"/> <src path="${test.distributed.src}"/> </javac> @@ -1620,6 +1626,13 @@ </testmacro> </target> + <target name="test-memory" depends="build-test" description="Execute functional tests"> + <testmacro inputdir="${test.memory.src}" + timeout="${test.memory.timeout}"> + <jvmarg value="-javaagent:${build.dir}/lib/jars/java-allocation-instrumenter-${allocation-instrumenter.version}.jar"/> + </testmacro> + </target> + <target name="cql-test" depends="build-test" description="Execute CQL tests"> <sequential> <echo message="running CQL tests"/> diff --git a/ide/idea-iml-file.xml b/ide/idea-iml-file.xml index 17d4c5b..86827c3 100644 --- a/ide/idea-iml-file.xml +++ b/ide/idea-iml-file.xml @@ -32,6 +32,7 @@ <sourceFolder url="file://$MODULE_DIR$/tools/fqltool/test/unit" isTestSource="true" /> <sourceFolder url="file://$MODULE_DIR$/test/unit" isTestSource="true" /> <sourceFolder url="file://$MODULE_DIR$/test/long" isTestSource="true" /> + <sourceFolder url="file://$MODULE_DIR$/test/memory" isTestSource="true" /> <sourceFolder url="file://$MODULE_DIR$/test/microbench" isTestSource="true" /> <sourceFolder url="file://$MODULE_DIR$/test/burn" isTestSource="true" /> <sourceFolder url="file://$MODULE_DIR$/test/distributed" isTestSource="true" /> diff --git a/test/memory/org/apache/cassandra/db/compaction/CompactionAllocationTest.java b/test/memory/org/apache/cassandra/db/compaction/CompactionAllocationTest.java new file mode 100644 index 0000000..a58303d --- /dev/null +++ b/test/memory/org/apache/cassandra/db/compaction/CompactionAllocationTest.java @@ -0,0 +1,767 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.monitoring.runtime.instrumentation.AllocationRecorder; +import com.google.monitoring.runtime.instrumentation.Sampler; +import com.sun.management.ThreadMXBean; +import org.apache.cassandra.OrderedJUnit4ClassRunner; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.statements.SelectStatement; +import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ReadExecutionController; +import org.apache.cassandra.db.ReadQuery; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.UnbufferedDataOutputStreamPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.ObjectSizes; + +@RunWith(OrderedJUnit4ClassRunner.class) +public class CompactionAllocationTest +{ + private static final Logger logger = LoggerFactory.getLogger(CompactionAllocationTest.class); + private static final ThreadMXBean threadMX = (ThreadMXBean) ManagementFactory.getThreadMXBean(); + + private static final boolean AGENT_MEASUREMENT = true; + + private static final boolean PROFILING_READS = false; + private static final boolean PROFILING_COMPACTION = false; + private static final boolean PROFILING = PROFILING_READS || PROFILING_COMPACTION; + private static final List<String> summaries = new ArrayList<>(); + + private static class CompactionSummary + { + final Measurement measurement; + final int numPartitions; + final int numRows; + + public CompactionSummary(Measurement measurement, int numPartitions, int numRows) + { + this.measurement = measurement; + this.numPartitions = numPartitions; + this.numRows = numRows; + } + + List<String> cells() + { + long b = measurement.bytes(); + return Lists.newArrayList(Long.toString(b), Long.toString(b/numPartitions), Long.toString(b/numRows)); + } + + static final List<String> HEADERS = Lists.newArrayList("bytes", "/p", "/r"); + static final List<String> EMPTY = Lists.newArrayList("n/a", "n/a", "n/a"); + } + + private static class ReadSummary + { + final Measurement measurement; + final int numReads; + + public ReadSummary(Measurement measurement, int numReads) + { + this.measurement = measurement; + this.numReads = numReads; + } + + List<String> cells() + { + long b = measurement.bytes(); + return Lists.newArrayList(Long.toString(b), Long.toString(b/numReads)); + } + static final List<String> HEADERS = Lists.newArrayList("bytes", "/rd"); + static final List<String> EMPTY = Lists.newArrayList("n/a", "n/a"); + } + + private static final Map<String, CompactionSummary> compactionSummaries = new HashMap<>(); + private static final Map<String, ReadSummary> readSummaries = new HashMap<>(); + + /* + add to jvm args: + -javaagent:${build.dir}/lib/jars/java-allocation-instrumenter-${allocation-instrumenter.version}.jar + */ + + private static final long MIN_OBJECTS_ALLOCATED; + private static final long MIN_BYTES_ALLOCATED; + + static + { + if (AGENT_MEASUREMENT) + { + AgentMeasurement measurement = new AgentMeasurement(); + measurement.start(); + measurement.stop(); + MIN_OBJECTS_ALLOCATED = measurement.objectsAllocated; + MIN_BYTES_ALLOCATED = measurement.bytesAllocated; + } + else + { + MIN_OBJECTS_ALLOCATED = 0; + MIN_BYTES_ALLOCATED = 0; + logger.warn("{} is using the ThreadMXBean to measure memory usage, this is less accurate than the allocation instrumenter agent", CompactionAllocationTest.class.getSimpleName()); + logger.warn("If you're running this in your IDE, add the following jvm arg: " + + "-javaagent:<build.dir>/lib/jars/java-allocation-instrumenter-<allocation-instrumenter.version>.jar " + + "(and replace <> with appropriate values from build.xml)"); + } + } + + @BeforeClass + public static void setupClass() throws Throwable + { + SchemaLoader.prepareServer(); + SchemaLoader.startGossiper(); + testTinyPartitions("warmup", 9, maybeInflate(300), true); + } + + @AfterClass + public static void afterClass() + { + + logger.info("SUMMARIES:"); + for (String summary : summaries) + logger.info(summary); + + + List<List<String>> groups = new ArrayList<>(); + groups.add(Lists.newArrayList("tinyNonOverlapping3", + "tinyNonOverlapping9", + "tinyOverlapping3", + "tinyOverlapping9")); + groups.add(Lists.newArrayList("mediumNonOverlappingPartitions3", + "mediumNonOverlappingPartitions9", + "mediumOverlappingPartitions3", + "mediumOverlappingPartitions9", + "mediumPartitionsOverlappingRows3", + "mediumPartitionsOverlappingRows9")); + groups.add(Lists.newArrayList("wideNonOverlappingPartitions3", + "wideNonOverlappingPartitions9", + "wideOverlappingPartitions3", + "wideOverlappingPartitions9", + "widePartitionsOverlappingRows9", + "widePartitionsOverlappingRows3")); + + Map<String, List<String>> fullRows = new HashMap<>(); + for (String workload : Iterables.concat(groups)) + { + CompactionSummary cs = compactionSummaries.get(workload); + ReadSummary rs = readSummaries.get(workload); + fullRows.put(workload, Lists.newArrayList(Iterables.concat(cs != null ? cs.cells() : CompactionSummary.EMPTY, + rs != null ? rs.cells() : ReadSummary.EMPTY))); + } + logger.info(""); + logger.info("TAB DELIMITED:"); + String header = Joiner.on('\t').join(Iterables.concat(CompactionSummary.HEADERS, ReadSummary.HEADERS)); + for (List<String> group: groups) + { + logger.info(Joiner.on('\t').join(group)); + logger.info(header); + logger.info(Joiner.on('\t').join(Iterables.concat(Iterables.transform(group, g -> fullRows.getOrDefault(g, Collections.emptyList()))))); + } + } + + private static int maybeInflate(int base, int inflate) + { + return PROFILING ? base * inflate : base; + } + + private static int maybeInflate(int base) + { + return maybeInflate(base, 3); + } + + private interface Workload + { + void setup(); + ColumnFamilyStore getCfs(); + String name(); + List<Runnable> getReads(); + } + + private static Measurement createMeasurement() + { + return AGENT_MEASUREMENT ? new AgentMeasurement() : new MXMeasurement(); + } + + private interface Measurement + { + void start(); + + void stop(); + + long cpu(); + + long bytes(); + + long objects(); + + default String prettyBytes() + { + return FBUtilities.prettyPrintMemory(bytes()); + } + + } + + public static class AgentMeasurement implements Measurement, Sampler + { + long objectsAllocated = 0; + long bytesAllocated = 0; + + private final long threadID = Thread.currentThread().getId(); + + public void sampleAllocation(int count, String desc, Object newObj, long bytes) + { + if (Thread.currentThread().getId() != threadID) + return; + objectsAllocated++; + bytesAllocated += bytes; + } + + public void start() + { + AllocationRecorder.addSampler(this); + } + + public void stop() + { + AllocationRecorder.removeSampler(this); + if (bytesAllocated == 0) + logger.warn("no allocations recorded, make sure junit is run with -javaagent:${build.dir}/lib/jars/java-allocation-instrumenter-${allocation-instrumenter.version}.jar"); + } + + public long cpu() + { + return 0; + } + + public long objects() + { + return objectsAllocated - MIN_OBJECTS_ALLOCATED; + } + + public long bytes() + { + return bytesAllocated - MIN_BYTES_ALLOCATED; + } + } + + public static class MXMeasurement implements Measurement + { + private final Thread thread = Thread.currentThread(); + + private class Point + { + long bytes; + long cpu; + + void capture() + { + bytes = threadMX.getThreadAllocatedBytes(thread.getId()); + cpu = threadMX.getThreadCpuTime(thread.getId()); + } + } + + private final Point start = new Point(); + private final Point stop = new Point(); + + public void start() + { + start.capture(); + } + + public void stop() + { + stop.capture(); + } + + public long cpu() + { + return stop.cpu - start.cpu; + } + + public long bytes() + { + return stop.bytes - start.bytes; + } + + public long objects() + { + return 0; + } + } + + @Test + public void allocMeasuring() + { + long size = ObjectSizes.measure(5); + int numAlloc = 1000; + + Measurement measurement = createMeasurement(); + measurement.start(); + for (int i=0; i<numAlloc; i++) + new Integer(i); + + measurement.stop(); + logger.info(" ** {}", measurement.prettyBytes()); + logger.info(" ** expected {}", size * numAlloc); + } + + private static void measure(Workload workload) throws Throwable + { + workload.setup(); + + Measurement readSampler = createMeasurement(); + Measurement compactionSampler = createMeasurement(); + + String readSummary = "SKIPPED"; + if (!PROFILING_COMPACTION) + { + List<Runnable> reads = workload.getReads(); + readSampler.start(); + if (PROFILING_READS && !workload.name().equals("warmup")) + { + logger.info(">>> Start profiling"); + Thread.sleep(10000); + } + for (int i=0; i<reads.size(); i++) + reads.get(i).run(); + Thread.sleep(1000); + if (PROFILING_READS && !workload.name().equals("warmup")) + { + logger.info(">>> Stop profiling"); + Thread.sleep(10000); + } + readSampler.stop(); + + readSummary = String.format("%s bytes, %s /read, %s cpu", readSampler.bytes(), readSampler.bytes()/reads.size(), readSampler.cpu()); + readSummaries.put(workload.name(), new ReadSummary(readSampler, reads.size())); + } + + ColumnFamilyStore cfs = workload.getCfs(); + ActiveCompactions active = new ActiveCompactions(); + Set<SSTableReader> sstables = cfs.getLiveSSTables(); + + CompactionTasks tasks = cfs.getCompactionStrategyManager() + .getUserDefinedTasks(sstables, FBUtilities.nowInSeconds()); + Assert.assertFalse(tasks.isEmpty()); + + String compactionSummary = "SKIPPED"; + if (!PROFILING_READS) + { + compactionSampler.start(); + if (PROFILING_COMPACTION && !workload.name().equals("warmup")) + { + logger.info(">>> Start profiling"); + Thread.sleep(10000); + } + for (AbstractCompactionTask task : tasks) + task.execute(active); + Thread.sleep(1000); + if (PROFILING_COMPACTION && !workload.name().equals("warmup")) + { + logger.info(">>> Stop profiling"); + Thread.sleep(10000); + } + compactionSampler.stop(); + + Assert.assertEquals(1, cfs.getLiveSSTables().size()); + int numPartitions = Ints.checkedCast(Iterables.getOnlyElement(cfs.getLiveSSTables()).getSSTableMetadata().estimatedPartitionSize.count()); + int numRows = Ints.checkedCast(Iterables.getOnlyElement(cfs.getLiveSSTables()).getSSTableMetadata().totalRows); + + compactionSummary = String.format("%s bytes, %s /partition, %s /row, %s cpu", compactionSampler.bytes(), compactionSampler.bytes()/numPartitions, compactionSampler.bytes()/numRows, compactionSampler.cpu()); + compactionSummaries.put(workload.name(), new CompactionSummary(compactionSampler, numPartitions, numRows)); + } + + cfs.truncateBlocking(); + + logger.info("***"); + logger.info("*** {} reads summary", workload.name()); + logger.info(readSummary); + logger.info("*** {} compaction summary", workload.name()); + logger.info(compactionSummary); + if (!workload.name().equals("warmup")) + { + summaries.add(workload.name() + " reads summary: " + readSummary); + summaries.add(workload.name() + " compaction summary: " + compactionSummary); + } + Thread.sleep(1000); // avoid losing report when running in IDE + } + + private static final DataOutputPlus NOOP_OUT = new UnbufferedDataOutputStreamPlus() + { + public void write(byte[] buffer, int offset, int count) throws IOException {} + + public void write(int oneByte) throws IOException {} + }; + + private static void runQuery(ReadQuery query, TableMetadata metadata) + { + try (ReadExecutionController executionController = query.executionController(); + UnfilteredPartitionIterator partitions = query.executeLocally(executionController)) + { + UnfilteredPartitionIterators.serializerForIntraNode().serialize(partitions, ColumnFilter.all(metadata), NOOP_OUT, MessagingService.current_version); + } + catch (IOException e) + { + throw new AssertionError(e); + } + } + + private static void testTinyPartitions(String name, int numSSTable, int sstablePartitions, boolean overlap) throws Throwable + { + String ksname = "ks_" + name.toLowerCase(); + + SchemaLoader.createKeyspace(ksname, KeyspaceParams.simple(1), + CreateTableStatement.parse("CREATE TABLE tbl (k INT PRIMARY KEY, v INT)", ksname).build()); + + ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata(ksname, "tbl").id); + Assert.assertNotNull(cfs); + cfs.disableAutoCompaction(); + List<Runnable> reads = new ArrayList<>(numSSTable * (overlap ? 1 : sstablePartitions)); + + measure(new Workload() + { + public void setup() + { + cfs.disableAutoCompaction(); + String insert = String.format("INSERT INTO %s.%s (k, v) VALUES (?,?)", ksname, "tbl"); + String read = String.format("SELECT * FROM %s.%s WHERE k = ?", ksname, "tbl"); + SelectStatement select = (SelectStatement) QueryProcessor.parseStatement(read).prepare(ClientState.forInternalCalls()); + QueryState queryState = QueryState.forInternalCalls(); + for (int f=0; f<numSSTable; f++) + { + for (int p = 0; p < sstablePartitions; p++) + { + int key = overlap ? p : (f * sstablePartitions) + p; + QueryProcessor.executeInternal(insert, key, key); + if (!overlap || f == 0) + { + QueryOptions options = QueryProcessor.makeInternalOptions(select, new Object[]{f}); + ReadQuery query = select.getQuery(options, queryState.getNowInSeconds()); + reads.add(() -> runQuery(query, cfs.metadata.get())); + } + } + cfs.forceBlockingFlush(); + } + + Assert.assertEquals(numSSTable, cfs.getLiveSSTables().size()); + } + + public List<Runnable> getReads() + { + return reads; + } + + public ColumnFamilyStore getCfs() + { + return cfs; + } + + public String name() + { + return name; + } + }); + } + + @Test + public void tinyNonOverlapping3() throws Throwable + { + testTinyPartitions("tinyNonOverlapping3", 3, maybeInflate(900, 6), false); + } + + @Test + public void tinyNonOverlapping9() throws Throwable + { + testTinyPartitions("tinyNonOverlapping9", 9, maybeInflate(300, 6), false); + } + + @Test + public void tinyOverlapping3() throws Throwable + { + testTinyPartitions("tinyOverlapping3", 3, maybeInflate(900, 6), true); + } + + @Test + public void tinyOverlapping9() throws Throwable + { + testTinyPartitions("tinyOverlapping9", 9, maybeInflate(300, 6), true); + } + + private static final Random globalRandom = new Random(); + private static final Random localRandom = new Random(); + + public static String makeRandomString(int length) + { + return makeRandomString(length, -1); + + } + + public static String makeRandomString(int length, int seed) + { + Random r; + if (seed < 0) + { + r = globalRandom; + } + else + { + r = localRandom; + r.setSeed(seed); + } + + char[] chars = new char[length]; + for (int i = 0; i < length; ++i) + chars[i] = (char) ('a' + r.nextInt('z' - 'a' + 1)); + return new String(chars); + } + + private static void testMediumPartitions(String name, int numSSTable, int sstablePartitions, boolean overlap, boolean overlapCK) throws Throwable + { + String ksname = "ks_" + name.toLowerCase(); + + SchemaLoader.createKeyspace(ksname, KeyspaceParams.simple(1), + CreateTableStatement.parse("CREATE TABLE tbl (k text, c text, v1 text, v2 text, v3 text, v4 text, PRIMARY KEY (k, c))", ksname).build()); + + ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata(ksname, "tbl").id); + Assert.assertNotNull(cfs); + cfs.disableAutoCompaction(); + int rowsPerPartition = 200; + List<Runnable> reads = new ArrayList<>(numSSTable * (overlap ? 1 : sstablePartitions)); + measure(new Workload() + { + public void setup() + { + cfs.disableAutoCompaction(); + String insert = String.format("INSERT INTO %s.%s (k, c, v1, v2, v3, v4) VALUES (?, ?, ?, ?, ?, ?)", ksname, "tbl"); + String read = String.format("SELECT * FROM %s.%s WHERE k = ?", ksname, "tbl"); + SelectStatement select = (SelectStatement) QueryProcessor.parseStatement(read).prepare(ClientState.forInternalCalls()); + QueryState queryState = QueryState.forInternalCalls(); + for (int f=0; f<numSSTable; f++) + { + for (int p = 0; p < sstablePartitions; p++) + { + String key = String.format("%08d", overlap ? p : (f * sstablePartitions) + p); + for (int r=0; r<rowsPerPartition; r++) + { + QueryProcessor.executeInternal(insert, key, makeRandomString(6, overlapCK ? r : -1), + makeRandomString(8), makeRandomString(8), + makeRandomString(8), makeRandomString(8)); + + } + if (!overlap || f == 0) + { + QueryOptions options = QueryProcessor.makeInternalOptions(select, new Object[]{key}); + ReadQuery query = select.getQuery(options, queryState.getNowInSeconds()); + reads.add(() -> runQuery(query, cfs.metadata.get())); + } + } + cfs.forceBlockingFlush(); + } + + Assert.assertEquals(numSSTable, cfs.getLiveSSTables().size()); + } + + public ColumnFamilyStore getCfs() + { + return cfs; + } + + public List<Runnable> getReads() + { + return reads; + } + + public String name() + { + return name; + } + }); + } + + @Test + public void mediumNonOverlappingPartitions3() throws Throwable + { + testMediumPartitions("mediumNonOverlappingPartitions3", 3, maybeInflate(60), false, false); + } + + @Test + public void mediumNonOverlappingPartitions9() throws Throwable + { + testMediumPartitions("mediumNonOverlappingPartitions9", 9, maybeInflate(20), false, false); + } + + @Test + public void mediumOverlappingPartitions3() throws Throwable + { + testMediumPartitions("mediumOverlappingPartitions3", 3, maybeInflate(60), true, false); + } + + @Test + public void mediumOverlappingPartitions9() throws Throwable + { + testMediumPartitions("mediumOverlappingPartitions9", 9, maybeInflate(20), true, false); + } + + @Test + public void mediumPartitionsOverlappingRows3() throws Throwable + { + testMediumPartitions("mediumPartitionsOverlappingRows3", 3, maybeInflate(60), true, true); + } + + @Test + public void mediumPartitionsOverlappingRows9() throws Throwable + { + testMediumPartitions("mediumPartitionsOverlappingRows9", 9, maybeInflate(20), true, true); + } + + private static void testWidePartitions(String name, int numSSTable, int sstablePartitions, boolean overlap, boolean overlapCK) throws Throwable + { + String ksname = "ks_" + name.toLowerCase(); + + SchemaLoader.createKeyspace(ksname, KeyspaceParams.simple(1), + CreateTableStatement.parse("CREATE TABLE tbl (k text, c text, v1 text, v2 text, v3 text, v4 text, PRIMARY KEY (k, c))", ksname).build()); + + ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata(ksname, "tbl").id); + Assert.assertNotNull(cfs); + cfs.disableAutoCompaction(); + int rowWidth = 100; + int rowsPerPartition = 1000; + List<Runnable> reads = new ArrayList<>(numSSTable * (overlap ? 1 : sstablePartitions)); + + measure(new Workload() + { + public void setup() + { + cfs.disableAutoCompaction(); + String insert = String.format("INSERT INTO %s.%s (k, c, v1, v2, v3, v4) VALUES (?, ?, ?, ?, ?, ?)", ksname, "tbl"); + String read = String.format("SELECT * FROM %s.%s WHERE k = ?", ksname, "tbl"); + SelectStatement select = (SelectStatement) QueryProcessor.parseStatement(read).prepare(ClientState.forInternalCalls()); + QueryState queryState = QueryState.forInternalCalls(); + for (int f=0; f<numSSTable; f++) + { + for (int p = 0; p < sstablePartitions; p++) + { + String key = String.format("%08d", overlap ? p : (f * sstablePartitions) + p); + for (int r=0; r<rowsPerPartition; r++) + { + QueryProcessor.executeInternal(insert , key, makeRandomString(6, overlapCK ? r : -1), + makeRandomString(rowWidth>>2), makeRandomString(rowWidth>>2), + makeRandomString(rowWidth>>2), makeRandomString(rowWidth>>2)); + } + if (!overlap || f == 0) + { + QueryOptions options = QueryProcessor.makeInternalOptions(select, new Object[]{key}); + ReadQuery query = select.getQuery(options, queryState.getNowInSeconds()); + reads.add(() -> runQuery(query, cfs.metadata.get())); + } + } + cfs.forceBlockingFlush(); + } + + Assert.assertEquals(numSSTable, cfs.getLiveSSTables().size()); + } + + public ColumnFamilyStore getCfs() + { + return cfs; + } + + public List<Runnable> getReads() + { + return reads; + } + + public String name() + { + return name; + } + }); + } + + @Test + public void wideNonOverlappingPartitions3() throws Throwable + { + testWidePartitions("wideNonOverlappingPartitions3", 3, maybeInflate(24), false, false); + } + + @Test + public void wideNonOverlappingPartitions9() throws Throwable + { + testWidePartitions("wideNonOverlappingPartitions9", 9, maybeInflate(8), false, false); + } + + @Test + public void wideOverlappingPartitions3() throws Throwable + { + testWidePartitions("wideOverlappingPartitions3", 3, maybeInflate(24), true, false); + } + + @Test + public void wideOverlappingPartitions9() throws Throwable + { + testWidePartitions("wideOverlappingPartitions9", 9, maybeInflate(8), true, false); + } + + @Test + public void widePartitionsOverlappingRows9() throws Throwable + { + testWidePartitions("widePartitionsOverlappingRows9", 9, maybeInflate(8), true, true); + } + + @Test + public void widePartitionsOverlappingRows3() throws Throwable + { + testWidePartitions("widePartitionsOverlappingRows3", 3, maybeInflate(24), true, true); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org