Repository: cassandra
Updated Branches:
  refs/heads/trunk 6d43fc981 -> cc90d0423


Support UDTs in CQLSStableWriter

Patch by Alex Petrov and Stefania Alborghetti;
reviewed by Stefania Alborghetti and Aleksey Yeschenko for CASSANDRA-10624.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc90d042
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc90d042
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc90d042

Branch: refs/heads/trunk
Commit: cc90d0423cb64bcf61ad37126c32de85fbca22c6
Parents: 6d43fc9
Author: Alex Petrov <[email protected]>
Authored: Tue Apr 5 10:50:59 2016 +0200
Committer: Aleksey Yeschenko <[email protected]>
Committed: Wed Apr 13 18:28:39 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/cql3/CQL3Type.java     |   5 +-
 .../cassandra/cql3/functions/UDHelper.java      |   4 +-
 .../cql3/statements/CreateTypeStatement.java    |  11 +-
 .../cassandra/io/sstable/CQLSSTableWriter.java  | 278 +++++++++++--------
 .../io/sstable/CQLSSTableWriterTest.java        | 188 +++++++++++--
 6 files changed, 340 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc90d042/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1576c24..8067962 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.6
+ * Support UDT in CQLSSTableWriter (CASSANDRA-10624)
  * Support for non-frozen user-defined types, updating
    individual fields of user-defined types (CASSANDRA-7423)
  * Make LZ4 compression level configurable (CASSANDRA-11051)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc90d042/src/java/org/apache/cassandra/cql3/CQL3Type.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQL3Type.java 
b/src/java/org/apache/cassandra/cql3/CQL3Type.java
index d5dfeed..cf7e18a 100644
--- a/src/java/org/apache/cassandra/cql3/CQL3Type.java
+++ b/src/java/org/apache/cassandra/cql3/CQL3Type.java
@@ -773,7 +773,10 @@ public interface CQL3Type
             @Override
             public String toString()
             {
-                return name.toString();
+                if (frozen)
+                    return "frozen<" + name.toString() + '>';
+                else
+                    return name.toString();
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc90d042/src/java/org/apache/cassandra/cql3/functions/UDHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDHelper.java 
b/src/java/org/apache/cassandra/cql3/functions/UDHelper.java
index 4effdc3..d1c6157 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDHelper.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDHelper.java
@@ -35,7 +35,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.transport.Server;
 
 /**
- * Helper class for User Defined Functions + Aggregates.
+ * Helper class for User Defined Functions, Types and Aggregates.
  */
 public final class UDHelper
 {
@@ -66,7 +66,7 @@ public final class UDHelper
         return codecs;
     }
 
-    static TypeCodec<Object> codecFor(DataType dataType)
+    public static TypeCodec<Object> codecFor(DataType dataType)
     {
         return codecRegistry.codecFor(dataType);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc90d042/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
index e134594..3268296 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.cql3.statements;
 
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.stream.Collectors;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.*;
@@ -28,6 +29,7 @@ import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Types;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.transport.Event;
@@ -97,13 +99,20 @@ public class CreateTypeStatement extends 
SchemaAlteringStatement
         }
     }
 
+    public void addToRawBuilder(Types.RawBuilder builder) throws 
InvalidRequestException
+    {
+        builder.add(name.getStringTypeName(),
+                    
columnNames.stream().map(ColumnIdentifier::toString).collect(Collectors.toList()),
+                    
columnTypes.stream().map(CQL3Type.Raw::toString).collect(Collectors.toList()));
+    }
+
     @Override
     public String keyspace()
     {
         return name.getKeyspace();
     }
 
-    private UserType createType() throws InvalidRequestException
+    public UserType createType() throws InvalidRequestException
     {
         List<ByteBuffer> names = new ArrayList<>(columnNames.size());
         for (ColumnIdentifier name : columnNames)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc90d042/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 81a3356..2de89b1 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -21,28 +21,44 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.apache.cassandra.config.*;
-import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.cql3.statements.CFStatement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.TypeCodec;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UpdateParameters;
+import org.apache.cassandra.cql3.functions.UDHelper;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.cql3.statements.CreateTypeStatement;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.cql3.statements.UpdateStatement;
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.schema.Types;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -50,12 +66,14 @@ import org.apache.cassandra.utils.Pair;
  * <p>
  * Typical usage looks like:
  * <pre>
+ *   String type = CREATE TYPE myKs.myType (a int, b int)";
  *   String schema = "CREATE TABLE myKs.myTable ("
  *                 + "  k int PRIMARY KEY,"
  *                 + "  v1 text,"
- *                 + "  v2 int"
+ *                 + "  v2 int,"
+ *                 + "  v3 myType,"
  *                 + ")";
- *   String insert = "INSERT INTO myKs.myTable (k, v1, v2) VALUES (?, ?, ?)";
+ *   String insert = "INSERT INTO myKs.myTable (k, v1, v2, v3) VALUES (?, ?, 
?, ?)";
  *
  *   // Creates a new writer. You need to provide at least the directory where 
to write the created sstable,
  *   // the schema for the sstable to write and a (prepared) insert statement 
to use. If you do not use the
@@ -63,13 +81,15 @@ import org.apache.cassandra.utils.Pair;
  *   // CQLSSTableWriter.Builder for more details on the available options.
  *   CQLSSTableWriter writer = CQLSSTableWriter.builder()
  *                                             
.inDirectory("path/to/directory")
+ *                                             .withType(type)
  *                                             .forTable(schema)
  *                                             .using(insert).build();
  *
+ *   UserType myType = writer.getUDType("myType");
  *   // Adds a nember of rows to the resulting sstable
- *   writer.addRow(0, "test1", 24);
- *   writer.addRow(1, "test2", null);
- *   writer.addRow(2, "test3", 42);
+ *   writer.addRow(0, "test1", 24, myType.newValue().setInt("a", 
10).setInt("b", 20));
+ *   writer.addRow(1, "test2", null, null);
+ *   writer.addRow(2, "test3", 42, myType.newValue().setInt("a", 
30).setInt("b", 40));
  *
  *   // Close the writer, finalizing the sstable
  *   writer.close();
@@ -145,8 +165,14 @@ public class CQLSSTableWriter implements Closeable
     {
         int size = Math.min(values.size(), boundNames.size());
         List<ByteBuffer> rawValues = new ArrayList<>(size);
+
         for (int i = 0; i < size; i++)
-            rawValues.add(values.get(i) == null ? null : 
((AbstractType)boundNames.get(i).type).decompose(values.get(i)));
+        {
+            TypeCodec typeCodec = 
UDHelper.codecFor(UDHelper.driverType(boundNames.get(i).type));
+            rawValues.add(values.get(i) == null ? null : 
typeCodec.serialize(values.get(i),
+                                                                             
ProtocolVersion.NEWEST_SUPPORTED));
+        }
+
         return rawAddRow(rawValues);
     }
 
@@ -175,10 +201,11 @@ public class CQLSSTableWriter implements Closeable
     {
         int size = boundNames.size();
         List<ByteBuffer> rawValues = new ArrayList<>(size);
-        for (int i = 0; i < size; i++) {
+        for (int i = 0; i < size; i++)
+        {
             ColumnSpecification spec = boundNames.get(i);
             Object value = values.get(spec.name.toString());
-            rawValues.add(value == null ? null : 
((AbstractType)spec.type).decompose(value));
+            rawValues.add(value == null ? null : ((AbstractType) 
spec.type).decompose(value));
         }
         return rawAddRow(rawValues);
     }
@@ -270,6 +297,20 @@ public class CQLSSTableWriter implements Closeable
     }
 
     /**
+     * Returns the User Defined type, used in this SSTable Writer, that can
+     * be used to create UDTValue instances.
+     *
+     * @param dataType name of the User Defined type
+     * @return user defined type
+     */
+    public com.datastax.driver.core.UserType getUDType(String dataType)
+    {
+        KeyspaceMetadata ksm = 
Schema.instance.getKSMetaData(insert.keyspace());
+        UserType userType = 
ksm.types.getNullable(ByteBufferUtil.bytes(dataType));
+        return (com.datastax.driver.core.UserType) 
UDHelper.driverType(userType);
+    }
+
+    /**
      * Close this writer.
      * <p>
      * This method should be called, otherwise the produced sstables are not
@@ -289,14 +330,17 @@ public class CQLSSTableWriter implements Closeable
 
         protected SSTableFormat.Type formatType = null;
 
-        private CFMetaData schema;
-        private UpdateStatement insert;
-        private List<ColumnSpecification> boundNames;
+        private CreateTableStatement.RawStatement schemaStatement;
+        private final List<CreateTypeStatement> typeStatements;
+        private UpdateStatement.ParsedInsert insertStatement;
+        private IPartitioner partitioner;
 
         private boolean sorted = false;
         private long bufferSizeInMB = 128;
 
-        protected Builder() {}
+        protected Builder() {
+            this.typeStatements = new ArrayList<>();
+        }
 
         /**
          * The directory where to write the sstables.
@@ -334,6 +378,12 @@ public class CQLSSTableWriter implements Closeable
             return this;
         }
 
+        public Builder withType(String typeDefinition) throws SyntaxException
+        {
+            typeStatements.add(parseStatement(typeDefinition, 
CreateTypeStatement.class, "CREATE TYPE"));
+            return this;
+        }
+
         /**
          * The schema (CREATE TABLE statement) for the table for which sstable 
are to be created.
          * <p>
@@ -350,52 +400,8 @@ public class CQLSSTableWriter implements Closeable
          */
         public Builder forTable(String schema)
         {
-            try
-            {
-                synchronized (CQLSSTableWriter.class)
-                {
-                    this.schema = getTableMetadata(schema);
-
-                    // We need to register the keyspace/table metadata through 
Schema, otherwise we won't be able to properly
-                    // build the insert statement in using().
-                    KeyspaceMetadata ksm = 
Schema.instance.getKSMetaData(this.schema.ksName);
-                    if (ksm == null)
-                    {
-                        createKeyspaceWithTable(this.schema);
-                    }
-                    else if (Schema.instance.getCFMetaData(this.schema.ksName, 
this.schema.cfName) == null)
-                    {
-                        addTableToKeyspace(ksm, this.schema);
-                    }
-                    return this;
-                }
-            }
-            catch (RequestValidationException e)
-            {
-                throw new IllegalArgumentException(e.getMessage(), e);
-            }
-        }
-
-        /**
-         * Creates the keyspace with the specified table.
-         *
-         * @param table the table that must be created.
-         */
-        private static void createKeyspaceWithTable(CFMetaData table)
-        {
-            Schema.instance.load(KeyspaceMetadata.create(table.ksName, 
KeyspaceParams.simple(1), Tables.of(table)));
-        }
-
-        /**
-         * Adds the table to the to the specified keyspace.
-         *
-         * @param keyspace the keyspace to add to
-         * @param table the table to add
-         */
-        private static void addTableToKeyspace(KeyspaceMetadata keyspace, 
CFMetaData table)
-        {
-            Schema.instance.load(table);
-            
Schema.instance.setKeyspaceMetadata(keyspace.withSwapped(keyspace.tables.with(table)));
+            this.schemaStatement = parseStatement(schema, 
CreateTableStatement.RawStatement.class, "CREATE TABLE");
+            return this;
         }
 
         /**
@@ -410,7 +416,7 @@ public class CQLSSTableWriter implements Closeable
          */
         public Builder withPartitioner(IPartitioner partitioner)
         {
-            this.schema = schema.copy(partitioner);
+            this.partitioner = partitioner;
             return this;
         }
 
@@ -424,27 +430,16 @@ public class CQLSSTableWriter implements Closeable
          * <p>
          * This is a mandatory option, and this needs to be called after 
foTable().
          *
-         * @param insertStatement an insertion statement that defines the order
+         * @param insert an insertion statement that defines the order
          * of column values to use.
          * @return this builder.
          *
          * @throws IllegalArgumentException if {@code insertStatement} is not 
a valid insertion
          * statement, does not have a fully-qualified table name or have no 
bind variables.
          */
-        public Builder using(String insertStatement)
+        public Builder using(String insert)
         {
-            if (schema == null)
-                throw new IllegalStateException("You need to define the schema 
by calling forTable() prior to this call.");
-
-            Pair<UpdateStatement, List<ColumnSpecification>> p = 
getStatement(insertStatement, UpdateStatement.class, "INSERT");
-            this.insert = p.left;
-            this.boundNames = p.right;
-            if (this.insert.hasConditions())
-                throw new IllegalArgumentException("Conditional statements are 
not supported");
-            if (this.insert.isCounter())
-                throw new IllegalArgumentException("Counter update statements 
are not supported");
-            if (this.boundNames.isEmpty())
-                throw new IllegalArgumentException("Provided insert statement 
has no bind variables");
+            this.insertStatement = parseStatement(insert, 
UpdateStatement.ParsedInsert.class, "INSERT");
             return this;
         }
 
@@ -490,54 +485,111 @@ public class CQLSSTableWriter implements Closeable
             return this;
         }
 
-        private static CFMetaData getTableMetadata(String schema)
+        @SuppressWarnings("resource")
+        public CQLSSTableWriter build()
         {
-            CFStatement parsed = 
(CFStatement)QueryProcessor.parseStatement(schema);
-            // tables with UDTs are currently not supported by 
CQLSSTableWrite, so we just use Types.none(), for now
-            // see CASSANDRA-10624 for more details
-            CreateTableStatement statement = (CreateTableStatement) 
((CreateTableStatement.RawStatement) parsed).prepare(Types.none()).statement;
-            statement.validate(ClientState.forInternalCalls());
-            return statement.getCFMetaData();
-        }
+            if (directory == null)
+                throw new IllegalStateException("No ouptut directory 
specified, you should provide a directory with inDirectory()");
+            if (schemaStatement == null)
+                throw new IllegalStateException("Missing schema, you should 
provide the schema for the SSTable to create with forTable()");
+            if (insertStatement == null)
+                throw new IllegalStateException("No insert statement 
specified, you should provide an insert statement through using()");
 
-        private static <T extends CQLStatement> Pair<T, 
List<ColumnSpecification>> getStatement(String query, Class<T> klass, String 
type)
-        {
-            try
+            synchronized (CQLSSTableWriter.class)
             {
-                ClientState state = ClientState.forInternalCalls();
-                ParsedStatement.Prepared prepared = 
QueryProcessor.getStatement(query, state);
-                CQLStatement stmt = prepared.statement;
-                stmt.validate(state);
+                String keyspace = schemaStatement.keyspace();
+
+                if (Schema.instance.getKSMetaData(keyspace) == null)
+                    Schema.instance.load(KeyspaceMetadata.create(keyspace, 
KeyspaceParams.simple(1)));
 
-                if (!stmt.getClass().equals(klass))
-                    throw new IllegalArgumentException("Invalid query, must be 
a " + type + " statement");
+                createTypes(keyspace);
+                CFMetaData cfMetaData = createTable(keyspace);
+                Pair<UpdateStatement, List<ColumnSpecification>> 
preparedInsert = prepareInsert();
 
-                return Pair.create(klass.cast(stmt), prepared.boundNames);
+                AbstractSSTableSimpleWriter writer = sorted
+                                                     ? new 
SSTableSimpleWriter(directory, cfMetaData, preparedInsert.left.updatedColumns())
+                                                     : new 
SSTableSimpleUnsortedWriter(directory, cfMetaData, 
preparedInsert.left.updatedColumns(), bufferSizeInMB);
+
+                if (formatType != null)
+                    writer.setSSTableFormatType(formatType);
+
+                return new CQLSSTableWriter(writer, preparedInsert.left, 
preparedInsert.right);
             }
-            catch (RequestValidationException e)
+        }
+
+        private void createTypes(String keyspace)
+        {
+            KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
+            Types.RawBuilder builder = Types.rawBuilder(keyspace);
+            for (CreateTypeStatement st : typeStatements)
+                st.addToRawBuilder(builder);
+
+            ksm = ksm.withSwapped(builder.build());
+            Schema.instance.setKeyspaceMetadata(ksm);
+        }
+        /**
+         * Creates the table according to schema statement
+         *
+         * @param keyspace name of the keyspace where table should be created
+         */
+        private CFMetaData createTable(String keyspace)
+        {
+            KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
+
+            CFMetaData cfMetaData = 
ksm.tables.getNullable(schemaStatement.columnFamily());
+            if (cfMetaData == null)
             {
-                throw new IllegalArgumentException(e.getMessage(), e);
+                CreateTableStatement statement = (CreateTableStatement) 
schemaStatement.prepare(ksm.types).statement;
+                statement.validate(ClientState.forInternalCalls());
+
+                cfMetaData = statement.getCFMetaData();
+
+                Schema.instance.load(cfMetaData);
+                
Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(cfMetaData)));
             }
+
+            if (partitioner != null)
+                return cfMetaData.copy(partitioner);
+            else
+                return cfMetaData;
         }
 
-        @SuppressWarnings("resource")
-        public CQLSSTableWriter build()
+        /**
+         * Prepares insert statement for writing data to SSTable
+         *
+         * @return prepared Insert statement and it's bound names
+         */
+        private Pair<UpdateStatement, List<ColumnSpecification>> 
prepareInsert()
         {
-            if (directory == null)
-                throw new IllegalStateException("No ouptut directory 
specified, you should provide a directory with inDirectory()");
-            if (schema == null)
-                throw new IllegalStateException("Missing schema, you should 
provide the schema for the SSTable to create with forTable()");
-            if (insert == null)
-                throw new IllegalStateException("No insert statement 
specified, you should provide an insert statement through using()");
+            ParsedStatement.Prepared cqlStatement = insertStatement.prepare();
+            UpdateStatement insert = (UpdateStatement) cqlStatement.statement;
+            insert.validate(ClientState.forInternalCalls());
 
-            AbstractSSTableSimpleWriter writer = sorted
-                                               ? new 
SSTableSimpleWriter(directory, schema, insert.updatedColumns())
-                                               : new 
SSTableSimpleUnsortedWriter(directory, schema, insert.updatedColumns(), 
bufferSizeInMB);
+            if (insert.hasConditions())
+                throw new IllegalArgumentException("Conditional statements are 
not supported");
+            if (insert.isCounter())
+                throw new IllegalArgumentException("Counter update statements 
are not supported");
+            if (cqlStatement.boundNames.isEmpty())
+                throw new IllegalArgumentException("Provided insert statement 
has no bind variables");
+
+            return Pair.create(insert, cqlStatement.boundNames);
+        }
+    }
+
+    private static <T extends ParsedStatement> T parseStatement(String query, 
Class<T> klass, String type)
+    {
+        try
+        {
+            ParsedStatement stmt = QueryProcessor.parseStatement(query);
 
-            if (formatType != null)
-                writer.setSSTableFormatType(formatType);
+            if (!stmt.getClass().equals(klass))
+                throw new IllegalArgumentException("Invalid query, must be a " 
+ type + " statement but was: " + stmt.getClass());
 
-            return new CQLSSTableWriter(writer, insert, boundNames);
+            return klass.cast(stmt);
+        }
+        catch (RequestValidationException e)
+        {
+            throw new IllegalArgumentException(e.getMessage(), e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc90d042/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java 
b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index 557beba..437e7a3 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -20,10 +20,10 @@ package org.apache.cassandra.io.sstable;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.io.Files;
 
@@ -33,16 +33,19 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.config.*;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.functions.UDHelper;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.OutputHandler;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.TypeCodec;
+import com.datastax.driver.core.UDTValue;
+import com.datastax.driver.core.UserType;
 
 import static org.junit.Assert.assertEquals;
 
@@ -92,24 +95,7 @@ public class CQLSSTableWriterTest
 
             writer.close();
 
-            SSTableLoader loader = new SSTableLoader(dataDir, new 
SSTableLoader.Client()
-            {
-                private String keyspace;
-
-                public void init(String keyspace)
-                {
-                    this.keyspace = keyspace;
-                    for (Range<Token> range : 
StorageService.instance.getLocalRanges("cql_keyspace"))
-                        addRangeForEndpoint(range, 
FBUtilities.getBroadcastAddress());
-                }
-
-                public CFMetaData getTableMetadata(String cfName)
-                {
-                    return Schema.instance.getCFMetaData(keyspace, cfName);
-                }
-            }, new OutputHandler.SystemOutput(false, false));
-
-            loader.stream().get();
+            loadSSTables(dataDir, KS);
 
             UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * 
FROM cql_keyspace.table1;");
             assertEquals(4, rs.size());
@@ -300,6 +286,151 @@ public class CQLSSTableWriterTest
             }
         }
 
+        loadSSTables(dataDir, KS);
+
+        UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM 
cql_keyspace2.table2;");
+        assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testWritesWithUdts() throws Exception
+    {
+        final String KS = "cql_keyspace3";
+        final String TABLE = "table3";
+
+        final String schema = "CREATE TABLE " + KS + "." + TABLE + " ("
+                              + "  k int,"
+                              + "  v1 list<frozen<tuple2>>,"
+                              + "  v2 frozen<tuple3>,"
+                              + "  PRIMARY KEY (k)"
+                              + ")";
+
+        File tempdir = Files.createTempDir();
+        File dataDir = new File(tempdir.getAbsolutePath() + File.separator + 
KS + File.separator + TABLE);
+        assert dataDir.mkdirs();
+
+        CQLSSTableWriter writer = CQLSSTableWriter.builder()
+                                                  .inDirectory(dataDir)
+                                                  .withType("CREATE TYPE " + 
KS + ".tuple2 (a int, b int)")
+                                                  .withType("CREATE TYPE " + 
KS + ".tuple3 (a int, b int, c int)")
+                                                  .forTable(schema)
+                                                  .using("INSERT INTO " + KS + 
"." + TABLE + " (k, v1, v2) " +
+                                                         "VALUES (?, ?, 
?)").build();
+
+        UserType tuple2Type = writer.getUDType("tuple2");
+        UserType tuple3Type = writer.getUDType("tuple3");
+        for (int i = 0; i < 100; i++)
+        {
+            writer.addRow(i,
+                          ImmutableList.builder()
+                                       .add(tuple2Type.newValue()
+                                                      .setInt("a", i * 10)
+                                                      .setInt("b", i * 20))
+                                       .add(tuple2Type.newValue()
+                                                      .setInt("a", i * 30)
+                                                      .setInt("b", i * 40))
+                                       .build(),
+                          tuple3Type.newValue()
+                                    .setInt("a", i * 100)
+                                    .setInt("b", i * 200)
+                                    .setInt("c", i * 300));
+        }
+
+        writer.close();
+        loadSSTables(dataDir, KS);
+
+        UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * 
FROM " + KS + "." + TABLE);
+        TypeCodec collectionCodec = 
UDHelper.codecFor(DataType.CollectionType.frozenList(tuple2Type));
+        TypeCodec tuple3Codec = UDHelper.codecFor(tuple3Type);
+
+        assertEquals(resultSet.size(), 100);
+        int cnt = 0;
+        for (UntypedResultSet.Row row: resultSet) {
+            assertEquals(cnt,
+                         row.getInt("k"));
+            List<UDTValue> values = (List<UDTValue>) 
collectionCodec.deserialize(row.getBytes("v1"),
+                                                                               
  ProtocolVersion.NEWEST_SUPPORTED);
+            assertEquals(values.get(0).getInt("a"), cnt * 10);
+            assertEquals(values.get(0).getInt("b"), cnt * 20);
+            assertEquals(values.get(1).getInt("a"), cnt * 30);
+            assertEquals(values.get(1).getInt("b"), cnt * 40);
+
+            UDTValue v2 = (UDTValue) 
tuple3Codec.deserialize(row.getBytes("v2"), ProtocolVersion.NEWEST_SUPPORTED);
+
+            assertEquals(v2.getInt("a"), cnt * 100);
+            assertEquals(v2.getInt("b"), cnt * 200);
+            assertEquals(v2.getInt("c"), cnt * 300);
+            cnt++;
+        }
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testWritesWithDependentUdts() throws Exception
+    {
+        final String KS = "cql_keyspace4";
+        final String TABLE = "table4";
+
+        final String schema = "CREATE TABLE " + KS + "." + TABLE + " ("
+                              + "  k int,"
+                              + "  v1 frozen<nested_tuple>,"
+                              + "  PRIMARY KEY (k)"
+                              + ")";
+
+        File tempdir = Files.createTempDir();
+        File dataDir = new File(tempdir.getAbsolutePath() + File.separator + 
KS + File.separator + TABLE);
+        assert dataDir.mkdirs();
+
+        CQLSSTableWriter writer = CQLSSTableWriter.builder()
+                                                  .inDirectory(dataDir)
+                                                  .withType("CREATE TYPE " + 
KS + ".nested_tuple (c int, tpl frozen<tuple2>)")
+                                                  .withType("CREATE TYPE " + 
KS + ".tuple2 (a int, b int)")
+                                                  .forTable(schema)
+                                                  .using("INSERT INTO " + KS + 
"." + TABLE + " (k, v1) " +
+                                                         "VALUES (?, ?)")
+                                                  .build();
+
+        UserType tuple2Type = writer.getUDType("tuple2");
+        UserType nestedTuple = writer.getUDType("nested_tuple");
+        TypeCodec tuple2Codec = UDHelper.codecFor(tuple2Type);
+        TypeCodec nestedTupleCodec = UDHelper.codecFor(nestedTuple);
+
+        for (int i = 0; i < 100; i++)
+        {
+            writer.addRow(i,
+                          nestedTuple.newValue()
+                                     .setInt("c", i * 100)
+                                     .set("tpl",
+                                          tuple2Type.newValue()
+                                                    .setInt("a", i * 200)
+                                                    .setInt("b", i * 300),
+                                          tuple2Codec));
+        }
+
+        writer.close();
+        loadSSTables(dataDir, KS);
+
+        UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * 
FROM " + KS + "." + TABLE);
+
+        assertEquals(resultSet.size(), 100);
+        int cnt = 0;
+        for (UntypedResultSet.Row row: resultSet) {
+            assertEquals(cnt,
+                         row.getInt("k"));
+            UDTValue nestedTpl = (UDTValue) 
nestedTupleCodec.deserialize(row.getBytes("v1"),
+                                                                         
ProtocolVersion.NEWEST_SUPPORTED);
+            assertEquals(nestedTpl.getInt("c"), cnt * 100);
+            UDTValue tpl = nestedTpl.getUDTValue("tpl");
+            assertEquals(tpl.getInt("a"), cnt * 200);
+            assertEquals(tpl.getInt("b"), cnt * 300);
+
+            cnt++;
+        }
+    }
+
+    private static void loadSSTables(File dataDir, String ks) throws 
ExecutionException, InterruptedException
+    {
         SSTableLoader loader = new SSTableLoader(dataDir, new 
SSTableLoader.Client()
         {
             private String keyspace;
@@ -307,7 +438,7 @@ public class CQLSSTableWriterTest
             public void init(String keyspace)
             {
                 this.keyspace = keyspace;
-                for (Range<Token> range : 
StorageService.instance.getLocalRanges(KS))
+                for (Range<Token> range : 
StorageService.instance.getLocalRanges(ks))
                     addRangeForEndpoint(range, 
FBUtilities.getBroadcastAddress());
             }
 
@@ -318,8 +449,5 @@ public class CQLSSTableWriterTest
         }, new OutputHandler.SystemOutput(false, false));
 
         loader.stream().get();
-
-        UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM 
cql_keyspace2.table2;");
-        assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size());
     }
 }

Reply via email to