Add delete by Query method for native serialization
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/2fe2c2bb Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/2fe2c2bb Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/2fe2c2bb Branch: refs/heads/master Commit: 2fe2c2bbbffcbeb968e523dd332ad087827b3f92 Parents: 4ce6a6e Author: madhawa <[email protected]> Authored: Sat Jul 8 10:55:27 2017 +0530 Committer: madhawa <[email protected]> Committed: Sat Jul 8 10:55:27 2017 +0530 ---------------------------------------------------------------------- .../cassandra/serializers/AvroSerializer.java | 5 + .../serializers/CassandraQueryFactory.java | 131 ++++++++++++++----- .../serializers/CassandraSerializer.java | 4 +- .../cassandra/serializers/NativeSerializer.java | 13 ++ .../gora/cassandra/store/CassandraStore.java | 12 +- .../gora-cassandra-mapping.xml | 2 +- .../cassandra/store/TestCassandraStore.java | 95 -------------- ...stCassandraStoreWithNativeSerialization.java | 43 +++++- 8 files changed, 164 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/2fe2c2bb/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java index d5dd548..8061a80 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java @@ -75,4 +75,9 @@ public class AvroSerializer<K, T extends PersistentBase> extends CassandraSerial return null; } + @Override + public long deleteByQuery(Query query) { + return 0; + } + } http://git-wip-us.apache.org/repos/asf/gora/blob/2fe2c2bb/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java index 3939c34..2a980e5 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java @@ -16,6 +16,7 @@ */ package org.apache.gora.cassandra.serializers; +import com.datastax.driver.core.querybuilder.Delete; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Select; import org.apache.gora.cassandra.bean.CassandraKey; @@ -27,6 +28,7 @@ import org.apache.gora.cassandra.query.CassandraRow; import org.apache.gora.cassandra.store.CassandraMapping; import org.apache.gora.query.Query; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -290,64 +292,125 @@ class CassandraQueryFactory { } - static<K> String getExecuteQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects ) { + static String getExecuteQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects) { String[] fields = cassandraQuery.getFields(); fields = fields != null ? fields : mapping.getFieldNames(); Object startKey = cassandraQuery.getStartKey(); Object endKey = cassandraQuery.getEndKey(); - long limit = cassandraQuery.getLimit(); - Select select = QueryBuilder.select(getColumnNames(mapping,fields)).from(mapping.getKeySpace().getName(), mapping.getCoreName()); - if(limit > 0) { - select = select.limit((int)limit); + Object key = cassandraQuery.getKey(); + String primaryKey = null; + long limit = cassandraQuery.getLimit(); + Select select = QueryBuilder.select(getColumnNames(mapping, fields)).from(mapping.getKeySpace().getName(), mapping.getCoreName()); + if (limit > 0) { + select = select.limit((int) limit); } Select.Where query = null; boolean isWhereNeeded = true; - if(startKey != null) { - if (mapping.getCassandraKey() != null) { + if (key != null) { + primaryKey = getPKey(mapping.getFieldList()); + query = select.where(QueryBuilder.eq(primaryKey, "?")); + objects.add(key); + } else { + if (startKey != null) { + if (mapping.getCassandraKey() != null) { //todo avro serialization - } else { - for (Field field : mapping.getFieldList()) { - boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey")); - if (isPrimaryKey) { - query = select.where(QueryBuilder.gte(field.getColumnName(), "?")); - objects.add(startKey); - isWhereNeeded = false; - break; - } + } else { + primaryKey = getPKey(mapping.getFieldList()); + query = select.where(QueryBuilder.gte(primaryKey, "?")); + objects.add(startKey); + isWhereNeeded = false; } } - } - if(endKey != null) { - if (mapping.getCassandraKey() != null) { + if (endKey != null) { + if (mapping.getCassandraKey() != null) { //todo avro serialization - } else { - for (Field field : mapping.getFieldList()) { - boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey")); - if (isPrimaryKey) { - if(isWhereNeeded) { - query = select.where(QueryBuilder.lte(field.getColumnName(), "?")); - } else { - query = query.and(QueryBuilder.lte(field.getColumnName(), "?")); - } - objects.add(endKey); - break; + } else { + primaryKey = primaryKey != null ? primaryKey : getPKey(mapping.getFieldList()); + if (isWhereNeeded) { + query = select.where(QueryBuilder.lte(primaryKey, "?")); + } else { + query = query.and(QueryBuilder.lte(primaryKey, "?")); } + objects.add(endKey); } } } - if(startKey == null && endKey == null) { + if (startKey == null && endKey == null && key == null) { return select.getQueryString(); } - return query.getQueryString(); + return query.getQueryString(); } private static String[] getColumnNames(CassandraMapping mapping, String[] fields) { String[] columnNames = new String[fields.length]; int i = 0; - for(String field : fields) { - columnNames[i] = mapping.getField(field).getColumnName(); + for (String field : fields) { + columnNames[i] = mapping.getField(field).getColumnName(); i++; } return columnNames; } + + private static String getPKey(List<Field> fields) { + for (Field field : fields) { + boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey")); + if (isPrimaryKey) { + return field.getColumnName(); + } + } + return null; + } + + static String getDeleteByQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects) { + String[] columns = null; + if(!Arrays.equals(cassandraQuery.getFields(), mapping.getFieldNames())) { + columns = getColumnNames(mapping, cassandraQuery.getFields()); + } + Object startKey = cassandraQuery.getStartKey(); + Object endKey = cassandraQuery.getEndKey(); + Object key = cassandraQuery.getKey(); + String primaryKey = null; + Delete delete; + if (columns != null) { + delete = QueryBuilder.delete(columns).from(mapping.getKeySpace().getName(), mapping.getCoreName()); + } else { + delete = QueryBuilder.delete().from(mapping.getKeySpace().getName(), mapping.getCoreName()); + } + Delete.Where query = null; + boolean isWhereNeeded = true; + if (key != null) { + primaryKey = getPKey(mapping.getFieldList()); + query = delete.where(QueryBuilder.eq(primaryKey, "?")); + objects.add(key); + } else { + if (startKey != null) { + if (mapping.getCassandraKey() != null) { +//todo avro serialization + } else { + primaryKey = getPKey(mapping.getFieldList()); + query = delete.where(QueryBuilder.gte(primaryKey, "?")); + objects.add(startKey); + isWhereNeeded = false; + } + } + if (endKey != null) { + if (mapping.getCassandraKey() != null) { +//todo avro serialization + } else { + primaryKey = primaryKey != null ? primaryKey : getPKey(mapping.getFieldList()); + if (isWhereNeeded) { + query = delete.where(QueryBuilder.lte(primaryKey, "?")); + } else { + query = query.and(QueryBuilder.lte(primaryKey, "?")); + } + objects.add(endKey); + } + } + } + if (startKey == null && endKey == null && key == null) { + return delete.getQueryString(); + } + return query.getQueryString(); + } + } http://git-wip-us.apache.org/repos/asf/gora/blob/2fe2c2bb/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java index cafc1be..e125a33 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java @@ -119,6 +119,8 @@ public abstract class CassandraSerializer<K, T extends Persistent> { public abstract T get(K key, String[] fields); - public abstract Result<K, T> execute(DataStore<K, T> dataStore,Query<K, T> query); + public abstract Result<K, T> execute(DataStore<K, T> dataStore, Query<K, T> query); + + public abstract long deleteByQuery(Query<K, T> query); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/2fe2c2bb/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java index c0c8d15..d551d33 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java @@ -72,6 +72,19 @@ public class NativeSerializer<K, T extends CassandraNativePersistent> extends Ca } @Override + public long deleteByQuery(Query query) { + List<Object> objectArrayList = new ArrayList<>(); + String cqlQuery = CassandraQueryFactory.getDeleteByQuery(mapping, query, objectArrayList); + ResultSet results; + if (objectArrayList.size() == 0) { + results = client.getSession().execute(cqlQuery); + } else { + results = client.getSession().execute(cqlQuery, objectArrayList.toArray()); + } + return 0; + } + + @Override public org.apache.gora.query.Result execute(DataStore dataStore, Query query) { List<Object> objectArrayList = new ArrayList<>(); CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<K, T>(dataStore, query); http://git-wip-us.apache.org/repos/asf/gora/blob/2fe2c2bb/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java index 7c24175..3fb1507 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java @@ -90,9 +90,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> cassandraClient.initialize(properties); cassandraSerializer = CassandraSerializer.getSerializer(cassandraClient, properties.getProperty(CassandraStoreParameters.CASSANDRA_SERIALIZATION_TYPE), keyClass, persistentClass, mapping); } catch (Exception e) { - LOG.error("Error while initializing Cassandra store: {}", - new Object[]{e.getMessage()}); - throw new RuntimeException(e); + throw new RuntimeException("Error while initializing Cassandra store: "+ e.getMessage(),e); } } @@ -152,8 +150,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> return keyClass.newInstance(); } } catch (Exception ex) { - LOG.error(ex.getMessage(), ex); - return null; + throw new RuntimeException("Error while instantiating a key: "+ ex.getMessage(),ex); } } @@ -168,8 +165,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> return persistentClass.newInstance(); } } catch (Exception ex) { - LOG.error(ex.getMessage(), ex); - return null; + throw new RuntimeException("Error while instantiating a persistent: "+ ex.getMessage(),ex); } } @@ -210,7 +206,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> @Override public long deleteByQuery(Query<K, T> query) { - return 0; + return cassandraSerializer.deleteByQuery(query); } @Override http://git-wip-us.apache.org/repos/asf/gora/blob/2fe2c2bb/gora-cassandra-cql/src/test/conf/nativeSerialization/gora-cassandra-mapping.xml ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/conf/nativeSerialization/gora-cassandra-mapping.xml b/gora-cassandra-cql/src/test/conf/nativeSerialization/gora-cassandra-mapping.xml index d3dde32..b8c2df8 100644 --- a/gora-cassandra-cql/src/test/conf/nativeSerialization/gora-cassandra-mapping.xml +++ b/gora-cassandra-cql/src/test/conf/nativeSerialization/gora-cassandra-mapping.xml @@ -56,7 +56,7 @@ </keyspace> <class name="org.apache.gora.cassandra.example.generated.nativeSerialization.User" keyClass="java.util.UUID" keyspace="nativeTestKeySpace" - table="Users" compactStorage="true" > + table="Users" > <field name="userId" column="user_id" type="uuid" ttl="10" primarykey="true"/> <field name="name" column="name" type="text" ttl="10"/> <field name="dateOfBirth" column="dob" type="timestamp" ttl="10"/> http://git-wip-us.apache.org/repos/asf/gora/blob/2fe2c2bb/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java deleted file mode 100644 index 34255a6..0000000 --- a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Testing class for all standard gora-cassandra functionality. - * We extend DataStoreTestBase enabling us to run the entire base test - * suite for Gora. - */ -package org.apache.gora.cassandra.store; - -import org.apache.gora.cassandra.GoraCassandraTestDriver; -import org.apache.gora.store.DataStoreTestBase; -import org.junit.Before; -import org.junit.Ignore; - -import java.io.IOException; -import java.util.Properties; - -/** - * Test for CassandraStore. - */ -public class TestCassandraStore extends DataStoreTestBase{ - private static Properties properties; - - static { - GoraCassandraTestDriver testDriver = new GoraCassandraTestDriver(); - setProperties(); - testDriver.setParameters(properties); - setTestDriver(testDriver); - } - - @Before - public void setUp() throws Exception { - super.setUp(); - } - - - private static void setProperties() { - properties = new Properties(); - properties.setProperty(CassandraStoreParameters.CASSANDRA_SERVERS, "localhost"); - properties.setProperty(CassandraStoreParameters.PORT, "9042"); - properties.setProperty(CassandraStoreParameters.ENABLE_JMX_REPORTING, "false"); - properties.setProperty(CassandraStoreParameters.PROTOCOL_VERSION, "3"); - properties.setProperty(CassandraStoreParameters.CLUSTER_NAME,"Test Cluster"); - properties.setProperty("gora.cassandrastore.mapping.file", "avro/gora-cassandra-mapping.xml"); - } - @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));") - @Override - public void testQuery() throws IOException {} - @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));") - @Override - public void testQueryStartKey() throws IOException {} - @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));") - @Override - public void testQueryEndKey() throws IOException {} - @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));") - @Override - public void testQueryKeyRange() throws IOException {} - @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));") - @Override - public void testQueryWebPageSingleKey() throws IOException {} - @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));") - @Override - public void testQueryWebPageSingleKeyDefaultFields() throws IOException {} - @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));") - @Override - public void testQueryWebPageQueryEmptyResults() throws IOException {} - @Ignore("GORA-154 delete() and deleteByQuery() methods are not implemented at CassandraStore, and always returns false or 0") - @Override - public void testDelete() throws IOException {} - @Ignore("GORA-154 delete() and deleteByQuery() methods are not implemented at CassandraStore, and always returns false or 0") - @Override - public void testDeleteByQuery() throws IOException {} - @Ignore("GORA-154 delete() and deleteByQuery() methods are not implemented at CassandraStore, and always returns false or 0") - @Override - public void testDeleteByQueryFields() throws IOException {} - @Ignore("GORA-298 Implement CassandraStore#getPartitions") - @Override - public void testGetPartitions() throws IOException {} -} http://git-wip-us.apache.org/repos/asf/gora/blob/2fe2c2bb/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java index 160a5ae..b2ac3c1 100644 --- a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java +++ b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java @@ -183,7 +183,7 @@ public class TestCassandraStoreWithNativeSerialization { i++; } Assert.assertEquals(result1.getProgress(),1.0,0.0); - Assert.assertEquals(i, 3); + Assert.assertEquals(3, i); // Check limit query Query<UUID, User> query2 = userDataStore.newQuery(); @@ -196,6 +196,45 @@ public class TestCassandraStoreWithNativeSerialization { Assert.assertEquals(result2.get().getUserId(), users.get(result2.getKey()).getUserId()); i++; } - Assert.assertEquals(i, 2); + Assert.assertEquals(2, i); + + // check key element + Query<UUID, User> query3 = userDataStore.newQuery(); + query3.setKey(id1); + + Result<UUID, User> result3 = userDataStore.execute(query3); + i = 0; + while (result3.next()) { + Assert.assertEquals(result3.get().getName(), users.get(result3.getKey()).getName()); + Assert.assertEquals(result3.get().getDateOfBirth(), users.get(result3.getKey()).getDateOfBirth()); + Assert.assertEquals(result3.get().getUserId(), users.get(result3.getKey()).getUserId()); + i++; + } + Assert.assertEquals(1, i); + } + + @Test + public void testDeleteByQuery() throws Exception { + userDataStore.truncateSchema(); + UUID id1 = UUID.randomUUID(); + User user1 = new User(id1, "user1", Date.from(Instant.now())); + userDataStore.put(id1, user1); + UUID id2 = UUID.randomUUID(); + User user2 = new User(id2, "user2", Date.from(Instant.now())); + userDataStore.put(id2, user2); + Query<UUID, User> query1 = userDataStore.newQuery(); + query1.setKey(id1); + userDataStore.deleteByQuery(query1); + User user = userDataStore.get(id1); + Assert.assertNull(user); + + //test deleteByFields + Query<UUID, User> query2 = userDataStore.newQuery(); + query2.setKey(id2); + query2.setFields("name"); + userDataStore.deleteByQuery(query2); + User partialDeletedUser = userDataStore.get(id2); + Assert.assertNull(partialDeletedUser.getName()); + Assert.assertEquals(partialDeletedUser.getDateOfBirth(),user2.getDateOfBirth()); } }
