Repository: cassandra Updated Branches: refs/heads/trunk 3f49c328f -> 0026e4eee
Put CQLSSTableWriter back to the old interface/behavior before CASSANDRA-11844 Patch by Jeremiah Jordan; reviewed by Jake Luciani for CASSANDRA-12551 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0026e4ee Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0026e4ee Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0026e4ee Branch: refs/heads/trunk Commit: 0026e4eeec23367c74c44b23a9586562b939f6f8 Parents: 3f49c32 Author: Jeremiah D Jordan <[email protected]> Authored: Fri Sep 2 10:06:39 2016 -0500 Committer: T Jake Luciani <[email protected]> Committed: Mon Sep 12 11:48:45 2016 -0400 ---------------------------------------------------------------------- .../hadoop/cql3/CqlBulkRecordWriter.java | 4 +- .../io/sstable/AbstractSSTableSimpleWriter.java | 29 +- .../cassandra/io/sstable/CQLSSTableWriter.java | 148 ++-- .../cassandra/io/sstable/SSTableLoader.java | 16 +- .../io/sstable/SSTableSimpleUnsortedWriter.java | 12 +- .../io/sstable/SSTableSimpleWriter.java | 8 +- .../cassandra/io/sstable/SSTableTxnWriter.java | 10 +- .../cassandra/streaming/LongStreamingTest.java | 7 +- .../db/lifecycle/RealTransactionsTest.java | 4 +- .../io/sstable/CQLSSTableWriterClientTest.java | 16 +- .../io/sstable/CQLSSTableWriterTest.java | 37 +- .../cassandra/io/sstable/SSTableLoaderTest.java | 38 +- .../io/sstable/StressCQLSSTableWriter.java | 672 +++++++++++++++++++ .../cassandra/stress/CompactionStress.java | 20 +- .../apache/cassandra/stress/StressProfile.java | 2 +- .../operations/userdefined/SchemaInsert.java | 20 +- 16 files changed, 813 insertions(+), 230 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java index bd157e9..2ed37ee 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java @@ -75,7 +75,7 @@ public class CqlBulkRecordWriter extends RecordWriter<Object, List<ByteBuffer>> protected final Configuration conf; protected final int maxFailures; protected final int bufferSize; - protected CQLSSTableWriter writer; + protected Closeable writer; protected SSTableLoader loader; protected Progressable progress; protected TaskAttemptContext context; @@ -174,7 +174,7 @@ public class CqlBulkRecordWriter extends RecordWriter<Object, List<ByteBuffer>> ExternalClient externalClient = new ExternalClient(conf); externalClient.setTableMetadata(CFMetaData.compile(schema, keyspace)); - loader = new SSTableLoader(writer.getInnermostDirectory(), externalClient, new NullOutputHandler()) + loader = new SSTableLoader(outputDir, externalClient, new NullOutputHandler()) { @Override public void onSuccess(StreamState finalState) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java index f989878..9a8f968 100644 --- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java @@ -22,13 +22,15 @@ import java.io.FilenameFilter; import java.io.IOException; import java.io.Closeable; import java.nio.ByteBuffer; -import java.util.*; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.Pair; @@ -38,17 +40,17 @@ import org.apache.cassandra.utils.Pair; */ abstract class AbstractSSTableSimpleWriter implements Closeable { - protected final ColumnFamilyStore cfs; - protected final IPartitioner partitioner; + protected final File directory; + protected final CFMetaData metadata; protected final PartitionColumns columns; protected SSTableFormat.Type formatType = SSTableFormat.Type.current(); protected static AtomicInteger generation = new AtomicInteger(0); protected boolean makeRangeAware = false; - protected AbstractSSTableSimpleWriter(ColumnFamilyStore cfs, IPartitioner partitioner, PartitionColumns columns) + protected AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, PartitionColumns columns) { - this.cfs = cfs; - this.partitioner = partitioner; + this.metadata = metadata; + this.directory = directory; this.columns = columns; } @@ -65,17 +67,18 @@ abstract class AbstractSSTableSimpleWriter implements Closeable protected SSTableTxnWriter createWriter() { - SerializationHeader header = new SerializationHeader(true, cfs.metadata, columns, EncodingStats.NO_STATS); + SerializationHeader header = new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS); if (makeRangeAware) - return SSTableTxnWriter.createRangeAware(cfs, 0, ActiveRepairService.UNREPAIRED_SSTABLE, formatType, 0, header); + return SSTableTxnWriter.createRangeAware(metadata, 0, ActiveRepairService.UNREPAIRED_SSTABLE, formatType, 0, header); - return SSTableTxnWriter.create(cfs, - createDescriptor(cfs.getDirectories().getDirectoryForNewSSTables(), cfs.metadata.ksName, cfs.metadata.cfName, formatType), + return SSTableTxnWriter.create(metadata, + createDescriptor(directory, metadata.ksName, metadata.cfName, formatType), 0, ActiveRepairService.UNREPAIRED_SSTABLE, 0, - header); + header, + Collections.emptySet()); } private static Descriptor createDescriptor(File directory, final String keyspace, final String columnFamily, final SSTableFormat.Type fmt) @@ -115,7 +118,7 @@ abstract class AbstractSSTableSimpleWriter implements Closeable PartitionUpdate getUpdateFor(ByteBuffer key) throws IOException { - return getUpdateFor(partitioner.decorateKey(key)); + return getUpdateFor(metadata.decorateKey(key)); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index e668b75..8a9d01d 100644 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@ -21,14 +21,17 @@ import java.io.Closeable; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; import java.util.stream.Collectors; import com.datastax.driver.core.ProtocolVersion; import com.datastax.driver.core.TypeCodec; -import com.sun.org.apache.xpath.internal.operations.Bool; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql3.ColumnSpecification; @@ -40,7 +43,8 @@ import org.apache.cassandra.cql3.statements.CreateTableStatement; import org.apache.cassandra.cql3.statements.CreateTypeStatement; import org.apache.cassandra.cql3.statements.ParsedStatement; import org.apache.cassandra.cql3.statements.UpdateStatement; -import org.apache.cassandra.db.*; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.marshal.UserType; import org.apache.cassandra.db.partitions.Partition; import org.apache.cassandra.dht.IPartitioner; @@ -49,12 +53,10 @@ import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.io.sstable.format.SSTableFormat; -import org.apache.cassandra.locator.SimpleSnitch; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.Types; import org.apache.cassandra.service.ClientState; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; @@ -331,30 +333,14 @@ public class CQLSSTableWriter implements Closeable return codec.serialize(value, ProtocolVersion.NEWEST_SUPPORTED); } /** - * The writer loads data in directories corresponding to how they laid out on the server. - * <p> - * {keyspace}/{table-cfid}/ - * - * This method can be used to fetch the innermost directory with the sstable components - * @return The directory containing the sstable components - */ - public File getInnermostDirectory() - { - return writer.cfs.getDirectories().getDirectoryForNewSSTables(); - } - - /** * A Builder for a CQLSSTableWriter object. */ public static class Builder { - private final List<File> directoryList; - private ColumnFamilyStore cfs; + private File directory; protected SSTableFormat.Type formatType = null; - private Boolean makeRangeAware = false; - private CreateTableStatement.RawStatement schemaStatement; private final List<CreateTypeStatement> typeStatements; private UpdateStatement.ParsedInsert insertStatement; @@ -363,10 +349,8 @@ public class CQLSSTableWriter implements Closeable private boolean sorted = false; private long bufferSizeInMB = 128; - protected Builder() - { + protected Builder() { this.typeStatements = new ArrayList<>(); - this.directoryList = new ArrayList<>(); } /** @@ -389,7 +373,7 @@ public class CQLSSTableWriter implements Closeable * <p> * This is a mandatory option. * - * @param directory the directory to use, which should exist and be writable. + * @param directory the directory to use, which should exists and be writable. * @return this builder. * * @throws IllegalArgumentException if {@code directory} doesn't exist or is not writable. @@ -401,29 +385,10 @@ public class CQLSSTableWriter implements Closeable if (!directory.canWrite()) throw new IllegalArgumentException(directory + " exists but is not writable"); - directoryList.add(directory); + this.directory = directory; return this; } - /** - * A pre-instanciated ColumnFamilyStore - * <p> - * This is can be used in place of inDirectory and forTable - * - * @see #inDirectory(File) - * - * @param cfs the list of directories to use, which should exist and be writable. - * @return this builder. - * - * @throws IllegalArgumentException if a directory doesn't exist or is not writable. - */ - public Builder withCfs(ColumnFamilyStore cfs) - { - this.cfs = cfs; - return this; - } - - public Builder withType(String typeDefinition) throws SyntaxException { typeStatements.add(parseStatement(typeDefinition, CreateTypeStatement.class, "CREATE TYPE")); @@ -466,20 +431,6 @@ public class CQLSSTableWriter implements Closeable return this; } - - /** - * Specify if the sstable writer should be vnode range aware. - * This will create a sstable per vnode range. - * - * @param makeRangeAware - * @return - */ - public Builder rangeAware(boolean makeRangeAware) - { - this.makeRangeAware = makeRangeAware; - return this; - } - /** * The INSERT statement defining the order of the values to add for a given CQL row. * <p> @@ -548,36 +499,36 @@ public class CQLSSTableWriter implements Closeable @SuppressWarnings("resource") public CQLSSTableWriter build() { - if (directoryList.isEmpty() && cfs == null) - throw new IllegalStateException("No output directories specified, you should provide a directory with inDirectory()"); - if (schemaStatement == null && cfs == null) + if (directory == null) + throw new IllegalStateException("No ouptut directory specified, you should provide a directory with inDirectory()"); + if (schemaStatement == null) throw new IllegalStateException("Missing schema, you should provide the schema for the SSTable to create with forTable()"); if (insertStatement == null) throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()"); synchronized (CQLSSTableWriter.class) { - if (cfs == null) - cfs = createOfflineTable(schemaStatement, typeStatements, directoryList); + String keyspace = schemaStatement.keyspace(); - if (partitioner == null) - partitioner = cfs.getPartitioner(); + if (Schema.instance.getKSMetaData(keyspace) == null) + Schema.instance.load(KeyspaceMetadata.create(keyspace, KeyspaceParams.simple(1))); + createTypes(keyspace); + CFMetaData cfMetaData = createTable(keyspace); Pair<UpdateStatement, List<ColumnSpecification>> preparedInsert = prepareInsert(); + AbstractSSTableSimpleWriter writer = sorted - ? new SSTableSimpleWriter(cfs, partitioner, preparedInsert.left.updatedColumns()) - : new SSTableSimpleUnsortedWriter(cfs, partitioner, preparedInsert.left.updatedColumns(), bufferSizeInMB); + ? new SSTableSimpleWriter(directory, cfMetaData, preparedInsert.left.updatedColumns()) + : new SSTableSimpleUnsortedWriter(directory, cfMetaData, preparedInsert.left.updatedColumns(), bufferSizeInMB); if (formatType != null) writer.setSSTableFormatType(formatType); - writer.setRangeAwareWriting(makeRangeAware); - return new CQLSSTableWriter(writer, preparedInsert.left, preparedInsert.right); } } - private static void createTypes(String keyspace, List<CreateTypeStatement> typeStatements) + private void createTypes(String keyspace) { KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace); Types.RawBuilder builder = Types.rawBuilder(keyspace); @@ -587,50 +538,31 @@ public class CQLSSTableWriter implements Closeable ksm = ksm.withSwapped(builder.build()); Schema.instance.setKeyspaceMetadata(ksm); } - - public static ColumnFamilyStore createOfflineTable(String schema, List<File> directoryList) - { - return createOfflineTable(parseStatement(schema, CreateTableStatement.RawStatement.class, "CREATE TABLE"), Collections.EMPTY_LIST, directoryList); - } - /** * Creates the table according to schema statement - * with specified data directories + * + * @param keyspace name of the keyspace where table should be created */ - public static ColumnFamilyStore createOfflineTable(CreateTableStatement.RawStatement schemaStatement, List<CreateTypeStatement> typeStatements, List<File> directoryList) + private CFMetaData createTable(String keyspace) { - String keyspace = schemaStatement.keyspace(); - - if (Schema.instance.getKSMetaData(keyspace) == null) - Schema.instance.load(KeyspaceMetadata.create(keyspace, KeyspaceParams.simple(1))); - - createTypes(keyspace, typeStatements); - KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace); CFMetaData cfMetaData = ksm.tables.getNullable(schemaStatement.columnFamily()); - assert cfMetaData == null; - - CreateTableStatement statement = (CreateTableStatement) schemaStatement.prepare(ksm.types).statement; - statement.validate(ClientState.forInternalCalls()); - - //Build metatdata with a portable cfId - cfMetaData = statement.metadataBuilder() - .withId(CFMetaData.generateLegacyCfId(keyspace, statement.columnFamily())) - .build() - .params(statement.params()); - - Keyspace.setInitialized(); - Directories directories = new Directories(cfMetaData, directoryList.stream().map(Directories.DataDirectory::new).collect(Collectors.toList())); + if (cfMetaData == null) + { + CreateTableStatement statement = (CreateTableStatement) schemaStatement.prepare(ksm.types).statement; + statement.validate(ClientState.forInternalCalls()); - Keyspace ks = Keyspace.openWithoutSSTables(keyspace); - ColumnFamilyStore cfs = ColumnFamilyStore.createColumnFamilyStore(ks, cfMetaData.cfName, cfMetaData, directories, false, false, true); + cfMetaData = statement.getCFMetaData(); - ks.initCfCustom(cfs); - Schema.instance.load(cfs.metadata); - Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(cfs.metadata))); + Schema.instance.load(cfMetaData); + Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(cfMetaData))); + } - return cfs; + if (partitioner != null) + return cfMetaData.copy(partitioner); + else + return cfMetaData; } /** @@ -655,7 +587,7 @@ public class CQLSSTableWriter implements Closeable } } - public static <T extends ParsedStatement> T parseStatement(String query, Class<T> klass, String type) + private static <T extends ParsedStatement> T parseStatement(String query, Class<T> klass, String type) { try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 15dd925..043f6fa 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -56,25 +56,13 @@ public class SSTableLoader implements StreamEventHandler public SSTableLoader(File directory, Client client, OutputHandler outputHandler) { - this(directory, client, outputHandler, directory.getParentFile().getName(), 1); + this(directory, client, outputHandler, 1); } - public SSTableLoader(File directory, Client client, OutputHandler outputHandler, int connectionsPerHost) { - this(directory, client, outputHandler, directory.getParentFile().getName(), connectionsPerHost); - } - - public SSTableLoader(File directory, Client client, OutputHandler outputHandler, String keyspace) - { - this(directory, client, outputHandler, keyspace, 1); - } - - - public SSTableLoader(File directory, Client client, OutputHandler outputHandler, String keyspace, int connectionsPerHost) - { this.directory = directory; - this.keyspace = keyspace; + this.keyspace = directory.getParentFile().getName(); this.client = client; this.outputHandler = outputHandler; this.connectionsPerHost = connectionsPerHost; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java index 2563f26..fa88817 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java @@ -19,7 +19,6 @@ package org.apache.cassandra.io.sstable; import java.io.File; import java.io.IOException; -import java.util.Collection; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.BlockingQueue; @@ -35,7 +34,6 @@ import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.db.rows.UnfilteredSerializer; import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.utils.JVMStabilityInspector; /** @@ -62,11 +60,11 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter private final BlockingQueue<Buffer> writeQueue = new SynchronousQueue<Buffer>(); private final DiskWriter diskWriter = new DiskWriter(); - SSTableSimpleUnsortedWriter(ColumnFamilyStore cfs, IPartitioner partitioner, PartitionColumns columns, long bufferSizeInMB) + SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, PartitionColumns columns, long bufferSizeInMB) { - super(cfs, partitioner, columns); + super(directory, metadata, columns); this.bufferSize = bufferSizeInMB * 1024L * 1024L; - this.header = new SerializationHeader(true, cfs.metadata, columns, EncodingStats.NO_STATS); + this.header = new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS); diskWriter.start(); } @@ -112,7 +110,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter private PartitionUpdate createPartitionUpdate(DecoratedKey key) { - return new PartitionUpdate(cfs.metadata, key, columns, 4) + return new PartitionUpdate(metadata, key, columns, 4) { @Override public void add(Row row) @@ -206,7 +204,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter if (b == SENTINEL) return; - try (SSTableTxnWriter writer = createWriter()) + try (SSTableTxnWriter writer = createWriter()) { for (Map.Entry<DecoratedKey, PartitionUpdate> entry : b.entrySet()) writer.append(entry.getValue().unfilteredIterator()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java index 2f6dd33..7fbd79d 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java @@ -19,14 +19,12 @@ package org.apache.cassandra.io.sstable; import java.io.File; import java.io.IOException; -import java.util.Collection; import com.google.common.base.Throwables; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.dht.IPartitioner; /** * A SSTable writer that assumes rows are in (partitioner) sorted order. @@ -45,9 +43,9 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter private SSTableTxnWriter writer; - protected SSTableSimpleWriter(ColumnFamilyStore cfs, IPartitioner partitioner, PartitionColumns columns) + protected SSTableSimpleWriter(File directory, CFMetaData metadata, PartitionColumns columns) { - super(cfs, partitioner, columns); + super(directory, metadata, columns); } private SSTableTxnWriter getOrCreateWriter() @@ -69,7 +67,7 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter if (update != null) writePartition(update); currentKey = key; - update = new PartitionUpdate(cfs.metadata, currentKey, columns, 4); + update = new PartitionUpdate(metadata, currentKey, columns, 4); } assert update != null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java index 28ca4c4..015c5bb 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java @@ -23,6 +23,7 @@ import java.util.Collection; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; @@ -106,8 +107,15 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem @SuppressWarnings("resource") // log and writer closed during doPostCleanup - public static SSTableTxnWriter createRangeAware(ColumnFamilyStore cfs, long keyCount, long repairedAt, SSTableFormat.Type type, int sstableLevel, SerializationHeader header) + public static SSTableTxnWriter createRangeAware(CFMetaData cfm, + long keyCount, + long repairedAt, + SSTableFormat.Type type, + int sstableLevel, + SerializationHeader header) { + + ColumnFamilyStore cfs = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfName); LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE); SSTableMultiWriter writer; try http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/test/long/org/apache/cassandra/streaming/LongStreamingTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java index 51b049d..1340224 100644 --- a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java +++ b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java @@ -30,7 +30,6 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql3.QueryProcessor; @@ -92,7 +91,7 @@ public class LongStreamingTest writer.close(); System.err.println(String.format("Writer finished after %d seconds....", TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start))); - File[] dataFiles = writer.getInnermostDirectory().listFiles((dir, name) -> name.endsWith("-Data.db")); + File[] dataFiles = dataDir.listFiles((dir, name) -> name.endsWith("-Data.db")); long dataSize = 0l; for (File file : dataFiles) { @@ -100,7 +99,7 @@ public class LongStreamingTest dataSize += file.length(); } - SSTableLoader loader = new SSTableLoader(writer.getInnermostDirectory(), new SSTableLoader.Client() + SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client() { private String ks; public void init(String keyspace) @@ -127,7 +126,7 @@ public class LongStreamingTest //Stream again - loader = new SSTableLoader(writer.getInnermostDirectory(), new SSTableLoader.Client() + loader = new SSTableLoader(dataDir, new SSTableLoader.Client() { private String ks; public void init(String keyspace) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java index 515ce18..595610e 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java @@ -131,10 +131,12 @@ public class RealTransactionsTest extends SchemaLoader { cfs.truncateBlocking(); + String schema = "CREATE TABLE \"%s\".\"%s\" (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))"; String query = "INSERT INTO \"%s\".\"%s\" (key, name, val) VALUES (?, ?, ?)"; try (CQLSSTableWriter writer = CQLSSTableWriter.builder() - .withCfs(cfs) + .inDirectory(cfs.getDirectories().getDirectoryForNewSSTables()) + .forTable(String.format(schema, cfs.keyspace.getName(), cfs.name)) .using(String.format(query, cfs.keyspace.getName(), cfs.name)) .build()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java index 9502dfa..8025861 100644 --- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java @@ -22,17 +22,15 @@ import java.io.FilenameFilter; import java.io.IOException; import com.google.common.io.Files; -import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang.ArrayUtils; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; -import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.tools.Util; import static org.junit.Assert.assertEquals; @@ -77,10 +75,16 @@ public class CQLSSTableWriterClientTest writer.close(); writer2.close(); - FilenameFilter filter = (dir, name) -> name.endsWith("-Data.db"); + FilenameFilter filter = new FilenameFilter() + { + @Override + public boolean accept(File dir, String name) + { + return name.endsWith("-Data.db"); + } + }; - File[] dataFiles = (File[])ArrayUtils.addAll(writer2.getInnermostDirectory().listFiles(filter), - writer.getInnermostDirectory().listFiles(filter)); + File[] dataFiles = this.testDirectory.listFiles(filter); assertEquals(2, dataFiles.length); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java index 56e62ee..3c80b9e 100644 --- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java @@ -33,11 +33,9 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; -import org.apache.cassandra.auth.AuthConfig; import org.apache.cassandra.config.*; import org.apache.cassandra.cql3.*; import org.apache.cassandra.cql3.functions.UDHelper; -import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.*; import org.apache.cassandra.exceptions.*; @@ -97,7 +95,7 @@ public class CQLSSTableWriterTest writer.close(); - loadSSTables(writer.getInnermostDirectory(), KS); + loadSSTables(dataDir, KS); UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace.table1;"); assertEquals(4, rs.size()); @@ -187,7 +185,7 @@ public class CQLSSTableWriterTest return name.endsWith("-Data.db"); } }; - assert writer.getInnermostDirectory().list(filterDataFiles).length > 1 : Arrays.toString(writer.getInnermostDirectory().list(filterDataFiles)); + assert dataDir.list(filterDataFiles).length > 1 : Arrays.toString(dataDir.list(filterDataFiles)); } @@ -221,22 +219,28 @@ public class CQLSSTableWriterTest private static final int NUMBER_WRITES_IN_RUNNABLE = 10; private class WriterThread extends Thread { + private final File dataDir; private final int id; - private final ColumnFamilyStore cfs; public volatile Exception exception; - public WriterThread(ColumnFamilyStore cfs, int id) + public WriterThread(File dataDir, int id) { - this.cfs = cfs; + this.dataDir = dataDir; this.id = id; } @Override public void run() { + String schema = "CREATE TABLE cql_keyspace2.table2 (" + + " k int," + + " v int," + + " PRIMARY KEY (k, v)" + + ")"; String insert = "INSERT INTO cql_keyspace2.table2 (k, v) VALUES (?, ?)"; CQLSSTableWriter writer = CQLSSTableWriter.builder() - .withCfs(cfs) + .inDirectory(dataDir) + .forTable(schema) .using(insert).build(); try @@ -264,17 +268,10 @@ public class CQLSSTableWriterTest File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE); assert dataDir.mkdirs(); - String schema = "CREATE TABLE cql_keyspace2.table2 (" - + " k int," - + " v int," - + " PRIMARY KEY (k, v)" - + ")"; - ColumnFamilyStore cfs = CQLSSTableWriter.Builder.createOfflineTable(schema, Collections.singletonList(dataDir)); - WriterThread[] threads = new WriterThread[5]; for (int i = 0; i < threads.length; i++) { - WriterThread thread = new WriterThread(cfs, i); + WriterThread thread = new WriterThread(dataDir, i); threads[i] = thread; thread.start(); } @@ -289,7 +286,7 @@ public class CQLSSTableWriterTest } } - loadSSTables(cfs.getDirectories().getDirectoryForNewSSTables(), KS); + loadSSTables(dataDir, KS); UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace2.table2;"); assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size()); @@ -341,7 +338,7 @@ public class CQLSSTableWriterTest } writer.close(); - loadSSTables(writer.getInnermostDirectory(), KS); + loadSSTables(dataDir, KS); UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE); TypeCodec collectionCodec = UDHelper.codecFor(DataType.CollectionType.frozenList(tuple2Type)); @@ -412,7 +409,7 @@ public class CQLSSTableWriterTest } writer.close(); - loadSSTables(writer.getInnermostDirectory(), KS); + loadSSTables(dataDir, KS); UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE); @@ -503,7 +500,7 @@ public class CQLSSTableWriterTest writer.addRow(5, 5, 5, "5"); writer.close(); - loadSSTables(writer.getInnermostDirectory(), KS); + loadSSTables(dataDir, KS); UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE); Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java index ba7571f..72c7467 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java @@ -19,7 +19,6 @@ package org.apache.cassandra.io.sstable; import java.io.File; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -34,8 +33,6 @@ import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.cql3.QueryProcessor; -import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.marshal.AsciiType; @@ -125,8 +122,6 @@ public class SSTableLoaderTest String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))"; String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)"; - - File outputDir; try (CQLSSTableWriter writer = CQLSSTableWriter.builder() .inDirectory(dataDir) .forTable(String.format(schema, KEYSPACE1, CF_STANDARD1)) @@ -134,22 +129,22 @@ public class SSTableLoaderTest .build()) { writer.addRow("key1", "col1", "100"); - outputDir = writer.getInnermostDirectory(); } + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1); + cfs.forceBlockingFlush(); // wait for sstables to be on disk else we won't be able to stream them + final CountDownLatch latch = new CountDownLatch(1); - SSTableLoader loader = new SSTableLoader(outputDir, new TestClient(), new OutputHandler.SystemOutput(false, false)); + SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false)); loader.stream(Collections.emptySet(), completionStreamListener(latch)).get(); - UntypedResultSet rs = QueryProcessor.executeInternal(String.format("SELECT * FROM %s.%s;", KEYSPACE1, CF_STANDARD1)); - - assertEquals(1, rs.size()); - - Iterator<UntypedResultSet.Row> iter = rs.iterator(); - UntypedResultSet.Row row; + List<FilteredPartition> partitions = Util.getAll(Util.cmd(cfs).build()); - row = iter.next(); - assertEquals("key1", row.getString("key")); + assertEquals(1, partitions.size()); + assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey())); + assertEquals(ByteBufferUtil.bytes("100"), partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1"))) + .getCell(cfmeta.getColumnDefinition(ByteBufferUtil.bytes("val"))) + .value()); // The stream future is signalled when the work is complete but before releasing references. Wait for release // before cleanup (CASSANDRA-10118). @@ -165,9 +160,8 @@ public class SSTableLoaderTest //make sure we have no tables... assertTrue(dataDir.listFiles().length == 0); - //Since this is running in the same jvm we need to put it in a tmp keyspace - String schema = "CREATE TABLE \"%stmp\".\"%s\" (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name)) with compression = {}"; - String query = "INSERT INTO \"%stmp\".\"%s\" (key, name, val) VALUES (?, ?, ?)"; + String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))"; + String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)"; CQLSSTableWriter writer = CQLSSTableWriter.builder() .inDirectory(dataDir) @@ -176,7 +170,7 @@ public class SSTableLoaderTest .withBufferSizeInMB(1) .build(); - int NB_PARTITIONS = 4200; // Enough to write >1MB and get at least one completed sstable before we've closed the writer + int NB_PARTITIONS = 5000; // Enough to write >1MB and get at least one completed sstable before we've closed the writer for (int i = 0; i < NB_PARTITIONS; i++) { @@ -188,11 +182,11 @@ public class SSTableLoaderTest cfs.forceBlockingFlush(); // wait for sstables to be on disk else we won't be able to stream them //make sure we have some tables... - assertTrue(writer.getInnermostDirectory().listFiles().length > 0); + assertTrue(dataDir.listFiles().length > 0); final CountDownLatch latch = new CountDownLatch(2); //writer is still open so loader should not load anything - SSTableLoader loader = new SSTableLoader(writer.getInnermostDirectory(), new TestClient(), new OutputHandler.SystemOutput(false, false), KEYSPACE1); + SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false)); loader.stream(Collections.emptySet(), completionStreamListener(latch)).get(); List<FilteredPartition> partitions = Util.getAll(Util.cmd(cfs).build()); @@ -202,7 +196,7 @@ public class SSTableLoaderTest // now we complete the write and the second loader should load the last sstable as well writer.close(); - loader = new SSTableLoader(writer.getInnermostDirectory(), new TestClient(), new OutputHandler.SystemOutput(false, false), KEYSPACE1); + loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false)); loader.stream(Collections.emptySet(), completionStreamListener(latch)).get(); partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2)).build()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java new file mode 100644 index 0000000..4fe05a8 --- /dev/null +++ b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java @@ -0,0 +1,672 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.stream.Collectors; + +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.TypeCodec; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.cql3.ColumnSpecification; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UpdateParameters; +import org.apache.cassandra.cql3.functions.UDHelper; +import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.cql3.statements.CreateTypeStatement; +import org.apache.cassandra.cql3.statements.ParsedStatement; +import org.apache.cassandra.cql3.statements.UpdateStatement; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.marshal.UserType; +import org.apache.cassandra.db.partitions.Partition; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.exceptions.SyntaxException; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Types; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Pair; + +/** + * Utility to write SSTables. + * <p> + * Typical usage looks like: + * <pre> + * String type = CREATE TYPE myKs.myType (a int, b int)"; + * String schema = "CREATE TABLE myKs.myTable (" + * + " k int PRIMARY KEY," + * + " v1 text," + * + " v2 int," + * + " v3 myType," + * + ")"; + * String insert = "INSERT INTO myKs.myTable (k, v1, v2, v3) VALUES (?, ?, ?, ?)"; + * + * // Creates a new writer. You need to provide at least the directory where to write the created sstable, + * // the schema for the sstable to write and a (prepared) insert statement to use. If you do not use the + * // default partitioner (Murmur3Partitioner), you will also need to provide the partitioner in use, see + * // StressCQLSSTableWriter.Builder for more details on the available options. + * StressCQLSSTableWriter writer = StressCQLSSTableWriter.builder() + * .inDirectory("path/to/directory") + * .withType(type) + * .forTable(schema) + * .using(insert).build(); + * + * UserType myType = writer.getUDType("myType"); + * // Adds a nember of rows to the resulting sstable + * writer.addRow(0, "test1", 24, myType.newValue().setInt("a", 10).setInt("b", 20)); + * writer.addRow(1, "test2", null, null); + * writer.addRow(2, "test3", 42, myType.newValue().setInt("a", 30).setInt("b", 40)); + * + * // Close the writer, finalizing the sstable + * writer.close(); + * </pre> + * + * Please note that {@code StressCQLSSTableWriter} is <b>not</b> thread-safe (multiple threads cannot access the + * same instance). It is however safe to use multiple instances in parallel (even if those instance write + * sstables for the same table). + */ +public class StressCQLSSTableWriter implements Closeable +{ + public static final ByteBuffer UNSET_VALUE = ByteBufferUtil.UNSET_BYTE_BUFFER; + + static + { + DatabaseDescriptor.clientInitialization(false); + // Partitioner is not set in client mode. + if (DatabaseDescriptor.getPartitioner() == null) + DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); + } + + private final AbstractSSTableSimpleWriter writer; + private final UpdateStatement insert; + private final List<ColumnSpecification> boundNames; + private final List<TypeCodec> typeCodecs; + private final ColumnFamilyStore cfs; + + private StressCQLSSTableWriter(ColumnFamilyStore cfs, AbstractSSTableSimpleWriter writer, UpdateStatement insert, List<ColumnSpecification> boundNames) + { + this.cfs = cfs; + this.writer = writer; + this.insert = insert; + this.boundNames = boundNames; + this.typeCodecs = boundNames.stream().map(bn -> UDHelper.codecFor(UDHelper.driverType(bn.type))) + .collect(Collectors.toList()); + } + + /** + * Returns a new builder for a StressCQLSSTableWriter. + * + * @return the new builder. + */ + public static Builder builder() + { + return new Builder(); + } + + /** + * Adds a new row to the writer. + * <p> + * This is a shortcut for {@code addRow(Arrays.asList(values))}. + * + * @param values the row values (corresponding to the bind variables of the + * insertion statement used when creating by this writer). + * @return this writer. + */ + public StressCQLSSTableWriter addRow(Object... values) + throws InvalidRequestException, IOException + { + return addRow(Arrays.asList(values)); + } + + /** + * Adds a new row to the writer. + * <p> + * Each provided value type should correspond to the types of the CQL column + * the value is for. The correspondance between java type and CQL type is the + * same one than the one documented at + * www.datastax.com/drivers/java/2.0/apidocs/com/datastax/driver/core/DataType.Name.html#asJavaClass(). + * <p> + * If you prefer providing the values directly as binary, use + * {@link #rawAddRow} instead. + * + * @param values the row values (corresponding to the bind variables of the + * insertion statement used when creating by this writer). + * @return this writer. + */ + public StressCQLSSTableWriter addRow(List<Object> values) + throws InvalidRequestException, IOException + { + int size = Math.min(values.size(), boundNames.size()); + List<ByteBuffer> rawValues = new ArrayList<>(size); + + for (int i = 0; i < size; i++) + { + Object value = values.get(i); + rawValues.add(serialize(value, typeCodecs.get(i))); + } + + return rawAddRow(rawValues); + } + + /** + * Adds a new row to the writer. + * <p> + * This is equivalent to the other addRow methods, but takes a map whose + * keys are the names of the columns to add instead of taking a list of the + * values in the order of the insert statement used during construction of + * this write. + * <p> + * Please note that the column names in the map keys must be in lowercase unless + * the declared column name is a + * <a href="http://cassandra.apache.org/doc/cql3/CQL.html#identifiers">case-sensitive quoted identifier</a> + * (in which case the map key must use the exact case of the column). + * + * @param values a map of colum name to column values representing the new + * row to add. Note that if a column is not part of the map, it's value will + * be {@code null}. If the map contains keys that does not correspond to one + * of the column of the insert statement used when creating this writer, the + * the corresponding value is ignored. + * @return this writer. + */ + public StressCQLSSTableWriter addRow(Map<String, Object> values) + throws InvalidRequestException, IOException + { + int size = boundNames.size(); + List<ByteBuffer> rawValues = new ArrayList<>(size); + for (int i = 0; i < size; i++) + { + ColumnSpecification spec = boundNames.get(i); + Object value = values.get(spec.name.toString()); + rawValues.add(serialize(value, typeCodecs.get(i))); + } + return rawAddRow(rawValues); + } + + /** + * Adds a new row to the writer given already serialized values. + * + * @param values the row values (corresponding to the bind variables of the + * insertion statement used when creating by this writer) as binary. + * @return this writer. + */ + public StressCQLSSTableWriter rawAddRow(ByteBuffer... values) + throws InvalidRequestException, IOException + { + return rawAddRow(Arrays.asList(values)); + } + + /** + * Adds a new row to the writer given already serialized values. + * <p> + * This is a shortcut for {@code rawAddRow(Arrays.asList(values))}. + * + * @param values the row values (corresponding to the bind variables of the + * insertion statement used when creating by this writer) as binary. + * @return this writer. + */ + public StressCQLSSTableWriter rawAddRow(List<ByteBuffer> values) + throws InvalidRequestException, IOException + { + if (values.size() != boundNames.size()) + throw new InvalidRequestException(String.format("Invalid number of arguments, expecting %d values but got %d", boundNames.size(), values.size())); + + QueryOptions options = QueryOptions.forInternalCalls(null, values); + List<ByteBuffer> keys = insert.buildPartitionKeyNames(options); + SortedSet<Clustering> clusterings = insert.createClustering(options); + + long now = System.currentTimeMillis() * 1000; + // Note that we asks indexes to not validate values (the last 'false' arg below) because that triggers a 'Keyspace.open' + // and that forces a lot of initialization that we don't want. + UpdateParameters params = new UpdateParameters(insert.cfm, + insert.updatedColumns(), + options, + insert.getTimestamp(now, options), + insert.getTimeToLive(options), + Collections.<DecoratedKey, Partition>emptyMap()); + + try + { + for (ByteBuffer key : keys) + { + for (Clustering clustering : clusterings) + insert.addUpdateForKey(writer.getUpdateFor(key), clustering, params); + } + return this; + } + catch (SSTableSimpleUnsortedWriter.SyncException e) + { + // If we use a BufferedWriter and had a problem writing to disk, the IOException has been + // wrapped in a SyncException (see BufferedWriter below). We want to extract that IOE. + throw (IOException)e.getCause(); + } + } + + /** + * Adds a new row to the writer given already serialized values. + * <p> + * This is equivalent to the other rawAddRow methods, but takes a map whose + * keys are the names of the columns to add instead of taking a list of the + * values in the order of the insert statement used during construction of + * this write. + * + * @param values a map of colum name to column values representing the new + * row to add. Note that if a column is not part of the map, it's value will + * be {@code null}. If the map contains keys that does not correspond to one + * of the column of the insert statement used when creating this writer, the + * the corresponding value is ignored. + * @return this writer. + */ + public StressCQLSSTableWriter rawAddRow(Map<String, ByteBuffer> values) + throws InvalidRequestException, IOException + { + int size = Math.min(values.size(), boundNames.size()); + List<ByteBuffer> rawValues = new ArrayList<>(size); + for (int i = 0; i < size; i++) + { + ColumnSpecification spec = boundNames.get(i); + rawValues.add(values.get(spec.name.toString())); + } + return rawAddRow(rawValues); + } + + /** + * Returns the User Defined type, used in this SSTable Writer, that can + * be used to create UDTValue instances. + * + * @param dataType name of the User Defined type + * @return user defined type + */ + public com.datastax.driver.core.UserType getUDType(String dataType) + { + KeyspaceMetadata ksm = Schema.instance.getKSMetaData(insert.keyspace()); + UserType userType = ksm.types.getNullable(ByteBufferUtil.bytes(dataType)); + return (com.datastax.driver.core.UserType) UDHelper.driverType(userType); + } + + /** + * Close this writer. + * <p> + * This method should be called, otherwise the produced sstables are not + * guaranteed to be complete (and won't be in practice). + */ + public void close() throws IOException + { + writer.close(); + } + + private ByteBuffer serialize(Object value, TypeCodec codec) + { + if (value == null || value == UNSET_VALUE) + return (ByteBuffer) value; + + return codec.serialize(value, ProtocolVersion.NEWEST_SUPPORTED); + } + /** + * The writer loads data in directories corresponding to how they laid out on the server. + * <p> + * {keyspace}/{table-cfid}/ + * + * This method can be used to fetch the innermost directory with the sstable components + * @return The directory containing the sstable components + */ + public File getInnermostDirectory() + { + return cfs.getDirectories().getDirectoryForNewSSTables(); + } + + /** + * A Builder for a StressCQLSSTableWriter object. + */ + public static class Builder + { + private final List<File> directoryList; + private ColumnFamilyStore cfs; + + protected SSTableFormat.Type formatType = null; + + private Boolean makeRangeAware = false; + + private CreateTableStatement.RawStatement schemaStatement; + private final List<CreateTypeStatement> typeStatements; + private UpdateStatement.ParsedInsert insertStatement; + private IPartitioner partitioner; + + private boolean sorted = false; + private long bufferSizeInMB = 128; + + protected Builder() + { + this.typeStatements = new ArrayList<>(); + this.directoryList = new ArrayList<>(); + } + + /** + * The directory where to write the sstables. + * <p> + * This is a mandatory option. + * + * @param directory the directory to use, which should exists and be writable. + * @return this builder. + * + * @throws IllegalArgumentException if {@code directory} doesn't exist or is not writable. + */ + public Builder inDirectory(String directory) + { + return inDirectory(new File(directory)); + } + + /** + * The directory where to write the sstables (mandatory option). + * <p> + * This is a mandatory option. + * + * @param directory the directory to use, which should exist and be writable. + * @return this builder. + * + * @throws IllegalArgumentException if {@code directory} doesn't exist or is not writable. + */ + public Builder inDirectory(File directory) + { + if (!directory.exists()) + throw new IllegalArgumentException(directory + " doesn't exists"); + if (!directory.canWrite()) + throw new IllegalArgumentException(directory + " exists but is not writable"); + + directoryList.add(directory); + return this; + } + + /** + * A pre-instanciated ColumnFamilyStore + * <p> + * This is can be used in place of inDirectory and forTable + * + * @see #inDirectory(File) + * + * @param cfs the list of directories to use, which should exist and be writable. + * @return this builder. + * + * @throws IllegalArgumentException if a directory doesn't exist or is not writable. + */ + public Builder withCfs(ColumnFamilyStore cfs) + { + this.cfs = cfs; + return this; + } + + + public Builder withType(String typeDefinition) throws SyntaxException + { + typeStatements.add(parseStatement(typeDefinition, CreateTypeStatement.class, "CREATE TYPE")); + return this; + } + + /** + * The schema (CREATE TABLE statement) for the table for which sstable are to be created. + * <p> + * Please note that the provided CREATE TABLE statement <b>must</b> use a fully-qualified + * table name, one that include the keyspace name. + * <p> + * This is a mandatory option. + * + * @param schema the schema of the table for which sstables are to be created. + * @return this builder. + * + * @throws IllegalArgumentException if {@code schema} is not a valid CREATE TABLE statement + * or does not have a fully-qualified table name. + */ + public Builder forTable(String schema) + { + this.schemaStatement = parseStatement(schema, CreateTableStatement.RawStatement.class, "CREATE TABLE"); + return this; + } + + /** + * The partitioner to use. + * <p> + * By default, {@code Murmur3Partitioner} will be used. If this is not the partitioner used + * by the cluster for which the SSTables are created, you need to use this method to + * provide the correct partitioner. + * + * @param partitioner the partitioner to use. + * @return this builder. + */ + public Builder withPartitioner(IPartitioner partitioner) + { + this.partitioner = partitioner; + return this; + } + + + /** + * Specify if the sstable writer should be vnode range aware. + * This will create a sstable per vnode range. + * + * @param makeRangeAware + * @return + */ + public Builder rangeAware(boolean makeRangeAware) + { + this.makeRangeAware = makeRangeAware; + return this; + } + + /** + * The INSERT statement defining the order of the values to add for a given CQL row. + * <p> + * Please note that the provided INSERT statement <b>must</b> use a fully-qualified + * table name, one that include the keyspace name. Morewover, said statement must use + * bind variables since it is those bind variables that will be bound to values by the + * resulting writer. + * <p> + * This is a mandatory option, and this needs to be called after foTable(). + * + * @param insert an insertion statement that defines the order + * of column values to use. + * @return this builder. + * + * @throws IllegalArgumentException if {@code insertStatement} is not a valid insertion + * statement, does not have a fully-qualified table name or have no bind variables. + */ + public Builder using(String insert) + { + this.insertStatement = parseStatement(insert, UpdateStatement.ParsedInsert.class, "INSERT"); + return this; + } + + /** + * The size of the buffer to use. + * <p> + * This defines how much data will be buffered before being written as + * a new SSTable. This correspond roughly to the data size that will have the created + * sstable. + * <p> + * The default is 128MB, which should be reasonable for a 1GB heap. If you experience + * OOM while using the writer, you should lower this value. + * + * @param size the size to use in MB. + * @return this builder. + */ + public Builder withBufferSizeInMB(int size) + { + this.bufferSizeInMB = size; + return this; + } + + /** + * Creates a StressCQLSSTableWriter that expects sorted inputs. + * <p> + * If this option is used, the resulting writer will expect rows to be + * added in SSTable sorted order (and an exception will be thrown if that + * is not the case during insertion). The SSTable sorted order means that + * rows are added such that their partition key respect the partitioner + * order. + * <p> + * You should thus only use this option is you know that you can provide + * the rows in order, which is rarely the case. If you can provide the + * rows in order however, using this sorted might be more efficient. + * <p> + * Note that if used, some option like withBufferSizeInMB will be ignored. + * + * @return this builder. + */ + public Builder sorted() + { + this.sorted = true; + return this; + } + + @SuppressWarnings("resource") + public StressCQLSSTableWriter build() + { + if (directoryList.isEmpty() && cfs == null) + throw new IllegalStateException("No output directories specified, you should provide a directory with inDirectory()"); + if (schemaStatement == null && cfs == null) + throw new IllegalStateException("Missing schema, you should provide the schema for the SSTable to create with forTable()"); + if (insertStatement == null) + throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()"); + + synchronized (StressCQLSSTableWriter.class) + { + if (cfs == null) + cfs = createOfflineTable(schemaStatement, typeStatements, directoryList); + + if (partitioner == null) + partitioner = cfs.getPartitioner(); + + Pair<UpdateStatement, List<ColumnSpecification>> preparedInsert = prepareInsert(); + AbstractSSTableSimpleWriter writer = sorted + ? new SSTableSimpleWriter(cfs.getDirectories().getDirectoryForNewSSTables(), cfs.metadata, preparedInsert.left.updatedColumns()) + : new SSTableSimpleUnsortedWriter(cfs.getDirectories().getDirectoryForNewSSTables(), cfs.metadata, preparedInsert.left.updatedColumns(), bufferSizeInMB); + + if (formatType != null) + writer.setSSTableFormatType(formatType); + + writer.setRangeAwareWriting(makeRangeAware); + + return new StressCQLSSTableWriter(cfs, writer, preparedInsert.left, preparedInsert.right); + } + } + + private static void createTypes(String keyspace, List<CreateTypeStatement> typeStatements) + { + KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace); + Types.RawBuilder builder = Types.rawBuilder(keyspace); + for (CreateTypeStatement st : typeStatements) + st.addToRawBuilder(builder); + + ksm = ksm.withSwapped(builder.build()); + Schema.instance.setKeyspaceMetadata(ksm); + } + + public static ColumnFamilyStore createOfflineTable(String schema, List<File> directoryList) + { + return createOfflineTable(parseStatement(schema, CreateTableStatement.RawStatement.class, "CREATE TABLE"), Collections.EMPTY_LIST, directoryList); + } + + /** + * Creates the table according to schema statement + * with specified data directories + */ + public static ColumnFamilyStore createOfflineTable(CreateTableStatement.RawStatement schemaStatement, List<CreateTypeStatement> typeStatements, List<File> directoryList) + { + String keyspace = schemaStatement.keyspace(); + + if (Schema.instance.getKSMetaData(keyspace) == null) + Schema.instance.load(KeyspaceMetadata.create(keyspace, KeyspaceParams.simple(1))); + + createTypes(keyspace, typeStatements); + + KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace); + + CFMetaData cfMetaData = ksm.tables.getNullable(schemaStatement.columnFamily()); + assert cfMetaData == null; + + CreateTableStatement statement = (CreateTableStatement) schemaStatement.prepare(ksm.types).statement; + statement.validate(ClientState.forInternalCalls()); + + //Build metatdata with a portable cfId + cfMetaData = statement.metadataBuilder() + .withId(CFMetaData.generateLegacyCfId(keyspace, statement.columnFamily())) + .build() + .params(statement.params()); + + Keyspace.setInitialized(); + Directories directories = new Directories(cfMetaData, directoryList.stream().map(Directories.DataDirectory::new).collect(Collectors.toList())); + + Keyspace ks = Keyspace.openWithoutSSTables(keyspace); + ColumnFamilyStore cfs = ColumnFamilyStore.createColumnFamilyStore(ks, cfMetaData.cfName, cfMetaData, directories, false, false, true); + + ks.initCfCustom(cfs); + Schema.instance.load(cfs.metadata); + Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(cfs.metadata))); + + return cfs; + } + + /** + * Prepares insert statement for writing data to SSTable + * + * @return prepared Insert statement and it's bound names + */ + private Pair<UpdateStatement, List<ColumnSpecification>> prepareInsert() + { + ParsedStatement.Prepared cqlStatement = insertStatement.prepare(); + UpdateStatement insert = (UpdateStatement) cqlStatement.statement; + insert.validate(ClientState.forInternalCalls()); + + if (insert.hasConditions()) + throw new IllegalArgumentException("Conditional statements are not supported"); + if (insert.isCounter()) + throw new IllegalArgumentException("Counter update statements are not supported"); + if (cqlStatement.boundNames.isEmpty()) + throw new IllegalArgumentException("Provided insert statement has no bind variables"); + + return Pair.create(insert, cqlStatement.boundNames); + } + } + + public static <T extends ParsedStatement> T parseStatement(String query, Class<T> klass, String type) + { + try + { + ParsedStatement stmt = QueryProcessor.parseStatement(query); + + if (!stmt.getClass().equals(klass)) + throw new IllegalArgumentException("Invalid query, must be a " + type + " statement but was: " + stmt.getClass()); + + return klass.cast(stmt); + } + catch (RequestValidationException e) + { + throw new IllegalArgumentException(e.getMessage(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java b/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java index 664f8d2..4180524 100644 --- a/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java +++ b/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java @@ -24,39 +24,32 @@ import java.net.InetAddress; import java.net.URI; import java.util.*; import java.util.concurrent.*; -import java.util.stream.Collectors; import javax.inject.Inject; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Uninterruptibles; import io.airlift.command.*; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql3.statements.CreateTableStatement; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; -import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.CQLSSTableWriter; +import org.apache.cassandra.io.sstable.StressCQLSSTableWriter; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.locator.TokenMetadata; -import org.apache.cassandra.schema.KeyspaceMetadata; -import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.stress.generate.PartitionGenerator; import org.apache.cassandra.stress.generate.SeedManager; import org.apache.cassandra.stress.operations.userdefined.SchemaInsert; import org.apache.cassandra.stress.settings.StressSettings; -import org.apache.cassandra.tools.Util; import org.apache.cassandra.tools.nodetool.CompactionStats; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; @@ -79,6 +72,11 @@ public abstract class CompactionStress implements Runnable @Option(name = {"-v", "--vnodes"}, description = "number of local tokens to generate (default 256)") Integer numTokens = 256; + static + { + DatabaseDescriptor.toolInitialization(); + } + List<File> getDataDirectories() { List<File> dataDirectories = new ArrayList<>(dataDirs.size()); @@ -112,14 +110,12 @@ public abstract class CompactionStress implements Runnable ColumnFamilyStore initCf(StressProfile stressProfile, boolean loadSSTables) { - Util.initDatabaseDescriptor(); - generateTokens(stressProfile.seedStr, StorageService.instance.getTokenMetadata(), numTokens); CreateTableStatement.RawStatement createStatement = stressProfile.getCreateStatement(); List<File> dataDirectories = getDataDirectories(); - ColumnFamilyStore cfs = CQLSSTableWriter.Builder.createOfflineTable(createStatement, Collections.EMPTY_LIST, dataDirectories); + ColumnFamilyStore cfs = StressCQLSSTableWriter.Builder.createOfflineTable(createStatement, Collections.EMPTY_LIST, dataDirectories); if (loadSSTables) { @@ -302,7 +298,7 @@ public abstract class CompactionStress implements Runnable { //Every thread needs it's own writer final SchemaInsert insert = stressProfile.getOfflineInsert(null, generator, seedManager, settings); - final CQLSSTableWriter tableWriter = insert.createWriter(cfs, bufferSize, makeRangeAware); + final StressCQLSSTableWriter tableWriter = insert.createWriter(cfs, bufferSize, makeRangeAware); executorService.submit(() -> { try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/tools/stress/src/org/apache/cassandra/stress/StressProfile.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java index b45462f..4fb70c6 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java @@ -62,7 +62,7 @@ import org.yaml.snakeyaml.Yaml; import org.yaml.snakeyaml.constructor.Constructor; import org.yaml.snakeyaml.error.YAMLException; -import static org.apache.cassandra.io.sstable.CQLSSTableWriter.parseStatement; +import static org.apache.cassandra.io.sstable.StressCQLSSTableWriter.parseStatement; public class StressProfile implements Serializable { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java index 96b3392..2c717a1 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java @@ -21,26 +21,18 @@ package org.apache.cassandra.stress.operations.userdefined; */ -import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.stream.Collectors; -import com.google.common.util.concurrent.Uninterruptibles; - import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Statement; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.ConsistencyLevel; -import org.apache.cassandra.io.sstable.CQLSSTableWriter; +import org.apache.cassandra.io.sstable.StressCQLSSTableWriter; import org.apache.cassandra.stress.WorkManager; import org.apache.cassandra.stress.generate.*; import org.apache.cassandra.stress.report.Timer; @@ -142,9 +134,9 @@ public class SchemaInsert extends SchemaStatement private class OfflineRun extends Runner { - final CQLSSTableWriter writer; + final StressCQLSSTableWriter writer; - OfflineRun(CQLSSTableWriter writer) + OfflineRun(StressCQLSSTableWriter writer) { this.writer = writer; } @@ -182,9 +174,9 @@ public class SchemaInsert extends SchemaStatement timeWithRetry(new ThriftRun(client)); } - public CQLSSTableWriter createWriter(ColumnFamilyStore cfs, int bufferSize, boolean makeRangeAware) + public StressCQLSSTableWriter createWriter(ColumnFamilyStore cfs, int bufferSize, boolean makeRangeAware) { - return CQLSSTableWriter.builder() + return StressCQLSSTableWriter.builder() .withCfs(cfs) .withBufferSizeInMB(bufferSize) .forTable(tableSchema) @@ -193,7 +185,7 @@ public class SchemaInsert extends SchemaStatement .build(); } - public void runOffline(CQLSSTableWriter writer, WorkManager workManager) throws Exception + public void runOffline(StressCQLSSTableWriter writer, WorkManager workManager) throws Exception { OfflineRun offline = new OfflineRun(writer);
