Add CQL-aware SSTableWriter patch by slebresne; reviewed by jbellis for CASSANDRA-5894
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/97cbf6ad Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/97cbf6ad Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/97cbf6ad Branch: refs/heads/trunk Commit: 97cbf6ad33af1fe3d16912bd3280b53bf9f22bb2 Parents: a999b15 Author: Sylvain Lebresne <[email protected]> Authored: Tue Oct 29 11:11:05 2013 +0100 Committer: Sylvain Lebresne <[email protected]> Committed: Tue Oct 29 11:11:05 2013 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 10 + .../org/apache/cassandra/config/CFMetaData.java | 3 +- .../apache/cassandra/cql3/QueryProcessor.java | 2 +- .../cql3/statements/ModificationStatement.java | 4 +- .../cql3/statements/UpdateStatement.java | 9 +- .../io/sstable/AbstractSSTableSimpleWriter.java | 19 + .../cassandra/io/sstable/CQLSSTableWriter.java | 476 +++++++++++++++++++ .../io/sstable/SSTableSimpleUnsortedWriter.java | 2 +- .../io/sstable/SSTableSimpleWriter.java | 2 +- .../cassandra/io/sstable/SSTableWriter.java | 37 +- .../unit/org/apache/cassandra/SchemaLoader.java | 14 +- .../io/sstable/CQLSSTableWriterTest.java | 115 +++++ 13 files changed, 671 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0d60fae..7bf7f21 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -5,6 +5,7 @@ * Update memtable size while flushing (CASSANDRA-6249) * Provide hooks around CQL2/CQL3 statement execution (CASSANDRA-6252) * Require Permission.SELECT for CAS updates (CASSANDRA-6247) + * New CQL-aware SSTableWriter (CASSANDRA-5894) Merged from 1.2: * Require logging in for Thrift CQL2/3 statement preparation (CASSANDRA-6254) http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 69ab4fd..2489f26 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -19,6 +19,16 @@ using the provided 'sstableupgrade' tool. New features ------------ + - A new CQLSSTableWriter class has been added. It is the equivalent of + the existing SSTableSimpleWriter/SSTableSimpleUnsortedWriter but is + CQL oriented. + + +2.0.2 +===== + +New features +------------ - Speculative retry defaults to 99th percentile (See blog post at http://www.datastax.com/dev/blog/rapid-read-protection-in-cassandra-2-0-2) - Configurable metrics reporting http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index 20c35b3..f53c60c 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -476,7 +476,8 @@ public final class CFMetaData return compile(cql, Keyspace.SYSTEM_KS); } - private static CFMetaData compile(String cql, String keyspace) + @VisibleForTesting + public static CFMetaData compile(String cql, String keyspace) { try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index ec8b379..dc2649c 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -365,7 +365,7 @@ public class QueryProcessor hook.processBatch(batch, context); } - private static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState) + public static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState) throws RequestValidationException { Tracing.trace("Parsing {}", queryStr); http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 70bafb4..0f425b8 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -205,7 +205,7 @@ public abstract class ModificationStatement implements CQLStatement } } - private List<ByteBuffer> buildPartitionKeyNames(List<ByteBuffer> variables) + public List<ByteBuffer> buildPartitionKeyNames(List<ByteBuffer> variables) throws InvalidRequestException { CFDefinition cfDef = cfm.getCfDef(); @@ -241,7 +241,7 @@ public abstract class ModificationStatement implements CQLStatement return keys; } - private ColumnNameBuilder createClusteringPrefixBuilder(List<ByteBuffer> variables) + public ColumnNameBuilder createClusteringPrefixBuilder(List<ByteBuffer> variables) throws InvalidRequestException { CFDefinition cfDef = cfm.getCfDef(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java index 89f17a7..12348df 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java @@ -45,11 +45,10 @@ public class UpdateStatement extends ModificationStatement return true; } - public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params) + public void addUpdateForKey(ColumnFamily cf, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params) throws InvalidRequestException { CFDefinition cfDef = cfm.getCfDef(); - ColumnFamily cf = UnsortedColumns.factory.create(cfm); // Inserting the CQL row marker (see #4361) // We always need to insert a marker, because of the following situation: @@ -97,7 +96,13 @@ public class UpdateStatement extends ModificationStatement for (Operation update : updates) update.execute(key, cf, builder.copy(), params); } + } + public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params) + throws InvalidRequestException + { + ColumnFamily cf = UnsortedColumns.factory.create(cfm); + addUpdateForKey(cf, key, builder, params); return cf; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java index 23f5c85..0059fda 100644 --- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java @@ -164,6 +164,25 @@ public abstract class AbstractSSTableSimpleWriter */ public abstract void close() throws IOException; + /** + * Package protected for use by AbstractCQLSSTableWriter. + * Not meant to be exposed publicly. + */ + ColumnFamily currentColumnFamily() + { + return columnFamily; + } + + /** + * Package protected for use by AbstractCQLSSTableWriter. + * Not meant to be exposed publicly. + */ + DecoratedKey currentKey() + { + return currentKey; + } + + protected abstract void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException; protected abstract ColumnFamily getColumnFamily(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java new file mode 100644 index 0000000..86348aa --- /dev/null +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@ -0,0 +1,476 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.ImmutableMap; + +import org.apache.cassandra.cql3.statements.*; +import org.apache.cassandra.cql3.*; +import org.apache.cassandra.config.*; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.io.compress.CompressionParameters; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.utils.Pair; + +/** + * Utility to write SSTables. + * <p> + * Typical usage looks like: + * <pre> + * String schema = "CREATE TABLE myKs.myTable (" + * + " k int PRIMARY KEY," + * + " v1 text," + * + " v2 int" + * + ")"; + * String insert = "INSERT INTO myKs.myTable (k, v1, v2) VALUES (?, ?, ?)"; + * + * // Creates a new writer. You need to provide at least the directory where to write the created sstable, + * // the schema for the sstable to write and a (prepared) insert statement to use. If you do not use the + * // default partitioner (Murmur3Partitioner), you will also need to provide the partitioner in use, see + * // CQLSSTableWriter.Builder for more details on the available options. + * CQLSSTableWriter writer = CQLSSTableWriter.builder() + * .inDirectory("path/to/directory") + * .forTable(schema) + * .using(insert).build(); + * + * // Adds a nember of rows to the resulting sstable + * writer.addRow(0, "test1", 24); + * writer.addRow(1, "test2", null); + * writer.addRow(2, "test3", 42); + * + * // Close the writer, finalizing the sstable + * writer.close(); + * </pre> + */ +public class CQLSSTableWriter +{ + private final AbstractSSTableSimpleWriter writer; + private final UpdateStatement insert; + private final List<ColumnSpecification> boundNames; + + private CQLSSTableWriter(AbstractSSTableSimpleWriter writer, UpdateStatement insert, List<ColumnSpecification> boundNames) + { + this.writer = writer; + this.insert = insert; + this.boundNames = boundNames; + } + + /** + * Returns a new builder for a CQLSSTableWriter. + * + * @return the new builder. + */ + public static Builder builder() + { + return new Builder(); + } + + /** + * Adds a new row to the writer. + * <p> + * This is a shortcut for {@code addRow(Arrays.asList(values))}. + * + * @param values the row values (corresponding to the bind variables of the + * insertion statement used when creating by this writer). + * @return this writer. + */ + public CQLSSTableWriter addRow(Object... values) + throws InvalidRequestException, IOException + { + return addRow(Arrays.asList(values)); + } + + /** + * Adds a new row to the writer. + * <p> + * Each provided value type should correspond to the types of the CQL column + * the value is for. The correspondance between java type and CQL type is the + * same one than the one documented at + * www.datastax.com/drivers/java/2.0/apidocs/com/datastax/driver/core/DataType.Name.html#asJavaClass(). + * <p> + * If you prefer providing the values directly as binary, use + * {@link #rawAddRow} instead. + * + * @param values the row values (corresponding to the bind variables of the + * insertion statement used when creating by this writer). + * @return this writer. + */ + public CQLSSTableWriter addRow(List<Object> values) + throws InvalidRequestException, IOException + { + int size = Math.min(values.size(), boundNames.size()); + List<ByteBuffer> rawValues = new ArrayList<>(size); + for (int i = 0; i < size; i++) + rawValues.add(values.get(i) == null ? null : ((AbstractType)boundNames.get(i).type).decompose(values.get(i))); + return rawAddRow(rawValues); + } + + /** + * Adds a new row to the writer. + * <p> + * This is equivalent to the other addRow methods, but takes a map whose + * keys are the names of the columns to add instead of taking a list of the + * values in the order of the insert statement used during construction of + * this write. + * + * @param values a map of colum name to column values representing the new + * row to add. Note that if a column is not part of the map, it's value will + * be {@code null}. If the map contains keys that does not correspond to one + * of the column of the insert statement used when creating this writer, the + * the corresponding value is ignored. + * @return this writer. + */ + public CQLSSTableWriter addRow(Map<String, Object> values) + throws InvalidRequestException, IOException + { + int size = Math.min(values.size(), boundNames.size()); + List<ByteBuffer> rawValues = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + ColumnSpecification spec = boundNames.get(i); + rawValues.add(((AbstractType)spec.type).decompose(values.get(spec.name.toString()))); + } + return rawAddRow(rawValues); + } + + /** + * Adds a new row to the writer given already serialized values. + * + * @param values the row values (corresponding to the bind variables of the + * insertion statement used when creating by this writer) as binary. + * @return this writer. + */ + public CQLSSTableWriter rawAddRow(ByteBuffer... values) + throws InvalidRequestException, IOException + { + return rawAddRow(Arrays.asList(values)); + } + + /** + * Adds a new row to the writer given already serialized values. + * <p> + * This is a shortcut for {@code rawAddRow(Arrays.asList(values))}. + * + * @param values the row values (corresponding to the bind variables of the + * insertion statement used when creating by this writer) as binary. + * @return this writer. + */ + public CQLSSTableWriter rawAddRow(List<ByteBuffer> values) + throws InvalidRequestException, IOException + { + if (values.size() != boundNames.size()) + throw new InvalidRequestException(String.format("Invalid number of arguments, expecting %d values but got %d", boundNames.size(), values.size())); + + List<ByteBuffer> keys = insert.buildPartitionKeyNames(values); + ColumnNameBuilder clusteringPrefix = insert.createClusteringPrefixBuilder(values); + + long now = System.currentTimeMillis() * 1000; + UpdateParameters params = new UpdateParameters(insert.cfm, + values, + insert.getTimestamp(now, values), + insert.getTimeToLive(values), + Collections.<ByteBuffer, ColumnGroupMap>emptyMap()); + + for (ByteBuffer key: keys) + { + if (writer.currentKey() == null || !key.equals(writer.currentKey().key)) + writer.newRow(key); + insert.addUpdateForKey(writer.currentColumnFamily(), key, clusteringPrefix, params); + } + return this; + } + + /** + * Adds a new row to the writer given already serialized values. + * <p> + * This is equivalent to the other rawAddRow methods, but takes a map whose + * keys are the names of the columns to add instead of taking a list of the + * values in the order of the insert statement used during construction of + * this write. + * + * @param values a map of colum name to column values representing the new + * row to add. Note that if a column is not part of the map, it's value will + * be {@code null}. If the map contains keys that does not correspond to one + * of the column of the insert statement used when creating this writer, the + * the corresponding value is ignored. + * @return this writer. + */ + public CQLSSTableWriter rawAddRow(Map<String, ByteBuffer> values) + throws InvalidRequestException, IOException + { + int size = Math.min(values.size(), boundNames.size()); + List<ByteBuffer> rawValues = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + ColumnSpecification spec = boundNames.get(i); + rawValues.add(values.get(spec.name.toString())); + } + return rawAddRow(rawValues); + } + + /** + * Close this writer. + * <p> + * This method should be called, otherwise the produced sstables are not + * guaranteed to be complete (and won't be in practice). + */ + public void close() throws IOException + { + writer.close(); + } + + /** + * A Builder for a CQLSSTableWriter object. + */ + public static class Builder + { + private File directory; + private IPartitioner partitioner = new Murmur3Partitioner(); + + private CFMetaData schema; + private UpdateStatement insert; + private List<ColumnSpecification> boundNames; + + private boolean sorted = false; + private long bufferSizeInMB = 128; + + private Builder() {} + + /** + * The directory where to write the sstables. + * <p> + * This is a mandatory option. + * + * @param directory the directory to use, which should exists and be writable. + * @return this builder. + * + * @throws IllegalArgumentException if {@code directory} doesn't exist or is not writable. + */ + public Builder inDirectory(String directory) + { + return inDirectory(new File(directory)); + } + + /** + * The directory where to write the sstables (mandatory option). + * <p> + * This is a mandatory option. + * + * @param directory the directory to use, which should exists and be writable. + * @return this builder. + * + * @throws IllegalArgumentException if {@code directory} doesn't exist or is not writable. + */ + public Builder inDirectory(File directory) + { + if (!directory.exists()) + throw new IllegalArgumentException(directory + " doesn't exists"); + if (!directory.canWrite()) + throw new IllegalArgumentException(directory + " exists but is not writable"); + + this.directory = directory; + return this; + } + + /** + * The schema (CREATE TABLE statement) for the table for which sstable are to be created. + * <p> + * Please note that the provided CREATE TABLE statement <b>must</b> use a fully-qualified + * table name, one that include the keyspace name. + * <p> + * This is a mandatory option. + * + * @param schema the schema of the table for which sstables are to be created. + * @return this builder. + * + * @throws IllegalArgumentException if {@code schema} is not a valid CREATE TABLE statement + * or does not have a fully-qualified table name. + */ + public Builder forTable(String schema) + { + try + { + this.schema = getStatement(schema, CreateTableStatement.class, "CREATE TABLE").left.getCFMetaData().rebuild(); + + // We need to register the keyspace/table metadata through Schema, otherwise we won't be able to properly + // build the insert statement in using(). + KSMetaData ksm = KSMetaData.newKeyspace(this.schema.ksName, + AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"), + ImmutableMap.of("replication_factor", "1"), + true, + Collections.singleton(this.schema)); + + Schema.instance.load(ksm); + return this; + } + catch (RequestValidationException e) + { + throw new IllegalArgumentException(e.getMessage(), e); + } + } + + /** + * The partitioner to use. + * <p> + * By default, {@code Murmur3Partitioner} will be used. If this is not the partitioner used + * by the cluster for which the SSTables are created, you need to use this method to + * provide the correct partitioner. + * + * @param partitioner the partitioner to use. + * @return this builder. + */ + public Builder withPartitioner(IPartitioner partitioner) + { + this.partitioner = partitioner; + return this; + } + + /** + * The INSERT statement defining the order of the values to add for a given CQL row. + * <p> + * Please note that the provided INSERT statement <b>must</b> use a fully-qualified + * table name, one that include the keyspace name. Morewover, said statement must use + * bind variables since it is those bind variables that will be bound to values by the + * resulting writer. + * <p> + * This is a mandatory option, and this needs to be called after foTable(). + * + * @param insertStatement an insertion statement that defines the order + * of column values to use. + * @return this builder. + * + * @throws IllegalArgumentException if {@code insertStatement} is not a valid insertion + * statement, does not have a fully-qualified table name or have no bind variables. + */ + public Builder using(String insertStatement) + { + if (schema == null) + throw new IllegalStateException("You need to define the schema by calling forTable() prior to this call."); + + Pair<UpdateStatement, List<ColumnSpecification>> p = getStatement(insertStatement, UpdateStatement.class, "INSERT"); + this.insert = p.left; + this.boundNames = p.right; + if (this.insert.hasConditions()) + throw new IllegalArgumentException("Conditional statements are not supported"); + if (this.boundNames.isEmpty()) + throw new IllegalArgumentException("Provided insert statement has no bind variables"); + return this; + } + + /** + * The size of the buffer to use. + * <p> + * This defines how much data will be buffered before being written as + * a new SSTable. This correspond roughly to the data size that will have the created + * sstable. + * <p> + * The default is 128MB, which should be reasonable for a 1GB heap. If you experience + * OOM while using the writer, you should lower this value. + * + * @param size the size to use in MB. + * @return this builder. + */ + public Builder withBufferSizeInMB(int size) + { + this.bufferSizeInMB = size; + return this; + } + + /** + * Creates a CQLSSTableWriter that expects sorted inputs. + * <p> + * If this option is used, the resulting writer will expect rows to be + * added in SSTable sorted order (and an exception will be thrown if that + * is not the case during insertion). The SSTable sorted order means that + * rows are added such that their partition key respect the partitioner + * order and for a given partition, that the rows respect the clustering + * columns order. + * <p> + * You should thus only use this option is you know that you can provide + * the rows in order, which is rarely the case. If you can provide the + * rows in order however, using this sorted might be more efficient. + * <p> + * Note that if used, some option like withBufferSizeInMB will be ignored. + * + * @return this builder. + */ + public Builder sorted() + { + this.sorted = true; + return this; + } + + private static <T extends CQLStatement> Pair<T, List<ColumnSpecification>> getStatement(String query, Class<T> klass, String type) + { + try + { + ClientState state = ClientState.forInternalCalls(); + ParsedStatement.Prepared prepared = QueryProcessor.getStatement(query, state); + CQLStatement stmt = prepared.statement; + stmt.validate(state); + + if (!stmt.getClass().equals(klass)) + throw new IllegalArgumentException("Invalid query, must be a " + type + " statement"); + + return Pair.create(klass.cast(stmt), prepared.boundNames); + } + catch (RequestValidationException e) + { + throw new IllegalArgumentException(e.getMessage(), e); + } + } + + public CQLSSTableWriter build() + { + if (directory == null) + throw new IllegalStateException("No ouptut directory specified, you should provide a directory with inDirectory()"); + if (schema == null) + throw new IllegalStateException("Missing schema, you should provide the schema for the SSTable to create with forTable()"); + if (insert == null) + throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()"); + + AbstractSSTableSimpleWriter writer; + if (sorted) + { + writer = new SSTableSimpleWriter(directory, + schema, + partitioner); + } + else + { + writer = new SSTableSimpleUnsortedWriter(directory, + schema, + partitioner, + bufferSizeInMB); + } + return new CQLSSTableWriter(writer, insert, boundNames); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java index 52e5a03..6b39024 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java @@ -193,7 +193,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter writer = getWriter(); for (Map.Entry<DecoratedKey, ColumnFamily> entry : b.entrySet()) writer.append(entry.getKey(), entry.getValue()); - writer.closeAndOpenReader(); + writer.close(); } } catch (Throwable e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java index f0b45b5..9b584f0 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java @@ -72,7 +72,7 @@ public class SSTableSimpleWriter extends AbstractSSTableSimpleWriter { if (currentKey != null) writeRow(currentKey, columnFamily); - writer.closeAndOpenReader(); + writer.close(); } catch (FSError e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index 5b3abfc..ac598bd 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -38,6 +38,7 @@ import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FilterFactory; import org.apache.cassandra.utils.IFilter; +import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.StreamingHistogram; public class SSTableWriter extends SSTable @@ -307,20 +308,9 @@ public class SSTableWriter extends SSTable public SSTableReader closeAndOpenReader(long maxDataAge) { - // index and filter - iwriter.close(); - // main data, close will truncate if necessary - dataFile.close(); - // write sstable statistics - SSTableMetadata sstableMetadata = sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(), - metadata.getBloomFilterFpChance()); - writeMetadata(descriptor, sstableMetadata, sstableMetadataCollector.ancestors); - - // save the table of components - SSTable.appendTOC(descriptor, components); - - // remove the 'tmp' marker from all components - final Descriptor newdesc = rename(descriptor, components); + Pair<Descriptor, SSTableMetadata> p = close(); + Descriptor newdesc = p.left; + SSTableMetadata sstableMetadata = p.right; // finalize in-memory state for the reader SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(SSTable.COMPONENT_INDEX)); @@ -344,6 +334,25 @@ public class SSTableWriter extends SSTable return sstable; } + // Close the writer and return the descriptor to the new sstable and it's metadata + public Pair<Descriptor, SSTableMetadata> close() + { + // index and filter + iwriter.close(); + // main data, close will truncate if necessary + dataFile.close(); + // write sstable statistics + SSTableMetadata sstableMetadata = sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(), + metadata.getBloomFilterFpChance()); + writeMetadata(descriptor, sstableMetadata, sstableMetadataCollector.ancestors); + + // save the table of components + SSTable.appendTOC(descriptor, components); + + // remove the 'tmp' marker from all components + return Pair.create(rename(descriptor, components), sstableMetadata); + } + private static void writeMetadata(Descriptor desc, SSTableMetadata sstableMetadata, Set<Integer> ancestors) { SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(SSTable.COMPONENT_STATS)), true); http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/test/unit/org/apache/cassandra/SchemaLoader.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java index 55cd329..f3cc38f 100644 --- a/test/unit/org/apache/cassandra/SchemaLoader.java +++ b/test/unit/org/apache/cassandra/SchemaLoader.java @@ -107,7 +107,7 @@ public class SchemaLoader String ks_rcs = "RowCacheSpace"; String ks_nocommit = "NoCommitlogSpace"; String ks_prsi = "PerRowSecondaryIndex"; - + String ks_cql = "cql_keyspace"; Class<? extends AbstractReplicationStrategy> simple = SimpleStrategy.class; @@ -290,6 +290,18 @@ public class SchemaLoader opts_rf1, perRowIndexedCFMD(ks_prsi, "Indexed1", withOldCfIds))); + // CQLKeyspace + schema.add(KSMetaData.testMetadata(ks_cql, + simple, + opts_rf1, + + // Column Families + CFMetaData.compile("CREATE TABLE table1 (" + + "k int PRIMARY KEY," + + "v1 text," + + "v2 int" + + ")", ks_cql))); + if (Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false"))) useCompression(schema); http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java new file mode 100644 index 0000000..7095b35 --- /dev/null +++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable; + +import java.io.File; +import java.util.Iterator; + +import com.google.common.io.Files; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.cql3.*; +import org.apache.cassandra.db.*; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.OutputHandler; + +public class CQLSSTableWriterTest extends SchemaLoader +{ + @BeforeClass + public static void setup() throws Exception + { + StorageService.instance.initServer(); + } + + @Test + public void testUnsortedWriter() throws Exception + { + String KS = "cql_keyspace"; + String TABLE = "table1"; + + File tempdir = Files.createTempDir(); + File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE); + assert dataDir.mkdirs(); + + String schema = "CREATE TABLE cql_keyspace.table1 (" + + " k int PRIMARY KEY," + + " v1 text," + + " v2 int" + + ")"; + String insert = "INSERT INTO cql_keyspace.table1 (k, v1, v2) VALUES (?, ?, ?)"; + CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .forTable(schema) + .withPartitioner(StorageService.instance.getPartitioner()) + .using(insert).build(); + + writer.addRow(0, "test1", 24); + writer.addRow(1, "test2", null); + writer.addRow(2, "test3", 42); + writer.close(); + + SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client() + { + public void init(String keyspace) + { + for (Range<Token> range : StorageService.instance.getLocalRanges("Keyspace1")) + addRangeForEndpoint(range, FBUtilities.getBroadcastAddress()); + setPartitioner(StorageService.getPartitioner()); + } + + public CFMetaData getCFMetaData(String keyspace, String cfName) + { + return Schema.instance.getCFMetaData(keyspace, cfName); + } + }, new OutputHandler.SystemOutput(false, false)); + + loader.stream().get(); + + UntypedResultSet rs = QueryProcessor.processInternal("SELECT * FROM cql_keyspace.table1;"); + assertEquals(3, rs.size()); + + Iterator<UntypedResultSet.Row> iter = rs.iterator(); + UntypedResultSet.Row row; + + row = iter.next(); + assertEquals(0, row.getInt("k")); + assertEquals("test1", row.getString("v1")); + assertEquals(24, row.getInt("v2")); + + row = iter.next(); + assertEquals(1, row.getInt("k")); + assertEquals("test2", row.getString("v1")); + assertFalse(row.has("v2")); + + row = iter.next(); + assertEquals(2, row.getInt("k")); + assertEquals("test3", row.getString("v1")); + assertEquals(42, row.getInt("v2")); + } +}
