Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 5aec8349a -> a1baeadab refs/heads/cassandra-3.11 9562b9b69 -> a2074b890 refs/heads/trunk 9c6f87c35 -> e6ba61629
Allow native function calls in CQLSSTableWriter Patch by Alex Petrov; reviewed by Joel Knighton for CASSANDRA-12606 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a1baeada Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a1baeada Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a1baeada Branch: refs/heads/cassandra-3.0 Commit: a1baeadab9d726d2ceeed795bb6efb13464dec4a Parents: 5aec834 Author: Alex Petrov <oleksandr.pet...@gmail.com> Authored: Tue Oct 11 09:51:50 2016 +0200 Committer: Alex Petrov <oleksandr.pet...@gmail.com> Committed: Thu Jun 29 17:50:21 2017 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/io/sstable/CQLSSTableWriter.java | 7 + .../io/sstable/CQLSSTableWriterTest.java | 149 +++++++++++++++---- 3 files changed, 130 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1baeada/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d90d220..c5179e7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.15 + * Allow native function calls in CQLSSTableWriter (CASSANDRA-12606) * Fix secondary index queries on COMPACT tables (CASSANDRA-13627) * Nodetool listsnapshots output is missing a newline, if there are no snapshots (CASSANDRA-13568) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1baeada/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index 81a3356..39f7339 100644 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@ -31,6 +31,7 @@ import org.apache.cassandra.cql3.statements.ParsedStatement; import org.apache.cassandra.cql3.statements.UpdateStatement; import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.partitions.Partition; import org.apache.cassandra.dht.IPartitioner; @@ -40,6 +41,7 @@ import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.SchemaKeyspace; import org.apache.cassandra.schema.Tables; import org.apache.cassandra.schema.Types; import org.apache.cassandra.service.ClientState; @@ -354,6 +356,11 @@ public class CQLSSTableWriter implements Closeable { synchronized (CQLSSTableWriter.class) { + if (Schema.instance.getKSMetaData(SchemaKeyspace.NAME) == null) + Schema.instance.load(SchemaKeyspace.metadata()); + if (Schema.instance.getKSMetaData(SystemKeyspace.NAME) == null) + Schema.instance.load(SystemKeyspace.metadata()); + this.schema = getTableMetadata(schema); // We need to register the keyspace/table metadata through Schema, otherwise we won't be able to properly http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1baeada/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java index e6d18c4..7d79036 100644 --- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Iterator; import java.util.UUID; +import java.util.concurrent.ExecutionException; import com.google.common.collect.ImmutableMap; import com.google.common.io.Files; @@ -42,10 +43,13 @@ import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.*; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.OutputHandler; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; public class CQLSSTableWriterTest { @@ -94,24 +98,7 @@ public class CQLSSTableWriterTest writer.close(); - SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client() - { - private String keyspace; - - public void init(String keyspace) - { - this.keyspace = keyspace; - for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace")) - addRangeForEndpoint(range, FBUtilities.getBroadcastAddress()); - } - - public CFMetaData getTableMetadata(String cfName) - { - return Schema.instance.getCFMetaData(keyspace, cfName); - } - }, new OutputHandler.SystemOutput(false, false)); - - loader.stream().get(); + loadSSTables(dataDir, KS); UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace.table1;"); assertEquals(4, rs.size()); @@ -142,7 +129,7 @@ public class CQLSSTableWriterTest } } - @Test(expected = IllegalArgumentException.class) + @Test public void testForbidCounterUpdates() throws Exception { String KS = "cql_keyspace"; @@ -158,10 +145,18 @@ public class CQLSSTableWriterTest " PRIMARY KEY (my_id)" + ")"; String insert = String.format("UPDATE cql_keyspace.counter1 SET my_counter = my_counter - ? WHERE my_id = ?"); - CQLSSTableWriter.builder().inDirectory(dataDir) - .forTable(schema) - .withPartitioner(Murmur3Partitioner.instance) - .using(insert).build(); + try + { + CQLSSTableWriter.builder().inDirectory(dataDir) + .forTable(schema) + .withPartitioner(Murmur3Partitioner.instance) + .using(insert).build(); + fail("Counter update statements should not be supported"); + } + catch (IllegalArgumentException e) + { + assertEquals(e.getMessage(), "Counter update statements are not supported"); + } } @Test @@ -230,7 +225,102 @@ public class CQLSSTableWriterTest } + @Test + public void testUpdateStatement() throws Exception + { + final String KS = "cql_keyspace6"; + final String TABLE = "table6"; + final String schema = "CREATE TABLE " + KS + "." + TABLE + " (" + + " k int," + + " c1 int," + + " c2 int," + + " v text," + + " PRIMARY KEY (k, c1, c2)" + + ")"; + + File tempdir = Files.createTempDir(); + File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE); + assert dataDir.mkdirs(); + + CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .forTable(schema) + .using("UPDATE " + KS + "." + TABLE + " SET v = ? " + + "WHERE k = ? AND c1 = ? AND c2 = ?") + .build(); + + writer.addRow("a", 1, 2, 3); + writer.addRow("b", 4, 5, 6); + writer.addRow(null, 7, 8, 9); + writer.close(); + loadSSTables(dataDir, KS); + + UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE); + assertEquals(2, resultSet.size()); + + Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); + UntypedResultSet.Row r1 = iter.next(); + assertEquals(1, r1.getInt("k")); + assertEquals(2, r1.getInt("c1")); + assertEquals(3, r1.getInt("c2")); + assertEquals("a", r1.getString("v")); + UntypedResultSet.Row r2 = iter.next(); + assertEquals(4, r2.getInt("k")); + assertEquals(5, r2.getInt("c1")); + assertEquals(6, r2.getInt("c2")); + assertEquals("b", r2.getString("v")); + assertFalse(iter.hasNext()); + } + + @Test + public void testNativeFunctions() throws Exception + { + final String KS = "cql_keyspace7"; + final String TABLE = "table7"; + + final String schema = "CREATE TABLE " + KS + "." + TABLE + " (" + + " k int," + + " c1 int," + + " c2 int," + + " v blob," + + " PRIMARY KEY (k, c1, c2)" + + ")"; + + File tempdir = Files.createTempDir(); + File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE); + assert dataDir.mkdirs(); + + CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .forTable(schema) + .using("INSERT INTO " + KS + "." + TABLE + " (k, c1, c2, v) VALUES (?, ?, ?, textAsBlob(?))") + .build(); + + writer.addRow(1, 2, 3, "abc"); + writer.addRow(4, 5, 6, "efg"); + + writer.close(); + loadSSTables(dataDir, KS); + + UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE); + assertEquals(2, resultSet.size()); + + Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); + UntypedResultSet.Row r1 = iter.next(); + assertEquals(1, r1.getInt("k")); + assertEquals(2, r1.getInt("c1")); + assertEquals(3, r1.getInt("c2")); + assertEquals(ByteBufferUtil.bytes("abc"), r1.getBytes("v")); + + UntypedResultSet.Row r2 = iter.next(); + assertEquals(4, r2.getInt("k")); + assertEquals(5, r2.getInt("c1")); + assertEquals(6, r2.getInt("c2")); + assertEquals(ByteBufferUtil.bytes("efg"), r2.getBytes("v")); + + assertFalse(iter.hasNext()); + } private static final int NUMBER_WRITES_IN_RUNNABLE = 10; private class WriterThread extends Thread @@ -302,6 +392,14 @@ public class CQLSSTableWriterTest } } + loadSSTables(dataDir, KS); + + UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace2.table2;"); + assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size()); + } + + private static void loadSSTables(File dataDir, String ks) throws ExecutionException, InterruptedException + { SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client() { private String keyspace; @@ -309,7 +407,7 @@ public class CQLSSTableWriterTest public void init(String keyspace) { this.keyspace = keyspace; - for (Range<Token> range : StorageService.instance.getLocalRanges(KS)) + for (Range<Token> range : StorageService.instance.getLocalRanges(ks)) addRangeForEndpoint(range, FBUtilities.getBroadcastAddress()); } @@ -320,8 +418,5 @@ public class CQLSSTableWriterTest }, new OutputHandler.SystemOutput(false, false)); loader.stream().get(); - - UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace2.table2;"); - assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org