Repository: cassandra Updated Branches: refs/heads/trunk ac53b720b -> 716264c72
Allow unset values in CQLSSTableWriter Patch by Alex Petrov; reviewed by Benjamin Lerer for CASSANDRA-11911 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/716264c7 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/716264c7 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/716264c7 Branch: refs/heads/trunk Commit: 716264c72f0980e763da540f59912d11bbfb4695 Parents: ac53b72 Author: Alex Petrov <[email protected]> Authored: Mon Jun 27 11:34:42 2016 +0200 Committer: Benjamin Lerer <[email protected]> Committed: Mon Jun 27 11:34:42 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/io/sstable/CQLSSTableWriter.java | 14 ++- .../io/sstable/CQLSSTableWriterTest.java | 108 ++++++++++++++++++- 3 files changed, 118 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/716264c7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9486c53..f441f8b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.8 + * Allow unset values in CQLSSTableWriter (CASSANDRA-11911) * Chunk cache to request compressor-compatible buffers if pool space is exhausted (CASSANDRA-11993) * Remove DatabaseDescriptor dependencies from SequentialWriter (CASSANDRA-11579) * Move skip_stop_words filter before stemming (CASSANDRA-12078) http://git-wip-us.apache.org/repos/asf/cassandra/blob/716264c7/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index 2d9e379..76c0e19 100644 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@ -101,6 +101,8 @@ import org.apache.cassandra.utils.Pair; */ public class CQLSSTableWriter implements Closeable { + public static final ByteBuffer UNSET_VALUE = ByteBufferUtil.UNSET_BYTE_BUFFER; + static { Config.setClientMode(true); @@ -172,7 +174,7 @@ public class CQLSSTableWriter implements Closeable for (int i = 0; i < size; i++) { Object value = values.get(i); - rawValues.add(value == null ? null : typeCodecs.get(i).serialize(value, ProtocolVersion.NEWEST_SUPPORTED)); + rawValues.add(serialize(value, typeCodecs.get(i))); } return rawAddRow(rawValues); @@ -207,8 +209,7 @@ public class CQLSSTableWriter implements Closeable { ColumnSpecification spec = boundNames.get(i); Object value = values.get(spec.name.toString()); - - rawValues.add(value == null ? null : typeCodecs.get(i).serialize(value, ProtocolVersion.NEWEST_SUPPORTED)); + rawValues.add(serialize(value, typeCodecs.get(i))); } return rawAddRow(rawValues); } @@ -324,6 +325,13 @@ public class CQLSSTableWriter implements Closeable writer.close(); } + private ByteBuffer serialize(Object value, TypeCodec codec) + { + if (value == null || value == UNSET_VALUE) + return (ByteBuffer) value; + + return codec.serialize(value, ProtocolVersion.NEWEST_SUPPORTED); + } /** * A Builder for a CQLSSTableWriter object. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/716264c7/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java index 437e7a3..caa92f6 100644 --- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java @@ -38,9 +38,9 @@ import org.apache.cassandra.cql3.*; import org.apache.cassandra.cql3.functions.UDHelper; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.*; +import org.apache.cassandra.exceptions.*; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.OutputHandler; +import org.apache.cassandra.utils.*; import com.datastax.driver.core.DataType; import com.datastax.driver.core.ProtocolVersion; import com.datastax.driver.core.TypeCodec; @@ -48,6 +48,7 @@ import com.datastax.driver.core.UDTValue; import com.datastax.driver.core.UserType; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; public class CQLSSTableWriterTest { @@ -429,6 +430,109 @@ public class CQLSSTableWriterTest } } + @Test + public void testUnsetValues() throws Exception + { + final String KS = "cql_keyspace5"; + final String TABLE = "table5"; + + final String schema = "CREATE TABLE " + KS + "." + TABLE + " (" + + " k int," + + " c1 int," + + " c2 int," + + " v text," + + " PRIMARY KEY (k, c1, c2)" + + ")"; + + File tempdir = Files.createTempDir(); + File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE); + assert dataDir.mkdirs(); + + CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .forTable(schema) + .using("INSERT INTO " + KS + "." + TABLE + " (k, c1, c2, v) " + + "VALUES (?, ?, ?, ?)") + .build(); + + try + { + writer.addRow(1, 1, 1); + fail("Passing less arguments then expected in prepared statement should not work."); + } + catch (InvalidRequestException e) + { + assertEquals("Invalid number of arguments, expecting 4 values but got 3", + e.getMessage()); + } + + try + { + writer.addRow(1, 1, CQLSSTableWriter.UNSET_VALUE, "1"); + fail("Unset values should not work with clustering columns."); + } + catch (InvalidRequestException e) + { + assertEquals("Invalid unset value for column c2", + e.getMessage()); + } + + try + { + writer.addRow(ImmutableMap.<String, Object>builder().put("k", 1).put("c1", 1).put("v", CQLSSTableWriter.UNSET_VALUE).build()); + fail("Unset or null clustering columns should not be allowed."); + } + catch (InvalidRequestException e) + { + assertEquals("Invalid null value in condition for column c2", + e.getMessage()); + } + + writer.addRow(1, 1, 1, CQLSSTableWriter.UNSET_VALUE); + writer.addRow(2, 2, 2, null); + writer.addRow(Arrays.asList(3, 3, 3, CQLSSTableWriter.UNSET_VALUE)); + writer.addRow(ImmutableMap.<String, Object>builder() + .put("k", 4) + .put("c1", 4) + .put("c2", 4) + .put("v", CQLSSTableWriter.UNSET_VALUE) + .build()); + writer.addRow(Arrays.asList(3, 3, 3, CQLSSTableWriter.UNSET_VALUE)); + writer.addRow(5, 5, 5, "5"); + + writer.close(); + loadSSTables(dataDir, KS); + + UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE); + Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); + UntypedResultSet.Row r1 = iter.next(); + assertEquals(1, r1.getInt("k")); + assertEquals(1, r1.getInt("c1")); + assertEquals(1, r1.getInt("c2")); + assertEquals(false, r1.has("v")); + UntypedResultSet.Row r2 = iter.next(); + assertEquals(2, r2.getInt("k")); + assertEquals(2, r2.getInt("c1")); + assertEquals(2, r2.getInt("c2")); + assertEquals(false, r2.has("v")); + UntypedResultSet.Row r3 = iter.next(); + assertEquals(3, r3.getInt("k")); + assertEquals(3, r3.getInt("c1")); + assertEquals(3, r3.getInt("c2")); + assertEquals(false, r3.has("v")); + UntypedResultSet.Row r4 = iter.next(); + assertEquals(4, r4.getInt("k")); + assertEquals(4, r4.getInt("c1")); + assertEquals(4, r4.getInt("c2")); + assertEquals(false, r3.has("v")); + UntypedResultSet.Row r5 = iter.next(); + assertEquals(5, r5.getInt("k")); + assertEquals(5, r5.getInt("c1")); + assertEquals(5, r5.getInt("c2")); + assertEquals(true, r5.has("v")); + assertEquals("5", r5.getString("v")); + } + private static void loadSSTables(File dataDir, String ks) throws ExecutionException, InterruptedException { SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
