GORA-530 : Reinstated exception throwing in DataStore and Query
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/b06da5f3 Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/b06da5f3 Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/b06da5f3 Branch: refs/heads/master Commit: b06da5f32ec572c88f7ec5245a4b573c73ae8c22 Parents: f028a54 Author: Alfonso Nishikawa Muñumer <[email protected]> Authored: Thu Jan 18 16:37:09 2018 -0100 Committer: Alfonso Nishikawa Muñumer <[email protected]> Committed: Thu Jan 18 16:37:09 2018 -0100 ---------------------------------------------------------------------- .../gora/accumulo/store/AccumuloStore.java | 75 ++--- .../gora/aerospike/store/AerospikeStore.java | 290 ++++++++++-------- .../cassandra/serializers/AvroSerializer.java | 306 ++++++++++--------- .../serializers/CassandraSerializer.java | 148 +++++---- .../cassandra/serializers/NativeSerializer.java | 121 +++++--- .../gora/cassandra/store/CassandraStore.java | 78 +++-- .../store/TestAvroSerializationWithUDT.java | 4 +- .../cassandra/store/TestCassandraStore.java | 3 +- .../TestCassandraStoreWithCassandraKey.java | 11 +- ...stCassandraStoreWithNativeSerialization.java | 18 +- .../store/TestNativeSerializationWithUDT.java | 4 +- .../org/apache/gora/avro/store/AvroStore.java | 28 +- .../gora/avro/store/DataFileAvroStore.java | 33 +- .../org/apache/gora/memory/store/MemStore.java | 24 +- .../gora/persistency/impl/BeanFactoryImpl.java | 8 +- .../main/java/org/apache/gora/query/Query.java | 3 +- .../org/apache/gora/query/impl/QueryBase.java | 3 +- .../apache/gora/query/ws/impl/QueryWSBase.java | 3 +- .../java/org/apache/gora/store/DataStore.java | 29 +- .../apache/gora/store/impl/DataStoreBase.java | 42 ++- .../store/impl/FileBackedDataStoreBase.java | 50 ++- .../store/ws/impl/WSBackedDataStoreBase.java | 16 +- .../gora/store/ws/impl/WSDataStoreBase.java | 5 +- .../apache/gora/memory/store/MemStoreTest.java | 22 +- .../apache/gora/mock/store/MockDataStore.java | 30 +- .../apache/gora/store/DataStoreTestUtil.java | 3 +- .../apache/gora/couchdb/store/CouchDBStore.java | 198 +++++++----- .../gora/couchdb/store/TestCouchDBStore.java | 3 +- .../gora/dynamodb/store/DynamoDBAvroStore.java | 11 +- .../dynamodb/store/DynamoDBNativeStore.java | 151 +++++---- .../gora/dynamodb/store/DynamoDBStore.java | 100 +++--- .../org/apache/gora/hbase/store/HBaseStore.java | 109 ++++--- .../gora/infinispan/store/InfinispanClient.java | 29 +- .../gora/infinispan/store/InfinispanStore.java | 131 +++++--- .../java/org/apache/gora/infinispan/Utils.java | 3 +- .../gora/jcache/store/JCacheCacheLoader.java | 34 ++- .../gora/jcache/store/JCacheCacheWriter.java | 30 +- .../apache/gora/jcache/store/JCacheStore.java | 157 +++++++--- .../apache/gora/mongodb/store/MongoStore.java | 270 ++++++++++------ .../gora/mongodb/store/TestMongoStore.java | 3 +- .../gora/orientdb/store/OrientDBStore.java | 140 ++++----- .../org/apache/gora/solr/query/SolrResult.java | 12 +- .../org/apache/gora/solr/store/SolrStore.java | 62 ++-- .../tutorial/log/DistributedLogManager.java | 23 +- 44 files changed, 1649 insertions(+), 1174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java ---------------------------------------------------------------------- diff --git a/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java b/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java index bac354b..6737dbb 100644 --- a/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java +++ b/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java @@ -359,7 +359,7 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T * @param properties */ @Override - public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) { + public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException { try{ super.initialize(keyClass, persistentClass, properties); @@ -376,7 +376,8 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T try { encoder = (Encoder) getClass().getClassLoader().loadClass(mapping.encoder).newInstance(); } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { - throw new IOException(e); + LOG.error(e.getMessage(), e); + throw new GoraException(e); } } @@ -394,10 +395,12 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T if (autoCreateSchema && !schemaExists()) createSchema(); } catch (AccumuloException | AccumuloSecurityException e) { - throw new IOException(e); + LOG.error(e.getMessage(), e); + throw new GoraException(e); } } catch(IOException e){ LOG.error(e.getMessage(), e); + throw new GoraException(e); } } @@ -470,7 +473,7 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T } @Override - public void createSchema() { + public void createSchema() throws GoraException { try { conn.tableOperations().create(mapping.tableName); Set<Entry<String,String>> es = mapping.tableConfig.entrySet(); @@ -480,13 +483,15 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T } catch (AccumuloException | AccumuloSecurityException e) { LOG.error(e.getMessage(), e); + throw new GoraException(e); } catch (TableExistsException e) { LOG.debug(e.getMessage(), e); + // Assume this is not an error } } @Override - public void deleteSchema() { + public void deleteSchema() throws GoraException { try { if (batchWriter != null) batchWriter.close(); @@ -494,12 +499,18 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T conn.tableOperations().delete(mapping.tableName); } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { LOG.error(e.getMessage(), e); + throw new GoraException(e); } } @Override - public boolean schemaExists() { - return conn.tableOperations().exists(mapping.tableName); + public boolean schemaExists() throws GoraException { + try { + return conn.tableOperations().exists(mapping.tableName); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); + } } public ByteSequence populate(Iterator<Entry<Key,Value>> iter, T persistent) throws IOException { @@ -639,7 +650,7 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T } @Override - public T get(K key, String[] fields) { + public T get(K key, String[] fields) throws GoraException { try { // TODO make isolated scanner optional? Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY)); @@ -653,17 +664,14 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T if (row == null) return null; return persistent; - } catch (TableNotFoundException e) { + } catch (Exception e) { LOG.error(e.getMessage(), e); - return null; - } catch (IOException e) { - LOG.error(e.getMessage(), e); - return null; + throw new GoraException(e); } } @Override - public void put(K key, T val) { + public void put(K key, T val) throws GoraException { try{ Mutation m = new Mutation(new Text(toBytes(key))); @@ -725,8 +733,11 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T } catch (MutationsRejectedException e) { LOG.error(e.getMessage(), e); } - } catch (IOException e) { + } catch (GoraException e) { + throw e; + } catch (Exception e) { LOG.error(e.getMessage(), e); + throw new GoraException(e); } } @@ -758,7 +769,7 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T return count; } - private int putArray(Mutation m, int count, Object o, Pair<Text, Text> col, String fieldName) { + private int putArray(Mutation m, int count, Object o, Pair<Text, Text> col, String fieldName) throws GoraException { // First of all we delete array field on accumulo store Text rowKey = new Text(m.getRow()); @@ -782,14 +793,14 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T } @Override - public boolean delete(K key) { + public boolean delete(K key) throws GoraException { Query<K,T> q = newQuery(); q.setKey(key); return deleteByQuery(q) > 0; } @Override - public long deleteByQuery(Query<K,T> query) { + public long deleteByQuery(Query<K,T> query) throws GoraException { try { Scanner scanner = createScanner(query); // add iterator that drops values on the server side @@ -814,16 +825,9 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T } return count; - } catch (TableNotFoundException e) { - // TODO return 0? + } catch (Exception e) { LOG.error(e.getMessage(), e); - return 0; - } catch (MutationsRejectedException e) { - LOG.error(e.getMessage(), e); - return 0; - } catch (IOException e){ - LOG.error(e.getMessage(), e); - return 0; + throw new GoraException(e); } } @@ -865,14 +869,13 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T * Execute the query and return the result. */ @Override - public Result<K,T> execute(Query<K,T> query) { + public Result<K,T> execute(Query<K,T> query) throws GoraException { try { Scanner scanner = createScanner(query); return new AccumuloResult<>(this, query, scanner); } catch (TableNotFoundException e) { - // TODO return empty result? LOG.error(e.getMessage(), e); - return null; + throw new GoraException(e) ; } } @@ -893,7 +896,7 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T } @Override - public List<PartitionQuery<K,T>> getPartitions(Query<K,T> query) throws IOException { + public List<PartitionQuery<K,T>> getPartitions(Query<K,T> query) throws GoraException { try { TabletLocator tl; if (conn instanceof MockConnector) @@ -962,8 +965,9 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T } return ret; - } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) { - throw new IOException(e); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } } @@ -1020,13 +1024,14 @@ public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T } @Override - public void flush() { + public void flush() throws GoraException { try { if (batchWriter != null) { batchWriter.flush(); } - } catch (MutationsRejectedException e) { + } catch (Exception e) { LOG.error(e.getMessage(), e); + throw new GoraException(e); } } http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java ---------------------------------------------------------------------- diff --git a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java index b456cf7..cf41392 100644 --- a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java +++ b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java @@ -18,20 +18,12 @@ package org.apache.gora.aerospike.store; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.ArrayList; import java.util.Map; import java.util.Properties; -import com.aerospike.client.Key; -import com.aerospike.client.Value; -import com.aerospike.client.Bin; -import com.aerospike.client.Record; -import com.aerospike.client.AerospikeClient; -import com.aerospike.client.policy.ClientPolicy; -import com.aerospike.client.query.RecordSet; -import com.aerospike.client.query.Statement; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.util.Utf8; @@ -48,9 +40,20 @@ import org.apache.gora.query.Result; import org.apache.gora.query.impl.PartitionQueryImpl; import org.apache.gora.store.impl.DataStoreBase; import org.apache.gora.util.AvroUtils; +import org.apache.gora.util.GoraException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.aerospike.client.AerospikeClient; +import com.aerospike.client.AerospikeException; +import com.aerospike.client.Bin; +import com.aerospike.client.Key; +import com.aerospike.client.Record; +import com.aerospike.client.Value; +import com.aerospike.client.policy.ClientPolicy; +import com.aerospike.client.query.RecordSet; +import com.aerospike.client.query.Statement; + /** * Implementation of a Aerospike data store to be used by gora. * @@ -79,36 +82,41 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K * @param properties properties */ @Override - public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) { + public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException { super.initialize(keyClass, persistentClass, properties); - AerospikeMappingBuilder aerospikeMappingBuilder = new AerospikeMappingBuilder(); - aerospikeMappingBuilder - .readMappingFile(getConf().get(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE), keyClass, - persistentClass); - aerospikeParameters = new AerospikeParameters(aerospikeMappingBuilder.getAerospikeMapping(), - properties, getConf()); - ClientPolicy policy = new ClientPolicy(); - policy.writePolicyDefault = aerospikeParameters.getAerospikeMapping().getWritePolicy(); - policy.readPolicyDefault = aerospikeParameters.getAerospikeMapping().getReadPolicy(); - - // 'SendKey' property is enabled by default as the key is needed in query execution - policy.readPolicyDefault.sendKey = true; - policy.writePolicyDefault.sendKey = true; - - // Set the credentials for servers with restricted access - if (aerospikeParameters.getUsername() != null) { - policy.user = aerospikeParameters.getUsername(); - } - if (aerospikeParameters.getPassword() != null) { - policy.password = aerospikeParameters.getPassword(); + try { + AerospikeMappingBuilder aerospikeMappingBuilder = new AerospikeMappingBuilder(); + aerospikeMappingBuilder + .readMappingFile(getConf().get(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE), keyClass, + persistentClass); + aerospikeParameters = new AerospikeParameters(aerospikeMappingBuilder.getAerospikeMapping(), + properties, getConf()); + ClientPolicy policy = new ClientPolicy(); + policy.writePolicyDefault = aerospikeParameters.getAerospikeMapping().getWritePolicy(); + policy.readPolicyDefault = aerospikeParameters.getAerospikeMapping().getReadPolicy(); + + // 'SendKey' property is enabled by default as the key is needed in query execution + policy.readPolicyDefault.sendKey = true; + policy.writePolicyDefault.sendKey = true; + + // Set the credentials for servers with restricted access + if (aerospikeParameters.getUsername() != null) { + policy.user = aerospikeParameters.getUsername(); + } + if (aerospikeParameters.getPassword() != null) { + policy.password = aerospikeParameters.getPassword(); + } + + aerospikeClient = new AerospikeClient(policy, aerospikeParameters.getHost(), + aerospikeParameters.getPort()); + aerospikeParameters.setServerSpecificParameters(aerospikeClient); + aerospikeParameters.validateServerBinConfiguration(persistentClass.getFields()); + LOG.info("Aerospike Gora datastore initialized successfully."); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } - - aerospikeClient = new AerospikeClient(policy, aerospikeParameters.getHost(), - aerospikeParameters.getPort()); - aerospikeParameters.setServerSpecificParameters(aerospikeClient); - aerospikeParameters.validateServerBinConfiguration(persistentClass.getFields()); - LOG.info("Aerospike Gora datastore initialized successfully."); } /** @@ -129,7 +137,7 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K * the fly. Thus, schema creation functionality is unavailable in gora-aerospike module. */ @Override - public void createSchema() { + public void createSchema() throws GoraException { } /** @@ -138,7 +146,7 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K * the fly. Thus, schema deletion functionality is unavailable in gora-aerospike module. */ @Override - public void deleteSchema() { + public void deleteSchema() throws GoraException { } /** @@ -147,7 +155,7 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K * the fly. Thus, schema exists functionality is unavailable in gora-aerospike module. */ @Override - public boolean schemaExists() { + public boolean schemaExists() throws GoraException { return true; } @@ -159,17 +167,25 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K * @return the Object corresponding to the key or null if it cannot be found */ @Override - public T get(K key, String[] fields) { - - Key recordKey = getAerospikeKey(key); - fields = getFieldsToQuery(fields); + public T get(K key, String[] fields) throws GoraException { - Record record = aerospikeClient - .get(aerospikeParameters.getAerospikeMapping().getReadPolicy(), recordKey, fields); - if (record == null) { - return null; + try { + Key recordKey = getAerospikeKey(key); + fields = getFieldsToQuery(fields); + + Record record = aerospikeClient + .get(aerospikeParameters.getAerospikeMapping().getReadPolicy(), recordKey, fields); + + if (record == null) { + return null; + } + return createPersistentInstance(record, fields); + } catch (GoraException e) { + throw e; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } - return createPersistentInstance(record, fields); } /** @@ -181,36 +197,41 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K * @param persistent object to be persisted */ @Override - public void put(K key, T persistent) { - - Key recordKey = getAerospikeKey(key); + public void put(K key, T persistent) throws GoraException { - List<Field> fields = persistent.getSchema().getFields(); - - for (int i = 0; i < fields.size(); i++) { - if (!persistent.isDirty(i)) { - continue; - } - Object persistentValue = persistent.get(i); - - String mappingBinName = aerospikeParameters.getAerospikeMapping().getBinMapping() - .get(fields.get(i).name()); - if (mappingBinName == null) { - LOG.error("Aerospike mapping for field {}#{} not found. Wrong gora-aerospike-mapping.xml?", - persistent.getClass().getName(), fields.get(i).name()); - throw new RuntimeException( - "Aerospike mapping for field [" + persistent.getClass().getName() + "#" + fields - .get(i).name() + "] not found. Wrong gora-aerospike-mapping.xml?"); - } - Bin bin; - if (persistentValue != null) { - bin = new Bin(mappingBinName, - getSerializableValue(persistentValue, fields.get(i).schema())); - } else { - bin = Bin.asNull(mappingBinName); + try { + Key recordKey = getAerospikeKey(key); + + List<Field> fields = persistent.getSchema().getFields(); + + for (int i = 0; i < fields.size(); i++) { + if (!persistent.isDirty(i)) { + continue; + } + Object persistentValue = persistent.get(i); + + String mappingBinName = aerospikeParameters.getAerospikeMapping().getBinMapping() + .get(fields.get(i).name()); + if (mappingBinName == null) { + LOG.error("Aerospike mapping for field {}#{} not found. Wrong gora-aerospike-mapping.xml?", + persistent.getClass().getName(), fields.get(i).name()); + throw new RuntimeException( + "Aerospike mapping for field [" + persistent.getClass().getName() + "#" + fields + .get(i).name() + "] not found. Wrong gora-aerospike-mapping.xml?"); + } + Bin bin; + if (persistentValue != null) { + bin = new Bin(mappingBinName, + getSerializableValue(persistentValue, fields.get(i).schema())); + } else { + bin = Bin.asNull(mappingBinName); + } + aerospikeClient + .put(aerospikeParameters.getAerospikeMapping().getWritePolicy(), recordKey, bin); } - aerospikeClient - .put(aerospikeParameters.getAerospikeMapping().getWritePolicy(), recordKey, bin); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } } @@ -221,10 +242,15 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K * @return whether the object was successfully deleted */ @Override - public boolean delete(K key) { - Key recordKey = getAerospikeKey(key); - return aerospikeClient - .delete(aerospikeParameters.getAerospikeMapping().getWritePolicy(), recordKey); + public boolean delete(K key) throws GoraException { + try { + Key recordKey = getAerospikeKey(key); + return aerospikeClient + .delete(aerospikeParameters.getAerospikeMapping().getWritePolicy(), recordKey); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); + } } /** @@ -234,10 +260,10 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K * @return the number of deleted records */ @Override - public long deleteByQuery(Query<K, T> query) { - Result<K, T> result = query.execute(); + public long deleteByQuery(Query<K, T> query) throws GoraException { int deleteCount = 0; try { + Result<K, T> result = query.execute(); while (result.next()) { if (aerospikeClient.delete(null, getAerospikeKey(result.getKey()))) { deleteCount++; @@ -246,7 +272,7 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K return deleteCount; } catch (Exception e) { LOG.error(e.getMessage(), e); - return -1; + throw new GoraException(e); } } @@ -257,59 +283,64 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K * @return the query result */ @Override - public Result<K, T> execute(Query<K, T> query) { + public Result<K, T> execute(Query<K, T> query) throws GoraException { List<AerospikeResultRecord> resultRecords = new ArrayList<>(); String namespace = aerospikeParameters.getAerospikeMapping().getNamespace(); String set = aerospikeParameters.getAerospikeMapping().getSet(); - // Query execution without any keys - if (query.getStartKey() == null && query.getEndKey() == null) { - - try (RecordSet recordSet = aerospikeClient.query(null, getStatement(namespace, set))) { - while (recordSet.next()) { - AerospikeResultRecord aerospikeRecord = new AerospikeResultRecord(recordSet.getKey(), - recordSet.getRecord()); - resultRecords.add(aerospikeRecord); + try { + // Query execution without any keys + if (query.getStartKey() == null && query.getEndKey() == null) { + + try (RecordSet recordSet = aerospikeClient.query(null, getStatement(namespace, set))) { + while (recordSet.next()) { + AerospikeResultRecord aerospikeRecord = new AerospikeResultRecord(recordSet.getKey(), + recordSet.getRecord()); + resultRecords.add(aerospikeRecord); + } } } - } - - // Query execution for single key - else if (query.getKey() != null) { - Key key = getAerospikeKey(query.getKey()); - Record record = aerospikeClient.get(null, key); - if (record != null) { - resultRecords.add(new AerospikeResultRecord(key, record)); + + // Query execution for single key + else if (query.getKey() != null) { + Key key = getAerospikeKey(query.getKey()); + Record record = aerospikeClient.get(null, key); + if (record != null) { + resultRecords.add(new AerospikeResultRecord(key, record)); + } } + + // Query execution for key ranges + // ToDo: Implement Query execution for key ranges + // else if (query.getStartKey() != null && query.getEndKey() != null) { + // + // // the key range filtering at the gora side, which is not a better solution + // String lowerBound = query.getStartKey().toString(); + // String upperBound = query.getEndKey().toString(); + // + // try (RecordSet recordSet = aerospikeClient.query(null, getStatement(namespace, set))) { + // while (recordSet.next()) { + // Key key = recordSet.getKey(); + // Record record = recordSet.getRecord(); + // + // String input = key.userKey.toString(); + // boolean isSpecifiedRange = input.compareToIgnoreCase(lowerBound) >= 0 && input + // .compareToIgnoreCase(upperBound) <= 0; + // + // if (isSpecifiedRange) { + // AerospikeResultRecord aerospikeRecord = new AerospikeResultRecord(key, record); + // resultRecords.add(aerospikeRecord); + // } + // + // } + // } + // } + return new AerospikeQueryResult<>(this, query, resultRecords, getFieldsToQuery(null)); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e) ; } - - // Query execution for key ranges - // ToDo: Implement Query execution for key ranges - // else if (query.getStartKey() != null && query.getEndKey() != null) { - // - // // the key range filtering at the gora side, which is not a better solution - // String lowerBound = query.getStartKey().toString(); - // String upperBound = query.getEndKey().toString(); - // - // try (RecordSet recordSet = aerospikeClient.query(null, getStatement(namespace, set))) { - // while (recordSet.next()) { - // Key key = recordSet.getKey(); - // Record record = recordSet.getRecord(); - // - // String input = key.userKey.toString(); - // boolean isSpecifiedRange = input.compareToIgnoreCase(lowerBound) >= 0 && input - // .compareToIgnoreCase(upperBound) <= 0; - // - // if (isSpecifiedRange) { - // AerospikeResultRecord aerospikeRecord = new AerospikeResultRecord(key, record); - // resultRecords.add(aerospikeRecord); - // } - // - // } - // } - // } - return new AerospikeQueryResult<>(this, query, resultRecords, getFieldsToQuery(null)); } /** @@ -341,7 +372,7 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K } @Override - public void flush() { + public void flush() throws GoraException { } /** @@ -440,8 +471,9 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K * @param record record retrieved from database * @param fields fields * @return persistent object created + * @throws GoraException */ - public T createPersistentInstance(Record record, String[] fields) { + public T createPersistentInstance(Record record, String[] fields) throws GoraException { T persistent = newPersistent(); for (String field : fields) { http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java index 0ab21c5..9660485 100644 --- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java +++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java @@ -38,6 +38,7 @@ import org.apache.gora.persistency.impl.PersistentBase; import org.apache.gora.query.Query; import org.apache.gora.query.Result; import org.apache.gora.store.DataStore; +import org.apache.gora.util.GoraException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,18 +60,18 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { private Schema persistentSchema; - AvroSerializer(CassandraClient cassandraClient, DataStore<K, T> dataStore, CassandraMapping mapping) { + AvroSerializer(CassandraClient cassandraClient, DataStore<K, T> dataStore, CassandraMapping mapping) throws GoraException { super(cassandraClient, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping); if (PersistentBase.class.isAssignableFrom(dataStore.getPersistentClass())) { persistentSchema = ((PersistentBase) dataStore.getBeanFactory().getCachedPersistent()).getSchema(); } else { - throw new RuntimeException("Unsupported persistent class, couldn't able to find the Avro schema."); + throw new GoraException("Unsupported persistent class, couldn't able to find the Avro schema."); } this.cassandraDataStore = dataStore; try { analyzePersistent(); } catch (Exception e) { - throw new RuntimeException("Error occurred while analyzing the persistent class, :" + e.getMessage()); + throw new GoraException("Error occurred while analyzing the persistent class, :" + e.getMessage()); } } @@ -136,28 +137,35 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { * @return */ @Override - public Persistent get(Object key, String[] fields) { - if (fields == null) { - fields = getFields(); - } - ArrayList<String> cassandraKeys = new ArrayList<>(); - ArrayList<Object> cassandraValues = new ArrayList<>(); - AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues); - String cqlQuery = CassandraQueryFactory.getSelectObjectWithFieldsQuery(mapping, fields, cassandraKeys); - SimpleStatement statement = new SimpleStatement(cqlQuery, cassandraValues.toArray()); - if (readConsistencyLevel != null) { - statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel)); - } - ResultSet resultSet = this.client.getSession().execute(statement); - Iterator<Row> iterator = resultSet.iterator(); - ColumnDefinitions definitions = resultSet.getColumnDefinitions(); - T obj = null; - if (iterator.hasNext()) { - obj = cassandraDataStore.newPersistent(); - AbstractGettableData row = (AbstractGettableData) iterator.next(); - populateValuesToPersistent(row, definitions, obj, fields); + public Persistent get(Object key, String[] fields) throws GoraException { + try { + if (fields == null) { + fields = getFields(); + } + ArrayList<String> cassandraKeys = new ArrayList<>(); + ArrayList<Object> cassandraValues = new ArrayList<>(); + AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues); + String cqlQuery = CassandraQueryFactory.getSelectObjectWithFieldsQuery(mapping, fields, cassandraKeys); + SimpleStatement statement = new SimpleStatement(cqlQuery, cassandraValues.toArray()); + if (readConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel)); + } + ResultSet resultSet = this.client.getSession().execute(statement); + Iterator<Row> iterator = resultSet.iterator(); + ColumnDefinitions definitions = resultSet.getColumnDefinitions(); + T obj = null; + if (iterator.hasNext()) { + obj = cassandraDataStore.newPersistent(); + AbstractGettableData row = (AbstractGettableData) iterator.next(); + populateValuesToPersistent(row, definitions, obj, fields); + } + return obj; + } catch (GoraException e) { + throw e; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } - return obj; } /** @@ -167,67 +175,72 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { * @param persistent */ @Override - public void put(Object key, Persistent persistent) { - if (persistent instanceof PersistentBase) { - if (persistent.isDirty()) { - PersistentBase persistentBase = (PersistentBase) persistent; - ArrayList<String> fields = new ArrayList<>(); - ArrayList<Object> values = new ArrayList<>(); - AvroCassandraUtils.processKeys(mapping, key, fields, values); - for (Schema.Field f : persistentBase.getSchema().getFields()) { - String fieldName = f.name(); - Field field = mapping.getFieldFromFieldName(fieldName); - if (field == null) { - LOG.debug("Ignoring {} adding field, {} field can't find in {} mapping", new Object[]{fieldName, fieldName, persistentClass}); - continue; - } - if (persistent.isDirty(f.pos()) || mapping.getInlinedDefinedPartitionKey().equals(mapping.getFieldFromFieldName(fieldName))) { - Object value = persistentBase.get(f.pos()); - String fieldType = field.getType(); - if (fieldType.contains("frozen")) { - fieldType = fieldType.substring(fieldType.indexOf("<") + 1, fieldType.indexOf(">")); - UserType userType = client.getSession().getCluster().getMetadata().getKeyspace(mapping.getKeySpace().getName()).getUserType(fieldType); - UDTValue udtValue = userType.newValue(); - Schema udtSchema = f.schema(); - if (udtSchema.getType().equals(Schema.Type.UNION)) { - for (Schema schema : udtSchema.getTypes()) { - if (schema.getType().equals(Schema.Type.RECORD)) { - udtSchema = schema; - break; + public void put(Object key, Persistent persistent) throws GoraException { + try { + if (persistent instanceof PersistentBase) { + if (persistent.isDirty()) { + PersistentBase persistentBase = (PersistentBase) persistent; + ArrayList<String> fields = new ArrayList<>(); + ArrayList<Object> values = new ArrayList<>(); + AvroCassandraUtils.processKeys(mapping, key, fields, values); + for (Schema.Field f : persistentBase.getSchema().getFields()) { + String fieldName = f.name(); + Field field = mapping.getFieldFromFieldName(fieldName); + if (field == null) { + LOG.debug("Ignoring {} adding field, {} field can't find in {} mapping", new Object[]{fieldName, fieldName, persistentClass}); + continue; + } + if (persistent.isDirty(f.pos()) || mapping.getInlinedDefinedPartitionKey().equals(mapping.getFieldFromFieldName(fieldName))) { + Object value = persistentBase.get(f.pos()); + String fieldType = field.getType(); + if (fieldType.contains("frozen")) { + fieldType = fieldType.substring(fieldType.indexOf("<") + 1, fieldType.indexOf(">")); + UserType userType = client.getSession().getCluster().getMetadata().getKeyspace(mapping.getKeySpace().getName()).getUserType(fieldType); + UDTValue udtValue = userType.newValue(); + Schema udtSchema = f.schema(); + if (udtSchema.getType().equals(Schema.Type.UNION)) { + for (Schema schema : udtSchema.getTypes()) { + if (schema.getType().equals(Schema.Type.RECORD)) { + udtSchema = schema; + break; + } } } - } - PersistentBase udtObjectBase = (PersistentBase) value; - for (Schema.Field udtField : udtSchema.getFields()) { - Object udtFieldValue = AvroCassandraUtils.getFieldValueFromAvroBean(udtField.schema(), udtField.schema().getType(), udtObjectBase.get(udtField.name()), field); - if (udtField.schema().getType().equals(Schema.Type.MAP)) { - udtValue.setMap(udtField.name(), (Map) udtFieldValue); - } else if (udtField.schema().getType().equals(Schema.Type.ARRAY)) { - udtValue.setList(udtField.name(), (List) udtFieldValue); - } else { - udtValue.set(udtField.name(), udtFieldValue, (Class) udtFieldValue.getClass()); + PersistentBase udtObjectBase = (PersistentBase) value; + for (Schema.Field udtField : udtSchema.getFields()) { + Object udtFieldValue = AvroCassandraUtils.getFieldValueFromAvroBean(udtField.schema(), udtField.schema().getType(), udtObjectBase.get(udtField.name()), field); + if (udtField.schema().getType().equals(Schema.Type.MAP)) { + udtValue.setMap(udtField.name(), (Map) udtFieldValue); + } else if (udtField.schema().getType().equals(Schema.Type.ARRAY)) { + udtValue.setList(udtField.name(), (List) udtFieldValue); + } else { + udtValue.set(udtField.name(), udtFieldValue, (Class) udtFieldValue.getClass()); + } } + value = udtValue; + } else { + value = AvroCassandraUtils.getFieldValueFromAvroBean(f.schema(), f.schema().getType(), value, field); } - value = udtValue; - } else { - value = AvroCassandraUtils.getFieldValueFromAvroBean(f.schema(), f.schema().getType(), value, field); + values.add(value); + fields.add(fieldName); } - values.add(value); - fields.add(fieldName); } + String cqlQuery = CassandraQueryFactory.getInsertDataQuery(mapping, fields); + SimpleStatement statement = new SimpleStatement(cqlQuery, values.toArray()); + if (writeConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel)); + } + client.getSession().execute(statement); + } else { + LOG.info("Ignored putting persistent bean {} in the store as it is neither " + + "new, neither dirty.", new Object[]{persistent}); } - String cqlQuery = CassandraQueryFactory.getInsertDataQuery(mapping, fields); - SimpleStatement statement = new SimpleStatement(cqlQuery, values.toArray()); - if (writeConsistencyLevel != null) { - statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel)); - } - client.getSession().execute(statement); } else { - LOG.info("Ignored putting persistent bean {} in the store as it is neither " - + "new, neither dirty.", new Object[]{persistent}); + LOG.error("{} Persistent bean isn't extended by {} .", new Object[]{this.persistentClass, PersistentBase.class}); } - } else { - LOG.error("{} Persistent bean isn't extended by {} .", new Object[]{this.persistentClass, PersistentBase.class}); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } } @@ -238,25 +251,30 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { * @return */ @Override - public Persistent get(Object key) { - ArrayList<String> cassandraKeys = new ArrayList<>(); - ArrayList<Object> cassandraValues = new ArrayList<>(); - AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues); - String cqlQuery = CassandraQueryFactory.getSelectObjectQuery(mapping, cassandraKeys); - SimpleStatement statement = new SimpleStatement(cqlQuery, cassandraValues.toArray()); - if (readConsistencyLevel != null) { - statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel)); - } - ResultSet resultSet = client.getSession().execute(statement); - Iterator<Row> iterator = resultSet.iterator(); - ColumnDefinitions definitions = resultSet.getColumnDefinitions(); - T obj = null; - if (iterator.hasNext()) { - obj = cassandraDataStore.newPersistent(); - AbstractGettableData row = (AbstractGettableData) iterator.next(); - populateValuesToPersistent(row, definitions, obj, mapping.getFieldNames()); + public Persistent get(Object key) throws GoraException { + try { + ArrayList<String> cassandraKeys = new ArrayList<>(); + ArrayList<Object> cassandraValues = new ArrayList<>(); + AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues); + String cqlQuery = CassandraQueryFactory.getSelectObjectQuery(mapping, cassandraKeys); + SimpleStatement statement = new SimpleStatement(cqlQuery, cassandraValues.toArray()); + if (readConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel)); + } + ResultSet resultSet = client.getSession().execute(statement); + Iterator<Row> iterator = resultSet.iterator(); + ColumnDefinitions definitions = resultSet.getColumnDefinitions(); + T obj = null; + if (iterator.hasNext()) { + obj = cassandraDataStore.newPersistent(); + AbstractGettableData row = (AbstractGettableData) iterator.next(); + populateValuesToPersistent(row, definitions, obj, mapping.getFieldNames()); + } + return obj; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } - return obj; } /** @@ -380,17 +398,22 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { * @return */ @Override - public boolean delete(Object key) { + public boolean delete(Object key) throws GoraException { ArrayList<String> cassandraKeys = new ArrayList<>(); ArrayList<Object> cassandraValues = new ArrayList<>(); - AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues); - String cqlQuery = CassandraQueryFactory.getDeleteDataQuery(mapping, cassandraKeys); - SimpleStatement statement = new SimpleStatement(cqlQuery, cassandraValues.toArray()); - if (writeConsistencyLevel != null) { - statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel)); + try { + AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues); + String cqlQuery = CassandraQueryFactory.getDeleteDataQuery(mapping, cassandraKeys); + SimpleStatement statement = new SimpleStatement(cqlQuery, cassandraValues.toArray()); + if (writeConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel)); + } + ResultSet resultSet = client.getSession().execute(statement); + return resultSet.wasApplied(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } - ResultSet resultSet = client.getSession().execute(statement); - return resultSet.wasApplied(); } /** @@ -401,46 +424,51 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { * @return */ @Override - public Result execute(DataStore dataStore, Query query) { - List<Object> objectArrayList = new ArrayList<>(); - String[] fields = query.getFields(); - if (fields != null) { - fields = (String[]) ArrayUtils.addAll(fields, mapping.getAllKeys()); - } else { - fields = mapping.getAllFieldsIncludingKeys(); - } - CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<>(dataStore, query); - String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList, fields); - ResultSet results; - SimpleStatement statement; - if (objectArrayList.size() == 0) { - statement = new SimpleStatement(cqlQuery); - } else { - statement = new SimpleStatement(cqlQuery, objectArrayList.toArray()); - } - if (readConsistencyLevel != null) { - statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel)); - } - results = client.getSession().execute(statement); - Iterator<Row> iterator = results.iterator(); - ColumnDefinitions definitions = results.getColumnDefinitions(); - T obj; - K keyObject; - CassandraKey cassandraKey = mapping.getCassandraKey(); - while (iterator.hasNext()) { - AbstractGettableData row = (AbstractGettableData) iterator.next(); - obj = cassandraDataStore.newPersistent(); - keyObject = cassandraDataStore.newKey(); - populateValuesToPersistent(row, definitions, obj, fields); - if (cassandraKey != null) { - populateValuesToPersistent(row, definitions, (PersistentBase) keyObject, cassandraKey.getFieldNames()); + public Result execute(DataStore dataStore, Query query) throws GoraException { + try { + List<Object> objectArrayList = new ArrayList<>(); + String[] fields = query.getFields(); + if (fields != null) { + fields = (String[]) ArrayUtils.addAll(fields, mapping.getAllKeys()); + } else { + fields = mapping.getAllFieldsIncludingKeys(); + } + CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<>(dataStore, query); + String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList, fields); + ResultSet results; + SimpleStatement statement; + if (objectArrayList.size() == 0) { + statement = new SimpleStatement(cqlQuery); } else { - Field key = mapping.getInlinedDefinedPartitionKey(); - keyObject = (K) getValue(row, definitions.getType(key.getColumnName()), key.getColumnName(), null); + statement = new SimpleStatement(cqlQuery, objectArrayList.toArray()); + } + if (readConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel)); + } + results = client.getSession().execute(statement); + Iterator<Row> iterator = results.iterator(); + ColumnDefinitions definitions = results.getColumnDefinitions(); + T obj; + K keyObject; + CassandraKey cassandraKey = mapping.getCassandraKey(); + while (iterator.hasNext()) { + AbstractGettableData row = (AbstractGettableData) iterator.next(); + obj = cassandraDataStore.newPersistent(); + keyObject = cassandraDataStore.newKey(); + populateValuesToPersistent(row, definitions, obj, fields); + if (cassandraKey != null) { + populateValuesToPersistent(row, definitions, (PersistentBase) keyObject, cassandraKey.getFieldNames()); + } else { + Field key = mapping.getInlinedDefinedPartitionKey(); + keyObject = (K) getValue(row, definitions.getType(key.getColumnName()), key.getColumnName(), null); + } + cassandraResult.addResultElement(keyObject, obj); } - cassandraResult.addResultElement(keyObject, obj); + return cassandraResult; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } - return cassandraResult; } } http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java index d93ff9c..cd806d0 100644 --- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java +++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java @@ -16,11 +16,11 @@ */ package org.apache.gora.cassandra.serializers; -import com.datastax.driver.core.ConsistencyLevel; -import com.datastax.driver.core.KeyspaceMetadata; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.SimpleStatement; -import com.datastax.driver.core.TableMetadata; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; + import org.apache.gora.cassandra.bean.Field; import org.apache.gora.cassandra.store.CassandraClient; import org.apache.gora.cassandra.store.CassandraMapping; @@ -29,13 +29,15 @@ import org.apache.gora.persistency.Persistent; import org.apache.gora.query.Query; import org.apache.gora.query.Result; import org.apache.gora.store.DataStore; +import org.apache.gora.util.GoraException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; -import java.util.Map; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.TableMetadata; /** * This is the abstract Cassandra Serializer class. @@ -77,8 +79,9 @@ public abstract class CassandraSerializer<K, T extends Persistent> { * @param <K> key class * @param <T> persistent class * @return Serializer + * @throws GoraException */ - public static <K, T extends Persistent> CassandraSerializer getSerializer(CassandraClient cc, String type, final DataStore<K, T> dataStore, CassandraMapping mapping) { + public static <K, T extends Persistent> CassandraSerializer getSerializer(CassandraClient cc, String type, final DataStore<K, T> dataStore, CassandraMapping mapping) throws GoraException { CassandraStore.SerializerType serType = type == null || type.isEmpty() ? CassandraStore.SerializerType.NATIVE : CassandraStore.SerializerType.valueOf(type.toUpperCase(Locale.ENGLISH)); CassandraSerializer serializer; switch (serType) { @@ -100,40 +103,60 @@ public abstract class CassandraSerializer<K, T extends Persistent> { protected abstract void analyzePersistent() throws Exception; - public void createSchema() { - LOG.debug("creating Cassandra keyspace {}", mapping.getKeySpace().getName()); - this.client.getSession().execute(CassandraQueryFactory.getCreateKeySpaceQuery(mapping)); - for (Map.Entry udtType : userDefineTypeMaps.entrySet()) { - LOG.debug("creating Cassandra User Define Type {}", udtType.getKey()); - this.client.getSession().execute((String) udtType.getValue()); + public void createSchema() throws GoraException { + try { + LOG.debug("creating Cassandra keyspace {}", mapping.getKeySpace().getName()); + this.client.getSession().execute(CassandraQueryFactory.getCreateKeySpaceQuery(mapping)); + for (Map.Entry udtType : userDefineTypeMaps.entrySet()) { + LOG.debug("creating Cassandra User Define Type {}", udtType.getKey()); + this.client.getSession().execute((String) udtType.getValue()); + } + LOG.debug("creating Cassandra column family / table {}", mapping.getCoreName()); + this.client.getSession().execute(CassandraQueryFactory.getCreateTableQuery(mapping)); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } - LOG.debug("creating Cassandra column family / table {}", mapping.getCoreName()); - this.client.getSession().execute(CassandraQueryFactory.getCreateTableQuery(mapping)); } - public void deleteSchema() { - LOG.debug("dropping Cassandra table {}", mapping.getCoreName()); - this.client.getSession().execute(CassandraQueryFactory.getDropTableQuery(mapping)); - LOG.debug("dropping Cassandra keyspace {}", mapping.getKeySpace().getName()); - this.client.getSession().execute(CassandraQueryFactory.getDropKeySpaceQuery(mapping)); + public void deleteSchema() throws GoraException { + try { + LOG.debug("dropping Cassandra table {}", mapping.getCoreName()); + this.client.getSession().execute(CassandraQueryFactory.getDropTableQuery(mapping)); + LOG.debug("dropping Cassandra keyspace {}", mapping.getKeySpace().getName()); + this.client.getSession().execute(CassandraQueryFactory.getDropKeySpaceQuery(mapping)); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); + } } public void close() { this.client.close(); } - public void truncateSchema() { - LOG.debug("truncating Cassandra table {}", mapping.getCoreName()); - this.client.getSession().execute(CassandraQueryFactory.getTruncateTableQuery(mapping)); + public void truncateSchema() throws GoraException { + try { + LOG.debug("truncating Cassandra table {}", mapping.getCoreName()); + this.client.getSession().execute(CassandraQueryFactory.getTruncateTableQuery(mapping)); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); + } } - public boolean schemaExists() { - KeyspaceMetadata keyspace = this.client.getCluster().getMetadata().getKeyspace(mapping.getKeySpace().getName()); - if (keyspace != null) { - TableMetadata table = keyspace.getTable(mapping.getCoreName()); - return table != null; - } else { - return false; + public boolean schemaExists() throws GoraException { + try { + KeyspaceMetadata keyspace = this.client.getCluster().getMetadata().getKeyspace(mapping.getKeySpace().getName()); + if (keyspace != null) { + TableMetadata table = keyspace.getTable(mapping.getCoreName()); + return table != null; + } else { + return false; + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } } @@ -151,7 +174,7 @@ public abstract class CassandraSerializer<K, T extends Persistent> { * @param key key value * @param value persistent value */ - public abstract void put(K key, T value); + public abstract void put(K key, T value) throws GoraException; /** * Retrieves the persistent value according to the key @@ -159,7 +182,7 @@ public abstract class CassandraSerializer<K, T extends Persistent> { * @param key key value * @return persistent value */ - public abstract T get(K key); + public abstract T get(K key) throws GoraException; /** * Deletes persistent value according to the key @@ -167,7 +190,7 @@ public abstract class CassandraSerializer<K, T extends Persistent> { * @param key key value * @return isDeleted */ - public abstract boolean delete(K key); + public abstract boolean delete(K key) throws GoraException; /** * Retrieves the persistent value according to the key and fields @@ -176,7 +199,7 @@ public abstract class CassandraSerializer<K, T extends Persistent> { * @param fields fields * @return persistent value */ - public abstract T get(K key, String[] fields); + public abstract T get(K key, String[] fields) throws GoraException; /** * Executes the given query and returns the results. @@ -185,7 +208,7 @@ public abstract class CassandraSerializer<K, T extends Persistent> { * @param query Cassandra Query * @return Cassandra Result */ - public abstract Result<K, T> execute(DataStore<K, T> dataStore, Query<K, T> query); + public abstract Result<K, T> execute(DataStore<K, T> dataStore, Query<K, T> query) throws GoraException ; /** * Update the persistent objects @@ -195,31 +218,36 @@ public abstract class CassandraSerializer<K, T extends Persistent> { */ public abstract boolean updateByQuery(Query query); - public long deleteByQuery(Query query) { - List<Object> objectArrayList = new ArrayList<>(); - if (query.getKey() == null && query.getEndKey() == null && query.getStartKey() == null) { - if (query.getFields() == null) { - client.getSession().execute(CassandraQueryFactory.getTruncateTableQuery(mapping)); + public long deleteByQuery(Query query) throws GoraException { + try { + List<Object> objectArrayList = new ArrayList<>(); + if (query.getKey() == null && query.getEndKey() == null && query.getStartKey() == null) { + if (query.getFields() == null) { + client.getSession().execute(CassandraQueryFactory.getTruncateTableQuery(mapping)); + } else { + LOG.error("Delete by Query is not supported for the Queries which didn't specify Query keys with fields."); + } } else { - LOG.error("Delete by Query is not supported for the Queries which didn't specify Query keys with fields."); - } - } else { - String cqlQuery = CassandraQueryFactory.getDeleteByQuery(mapping, query, objectArrayList); - ResultSet results; - SimpleStatement statement; - if (objectArrayList.size() == 0) { - statement = new SimpleStatement(cqlQuery); - } else { - statement = new SimpleStatement(cqlQuery, objectArrayList.toArray()); - } - if (writeConsistencyLevel != null) { - statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel)); + String cqlQuery = CassandraQueryFactory.getDeleteByQuery(mapping, query, objectArrayList); + ResultSet results; + SimpleStatement statement; + if (objectArrayList.size() == 0) { + statement = new SimpleStatement(cqlQuery); + } else { + statement = new SimpleStatement(cqlQuery, objectArrayList.toArray()); + } + if (writeConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel)); + } + results = client.getSession().execute(statement); + LOG.debug("Delete by Query was applied : " + results.wasApplied()); } - results = client.getSession().execute(statement); - LOG.debug("Delete by Query was applied : " + results.wasApplied()); + LOG.info("Delete By Query method doesn't return the deleted element count."); + return 0; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } - LOG.info("Delete By Query method doesn't return the deleted element count."); - return 0; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java index bf28ee0..3fb8e10 100644 --- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java +++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java @@ -16,12 +16,12 @@ */ package org.apache.gora.cassandra.serializers; -import com.datastax.driver.core.ConsistencyLevel; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.SimpleStatement; -import com.datastax.driver.mapping.Mapper; -import com.datastax.driver.mapping.MappingManager; -import com.datastax.driver.mapping.Result; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; + import org.apache.commons.lang.ArrayUtils; import org.apache.gora.cassandra.bean.Field; import org.apache.gora.cassandra.query.CassandraResultSet; @@ -30,14 +30,16 @@ import org.apache.gora.cassandra.store.CassandraMapping; import org.apache.gora.persistency.Persistent; import org.apache.gora.query.Query; import org.apache.gora.store.DataStore; +import org.apache.gora.util.GoraException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.mapping.Mapper; +import com.datastax.driver.mapping.MappingManager; +import com.datastax.driver.mapping.Result; /** * This Class contains the operation relates to Native Serialization. @@ -48,12 +50,13 @@ class NativeSerializer<K, T extends Persistent> extends CassandraSerializer { private Mapper<T> mapper; - NativeSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) { + NativeSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) throws GoraException { super(cassandraClient, keyClass, persistentClass, mapping); try { analyzePersistent(); } catch (Exception e) { - throw new RuntimeException("Error occurred while analyzing the persistent class, :" + e.getMessage()); + LOG.error(e.getMessage(), e); + throw new GoraException("Error occurred while analyzing the persistent class, :" + e.getMessage(), e); } this.createSchema(); MappingManager mappingManager = new MappingManager(cassandraClient.getSession()); @@ -74,9 +77,14 @@ class NativeSerializer<K, T extends Persistent> extends CassandraSerializer { * @param value */ @Override - public void put(Object key, Persistent value) { - LOG.debug("Object is saved with key : {} and value : {}", key, value); - mapper.save((T) value); + public void put(Object key, Persistent value) throws GoraException { + try { + LOG.debug("Object is saved with key : {} and value : {}", key, value); + mapper.save((T) value); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); + } } /** @@ -86,14 +94,19 @@ class NativeSerializer<K, T extends Persistent> extends CassandraSerializer { * @return */ @Override - public T get(Object key) { - T object = mapper.get(key); - if (object != null) { - LOG.debug("Object is found for key : {}", key); - } else { - LOG.debug("Object is not found for key : {}", key); + public T get(Object key) throws GoraException { + try { + T object = mapper.get(key); + if (object != null) { + LOG.debug("Object is found for key : {}", key); + } else { + LOG.debug("Object is not found for key : {}", key); + } + return object; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } - return object; } /** @@ -103,10 +116,15 @@ class NativeSerializer<K, T extends Persistent> extends CassandraSerializer { * @return */ @Override - public boolean delete(Object key) { + public boolean delete(Object key) throws GoraException { LOG.debug("Object is deleted for key : {}", key); - mapper.delete(key); - return true; + try { + mapper.delete(key); + return true; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); + } } /** @@ -187,30 +205,35 @@ class NativeSerializer<K, T extends Persistent> extends CassandraSerializer { * @return */ @Override - public org.apache.gora.query.Result execute(DataStore dataStore, Query query) { - List<Object> objectArrayList = new ArrayList<>(); - String[] fields = query.getFields(); - if (fields != null) { - fields = (String[]) ArrayUtils.addAll(fields, mapping.getAllKeys()); - } else { - fields = mapping.getAllFieldsIncludingKeys(); - } - CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<>(dataStore, query); - String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList, fields); - ResultSet results; - if (objectArrayList.size() == 0) { - results = client.getSession().execute(cqlQuery); - } else { - results = client.getSession().execute(cqlQuery, objectArrayList.toArray()); - } - Result<T> objects = mapper.map(results); - Iterator iterator = objects.iterator(); - while (iterator.hasNext()) { - T result = (T) iterator.next(); - K key = getKey(result); - cassandraResult.addResultElement(key, result); + public org.apache.gora.query.Result execute(DataStore dataStore, Query query) throws GoraException { + try { + List<Object> objectArrayList = new ArrayList<>(); + String[] fields = query.getFields(); + if (fields != null) { + fields = (String[]) ArrayUtils.addAll(fields, mapping.getAllKeys()); + } else { + fields = mapping.getAllFieldsIncludingKeys(); + } + CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<>(dataStore, query); + String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList, fields); + ResultSet results; + if (objectArrayList.size() == 0) { + results = client.getSession().execute(cqlQuery); + } else { + results = client.getSession().execute(cqlQuery, objectArrayList.toArray()); + } + Result<T> objects = mapper.map(results); + Iterator iterator = objects.iterator(); + while (iterator.hasNext()) { + T result = (T) iterator.next(); + K key = getKey(result); + cassandraResult.addResultElement(key, result); + } + return cassandraResult; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } - return cassandraResult; } private K getKey(T object) { http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java index 7d7d0d8..865d8cf 100644 --- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java +++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java @@ -16,6 +16,11 @@ */ package org.apache.gora.cassandra.store; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + import org.apache.gora.cassandra.query.CassandraQuery; import org.apache.gora.cassandra.serializers.CassandraSerializer; import org.apache.gora.persistency.BeanFactory; @@ -27,14 +32,10 @@ import org.apache.gora.query.Result; import org.apache.gora.query.ws.impl.PartitionWSQueryImpl; import org.apache.gora.store.DataStore; import org.apache.gora.store.DataStoreFactory; +import org.apache.gora.util.GoraException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; - /** * Implementation of Cassandra Store. * @@ -70,7 +71,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> * @param properties properties */ @Override - public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) { + public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException { LOG.debug("Initializing Cassandra store"); String serializationType; try { @@ -86,8 +87,11 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> CassandraClient cassandraClient = new CassandraClient(); cassandraClient.initialize(properties, mapping); cassandraSerializer = CassandraSerializer.getSerializer(cassandraClient, serializationType, this, mapping); + } catch (GoraException e) { + throw e; } catch (Exception e) { - throw new RuntimeException("Error while initializing Cassandra store: " + e.getMessage(), e); + LOG.error(e.getMessage(), e); + throw new GoraException("Error while initializing Cassandra store: " + e.getMessage(), e); } } @@ -125,7 +129,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> * {@inheritDoc} */ @Override - public void createSchema() { + public void createSchema() throws GoraException { cassandraSerializer.createSchema(); } @@ -133,7 +137,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> * {@inheritDoc} */ @Override - public void deleteSchema() { + public void deleteSchema() throws GoraException { cassandraSerializer.deleteSchema(); } @@ -158,15 +162,16 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> * {@inheritDoc} */ @Override - public K newKey() { + public K newKey() throws GoraException { try { if (beanFactory != null) { return beanFactory.newKey(); } else { return keyClass.newInstance(); } - } catch (Exception ex) { - throw new RuntimeException("Error while instantiating a key: " + ex.getMessage(), ex); + } catch (Exception e) { + LOG.error("Error while instantiating a key: " + e.getMessage(), e); + throw new GoraException("Error while instantiating a key: " + e.getMessage(), e); } } @@ -175,15 +180,16 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> */ @SuppressWarnings("all") @Override - public T newPersistent() { + public T newPersistent() throws GoraException { try { if (beanFactory != null) { return this.beanFactory.newPersistent(); } else { return persistentClass.newInstance(); } - } catch (Exception ex) { - throw new RuntimeException("Error while instantiating a persistent: " + ex.getMessage(), ex); + } catch (Exception e) { + LOG.error("Error while instantiating a persistent: " + e.getMessage(), e); + throw new GoraException("Error while instantiating a key: " + e.getMessage(), e); } } @@ -215,7 +221,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> * {@inheritDoc} */ @Override - public T get(K key) { + public T get(K key) throws GoraException { return (T) cassandraSerializer.get(key); } @@ -223,7 +229,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> * {@inheritDoc} */ @Override - public T get(K key, String[] fields) { + public T get(K key, String[] fields) throws GoraException { return (T) cassandraSerializer.get(key, fields); } @@ -231,7 +237,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> * {@inheritDoc} */ @Override - public void put(K key, T obj) { + public void put(K key, T obj) throws GoraException { cassandraSerializer.put(key, obj); } @@ -239,7 +245,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> * {@inheritDoc} */ @Override - public boolean delete(K key) { + public boolean delete(K key) throws GoraException { return cassandraSerializer.delete(key); } @@ -247,7 +253,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> * {@inheritDoc} */ @Override - public long deleteByQuery(Query<K, T> query) { + public long deleteByQuery(Query<K, T> query) throws GoraException { return cassandraSerializer.deleteByQuery(query); } @@ -255,8 +261,13 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> * {@inheritDoc} */ @Override - public Result<K, T> execute(Query<K, T> query) { - return (Result<K, T>) cassandraSerializer.execute(this, query); + public Result<K, T> execute(Query<K, T> query) throws GoraException { + try { + return (Result<K, T>) cassandraSerializer.execute(this, query); + } catch (Exception e) { + this.LOG.error(e.getMessage(), e); + throw new GoraException(e); + } } /** @@ -282,19 +293,24 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> * {@inheritDoc} */ @Override - public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException { - List<PartitionQuery<K, T>> partitions = new ArrayList<>(); - PartitionWSQueryImpl<K, T> pqi = new PartitionWSQueryImpl<>(query); - pqi.setDataStore(this); - partitions.add(pqi); - return partitions; + public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws GoraException { + try { + List<PartitionQuery<K, T>> partitions = new ArrayList<>(); + PartitionWSQueryImpl<K, T> pqi = new PartitionWSQueryImpl<>(query); + pqi.setDataStore(this); + partitions.add(pqi); + return partitions; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); + } } /** * {@inheritDoc} */ @Override - public void flush() { + public void flush() throws GoraException { // ignore since caching has been disabled } @@ -310,7 +326,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> * {@inheritDoc} */ @Override - public void truncateSchema() { + public void truncateSchema() throws GoraException { cassandraSerializer.truncateSchema(); } @@ -318,7 +334,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> * {@inheritDoc} */ @Override - public boolean schemaExists() { + public boolean schemaExists() throws GoraException{ return cassandraSerializer.schemaExists(); } http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestAvroSerializationWithUDT.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestAvroSerializationWithUDT.java b/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestAvroSerializationWithUDT.java index c016893..736de0f 100644 --- a/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestAvroSerializationWithUDT.java +++ b/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestAvroSerializationWithUDT.java @@ -22,6 +22,7 @@ import org.apache.avro.util.Utf8; import org.apache.gora.cassandra.GoraCassandraTestDriver; import org.apache.gora.examples.generated.Metadata; import org.apache.gora.examples.generated.WebPage; +import org.apache.gora.util.GoraException; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -75,9 +76,10 @@ public class TestAvroSerializationWithUDT { /** * This is for testGetNested() with UDT dataType with avro serialization + * @throws GoraException */ @Test - public void testSimplePutAndGEt() { + public void testSimplePutAndGEt() throws GoraException { webPageCassandraStore.createSchema(); WebPage webpage = WebPage.newBuilder().build(); webpage.setUrl(new Utf8("url..")); http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java b/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java index ce9e2df..a588ab1 100644 --- a/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java +++ b/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java @@ -25,6 +25,7 @@ import org.apache.gora.query.Query; import org.apache.gora.store.DataStore; import org.apache.gora.store.DataStoreTestBase; import org.apache.gora.store.DataStoreTestUtil; +import org.apache.gora.util.GoraException; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; @@ -83,7 +84,7 @@ public class TestCassandraStore extends DataStoreTestBase { public void testGetPartitions() throws IOException { } - private void preConfiguration() { + private void preConfiguration() throws GoraException { if (webPageStore.schemaExists()) { webPageStore.truncateSchema(); } else { http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithCassandraKey.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithCassandraKey.java b/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithCassandraKey.java index 3ae3152..a8039ed 100644 --- a/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithCassandraKey.java +++ b/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithCassandraKey.java @@ -24,6 +24,7 @@ import org.apache.gora.cassandra.example.generated.AvroSerialization.CassandraRe import org.apache.gora.cassandra.query.CassandraQuery; import org.apache.gora.query.Query; import org.apache.gora.query.Result; +import org.apache.gora.util.GoraException; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -75,9 +76,10 @@ public class TestCassandraStoreWithCassandraKey { /** * In this test case, schema exists method behavior of the data store is testing. + * @throws GoraException */ @Test - public void testSchemaRelatedBehaviour() { + public void testSchemaRelatedBehaviour() throws GoraException { cassandraRecordDataStore.createSchema(); Assert.assertTrue(cassandraRecordDataStore.schemaExists()); cassandraRecordDataStore.deleteSchema(); @@ -88,9 +90,10 @@ public class TestCassandraStoreWithCassandraKey { /** * In this test case, get, put and delete methods behaviour of the data store is testing. + * @throws GoraException */ @Test - public void testSimplePutGet() { + public void testSimplePutGet() throws GoraException { cassandraRecordDataStore.createSchema(); CassandraRecord record = new CassandraRecord(); record.setDataLong(719411002L); @@ -221,7 +224,7 @@ public class TestCassandraStoreWithCassandraKey { } @Test - public void testUpdateByQuery() { + public void testUpdateByQuery() throws GoraException { cassandraRecordDataStore.truncateSchema(); //insert data CassandraRecord record1 = new CassandraRecord(); @@ -266,7 +269,7 @@ public class TestCassandraStoreWithCassandraKey { @Test - public void testDataTypes() { + public void testDataTypes() throws GoraException { cassandraRecordDataStore.truncateSchema(); CassandraRecord record = new CassandraRecord(); record.setDataLong(719411002L); http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java b/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java index 489732c..3216149 100644 --- a/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java +++ b/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java @@ -79,9 +79,10 @@ public class TestCassandraStoreWithNativeSerialization { /** * In this test case, put and get behavior of the data store are testing. + * @throws GoraException */ @Test - public void testSimplePutAndGet() { + public void testSimplePutAndGet() throws GoraException { UUID id = UUID.randomUUID(); User user1 = new User(id, "madhawa", Date.from(Instant.now())); // storing data; @@ -94,9 +95,10 @@ public class TestCassandraStoreWithNativeSerialization { /** * In this test case, put and delete behavior of the data store are testing. + * @throws GoraException */ @Test - public void testSimplePutDeleteAndGet() { + public void testSimplePutDeleteAndGet() throws GoraException { UUID id = UUID.randomUUID(); User user1 = new User(id, "kasun", Date.from(Instant.now())); // storing data; @@ -114,9 +116,10 @@ public class TestCassandraStoreWithNativeSerialization { /** * In this test case, schema exists method behavior of the data store is testing. + * @throws GoraException */ @Test() - public void testSchemaExists() { + public void testSchemaExists() throws GoraException { userDataStore.deleteSchema(); Assert.assertFalse(userDataStore.schemaExists()); userDataStore.createSchema(); @@ -125,9 +128,10 @@ public class TestCassandraStoreWithNativeSerialization { /** * In this test case, schema exists method behavior of the data store is testing. + * @throws GoraException */ @Test - public void testTruncateSchema() { + public void testTruncateSchema() throws GoraException { if (!userDataStore.schemaExists()) { userDataStore.createSchema(); } @@ -144,9 +148,10 @@ public class TestCassandraStoreWithNativeSerialization { /** * In this test case, get with fields method behavior of the data store is testing. + * @throws GoraException */ @Test - public void testGetWithFields() { + public void testGetWithFields() throws GoraException { UUID id = UUID.randomUUID(); User user1 = new User(id, "Madhawa Kasun Gunasekara", Date.from(Instant.now())); userDataStore.put(id, user1); @@ -249,9 +254,10 @@ public class TestCassandraStoreWithNativeSerialization { /** * In this test case, update by quert method behavior of the data store is testing. + * @throws GoraException */ @Test - public void testUpdateByQuery() { + public void testUpdateByQuery() throws GoraException { userDataStore.truncateSchema(); UUID id1 = UUID.randomUUID(); User user1 = new User(id1, "user1", Date.from(Instant.now())); http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestNativeSerializationWithUDT.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestNativeSerializationWithUDT.java b/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestNativeSerializationWithUDT.java index f9b5df4..6c35ae6 100644 --- a/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestNativeSerializationWithUDT.java +++ b/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestNativeSerializationWithUDT.java @@ -21,6 +21,7 @@ package org.apache.gora.cassandra.store; import org.apache.gora.cassandra.GoraCassandraTestDriver; import org.apache.gora.cassandra.example.generated.nativeSerialization.Customer; import org.apache.gora.cassandra.example.generated.nativeSerialization.Document; +import org.apache.gora.util.GoraException; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -68,9 +69,10 @@ public class TestNativeSerializationWithUDT { /** * This is for testGetNested() with UDT dataType with native serialization. + * @throws GoraException */ @Test - public void testSimplePutAndGEt() { + public void testSimplePutAndGEt() throws GoraException { documentCassandraStore.createSchema(); Document document = new Document(); document.setDefaultId("yawamu.com");
