Repository: cassandra Updated Branches: refs/heads/trunk 6d43fc981 -> cc90d0423
Support UDTs in CQLSStableWriter Patch by Alex Petrov and Stefania Alborghetti; reviewed by Stefania Alborghetti and Aleksey Yeschenko for CASSANDRA-10624. Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc90d042 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc90d042 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc90d042 Branch: refs/heads/trunk Commit: cc90d0423cb64bcf61ad37126c32de85fbca22c6 Parents: 6d43fc9 Author: Alex Petrov <[email protected]> Authored: Tue Apr 5 10:50:59 2016 +0200 Committer: Aleksey Yeschenko <[email protected]> Committed: Wed Apr 13 18:28:39 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/cql3/CQL3Type.java | 5 +- .../cassandra/cql3/functions/UDHelper.java | 4 +- .../cql3/statements/CreateTypeStatement.java | 11 +- .../cassandra/io/sstable/CQLSSTableWriter.java | 278 +++++++++++-------- .../io/sstable/CQLSSTableWriterTest.java | 188 +++++++++++-- 6 files changed, 340 insertions(+), 147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc90d042/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1576c24..8067962 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.6 + * Support UDT in CQLSSTableWriter (CASSANDRA-10624) * Support for non-frozen user-defined types, updating individual fields of user-defined types (CASSANDRA-7423) * Make LZ4 compression level configurable (CASSANDRA-11051) http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc90d042/src/java/org/apache/cassandra/cql3/CQL3Type.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/CQL3Type.java b/src/java/org/apache/cassandra/cql3/CQL3Type.java index d5dfeed..cf7e18a 100644 --- a/src/java/org/apache/cassandra/cql3/CQL3Type.java +++ b/src/java/org/apache/cassandra/cql3/CQL3Type.java @@ -773,7 +773,10 @@ public interface CQL3Type @Override public String toString() { - return name.toString(); + if (frozen) + return "frozen<" + name.toString() + '>'; + else + return name.toString(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc90d042/src/java/org/apache/cassandra/cql3/functions/UDHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/UDHelper.java b/src/java/org/apache/cassandra/cql3/functions/UDHelper.java index 4effdc3..d1c6157 100644 --- a/src/java/org/apache/cassandra/cql3/functions/UDHelper.java +++ b/src/java/org/apache/cassandra/cql3/functions/UDHelper.java @@ -35,7 +35,7 @@ import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.transport.Server; /** - * Helper class for User Defined Functions + Aggregates. + * Helper class for User Defined Functions, Types and Aggregates. */ public final class UDHelper { @@ -66,7 +66,7 @@ public final class UDHelper return codecs; } - static TypeCodec<Object> codecFor(DataType dataType) + public static TypeCodec<Object> codecFor(DataType dataType) { return codecRegistry.codecFor(dataType); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc90d042/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java index e134594..3268296 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java @@ -19,6 +19,7 @@ package org.apache.cassandra.cql3.statements; import java.nio.ByteBuffer; import java.util.*; +import java.util.stream.Collectors; import org.apache.cassandra.auth.Permission; import org.apache.cassandra.config.*; @@ -28,6 +29,7 @@ import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.marshal.UserType; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.Types; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.transport.Event; @@ -97,13 +99,20 @@ public class CreateTypeStatement extends SchemaAlteringStatement } } + public void addToRawBuilder(Types.RawBuilder builder) throws InvalidRequestException + { + builder.add(name.getStringTypeName(), + columnNames.stream().map(ColumnIdentifier::toString).collect(Collectors.toList()), + columnTypes.stream().map(CQL3Type.Raw::toString).collect(Collectors.toList())); + } + @Override public String keyspace() { return name.getKeyspace(); } - private UserType createType() throws InvalidRequestException + public UserType createType() throws InvalidRequestException { List<ByteBuffer> names = new ArrayList<>(columnNames.size()); for (ColumnIdentifier name : columnNames) http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc90d042/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index 81a3356..2de89b1 100644 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@ -21,28 +21,44 @@ import java.io.Closeable; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.*; - -import org.apache.cassandra.config.*; -import org.apache.cassandra.cql3.*; -import org.apache.cassandra.cql3.statements.CFStatement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; + +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.TypeCodec; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.cql3.ColumnSpecification; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UpdateParameters; +import org.apache.cassandra.cql3.functions.UDHelper; import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.cql3.statements.CreateTypeStatement; import org.apache.cassandra.cql3.statements.ParsedStatement; import org.apache.cassandra.cql3.statements.UpdateStatement; import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.UserType; import org.apache.cassandra.db.partitions.Partition; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.schema.Tables; import org.apache.cassandra.schema.Types; import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; /** @@ -50,12 +66,14 @@ import org.apache.cassandra.utils.Pair; * <p> * Typical usage looks like: * <pre> + * String type = CREATE TYPE myKs.myType (a int, b int)"; * String schema = "CREATE TABLE myKs.myTable (" * + " k int PRIMARY KEY," * + " v1 text," - * + " v2 int" + * + " v2 int," + * + " v3 myType," * + ")"; - * String insert = "INSERT INTO myKs.myTable (k, v1, v2) VALUES (?, ?, ?)"; + * String insert = "INSERT INTO myKs.myTable (k, v1, v2, v3) VALUES (?, ?, ?, ?)"; * * // Creates a new writer. You need to provide at least the directory where to write the created sstable, * // the schema for the sstable to write and a (prepared) insert statement to use. If you do not use the @@ -63,13 +81,15 @@ import org.apache.cassandra.utils.Pair; * // CQLSSTableWriter.Builder for more details on the available options. * CQLSSTableWriter writer = CQLSSTableWriter.builder() * .inDirectory("path/to/directory") + * .withType(type) * .forTable(schema) * .using(insert).build(); * + * UserType myType = writer.getUDType("myType"); * // Adds a nember of rows to the resulting sstable - * writer.addRow(0, "test1", 24); - * writer.addRow(1, "test2", null); - * writer.addRow(2, "test3", 42); + * writer.addRow(0, "test1", 24, myType.newValue().setInt("a", 10).setInt("b", 20)); + * writer.addRow(1, "test2", null, null); + * writer.addRow(2, "test3", 42, myType.newValue().setInt("a", 30).setInt("b", 40)); * * // Close the writer, finalizing the sstable * writer.close(); @@ -145,8 +165,14 @@ public class CQLSSTableWriter implements Closeable { int size = Math.min(values.size(), boundNames.size()); List<ByteBuffer> rawValues = new ArrayList<>(size); + for (int i = 0; i < size; i++) - rawValues.add(values.get(i) == null ? null : ((AbstractType)boundNames.get(i).type).decompose(values.get(i))); + { + TypeCodec typeCodec = UDHelper.codecFor(UDHelper.driverType(boundNames.get(i).type)); + rawValues.add(values.get(i) == null ? null : typeCodec.serialize(values.get(i), + ProtocolVersion.NEWEST_SUPPORTED)); + } + return rawAddRow(rawValues); } @@ -175,10 +201,11 @@ public class CQLSSTableWriter implements Closeable { int size = boundNames.size(); List<ByteBuffer> rawValues = new ArrayList<>(size); - for (int i = 0; i < size; i++) { + for (int i = 0; i < size; i++) + { ColumnSpecification spec = boundNames.get(i); Object value = values.get(spec.name.toString()); - rawValues.add(value == null ? null : ((AbstractType)spec.type).decompose(value)); + rawValues.add(value == null ? null : ((AbstractType) spec.type).decompose(value)); } return rawAddRow(rawValues); } @@ -270,6 +297,20 @@ public class CQLSSTableWriter implements Closeable } /** + * Returns the User Defined type, used in this SSTable Writer, that can + * be used to create UDTValue instances. + * + * @param dataType name of the User Defined type + * @return user defined type + */ + public com.datastax.driver.core.UserType getUDType(String dataType) + { + KeyspaceMetadata ksm = Schema.instance.getKSMetaData(insert.keyspace()); + UserType userType = ksm.types.getNullable(ByteBufferUtil.bytes(dataType)); + return (com.datastax.driver.core.UserType) UDHelper.driverType(userType); + } + + /** * Close this writer. * <p> * This method should be called, otherwise the produced sstables are not @@ -289,14 +330,17 @@ public class CQLSSTableWriter implements Closeable protected SSTableFormat.Type formatType = null; - private CFMetaData schema; - private UpdateStatement insert; - private List<ColumnSpecification> boundNames; + private CreateTableStatement.RawStatement schemaStatement; + private final List<CreateTypeStatement> typeStatements; + private UpdateStatement.ParsedInsert insertStatement; + private IPartitioner partitioner; private boolean sorted = false; private long bufferSizeInMB = 128; - protected Builder() {} + protected Builder() { + this.typeStatements = new ArrayList<>(); + } /** * The directory where to write the sstables. @@ -334,6 +378,12 @@ public class CQLSSTableWriter implements Closeable return this; } + public Builder withType(String typeDefinition) throws SyntaxException + { + typeStatements.add(parseStatement(typeDefinition, CreateTypeStatement.class, "CREATE TYPE")); + return this; + } + /** * The schema (CREATE TABLE statement) for the table for which sstable are to be created. * <p> @@ -350,52 +400,8 @@ public class CQLSSTableWriter implements Closeable */ public Builder forTable(String schema) { - try - { - synchronized (CQLSSTableWriter.class) - { - this.schema = getTableMetadata(schema); - - // We need to register the keyspace/table metadata through Schema, otherwise we won't be able to properly - // build the insert statement in using(). - KeyspaceMetadata ksm = Schema.instance.getKSMetaData(this.schema.ksName); - if (ksm == null) - { - createKeyspaceWithTable(this.schema); - } - else if (Schema.instance.getCFMetaData(this.schema.ksName, this.schema.cfName) == null) - { - addTableToKeyspace(ksm, this.schema); - } - return this; - } - } - catch (RequestValidationException e) - { - throw new IllegalArgumentException(e.getMessage(), e); - } - } - - /** - * Creates the keyspace with the specified table. - * - * @param table the table that must be created. - */ - private static void createKeyspaceWithTable(CFMetaData table) - { - Schema.instance.load(KeyspaceMetadata.create(table.ksName, KeyspaceParams.simple(1), Tables.of(table))); - } - - /** - * Adds the table to the to the specified keyspace. - * - * @param keyspace the keyspace to add to - * @param table the table to add - */ - private static void addTableToKeyspace(KeyspaceMetadata keyspace, CFMetaData table) - { - Schema.instance.load(table); - Schema.instance.setKeyspaceMetadata(keyspace.withSwapped(keyspace.tables.with(table))); + this.schemaStatement = parseStatement(schema, CreateTableStatement.RawStatement.class, "CREATE TABLE"); + return this; } /** @@ -410,7 +416,7 @@ public class CQLSSTableWriter implements Closeable */ public Builder withPartitioner(IPartitioner partitioner) { - this.schema = schema.copy(partitioner); + this.partitioner = partitioner; return this; } @@ -424,27 +430,16 @@ public class CQLSSTableWriter implements Closeable * <p> * This is a mandatory option, and this needs to be called after foTable(). * - * @param insertStatement an insertion statement that defines the order + * @param insert an insertion statement that defines the order * of column values to use. * @return this builder. * * @throws IllegalArgumentException if {@code insertStatement} is not a valid insertion * statement, does not have a fully-qualified table name or have no bind variables. */ - public Builder using(String insertStatement) + public Builder using(String insert) { - if (schema == null) - throw new IllegalStateException("You need to define the schema by calling forTable() prior to this call."); - - Pair<UpdateStatement, List<ColumnSpecification>> p = getStatement(insertStatement, UpdateStatement.class, "INSERT"); - this.insert = p.left; - this.boundNames = p.right; - if (this.insert.hasConditions()) - throw new IllegalArgumentException("Conditional statements are not supported"); - if (this.insert.isCounter()) - throw new IllegalArgumentException("Counter update statements are not supported"); - if (this.boundNames.isEmpty()) - throw new IllegalArgumentException("Provided insert statement has no bind variables"); + this.insertStatement = parseStatement(insert, UpdateStatement.ParsedInsert.class, "INSERT"); return this; } @@ -490,54 +485,111 @@ public class CQLSSTableWriter implements Closeable return this; } - private static CFMetaData getTableMetadata(String schema) + @SuppressWarnings("resource") + public CQLSSTableWriter build() { - CFStatement parsed = (CFStatement)QueryProcessor.parseStatement(schema); - // tables with UDTs are currently not supported by CQLSSTableWrite, so we just use Types.none(), for now - // see CASSANDRA-10624 for more details - CreateTableStatement statement = (CreateTableStatement) ((CreateTableStatement.RawStatement) parsed).prepare(Types.none()).statement; - statement.validate(ClientState.forInternalCalls()); - return statement.getCFMetaData(); - } + if (directory == null) + throw new IllegalStateException("No ouptut directory specified, you should provide a directory with inDirectory()"); + if (schemaStatement == null) + throw new IllegalStateException("Missing schema, you should provide the schema for the SSTable to create with forTable()"); + if (insertStatement == null) + throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()"); - private static <T extends CQLStatement> Pair<T, List<ColumnSpecification>> getStatement(String query, Class<T> klass, String type) - { - try + synchronized (CQLSSTableWriter.class) { - ClientState state = ClientState.forInternalCalls(); - ParsedStatement.Prepared prepared = QueryProcessor.getStatement(query, state); - CQLStatement stmt = prepared.statement; - stmt.validate(state); + String keyspace = schemaStatement.keyspace(); + + if (Schema.instance.getKSMetaData(keyspace) == null) + Schema.instance.load(KeyspaceMetadata.create(keyspace, KeyspaceParams.simple(1))); - if (!stmt.getClass().equals(klass)) - throw new IllegalArgumentException("Invalid query, must be a " + type + " statement"); + createTypes(keyspace); + CFMetaData cfMetaData = createTable(keyspace); + Pair<UpdateStatement, List<ColumnSpecification>> preparedInsert = prepareInsert(); - return Pair.create(klass.cast(stmt), prepared.boundNames); + AbstractSSTableSimpleWriter writer = sorted + ? new SSTableSimpleWriter(directory, cfMetaData, preparedInsert.left.updatedColumns()) + : new SSTableSimpleUnsortedWriter(directory, cfMetaData, preparedInsert.left.updatedColumns(), bufferSizeInMB); + + if (formatType != null) + writer.setSSTableFormatType(formatType); + + return new CQLSSTableWriter(writer, preparedInsert.left, preparedInsert.right); } - catch (RequestValidationException e) + } + + private void createTypes(String keyspace) + { + KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace); + Types.RawBuilder builder = Types.rawBuilder(keyspace); + for (CreateTypeStatement st : typeStatements) + st.addToRawBuilder(builder); + + ksm = ksm.withSwapped(builder.build()); + Schema.instance.setKeyspaceMetadata(ksm); + } + /** + * Creates the table according to schema statement + * + * @param keyspace name of the keyspace where table should be created + */ + private CFMetaData createTable(String keyspace) + { + KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace); + + CFMetaData cfMetaData = ksm.tables.getNullable(schemaStatement.columnFamily()); + if (cfMetaData == null) { - throw new IllegalArgumentException(e.getMessage(), e); + CreateTableStatement statement = (CreateTableStatement) schemaStatement.prepare(ksm.types).statement; + statement.validate(ClientState.forInternalCalls()); + + cfMetaData = statement.getCFMetaData(); + + Schema.instance.load(cfMetaData); + Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(cfMetaData))); } + + if (partitioner != null) + return cfMetaData.copy(partitioner); + else + return cfMetaData; } - @SuppressWarnings("resource") - public CQLSSTableWriter build() + /** + * Prepares insert statement for writing data to SSTable + * + * @return prepared Insert statement and it's bound names + */ + private Pair<UpdateStatement, List<ColumnSpecification>> prepareInsert() { - if (directory == null) - throw new IllegalStateException("No ouptut directory specified, you should provide a directory with inDirectory()"); - if (schema == null) - throw new IllegalStateException("Missing schema, you should provide the schema for the SSTable to create with forTable()"); - if (insert == null) - throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()"); + ParsedStatement.Prepared cqlStatement = insertStatement.prepare(); + UpdateStatement insert = (UpdateStatement) cqlStatement.statement; + insert.validate(ClientState.forInternalCalls()); - AbstractSSTableSimpleWriter writer = sorted - ? new SSTableSimpleWriter(directory, schema, insert.updatedColumns()) - : new SSTableSimpleUnsortedWriter(directory, schema, insert.updatedColumns(), bufferSizeInMB); + if (insert.hasConditions()) + throw new IllegalArgumentException("Conditional statements are not supported"); + if (insert.isCounter()) + throw new IllegalArgumentException("Counter update statements are not supported"); + if (cqlStatement.boundNames.isEmpty()) + throw new IllegalArgumentException("Provided insert statement has no bind variables"); + + return Pair.create(insert, cqlStatement.boundNames); + } + } + + private static <T extends ParsedStatement> T parseStatement(String query, Class<T> klass, String type) + { + try + { + ParsedStatement stmt = QueryProcessor.parseStatement(query); - if (formatType != null) - writer.setSSTableFormatType(formatType); + if (!stmt.getClass().equals(klass)) + throw new IllegalArgumentException("Invalid query, must be a " + type + " statement but was: " + stmt.getClass()); - return new CQLSSTableWriter(writer, insert, boundNames); + return klass.cast(stmt); + } + catch (RequestValidationException e) + { + throw new IllegalArgumentException(e.getMessage(), e); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc90d042/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java index 557beba..437e7a3 100644 --- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java @@ -20,10 +20,10 @@ package org.apache.cassandra.io.sstable; import java.io.File; import java.io.FilenameFilter; import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Iterator; -import java.util.UUID; +import java.util.*; +import java.util.concurrent.ExecutionException; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.io.Files; @@ -33,16 +33,19 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.Config; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.cql3.QueryProcessor; -import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.config.*; +import org.apache.cassandra.cql3.*; +import org.apache.cassandra.cql3.functions.UDHelper; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.*; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.OutputHandler; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.TypeCodec; +import com.datastax.driver.core.UDTValue; +import com.datastax.driver.core.UserType; import static org.junit.Assert.assertEquals; @@ -92,24 +95,7 @@ public class CQLSSTableWriterTest writer.close(); - SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client() - { - private String keyspace; - - public void init(String keyspace) - { - this.keyspace = keyspace; - for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace")) - addRangeForEndpoint(range, FBUtilities.getBroadcastAddress()); - } - - public CFMetaData getTableMetadata(String cfName) - { - return Schema.instance.getCFMetaData(keyspace, cfName); - } - }, new OutputHandler.SystemOutput(false, false)); - - loader.stream().get(); + loadSSTables(dataDir, KS); UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace.table1;"); assertEquals(4, rs.size()); @@ -300,6 +286,151 @@ public class CQLSSTableWriterTest } } + loadSSTables(dataDir, KS); + + UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace2.table2;"); + assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size()); + } + + @Test + @SuppressWarnings("unchecked") + public void testWritesWithUdts() throws Exception + { + final String KS = "cql_keyspace3"; + final String TABLE = "table3"; + + final String schema = "CREATE TABLE " + KS + "." + TABLE + " (" + + " k int," + + " v1 list<frozen<tuple2>>," + + " v2 frozen<tuple3>," + + " PRIMARY KEY (k)" + + ")"; + + File tempdir = Files.createTempDir(); + File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE); + assert dataDir.mkdirs(); + + CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .withType("CREATE TYPE " + KS + ".tuple2 (a int, b int)") + .withType("CREATE TYPE " + KS + ".tuple3 (a int, b int, c int)") + .forTable(schema) + .using("INSERT INTO " + KS + "." + TABLE + " (k, v1, v2) " + + "VALUES (?, ?, ?)").build(); + + UserType tuple2Type = writer.getUDType("tuple2"); + UserType tuple3Type = writer.getUDType("tuple3"); + for (int i = 0; i < 100; i++) + { + writer.addRow(i, + ImmutableList.builder() + .add(tuple2Type.newValue() + .setInt("a", i * 10) + .setInt("b", i * 20)) + .add(tuple2Type.newValue() + .setInt("a", i * 30) + .setInt("b", i * 40)) + .build(), + tuple3Type.newValue() + .setInt("a", i * 100) + .setInt("b", i * 200) + .setInt("c", i * 300)); + } + + writer.close(); + loadSSTables(dataDir, KS); + + UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE); + TypeCodec collectionCodec = UDHelper.codecFor(DataType.CollectionType.frozenList(tuple2Type)); + TypeCodec tuple3Codec = UDHelper.codecFor(tuple3Type); + + assertEquals(resultSet.size(), 100); + int cnt = 0; + for (UntypedResultSet.Row row: resultSet) { + assertEquals(cnt, + row.getInt("k")); + List<UDTValue> values = (List<UDTValue>) collectionCodec.deserialize(row.getBytes("v1"), + ProtocolVersion.NEWEST_SUPPORTED); + assertEquals(values.get(0).getInt("a"), cnt * 10); + assertEquals(values.get(0).getInt("b"), cnt * 20); + assertEquals(values.get(1).getInt("a"), cnt * 30); + assertEquals(values.get(1).getInt("b"), cnt * 40); + + UDTValue v2 = (UDTValue) tuple3Codec.deserialize(row.getBytes("v2"), ProtocolVersion.NEWEST_SUPPORTED); + + assertEquals(v2.getInt("a"), cnt * 100); + assertEquals(v2.getInt("b"), cnt * 200); + assertEquals(v2.getInt("c"), cnt * 300); + cnt++; + } + } + + @Test + @SuppressWarnings("unchecked") + public void testWritesWithDependentUdts() throws Exception + { + final String KS = "cql_keyspace4"; + final String TABLE = "table4"; + + final String schema = "CREATE TABLE " + KS + "." + TABLE + " (" + + " k int," + + " v1 frozen<nested_tuple>," + + " PRIMARY KEY (k)" + + ")"; + + File tempdir = Files.createTempDir(); + File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE); + assert dataDir.mkdirs(); + + CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .withType("CREATE TYPE " + KS + ".nested_tuple (c int, tpl frozen<tuple2>)") + .withType("CREATE TYPE " + KS + ".tuple2 (a int, b int)") + .forTable(schema) + .using("INSERT INTO " + KS + "." + TABLE + " (k, v1) " + + "VALUES (?, ?)") + .build(); + + UserType tuple2Type = writer.getUDType("tuple2"); + UserType nestedTuple = writer.getUDType("nested_tuple"); + TypeCodec tuple2Codec = UDHelper.codecFor(tuple2Type); + TypeCodec nestedTupleCodec = UDHelper.codecFor(nestedTuple); + + for (int i = 0; i < 100; i++) + { + writer.addRow(i, + nestedTuple.newValue() + .setInt("c", i * 100) + .set("tpl", + tuple2Type.newValue() + .setInt("a", i * 200) + .setInt("b", i * 300), + tuple2Codec)); + } + + writer.close(); + loadSSTables(dataDir, KS); + + UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE); + + assertEquals(resultSet.size(), 100); + int cnt = 0; + for (UntypedResultSet.Row row: resultSet) { + assertEquals(cnt, + row.getInt("k")); + UDTValue nestedTpl = (UDTValue) nestedTupleCodec.deserialize(row.getBytes("v1"), + ProtocolVersion.NEWEST_SUPPORTED); + assertEquals(nestedTpl.getInt("c"), cnt * 100); + UDTValue tpl = nestedTpl.getUDTValue("tpl"); + assertEquals(tpl.getInt("a"), cnt * 200); + assertEquals(tpl.getInt("b"), cnt * 300); + + cnt++; + } + } + + private static void loadSSTables(File dataDir, String ks) throws ExecutionException, InterruptedException + { SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client() { private String keyspace; @@ -307,7 +438,7 @@ public class CQLSSTableWriterTest public void init(String keyspace) { this.keyspace = keyspace; - for (Range<Token> range : StorageService.instance.getLocalRanges(KS)) + for (Range<Token> range : StorageService.instance.getLocalRanges(ks)) addRangeForEndpoint(range, FBUtilities.getBroadcastAddress()); } @@ -318,8 +449,5 @@ public class CQLSSTableWriterTest }, new OutputHandler.SystemOutput(false, false)); loader.stream().get(); - - UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace2.table2;"); - assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size()); } }
