Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a2074b89 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a2074b89 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a2074b89 Branch: refs/heads/trunk Commit: a2074b890c32f1a52c4852549db92444a27ef4a7 Parents: 9562b9b a1baead Author: Alex Petrov <oleksandr.pet...@gmail.com> Authored: Thu Jun 29 18:02:27 2017 +0200 Committer: Alex Petrov <oleksandr.pet...@gmail.com> Committed: Thu Jun 29 18:02:27 2017 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/io/sstable/CQLSSTableWriter.java | 61 +++++++++++++------- .../io/sstable/CQLSSTableWriterTest.java | 51 +++++++++++++++- 3 files changed, 90 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2074b89/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 88aa1ef,c5179e7..bab91e7 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -5,45 -4,7 +5,46 @@@ Merged from 3.0 * Nodetool listsnapshots output is missing a newline, if there are no snapshots (CASSANDRA-13568) -3.0.14 +3.11.0 ++ * Allow native function calls in CQLSSTableWriter (CASSANDRA-12606) + * Replace string comparison with regex/number checks in MessagingService test (CASSANDRA-13216) + * Fix formatting of duration columns in CQLSH (CASSANDRA-13549) + * Fix the problem with duplicated rows when using paging with SASI (CASSANDRA-13302) + * Allow CONTAINS statements filtering on the partition key and itâs parts (CASSANDRA-13275) + * Fall back to even ranges calculation in clusters with vnodes when tokens are distributed unevenly (CASSANDRA-13229) + * Fix duration type validation to prevent overflow (CASSANDRA-13218) + * Forbid unsupported creation of SASI indexes over partition key columns (CASSANDRA-13228) + * Reject multiple values for a key in CQL grammar. (CASSANDRA-13369) + * UDA fails without input rows (CASSANDRA-13399) + * Fix compaction-stress by using daemonInitialization (CASSANDRA-13188) + * V5 protocol flags decoding broken (CASSANDRA-13443) + * Use write lock not read lock for removing sstables from compaction strategies. (CASSANDRA-13422) + * Use corePoolSize equal to maxPoolSize in JMXEnabledThreadPoolExecutors (CASSANDRA-13329) + * Avoid rebuilding SASI indexes containing no values (CASSANDRA-12962) + * Add charset to Analyser input stream (CASSANDRA-13151) + * Fix testLimitSSTables flake caused by concurrent flush (CASSANDRA-12820) + * cdc column addition strikes again (CASSANDRA-13382) + * Fix static column indexes (CASSANDRA-13277) + * DataOutputBuffer.asNewBuffer broken (CASSANDRA-13298) + * unittest CipherFactoryTest failed on MacOS (CASSANDRA-13370) + * Forbid SELECT restrictions and CREATE INDEX over non-frozen UDT columns (CASSANDRA-13247) + * Default logging we ship will incorrectly print "?:?" for "%F:%L" pattern (CASSANDRA-13317) + * Possible AssertionError in UnfilteredRowIteratorWithLowerBound (CASSANDRA-13366) + * Support unaligned memory access for AArch64 (CASSANDRA-13326) + * Improve SASI range iterator efficiency on intersection with an empty range (CASSANDRA-12915). + * Fix equality comparisons of columns using the duration type (CASSANDRA-13174) + * Obfuscate password in stress-graphs (CASSANDRA-12233) + * Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034) + * nodetool stopdaemon errors out (CASSANDRA-13030) + * Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954) + * Fix primary index calculation for SASI (CASSANDRA-12910) + * More fixes to the TokenAllocator (CASSANDRA-12990) + * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983) + * Address message coalescing regression (CASSANDRA-12676) + * Delete illegal character from StandardTokenizerImpl.jflex (CASSANDRA-13417) + * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307) + * Tracing payload not passed from QueryMessage to tracing session (CASSANDRA-12835) +Merged from 3.0: * Ensure int overflow doesn't occur when calculating large partition warning size (CASSANDRA-13172) * Ensure consistent view of partition columns between coordinator and replica in ColumnFilter (CASSANDRA-13004) * Failed unregistering mbean during drop keyspace (CASSANDRA-13346) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2074b89/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index a195235,39f7339..9e42101 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@@ -21,43 -21,30 +21,49 @@@ import java.io.Closeable import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.*; - -import org.apache.cassandra.config.*; -import org.apache.cassandra.cql3.*; -import org.apache.cassandra.cql3.statements.CFStatement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.stream.Collectors; + +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.TypeCodec; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; ++import org.apache.cassandra.config.SchemaConstants; +import org.apache.cassandra.cql3.ColumnSpecification; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UpdateParameters; +import org.apache.cassandra.cql3.functions.UDHelper; import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.cql3.statements.CreateTypeStatement; +import org.apache.cassandra.cql3.statements.ModificationStatement; import org.apache.cassandra.cql3.statements.ParsedStatement; import org.apache.cassandra.cql3.statements.UpdateStatement; import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.DecoratedKey; + import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.UserType; import org.apache.cassandra.db.partitions.Partition; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.io.sstable.format.SSTableFormat; ++import org.apache.cassandra.schema.Functions; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceParams; + import org.apache.cassandra.schema.SchemaKeyspace; + import org.apache.cassandra.schema.Tables; import org.apache.cassandra.schema.Types; ++import org.apache.cassandra.schema.Views; import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; /** @@@ -495,94 -497,54 +501,105 @@@ public class CQLSSTableWriter implement return this; } - private static CFMetaData getTableMetadata(String schema) + @SuppressWarnings("resource") + public CQLSSTableWriter build() { - CFStatement parsed = (CFStatement)QueryProcessor.parseStatement(schema); - // tables with UDTs are currently not supported by CQLSSTableWrite, so we just use Types.none(), for now - // see CASSANDRA-10624 for more details - CreateTableStatement statement = (CreateTableStatement) ((CreateTableStatement.RawStatement) parsed).prepare(Types.none()).statement; - statement.validate(ClientState.forInternalCalls()); - return statement.getCFMetaData(); - } + if (directory == null) + throw new IllegalStateException("No ouptut directory specified, you should provide a directory with inDirectory()"); + if (schemaStatement == null) + throw new IllegalStateException("Missing schema, you should provide the schema for the SSTable to create with forTable()"); + if (insertStatement == null) + throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()"); - private static <T extends CQLStatement> Pair<T, List<ColumnSpecification>> getStatement(String query, Class<T> klass, String type) - { - try + synchronized (CQLSSTableWriter.class) { - ClientState state = ClientState.forInternalCalls(); - ParsedStatement.Prepared prepared = QueryProcessor.getStatement(query, state); - CQLStatement stmt = prepared.statement; - stmt.validate(state); ++ if (Schema.instance.getKSMetaData(SchemaConstants.SCHEMA_KEYSPACE_NAME) == null) ++ Schema.instance.load(SchemaKeyspace.metadata()); ++ if (Schema.instance.getKSMetaData(SchemaConstants.SYSTEM_KEYSPACE_NAME) == null) ++ Schema.instance.load(SystemKeyspace.metadata()); + - if (!stmt.getClass().equals(klass)) - throw new IllegalArgumentException("Invalid query, must be a " + type + " statement"); + String keyspace = schemaStatement.keyspace(); - return Pair.create(klass.cast(stmt), prepared.boundNames); - } - catch (RequestValidationException e) - { - throw new IllegalArgumentException(e.getMessage(), e); + if (Schema.instance.getKSMetaData(keyspace) == null) - Schema.instance.load(KeyspaceMetadata.create(keyspace, KeyspaceParams.simple(1))); ++ { ++ Schema.instance.load(KeyspaceMetadata.create(keyspace, ++ KeyspaceParams.simple(1), ++ Tables.none(), ++ Views.none(), ++ Types.none(), ++ Functions.none())); ++ } ++ ++ ++ KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace); ++ CFMetaData cfMetaData = ksm.tables.getNullable(schemaStatement.columnFamily()); ++ if (cfMetaData == null) ++ { ++ Types types = createTypes(keyspace); ++ cfMetaData = createTable(types); ++ ++ Schema.instance.load(cfMetaData); ++ Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(cfMetaData)).withSwapped(types)); ++ } + - createTypes(keyspace); - CFMetaData cfMetaData = createTable(keyspace); + Pair<UpdateStatement, List<ColumnSpecification>> preparedInsert = prepareInsert(); + + AbstractSSTableSimpleWriter writer = sorted + ? new SSTableSimpleWriter(directory, cfMetaData, preparedInsert.left.updatedColumns()) + : new SSTableSimpleUnsortedWriter(directory, cfMetaData, preparedInsert.left.updatedColumns(), bufferSizeInMB); + + if (formatType != null) + writer.setSSTableFormatType(formatType); + + return new CQLSSTableWriter(writer, preparedInsert.left, preparedInsert.right); } } - private void createTypes(String keyspace) - @SuppressWarnings("resource") - public CQLSSTableWriter build() ++ private Types createTypes(String keyspace) { - KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace); - if (directory == null) - throw new IllegalStateException("No ouptut directory specified, you should provide a directory with inDirectory()"); - if (schema == null) - throw new IllegalStateException("Missing schema, you should provide the schema for the SSTable to create with forTable()"); - if (insert == null) - throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()"); + Types.RawBuilder builder = Types.rawBuilder(keyspace); + for (CreateTypeStatement st : typeStatements) + st.addToRawBuilder(builder); - - ksm = ksm.withSwapped(builder.build()); - Schema.instance.setKeyspaceMetadata(ksm); ++ return builder.build(); + } + - AbstractSSTableSimpleWriter writer = sorted - ? new SSTableSimpleWriter(directory, schema, insert.updatedColumns()) - : new SSTableSimpleUnsortedWriter(directory, schema, insert.updatedColumns(), bufferSizeInMB); + /** + * Creates the table according to schema statement + * - * @param keyspace name of the keyspace where table should be created ++ * @param types types this table should be created with + */ - private CFMetaData createTable(String keyspace) ++ private CFMetaData createTable(Types types) + { - KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace); ++ CreateTableStatement statement = (CreateTableStatement) schemaStatement.prepare(types).statement; ++ statement.validate(ClientState.forInternalCalls()); - CFMetaData cfMetaData = ksm.tables.getNullable(schemaStatement.columnFamily()); - if (cfMetaData == null) - { - CreateTableStatement statement = (CreateTableStatement) schemaStatement.prepare(ksm.types).statement; - statement.validate(ClientState.forInternalCalls()); - - cfMetaData = statement.getCFMetaData(); - - Schema.instance.load(cfMetaData); - Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(cfMetaData))); - } - if (formatType != null) - writer.setSSTableFormatType(formatType); ++ CFMetaData cfMetaData = statement.getCFMetaData(); + + if (partitioner != null) + return cfMetaData.copy(partitioner); + else + return cfMetaData; + } + + /** + * Prepares insert statement for writing data to SSTable + * + * @return prepared Insert statement and it's bound names + */ + private Pair<UpdateStatement, List<ColumnSpecification>> prepareInsert() + { + ParsedStatement.Prepared cqlStatement = insertStatement.prepare(); + UpdateStatement insert = (UpdateStatement) cqlStatement.statement; + insert.validate(ClientState.forInternalCalls()); + + if (insert.hasConditions()) + throw new IllegalArgumentException("Conditional statements are not supported"); + if (insert.isCounter()) + throw new IllegalArgumentException("Counter update statements are not supported"); + if (cqlStatement.boundNames.isEmpty()) + throw new IllegalArgumentException("Provided insert statement has no bind variables"); - return new CQLSSTableWriter(writer, insert, boundNames); + return Pair.create(insert, cqlStatement.boundNames); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2074b89/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java index ac7f4ad,7d79036..a400612 --- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java @@@ -222,326 -225,8 +222,326 @@@ public class CQLSSTableWriterTes } + + + private static final int NUMBER_WRITES_IN_RUNNABLE = 10; + private class WriterThread extends Thread + { + private final File dataDir; + private final int id; + public volatile Exception exception; + + public WriterThread(File dataDir, int id) + { + this.dataDir = dataDir; + this.id = id; + } + + @Override + public void run() + { + String schema = "CREATE TABLE cql_keyspace2.table2 (" + + " k int," + + " v int," + + " PRIMARY KEY (k, v)" + + ")"; + String insert = "INSERT INTO cql_keyspace2.table2 (k, v) VALUES (?, ?)"; + CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .forTable(schema) + .using(insert).build(); + + try + { + for (int i = 0; i < NUMBER_WRITES_IN_RUNNABLE; i++) + { + writer.addRow(id, i); + } + writer.close(); + } + catch (Exception e) + { + exception = e; + } + } + } + + @Test + public void testConcurrentWriters() throws Exception + { + final String KS = "cql_keyspace2"; + final String TABLE = "table2"; + + File tempdir = Files.createTempDir(); + File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE); + assert dataDir.mkdirs(); + + WriterThread[] threads = new WriterThread[5]; + for (int i = 0; i < threads.length; i++) + { + WriterThread thread = new WriterThread(dataDir, i); + threads[i] = thread; + thread.start(); + } + + for (WriterThread thread : threads) + { + thread.join(); + assert !thread.isAlive() : "Thread should be dead by now"; + if (thread.exception != null) + { + throw thread.exception; + } + } + + loadSSTables(dataDir, KS); + + UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace2.table2;"); + assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size()); + } + + @Test + @SuppressWarnings("unchecked") + public void testWritesWithUdts() throws Exception + { + final String KS = "cql_keyspace3"; + final String TABLE = "table3"; + + final String schema = "CREATE TABLE " + KS + "." + TABLE + " (" + + " k int," + + " v1 list<frozen<tuple2>>," + + " v2 frozen<tuple3>," + + " PRIMARY KEY (k)" + + ")"; + + File tempdir = Files.createTempDir(); + File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE); + assert dataDir.mkdirs(); + + CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .withType("CREATE TYPE " + KS + ".tuple2 (a int, b int)") + .withType("CREATE TYPE " + KS + ".tuple3 (a int, b int, c int)") + .forTable(schema) + .using("INSERT INTO " + KS + "." + TABLE + " (k, v1, v2) " + + "VALUES (?, ?, ?)").build(); + + UserType tuple2Type = writer.getUDType("tuple2"); + UserType tuple3Type = writer.getUDType("tuple3"); + for (int i = 0; i < 100; i++) + { + writer.addRow(i, + ImmutableList.builder() + .add(tuple2Type.newValue() + .setInt("a", i * 10) + .setInt("b", i * 20)) + .add(tuple2Type.newValue() + .setInt("a", i * 30) + .setInt("b", i * 40)) + .build(), + tuple3Type.newValue() + .setInt("a", i * 100) + .setInt("b", i * 200) + .setInt("c", i * 300)); + } + + writer.close(); + loadSSTables(dataDir, KS); + + UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE); + TypeCodec collectionCodec = UDHelper.codecFor(DataType.CollectionType.frozenList(tuple2Type)); + TypeCodec tuple3Codec = UDHelper.codecFor(tuple3Type); + + assertEquals(resultSet.size(), 100); + int cnt = 0; + for (UntypedResultSet.Row row: resultSet) { + assertEquals(cnt, + row.getInt("k")); + List<UDTValue> values = (List<UDTValue>) collectionCodec.deserialize(row.getBytes("v1"), + ProtocolVersion.NEWEST_SUPPORTED); + assertEquals(values.get(0).getInt("a"), cnt * 10); + assertEquals(values.get(0).getInt("b"), cnt * 20); + assertEquals(values.get(1).getInt("a"), cnt * 30); + assertEquals(values.get(1).getInt("b"), cnt * 40); + + UDTValue v2 = (UDTValue) tuple3Codec.deserialize(row.getBytes("v2"), ProtocolVersion.NEWEST_SUPPORTED); + + assertEquals(v2.getInt("a"), cnt * 100); + assertEquals(v2.getInt("b"), cnt * 200); + assertEquals(v2.getInt("c"), cnt * 300); + cnt++; + } + } + + @Test + @SuppressWarnings("unchecked") + public void testWritesWithDependentUdts() throws Exception + { + final String KS = "cql_keyspace4"; + final String TABLE = "table4"; + + final String schema = "CREATE TABLE " + KS + "." + TABLE + " (" + + " k int," + + " v1 frozen<nested_tuple>," + + " PRIMARY KEY (k)" + + ")"; + + File tempdir = Files.createTempDir(); + File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE); + assert dataDir.mkdirs(); + + CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .withType("CREATE TYPE " + KS + ".nested_tuple (c int, tpl frozen<tuple2>)") + .withType("CREATE TYPE " + KS + ".tuple2 (a int, b int)") + .forTable(schema) + .using("INSERT INTO " + KS + "." + TABLE + " (k, v1) " + + "VALUES (?, ?)") + .build(); + + UserType tuple2Type = writer.getUDType("tuple2"); + UserType nestedTuple = writer.getUDType("nested_tuple"); + TypeCodec tuple2Codec = UDHelper.codecFor(tuple2Type); + TypeCodec nestedTupleCodec = UDHelper.codecFor(nestedTuple); + + for (int i = 0; i < 100; i++) + { + writer.addRow(i, + nestedTuple.newValue() + .setInt("c", i * 100) + .set("tpl", + tuple2Type.newValue() + .setInt("a", i * 200) + .setInt("b", i * 300), + tuple2Codec)); + } + + writer.close(); + loadSSTables(dataDir, KS); + + UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE); + + assertEquals(resultSet.size(), 100); + int cnt = 0; + for (UntypedResultSet.Row row: resultSet) { + assertEquals(cnt, + row.getInt("k")); + UDTValue nestedTpl = (UDTValue) nestedTupleCodec.deserialize(row.getBytes("v1"), + ProtocolVersion.NEWEST_SUPPORTED); + assertEquals(nestedTpl.getInt("c"), cnt * 100); + UDTValue tpl = nestedTpl.getUDTValue("tpl"); + assertEquals(tpl.getInt("a"), cnt * 200); + assertEquals(tpl.getInt("b"), cnt * 300); + + cnt++; + } + } + + @Test + public void testUnsetValues() throws Exception + { + final String KS = "cql_keyspace5"; + final String TABLE = "table5"; + + final String schema = "CREATE TABLE " + KS + "." + TABLE + " (" + + " k int," + + " c1 int," + + " c2 int," + + " v text," + + " PRIMARY KEY (k, c1, c2)" + + ")"; + + File tempdir = Files.createTempDir(); + File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE); + assert dataDir.mkdirs(); + + CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .forTable(schema) + .using("INSERT INTO " + KS + "." + TABLE + " (k, c1, c2, v) " + + "VALUES (?, ?, ?, ?)") + .build(); + + try + { + writer.addRow(1, 1, 1); + fail("Passing less arguments then expected in prepared statement should not work."); + } + catch (InvalidRequestException e) + { + assertEquals("Invalid number of arguments, expecting 4 values but got 3", + e.getMessage()); + } + + try + { + writer.addRow(1, 1, CQLSSTableWriter.UNSET_VALUE, "1"); + fail("Unset values should not work with clustering columns."); + } + catch (InvalidRequestException e) + { + assertEquals("Invalid unset value for column c2", + e.getMessage()); + } + + try + { + writer.addRow(ImmutableMap.<String, Object>builder().put("k", 1).put("c1", 1).put("v", CQLSSTableWriter.UNSET_VALUE).build()); + fail("Unset or null clustering columns should not be allowed."); + } + catch (InvalidRequestException e) + { + assertEquals("Invalid null value in condition for column c2", + e.getMessage()); + } + + writer.addRow(1, 1, 1, CQLSSTableWriter.UNSET_VALUE); + writer.addRow(2, 2, 2, null); + writer.addRow(Arrays.asList(3, 3, 3, CQLSSTableWriter.UNSET_VALUE)); + writer.addRow(ImmutableMap.<String, Object>builder() + .put("k", 4) + .put("c1", 4) + .put("c2", 4) + .put("v", CQLSSTableWriter.UNSET_VALUE) + .build()); + writer.addRow(Arrays.asList(3, 3, 3, CQLSSTableWriter.UNSET_VALUE)); + writer.addRow(5, 5, 5, "5"); + + writer.close(); + loadSSTables(dataDir, KS); + + UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE); + Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); + UntypedResultSet.Row r1 = iter.next(); + assertEquals(1, r1.getInt("k")); + assertEquals(1, r1.getInt("c1")); + assertEquals(1, r1.getInt("c2")); + assertEquals(false, r1.has("v")); + UntypedResultSet.Row r2 = iter.next(); + assertEquals(2, r2.getInt("k")); + assertEquals(2, r2.getInt("c1")); + assertEquals(2, r2.getInt("c2")); + assertEquals(false, r2.has("v")); + UntypedResultSet.Row r3 = iter.next(); + assertEquals(3, r3.getInt("k")); + assertEquals(3, r3.getInt("c1")); + assertEquals(3, r3.getInt("c2")); + assertEquals(false, r3.has("v")); + UntypedResultSet.Row r4 = iter.next(); + assertEquals(4, r4.getInt("k")); + assertEquals(4, r4.getInt("c1")); + assertEquals(4, r4.getInt("c2")); + assertEquals(false, r3.has("v")); + UntypedResultSet.Row r5 = iter.next(); + assertEquals(5, r5.getInt("k")); + assertEquals(5, r5.getInt("c1")); + assertEquals(5, r5.getInt("c2")); + assertEquals(true, r5.has("v")); + assertEquals("5", r5.getString("v")); + } + @Test - public void testUpdateSatement() throws Exception + public void testUpdateStatement() throws Exception { final String KS = "cql_keyspace6"; final String TABLE = "table6"; @@@ -589,6 -273,131 +589,55 @@@ assertFalse(iter.hasNext()); } + @Test + public void testNativeFunctions() throws Exception + { + final String KS = "cql_keyspace7"; + final String TABLE = "table7"; + + final String schema = "CREATE TABLE " + KS + "." + TABLE + " (" + + " k int," + + " c1 int," + + " c2 int," + + " v blob," + + " PRIMARY KEY (k, c1, c2)" + + ")"; + + File tempdir = Files.createTempDir(); + File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE); + assert dataDir.mkdirs(); + + CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .forTable(schema) + .using("INSERT INTO " + KS + "." + TABLE + " (k, c1, c2, v) VALUES (?, ?, ?, textAsBlob(?))") + .build(); + + writer.addRow(1, 2, 3, "abc"); + writer.addRow(4, 5, 6, "efg"); + + writer.close(); + loadSSTables(dataDir, KS); + + UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE); + assertEquals(2, resultSet.size()); + + Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); + UntypedResultSet.Row r1 = iter.next(); + assertEquals(1, r1.getInt("k")); + assertEquals(2, r1.getInt("c1")); + assertEquals(3, r1.getInt("c2")); + assertEquals(ByteBufferUtil.bytes("abc"), r1.getBytes("v")); + + UntypedResultSet.Row r2 = iter.next(); + assertEquals(4, r2.getInt("k")); + assertEquals(5, r2.getInt("c1")); + assertEquals(6, r2.getInt("c2")); + assertEquals(ByteBufferUtil.bytes("efg"), r2.getBytes("v")); + + assertFalse(iter.hasNext()); + } + - private static final int NUMBER_WRITES_IN_RUNNABLE = 10; - private class WriterThread extends Thread - { - private final File dataDir; - private final int id; - public volatile Exception exception; - - public WriterThread(File dataDir, int id) - { - this.dataDir = dataDir; - this.id = id; - } - - @Override - public void run() - { - String schema = "CREATE TABLE cql_keyspace2.table2 (" - + " k int," - + " v int," - + " PRIMARY KEY (k, v)" - + ")"; - String insert = "INSERT INTO cql_keyspace2.table2 (k, v) VALUES (?, ?)"; - CQLSSTableWriter writer = CQLSSTableWriter.builder() - .inDirectory(dataDir) - .forTable(schema) - .using(insert).build(); - - try - { - for (int i = 0; i < NUMBER_WRITES_IN_RUNNABLE; i++) - { - writer.addRow(id, i); - } - writer.close(); - } - catch (Exception e) - { - exception = e; - } - } - } - - @Test - public void testConcurrentWriters() throws Exception - { - final String KS = "cql_keyspace2"; - final String TABLE = "table2"; - - File tempdir = Files.createTempDir(); - File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE); - assert dataDir.mkdirs(); - - WriterThread[] threads = new WriterThread[5]; - for (int i = 0; i < threads.length; i++) - { - WriterThread thread = new WriterThread(dataDir, i); - threads[i] = thread; - thread.start(); - } - - for (WriterThread thread : threads) - { - thread.join(); - assert !thread.isAlive() : "Thread should be dead by now"; - if (thread.exception != null) - { - throw thread.exception; - } - } - - loadSSTables(dataDir, KS); - - UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace2.table2;"); - assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size()); - } - private static void loadSSTables(File dataDir, String ks) throws ExecutionException, InterruptedException { SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org