Repository: gora Updated Branches: refs/heads/master dc754b16e -> e2d7341b3
http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-infinispan/src/main/java/org/apache/gora/infinispan/store/InfinispanStore.java ---------------------------------------------------------------------- diff --git a/gora-infinispan/src/main/java/org/apache/gora/infinispan/store/InfinispanStore.java b/gora-infinispan/src/main/java/org/apache/gora/infinispan/store/InfinispanStore.java index a6f9708..c072d2d 100644 --- a/gora-infinispan/src/main/java/org/apache/gora/infinispan/store/InfinispanStore.java +++ b/gora-infinispan/src/main/java/org/apache/gora/infinispan/store/InfinispanStore.java @@ -17,6 +17,14 @@ */ package org.apache.gora.infinispan.store; +import static org.apache.gora.mapreduce.GoraRecordReader.BUFFER_LIMIT_READ_NAME; +import static org.apache.gora.mapreduce.GoraRecordReader.BUFFER_LIMIT_READ_VALUE; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + import org.apache.gora.infinispan.query.InfinispanQuery; import org.apache.gora.infinispan.query.InfinispanResult; import org.apache.gora.persistency.impl.PersistentBase; @@ -24,15 +32,10 @@ import org.apache.gora.query.PartitionQuery; import org.apache.gora.query.Query; import org.apache.gora.query.Result; import org.apache.gora.store.impl.DataStoreBase; +import org.apache.gora.util.GoraException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.*; - -import static org.apache.gora.mapreduce.GoraRecordReader.BUFFER_LIMIT_READ_NAME; -import static org.apache.gora.mapreduce.GoraRecordReader.BUFFER_LIMIT_READ_VALUE; - /** * {@link org.apache.gora.infinispan.store.InfinispanStore} is the primary class * responsible for directing Gora CRUD operations to Infinispan.This class delegate @@ -71,7 +74,7 @@ public class InfinispanStore<K, T extends PersistentBase> extends DataStoreBase< * @param properties */ @Override - public synchronized void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) { + public synchronized void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException { try { @@ -101,8 +104,11 @@ public class InfinispanStore<K, T extends PersistentBase> extends DataStoreBase< primaryFieldName = schema.getFields().get(0).name(); this.infinispanClient.initialize(keyClass, persistentClass, properties); + } catch (GoraException e) { + throw e; } catch (Exception e) { - throw new RuntimeException(e); + LOG.error(e.getMessage(), e); + throw new GoraException(e); } } @@ -113,32 +119,42 @@ public class InfinispanStore<K, T extends PersistentBase> extends DataStoreBase< } @Override - public void createSchema() { + public void createSchema() throws GoraException { LOG.debug("createSchema()"); this.infinispanClient.createCache(); } @Override - public boolean delete(K key) { + public boolean delete(K key) throws GoraException { LOG.debug("delete(" + key+")"); - this.infinispanClient.deleteByKey(key); - return true; + try { + this.infinispanClient.deleteByKey(key); + return true; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); + } } @Override - public long deleteByQuery(Query<K, T> query) { - ((InfinispanQuery<K, T>) query).build(); - LOG.debug("deleteByQuery("+query.toString()+")"); - InfinispanQuery<K, T> q = (InfinispanQuery) query; - q.build(); - for( T t : q.list()){ - infinispanClient.deleteByKey((K) t.get(primaryFieldPos)); + public long deleteByQuery(Query<K, T> query) throws GoraException { + try { + ((InfinispanQuery<K, T>) query).build(); + LOG.debug("deleteByQuery("+query.toString()+")"); + InfinispanQuery<K, T> q = (InfinispanQuery) query; + q.build(); + for( T t : q.list()){ + infinispanClient.deleteByKey((K) t.get(primaryFieldPos)); + } + return q.getResultSize(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } - return q.getResultSize(); } @Override - public void deleteSchema() { + public void deleteSchema() throws GoraException { LOG.debug("deleteSchema()"); this.infinispanClient.dropCache(); } @@ -147,39 +163,50 @@ public class InfinispanStore<K, T extends PersistentBase> extends DataStoreBase< * Execute the query and return the result. */ @Override - public Result<K, T> execute(Query<K, T> query) { + public Result<K, T> execute(Query<K, T> query) throws GoraException { LOG.debug("execute()"); - ((InfinispanQuery<K,T>)query).build(); - InfinispanResult<K,T> result = new InfinispanResult<>(this, (InfinispanQuery<K,T>)query); - LOG.trace("query: " + query.toString()); - LOG.trace("result size: " + result.size()); - return result; + try { + ((InfinispanQuery<K,T>)query).build(); + InfinispanResult<K,T> result = null; + result = new InfinispanResult<>(this, (InfinispanQuery<K,T>)query); + LOG.trace("query: " + query.toString()); + LOG.trace("result size: " + result.size()); + return result; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); + } } @Override - public T get(K key){ + public T get(K key) throws GoraException { LOG.debug("get("+key+")"); - return infinispanClient.get(key); + try { + return infinispanClient.get(key); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); + } } @Override - public T get(K key, String[] fields) { + public T get(K key, String[] fields) throws GoraException { LOG.debug("get("+key+","+fields+")"); - if (fields==null) - return infinispanClient.get(key); - - InfinispanQuery<K, T> query = new InfinispanQuery<K, T>(this); - query.setKey(key); - query.setFields(fields); - query.build(); - - - Result<K,T> result = query.execute(); try { + if (fields==null) + return infinispanClient.get(key); + + InfinispanQuery<K, T> query = new InfinispanQuery<K, T>(this); + query.setKey(key); + query.setFields(fields); + query.build(); + + Result<K,T> result = query.execute(); result.next(); return result.get(); } catch (Exception e) { - throw new RuntimeException(e); + LOG.error(e.getMessage(), e); + throw new GoraException(e); } } @@ -242,9 +269,14 @@ public class InfinispanStore<K, T extends PersistentBase> extends DataStoreBase< } @Override - public void flush() { + public void flush() throws GoraException { LOG.debug("flush()"); - infinispanClient.flush(); + try { + infinispanClient.flush(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); + } } /** @@ -267,7 +299,7 @@ public class InfinispanStore<K, T extends PersistentBase> extends DataStoreBase< } @Override - public void put(K key, T obj) { + public void put(K key, T obj) throws GoraException { LOG.debug("put(" +key.toString()+")"); LOG.trace(obj.toString()); @@ -276,12 +308,17 @@ public class InfinispanStore<K, T extends PersistentBase> extends DataStoreBase< if (!obj.get(primaryFieldPos).equals(key) ) LOG.warn("Invalid or different primary field :"+key+"<->"+obj.get(primaryFieldPos)); - - this.infinispanClient.put(key, obj); + + try { + this.infinispanClient.put(key, obj); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); + } } @Override - public boolean schemaExists() { + public boolean schemaExists() throws GoraException { LOG.debug("schemaExists()"); return infinispanClient.cacheExists(); } http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-infinispan/src/test/java/org/apache/gora/infinispan/Utils.java ---------------------------------------------------------------------- diff --git a/gora-infinispan/src/test/java/org/apache/gora/infinispan/Utils.java b/gora-infinispan/src/test/java/org/apache/gora/infinispan/Utils.java index 3bf4a09..aeeaf68 100644 --- a/gora-infinispan/src/test/java/org/apache/gora/infinispan/Utils.java +++ b/gora-infinispan/src/test/java/org/apache/gora/infinispan/Utils.java @@ -19,6 +19,7 @@ package org.apache.gora.infinispan; import org.apache.gora.examples.generated.Employee; import org.apache.gora.store.DataStore; +import org.apache.gora.util.GoraException; import java.util.Random; @@ -39,7 +40,7 @@ public class Utils { return employee; } - public static <T extends CharSequence> void populateEmployeeStore(DataStore<T, Employee> dataStore, int n) { + public static <T extends CharSequence> void populateEmployeeStore(DataStore<T, Employee> dataStore, int n) throws GoraException { for(int i=0; i<n; i++) { Employee e = createEmployee(i); dataStore.put((T)e.getSsn(),e); http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoader.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoader.java b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoader.java index 72915d0..243ad62 100644 --- a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoader.java +++ b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoader.java @@ -17,16 +17,18 @@ package org.apache.gora.jcache.store; +import java.util.HashMap; +import java.util.Map; + +import javax.cache.integration.CacheLoader; +import javax.cache.integration.CacheLoaderException; + import org.apache.gora.persistency.impl.PersistentBase; import org.apache.gora.store.DataStore; +import org.apache.gora.util.GoraException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.cache.integration.CacheLoader; -import javax.cache.integration.CacheLoaderException; -import java.util.HashMap; -import java.util.Map; - /** * {@link org.apache.gora.jcache.store.JCacheCacheLoader} is the primary class * responsible for loading data beans from persistency dataStore to in memory cache. @@ -46,23 +48,27 @@ public class JCacheCacheLoader<K, T extends PersistentBase> implements CacheLoad try { persistent = dataStore.get(key); LOG.info("Loaded data bean from persistent datastore on key {}.", key.toString()); - } catch (CacheLoaderException ex) { - throw ex; + } catch (GoraException ex) { + throw new CacheLoaderException(ex); } return persistent; } @Override public Map<K, T> loadAll(Iterable<? extends K> keys) throws CacheLoaderException { - Map<K, T> loaded = new HashMap<K, T>(); - for (K key : keys) { - T persistent = dataStore.get(key); - LOG.info("Loaded data bean from persistent datastore on key {}.", key.toString()); - if (persistent != null) { - loaded.put(key, persistent); + try { + Map<K, T> loaded = new HashMap<K, T>(); + for (K key : keys) { + T persistent = dataStore.get(key); + LOG.info("Loaded data bean from persistent datastore on key {}.", key.toString()); + if (persistent != null) { + loaded.put(key, persistent); + } } + return loaded; + } catch (GoraException e) { + throw new CacheLoaderException(e); } - return loaded; } } http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriter.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriter.java b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriter.java index e1e5ae8..625b750 100644 --- a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriter.java +++ b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriter.java @@ -17,16 +17,18 @@ package org.apache.gora.jcache.store; -import org.apache.gora.persistency.impl.PersistentBase; -import org.apache.gora.store.DataStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.Collection; +import java.util.Iterator; import javax.cache.Cache; import javax.cache.integration.CacheWriter; import javax.cache.integration.CacheWriterException; -import java.util.Collection; -import java.util.Iterator; + +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.store.DataStore; +import org.apache.gora.util.GoraException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@link org.apache.gora.jcache.store.JCacheCacheWriter} is the primary class @@ -44,8 +46,12 @@ public class JCacheCacheWriter<K, T extends PersistentBase> implements CacheWrit @Override public void write(Cache.Entry<? extends K, ? extends T> entry) throws CacheWriterException { - dataStore.put(entry.getKey(), entry.getValue()); - LOG.info("Written data bean to persistent datastore on key {}.", entry.getKey().toString()); + try { + dataStore.put(entry.getKey(), entry.getValue()); + LOG.info("Written data bean to persistent datastore on key {}.", entry.getKey().toString()); + } catch (GoraException e) { + throw new CacheWriterException(e); + } } @Override @@ -60,8 +66,12 @@ public class JCacheCacheWriter<K, T extends PersistentBase> implements CacheWrit @Override public void delete(Object key) throws CacheWriterException { - dataStore.delete((K) key); - LOG.info("Deleted data bean from persistent datastore on key {}.", key.toString()); + try { + dataStore.delete((K) key); + LOG.info("Deleted data bean from persistent datastore on key {}.", key.toString()); + } catch (GoraException e) { + throw new CacheWriterException(e); + } } @Override http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheStore.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheStore.java b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheStore.java index e6b8feb..b07bf69 100644 --- a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheStore.java +++ b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheStore.java @@ -22,7 +22,10 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Iterator; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.NavigableMap; +import java.util.NavigableSet; import java.util.Properties; import java.util.ArrayList; import java.util.concurrent.ConcurrentSkipListSet; @@ -144,7 +147,7 @@ public class JCacheStore<K, T extends PersistentBase> extends DataStoreBase<K, T } @Override - public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) { + public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException { super.initialize(keyClass, persistentClass, properties); CachingProvider cachingProvider = Caching.getCachingProvider (properties.getProperty(GORA_DEFAULT_JCACHE_PROVIDER_KEY)); @@ -156,6 +159,7 @@ public class JCacheStore<K, T extends PersistentBase> extends DataStoreBase<K, T new Configuration()); } catch (GoraException ex) { LOG.error("Couldn't initialize persistent DataStore.", ex); + throw ex; } if (properties.getProperty(GORA_DEFAULT_JCACHE_PROVIDER_KEY) .contains(HAZELCAST_SERVER_CACHE_PROVIDER_IDENTIFIER)) { @@ -168,6 +172,7 @@ public class JCacheStore<K, T extends PersistentBase> extends DataStoreBase<K, T hazelcastInstance = HazelcastClient.newHazelcastClient(config); } catch (IOException ex) { LOG.error("Couldn't locate the client side cache provider configuration.", ex); + throw new GoraException (ex); } } Properties providerProperties = new Properties(); @@ -273,61 +278,100 @@ public class JCacheStore<K, T extends PersistentBase> extends DataStoreBase<K, T } @Override - public void createSchema() { - if (manager.getCache(super.getPersistentClass().getSimpleName(), keyClass, persistentClass) == null) { - cacheEntryList.clear(); - cache = manager.createCache(persistentClass.getSimpleName(), - cacheConfig).unwrap(ICache.class); + public void createSchema() throws GoraException { + try { + if (manager.getCache(super.getPersistentClass().getSimpleName(), keyClass, persistentClass) == null) { + cacheEntryList.clear(); + cache = manager.createCache(persistentClass.getSimpleName(), + cacheConfig).unwrap(ICache.class); + } + cache.registerCacheEntryListener(new MutableCacheEntryListenerConfiguration<>( + JCacheCacheFactoryBuilder + .factoryOfEntryListener(new JCacheCacheEntryListener<K, T>(cacheEntryList)), + null, true, true)); + persistentDataStore.createSchema(); + LOG.info("Created schema on persistent store and initialized cache for persistent bean {}." + , super.getPersistentClass().getSimpleName()); + } catch (GoraException e) { + throw e; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } - cache.registerCacheEntryListener(new MutableCacheEntryListenerConfiguration<>( - JCacheCacheFactoryBuilder - .factoryOfEntryListener(new JCacheCacheEntryListener<K, T>(cacheEntryList)), - null, true, true)); - persistentDataStore.createSchema(); - LOG.info("Created schema on persistent store and initialized cache for persistent bean {}." - , super.getPersistentClass().getSimpleName()); } @Override - public void deleteSchema() { - cache.removeAll(); - manager.destroyCache(super.getPersistentClass().getSimpleName()); - persistentDataStore.deleteSchema(); - LOG.info("Deleted schema on persistent store and destroyed cache for persistent bean {}." - , super.getPersistentClass().getSimpleName()); + public void deleteSchema() throws GoraException { + try { + cache.removeAll(); + manager.destroyCache(super.getPersistentClass().getSimpleName()); + persistentDataStore.deleteSchema(); + LOG.info("Deleted schema on persistent store and destroyed cache for persistent bean {}." + , super.getPersistentClass().getSimpleName()); + } catch (GoraException e) { + throw e; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); + } } @Override - public boolean schemaExists() { - return (manager.getCache(super.getPersistentClass().getSimpleName(), keyClass, persistentClass) != null); + public boolean schemaExists() throws GoraException { + try { + return (manager.getCache(super.getPersistentClass().getSimpleName(), keyClass, persistentClass) != null); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); + } } @Override - public T get(K key, String[] fields) { - T persitent = (T) cache.get(key); - if (persitent == null) { - return null; + public T get(K key, String[] fields) throws GoraException { + try { + T persitent = (T) cache.get(key); + if (persitent == null) { + return null; + } + return getPersistent(persitent, fields); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } - return getPersistent(persitent, fields); } @Override - public T get(K key) { - return cache.get(key); + public T get(K key) throws GoraException { + try { + return cache.get(key); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); + } } @Override - public void put(K key, T val) { - cache.put(key, val); + public void put(K key, T val) throws GoraException { + try { + cache.put(key, val); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); + } } @Override - public boolean delete(K key) { - return cache.remove(key); + public boolean delete(K key) throws GoraException { + try { + return cache.remove(key); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); + } } @Override - public long deleteByQuery(Query<K, T> query) { + public long deleteByQuery(Query<K, T> query) throws GoraException { try { long deletedRows = 0; Result<K, T> result = query.execute(); @@ -355,14 +399,16 @@ public class JCacheStore<K, T extends PersistentBase> extends DataStoreBase<K, T } LOG.info("JCache Gora datastore deleled {} rows from Persistent datastore.", deletedRows); return deletedRows; + } catch (GoraException e) { + throw e; } catch (Exception e) { - LOG.error("Exception occurred while deleting entries from JCache Gora datastore. Hence returning 0.", e); - return 0; - } + LOG.error(e.getMessage(), e); + throw new GoraException(e); + } } @Override - public Result<K, T> execute(Query<K, T> query) { + public Result<K, T> execute(Query<K, T> query) throws GoraException { K startKey = query.getStartKey(); K endKey = query.getEndKey(); if (startKey == null) { @@ -376,13 +422,20 @@ public class JCacheStore<K, T extends PersistentBase> extends DataStoreBase<K, T } } query.setFields(getFieldsToQuery(query.getFields())); - ConcurrentSkipListSet<K> cacheEntrySubList = null; - try { - cacheEntrySubList = (ConcurrentSkipListSet<K>) cacheEntryList.subSet(startKey, true, endKey, true); - } catch (NullPointerException npe) { - LOG.error("NPE occurred while executing the query for JCacheStore. Hence returning empty entry set.", npe); - return new JCacheResult<>(this, query, new ConcurrentSkipListSet<K>()); + + NavigableSet<K> cacheEntrySubList = null; + if (startKey != null && endKey != null) { + try { + cacheEntrySubList = cacheEntryList.subSet(startKey, true, endKey, true); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); + } + } else { + // Empty + cacheEntrySubList = Collections.emptyNavigableSet() ; } + return new JCacheResult<>(this, query, cacheEntrySubList); } @@ -413,23 +466,33 @@ public class JCacheStore<K, T extends PersistentBase> extends DataStoreBase<K, T partition.setConf(this.getConf()); partitions.add(partition); } - } catch (java.lang.Exception ex) { + } catch (IOException ex) { + throw ex; + } catch (Exception ex) { LOG.error("Exception occurred while partitioning the query based on Hazelcast partitions.", ex); - return null; + throw new IOException(ex.getMessage(), ex) ; } LOG.info("Query is partitioned to {} number of partitions.", partitions.size()); return partitions; } @Override - public void flush() { + public void flush() throws GoraException { persistentDataStore.flush(); LOG.info("JCache Gora datastore flushed successfully."); } @Override public void close() { - flush(); + try{ + flush(); + } catch (GoraException e) { + LOG.error(e.getMessage(), e); + if (e.getCause() != null) { + LOG.error(e.getCause().getMessage()); + } + // At this point, the exception is ignored... + } cacheEntryList.clear(); if (!cache.isDestroyed() && !manager.isClosed()) { cache.close(); http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStore.java ---------------------------------------------------------------------- diff --git a/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStore.java b/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStore.java index 3b0d6a3..5532cb5 100644 --- a/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStore.java +++ b/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStore.java @@ -17,14 +17,27 @@ */ package org.apache.gora.mongodb.store; -import static com.mongodb.AuthenticationMechanism.*; -import static org.apache.gora.mongodb.store.MongoMapping.DocumentFieldType; +import static com.mongodb.AuthenticationMechanism.GSSAPI; +import static com.mongodb.AuthenticationMechanism.MONGODB_CR; +import static com.mongodb.AuthenticationMechanism.MONGODB_X509; +import static com.mongodb.AuthenticationMechanism.PLAIN; +import static com.mongodb.AuthenticationMechanism.SCRAM_SHA_1; import java.io.IOException; import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Map; import java.util.Map.Entry; +import java.util.Properties; +import java.util.TimeZone; import java.util.concurrent.ConcurrentHashMap; import javax.xml.bind.DatatypeConverter; @@ -37,6 +50,7 @@ import org.apache.avro.util.Utf8; import org.apache.gora.mongodb.filters.MongoFilterUtil; import org.apache.gora.mongodb.query.MongoDBQuery; import org.apache.gora.mongodb.query.MongoDBResult; +import org.apache.gora.mongodb.store.MongoMapping.DocumentFieldType; import org.apache.gora.mongodb.utils.BSONDecorator; import org.apache.gora.mongodb.utils.GoraDBEncoder; import org.apache.gora.persistency.impl.BeanFactoryImpl; @@ -50,12 +64,27 @@ import org.apache.gora.query.impl.PartitionQueryImpl; import org.apache.gora.store.impl.DataStoreBase; import org.apache.gora.util.AvroUtils; import org.apache.gora.util.ClassLoadingUtils; +import org.apache.gora.util.GoraException; import org.bson.types.ObjectId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Splitter; -import com.mongodb.*; +import com.mongodb.BasicDBList; +import com.mongodb.BasicDBObject; +import com.mongodb.Bytes; +import com.mongodb.DB; +import com.mongodb.DBCollection; +import com.mongodb.DBCursor; +import com.mongodb.DBObject; +import com.mongodb.Mongo; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientOptions; +import com.mongodb.MongoCredential; +import com.mongodb.ReadPreference; +import com.mongodb.ServerAddress; +import com.mongodb.WriteConcern; +import com.mongodb.WriteResult; /** * Implementation of a MongoDB data store to be used by gora. @@ -103,7 +132,7 @@ DataStoreBase<K, T> { * properties up and reading the mapping file. */ public void initialize(final Class<K> keyClass, - final Class<T> pPersistentClass, final Properties properties) { + final Class<T> pPersistentClass, final Properties properties) throws GoraException { try { LOG.debug("Initializing MongoDB store"); MongoStoreParameters parameters = MongoStoreParameters.load(properties, getConf()); @@ -125,10 +154,11 @@ DataStoreBase<K, T> { LOG.info("Initialized Mongo store for database {} of {}.", new Object[] { parameters.getDbname(), parameters.getServers() }); + } catch (GoraException e) { + throw e; } catch (IOException e) { - LOG.error("Error while initializing MongoDB store: {}", - new Object[] { e.getMessage() }); - throw new RuntimeException(e); + LOG.error("Error while initializing MongoDB store", e); + throw new GoraException(e); } } @@ -242,58 +272,79 @@ DataStoreBase<K, T> { * Create a new collection in MongoDB if necessary. */ @Override - public void createSchema() { + public void createSchema() throws GoraException { if (mongoClientDB == null) - throw new IllegalStateException( + throw new GoraException( "Impossible to create the schema as no database has been selected."); if (schemaExists()) { return; } - - // If initialized create the collection - mongoClientColl = mongoClientDB.createCollection( - mapping.getCollectionName(), new BasicDBObject()); // send a DBObject to - // force creation - // otherwise creation is deferred - mongoClientColl.setDBEncoderFactory(GoraDBEncoder.FACTORY); - - LOG.debug("Collection {} has been created for Mongo instance {}.", - new Object[] { mapping.getCollectionName(), mongoClientDB.getMongo() }); + + try { + // If initialized create the collection + mongoClientColl = mongoClientDB.createCollection( + mapping.getCollectionName(), new BasicDBObject()); // send a DBObject to + // force creation + // otherwise creation is deferred + mongoClientColl.setDBEncoderFactory(GoraDBEncoder.FACTORY); + + LOG.debug("Collection {} has been created for Mongo instance {}.", + new Object[] { mapping.getCollectionName(), mongoClientDB.getMongo() }); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); + } } /** * Drop the collection. */ @Override - public void deleteSchema() { + public void deleteSchema() throws GoraException { if (mongoClientColl == null) - throw new IllegalStateException( + throw new GoraException( "Impossible to delete the schema as no schema is selected."); - // If initialized, simply drop the collection - mongoClientColl.drop(); - - LOG.debug( - "Collection {} has been dropped for Mongo instance {}.", - new Object[] { mongoClientColl.getFullName(), mongoClientDB.getMongo() }); + + try { + // If initialized, simply drop the collection + mongoClientColl.drop(); + + LOG.debug( + "Collection {} has been dropped for Mongo instance {}.", + new Object[] { mongoClientColl.getFullName(), mongoClientDB.getMongo() }); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); + } } /** * Check if the collection already exists or should be created. */ @Override - public boolean schemaExists() { - return mongoClientDB.collectionExists(mapping.getCollectionName()); + public boolean schemaExists() throws GoraException { + try { + return mongoClientDB.collectionExists(mapping.getCollectionName()); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); + } } /** * Ensure the data is synced to disk. */ @Override - public void flush() { - for (MongoClient client : mapsOfClients.values()) { - client.fsync(false); - LOG.debug("Forced synced of database for Mongo instance {}.", - new Object[] { client }); + public void flush() throws GoraException { + try { + for (MongoClient client : mapsOfClients.values()) { + client.fsync(false); + LOG.debug("Forced synced of database for Mongo instance {}.", + new Object[] { client }); + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } } @@ -313,21 +364,26 @@ DataStoreBase<K, T> { * list of fields to be loaded from the database */ @Override - public T get(final K key, final String[] fields) { - String[] dbFields = getFieldsToQuery(fields); - // Prepare the MongoDB query - BasicDBObject q = new BasicDBObject("_id", key); - BasicDBObject proj = new BasicDBObject(); - for (String field : dbFields) { - String docf = mapping.getDocumentField(field); - if (docf != null) { - proj.put(docf, true); + public T get(final K key, final String[] fields) throws GoraException { + try { + String[] dbFields = getFieldsToQuery(fields); + // Prepare the MongoDB query + BasicDBObject q = new BasicDBObject("_id", key); + BasicDBObject proj = new BasicDBObject(); + for (String field : dbFields) { + String docf = mapping.getDocumentField(field); + if (docf != null) { + proj.put(docf, true); + } } + // Execute the query + DBObject res = mongoClientColl.findOne(q, proj); + // Build the corresponding persistent + return newInstance(res, dbFields); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } - // Execute the query - DBObject res = mongoClientColl.findOne(q, proj); - // Build the corresponding persistent - return newInstance(res, dbFields); } /** @@ -339,13 +395,18 @@ DataStoreBase<K, T> { * the object to be inserted */ @Override - public void put(final K key, final T obj) { - // Save the object in the database - if (obj.isDirty()) { - performPut(key, obj); - } else { - LOG.info("Ignored putting object {} in the store as it is neither " - + "new, neither dirty.", new Object[] { obj }); + public void put(final K key, final T obj) throws GoraException { + try { + // Save the object in the database + if (obj.isDirty()) { + performPut(key, obj); + } else { + LOG.info("Ignored putting object {} in the store as it is neither " + + "new, neither dirty.", new Object[] { obj }); + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } } @@ -385,54 +446,68 @@ DataStoreBase<K, T> { } @Override - public boolean delete(final K key) { - DBObject removeKey = new BasicDBObject("_id", key); - WriteResult writeResult = mongoClientColl.remove(removeKey); - return writeResult != null && writeResult.getN() > 0; + public boolean delete(final K key) throws GoraException { + try { + DBObject removeKey = new BasicDBObject("_id", key); + WriteResult writeResult = mongoClientColl.remove(removeKey); + return writeResult != null && writeResult.getN() > 0; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); + } } @Override - public long deleteByQuery(final Query<K, T> query) { - // Build the actual MongoDB query - DBObject q = MongoDBQuery.toDBQuery(query); - WriteResult writeResult = mongoClientColl.remove(q); - if (writeResult != null) { - return writeResult.getN(); + public long deleteByQuery(final Query<K, T> query) throws GoraException { + try { + // Build the actual MongoDB query + DBObject q = MongoDBQuery.toDBQuery(query); + WriteResult writeResult = mongoClientColl.remove(q); + if (writeResult != null) { + return writeResult.getN(); + } + return 0; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } - return 0; } /** * Execute the query and return the result. */ @Override - public Result<K, T> execute(final Query<K, T> query) { - - String[] fields = getFieldsToQuery(query.getFields()); - // Build the actual MongoDB query - DBObject q = MongoDBQuery.toDBQuery(query); - DBObject p = MongoDBQuery.toProjection(fields, mapping); - - if (query.getFilter() != null) { - boolean succeeded = filterUtil.setFilter(q, query.getFilter(), this); - if (succeeded) { - // don't need local filter - query.setLocalFilterEnabled(false); + public Result<K, T> execute(final Query<K, T> query) throws GoraException { + try { + String[] fields = getFieldsToQuery(query.getFields()); + // Build the actual MongoDB query + DBObject q = MongoDBQuery.toDBQuery(query); + DBObject p = MongoDBQuery.toProjection(fields, mapping); + + if (query.getFilter() != null) { + boolean succeeded = filterUtil.setFilter(q, query.getFilter(), this); + if (succeeded) { + // don't need local filter + query.setLocalFilterEnabled(false); + } } + + // Execute the query on the collection + DBCursor cursor = mongoClientColl.find(q, p); + if (query.getLimit() > 0) + cursor = cursor.limit((int) query.getLimit()); + cursor.batchSize(100); + cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT); + + // Build the result + MongoDBResult<K, T> mongoResult = new MongoDBResult<>(this, query); + mongoResult.setCursor(cursor); + + return mongoResult; + } catch(Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } - - // Execute the query on the collection - DBCursor cursor = mongoClientColl.find(q, p); - if (query.getLimit() > 0) - cursor = cursor.limit((int) query.getLimit()); - cursor.batchSize(100); - cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT); - - // Build the result - MongoDBResult<K, T> mongoResult = new MongoDBResult<>(this, query); - mongoResult.setCursor(cursor); - - return mongoResult; } /** @@ -474,8 +549,9 @@ DataStoreBase<K, T> { * the list of fields to be mapped to the persistence class instance * @return a persistence class instance which content was deserialized from * the {@link DBObject} + * @throws GoraException */ - public T newInstance(final DBObject obj, final String[] fields) { + public T newInstance(final DBObject obj, final String[] fields) throws GoraException { if (obj == null) return null; BSONDecorator easybson = new BSONDecorator(obj); @@ -507,7 +583,7 @@ DataStoreBase<K, T> { private Object fromDBObject(final Schema fieldSchema, final DocumentFieldType storeType, final Field field, final String docf, - final BSONDecorator easybson) { + final BSONDecorator easybson) throws GoraException { Object result = null; switch (fieldSchema.getType()) { case MAP: @@ -565,7 +641,7 @@ DataStoreBase<K, T> { private Object fromMongoUnion(final Schema fieldSchema, final DocumentFieldType storeType, final Field field, final String docf, - final BSONDecorator easybson) { + final BSONDecorator easybson) throws GoraException { Object result;// schema [type0, type1] Type type0 = fieldSchema.getTypes().get(0).getType(); Type type1 = fieldSchema.getTypes().get(1).getType(); @@ -589,7 +665,7 @@ DataStoreBase<K, T> { @SuppressWarnings({ "unchecked", "rawtypes" }) private Object fromMongoRecord(final Schema fieldSchema, final String docf, - final DBObject rec) { + final DBObject rec) throws GoraException { Object result; BSONDecorator innerBson = new BSONDecorator(rec); Class<?> clazz = null; @@ -619,7 +695,7 @@ DataStoreBase<K, T> { } /* pp */ Object fromMongoList(final String docf, final Schema fieldSchema, - final BSONDecorator easybson, final Field f) { + final BSONDecorator easybson, final Field f) throws GoraException { List<Object> list = easybson.getDBList(docf); List<Object> rlist = new ArrayList<>(); if (list == null) { @@ -637,7 +713,7 @@ DataStoreBase<K, T> { } /* pp */ Object fromMongoMap(final String docf, final Schema fieldSchema, - final BSONDecorator easybson, final Field f) { + final BSONDecorator easybson, final Field f) throws GoraException { BasicDBObject map = easybson.getDBObject(docf); Map<Utf8, Object> rmap = new HashMap<>(); if (map == null) { http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-mongodb/src/test/java/org/apache/gora/mongodb/store/TestMongoStore.java ---------------------------------------------------------------------- diff --git a/gora-mongodb/src/test/java/org/apache/gora/mongodb/store/TestMongoStore.java b/gora-mongodb/src/test/java/org/apache/gora/mongodb/store/TestMongoStore.java index 62e19f9..9deb906 100644 --- a/gora-mongodb/src/test/java/org/apache/gora/mongodb/store/TestMongoStore.java +++ b/gora-mongodb/src/test/java/org/apache/gora/mongodb/store/TestMongoStore.java @@ -28,6 +28,7 @@ import org.apache.gora.query.Query; import org.apache.gora.query.Result; import org.apache.gora.store.DataStore; import org.apache.gora.store.DataStoreTestBase; +import org.apache.gora.util.GoraException; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; @@ -158,7 +159,7 @@ public abstract class TestMongoStore extends DataStoreTestBase { r.close(); } - private void addWebPage() { + private void addWebPage() throws GoraException { String key = String.valueOf(keySequence++); WebPage p1 = webPageStore.newPersistent(); p1.setUrl(new Utf8(key)); http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java ---------------------------------------------------------------------- diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java index 000f1fa..faf0607 100644 --- a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java +++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java @@ -64,6 +64,7 @@ import org.apache.gora.query.impl.PartitionQueryImpl; import org.apache.gora.store.impl.DataStoreBase; import org.apache.gora.util.AvroUtils; import org.apache.gora.util.ClassLoadingUtils; +import org.apache.gora.util.GoraException; import javax.xml.bind.DatatypeConverter; @@ -95,7 +96,7 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, * @param properties OrientDB dataStore properties EG:- OrientDB client credentials. */ @Override - public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) { + public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException { super.initialize(keyClass, persistentClass, properties); try { orientDbStoreParams = OrientDBStoreParameters.load(properties); @@ -153,14 +154,13 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, * Create a new class of OrientDB documents if necessary. Enforce specified schema over the document class. * */ @Override - public void createSchema() { + public void createSchema() throws GoraException { if (schemaExists()) { return; } - ODatabaseDocumentTx schemaTx = connectionPool.acquire(); - schemaTx.activateOnCurrentThread(); - try { + try (ODatabaseDocumentTx schemaTx = connectionPool.acquire()) { + schemaTx.activateOnCurrentThread(); OClass documentClass = schemaTx.getMetadata().getSchema().createClass(orientDBMapping.getDocumentClass()); documentClass.createProperty("_id", @@ -170,8 +170,9 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, OType.valueOf(orientDBMapping.getDocumentFieldType(docField).name())); } schemaTx.getMetadata().getSchema().reload(); - } finally { - schemaTx.close(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } } @@ -180,13 +181,13 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, * Deletes enforced schema over OrientDB Document class. */ @Override - public void deleteSchema() { - ODatabaseDocumentTx schemaTx = connectionPool.acquire(); - schemaTx.activateOnCurrentThread(); - try { + public void deleteSchema() throws GoraException { + try (ODatabaseDocumentTx schemaTx = connectionPool.acquire()) { + schemaTx.activateOnCurrentThread(); schemaTx.getMetadata().getSchema().dropClass(orientDBMapping.getDocumentClass()); - } finally { - schemaTx.close(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } } @@ -195,14 +196,14 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, * Check whether there exist a schema enforced over OrientDB document class. */ @Override - public boolean schemaExists() { - ODatabaseDocumentTx schemaTx = connectionPool.acquire(); - schemaTx.activateOnCurrentThread(); - try { + public boolean schemaExists() throws GoraException { + try (ODatabaseDocumentTx schemaTx = connectionPool.acquire()) { + schemaTx.activateOnCurrentThread(); return schemaTx.getMetadata().getSchema() .existsClass(orientDBMapping.getDocumentClass()); - } finally { - schemaTx.close(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } } @@ -210,7 +211,7 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, * {@inheritDoc} */ @Override - public T get(K key, String[] fields) { + public T get(K key, String[] fields) throws GoraException { String[] dbFields = getFieldsToQuery(fields); com.github.raymanrt.orientqb.query.Query selectQuery = new com.github.raymanrt.orientqb.query.Query(); for (String k : dbFields) { @@ -224,17 +225,18 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, Map<String, Object> params = new HashMap<String, Object>(); params.put("key", key); OSQLSynchQuery<ODocument> query = new OSQLSynchQuery<ODocument>(selectQuery.toString()); - ODatabaseDocumentTx selectTx = connectionPool.acquire(); - selectTx.activateOnCurrentThread(); - try { + + try (ODatabaseDocumentTx selectTx = connectionPool.acquire()) { + selectTx.activateOnCurrentThread(); List<ODocument> result = selectTx.command(query).execute(params); if (result.size() == 1) { return convertOrientDocToAvroBean(result.get(0), dbFields); } else { return null; } - } finally { - selectTx.close(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } } @@ -242,16 +244,15 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, * {@inheritDoc} */ @Override - public void put(K key, T val) { + public void put(K key, T val) throws GoraException { if (val.isDirty()) { OrientDBQuery<K, T> dataStoreQuery = new OrientDBQuery<>(this); dataStoreQuery.setStartKey(key); dataStoreQuery.setEndKey(key); dataStoreQuery.populateOrientDBQuery(orientDBMapping, getFieldsToQuery(null), getFields()); - ODatabaseDocumentTx selectTx = connectionPool.acquire(); - selectTx.activateOnCurrentThread(); - try { + try (ODatabaseDocumentTx selectTx = connectionPool.acquire()) { + selectTx.activateOnCurrentThread(); // TODO : further optimize for queries to separate cases update / insert == get rid of select all query // TODO : for update List<ODocument> result = selectTx.command(dataStoreQuery.getOrientDBQuery()) @@ -263,8 +264,9 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, ODocument document = convertAvroBeanToOrientDoc(key, val); docBatch.add(document); } - } finally { - selectTx.close(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } } else { if (LOG.isDebugEnabled()) { @@ -278,24 +280,24 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, * {@inheritDoc} */ @Override - public boolean delete(K key) { + public boolean delete(K key) throws GoraException { Delete delete = new Delete(); delete.from(orientDBMapping.getDocumentClass()) .where(projection("_id").eq(Parameter.parameter("key"))); Map<String, Object> params = new HashMap<String, Object>(); params.put("key", key); OCommandSQL query = new OCommandSQL(delete.toString().replace("DELETE", "DELETE FROM")); - ODatabaseDocumentTx deleteTx = connectionPool.acquire(); - deleteTx.activateOnCurrentThread(); - try { + try (ODatabaseDocumentTx deleteTx = connectionPool.acquire()) { + deleteTx.activateOnCurrentThread(); int deleteCount = deleteTx.command(query).execute(params); if (deleteCount == 1) { return true; } else { return false; } - } finally { - deleteTx.close(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } } @@ -303,7 +305,7 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, * {@inheritDoc} */ @Override - public long deleteByQuery(Query<K, T> query) { + public long deleteByQuery(Query<K, T> query) throws GoraException { Delete delete = new Delete(); delete.from(orientDBMapping.getDocumentClass()); Map<String, Object> params = new HashMap<String, Object>(); @@ -318,9 +320,8 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, } OCommandSQL dbQuery = new OCommandSQL(delete.toString().replace("DELETE", "DELETE FROM")); - ODatabaseDocumentTx deleteTx = connectionPool.acquire(); - deleteTx.activateOnCurrentThread(); - try { + try (ODatabaseDocumentTx deleteTx = connectionPool.acquire()) { + deleteTx.activateOnCurrentThread(); int deleteCount; if (params.isEmpty()) { deleteCount = deleteTx.command(dbQuery).execute(); @@ -332,8 +333,9 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, } else { return 0; } - } finally { - deleteTx.close(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } } else { @@ -342,9 +344,8 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, dataStoreQuery.setEndKey(query.getEndKey()); dataStoreQuery.populateOrientDBQuery(orientDBMapping, getFieldsToQuery(null), getFields()); - ODatabaseDocumentTx selectTx = connectionPool.acquire(); - selectTx.activateOnCurrentThread(); - try { + try (ODatabaseDocumentTx selectTx = connectionPool.acquire()) { + selectTx.activateOnCurrentThread(); List<ODocument> result = selectTx.command(dataStoreQuery.getOrientDBQuery()) .execute(dataStoreQuery.getParams()); if (result != null && result.isEmpty()) { @@ -360,8 +361,9 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, } return result.size(); } - } finally { - selectTx.close(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } } } @@ -370,7 +372,7 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, * {@inheritDoc} */ @Override - public Result<K, T> execute(Query<K, T> query) { + public Result<K, T> execute(Query<K, T> query) throws GoraException { String[] fields = getFieldsToQuery(query.getFields()); OrientDBQuery dataStoreQuery; if (query instanceof OrientDBQuery) { @@ -379,15 +381,15 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, dataStoreQuery = (OrientDBQuery) ((PartitionQueryImpl<K, T>) query).getBaseQuery(); } dataStoreQuery.populateOrientDBQuery(orientDBMapping, fields, getFields()); - ODatabaseDocumentTx selectTx = connectionPool.acquire(); - selectTx.activateOnCurrentThread(); - try { + try (ODatabaseDocumentTx selectTx = connectionPool.acquire()) { + selectTx.activateOnCurrentThread(); OConcurrentResultSet<ODocument> result = selectTx.command(dataStoreQuery.getOrientDBQuery()) .execute(dataStoreQuery.getParams()); result.setLimit((int) query.getLimit()); return new OrientDBResult<K, T>(this, query, result); - } finally { - selectTx.close(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } } @@ -420,16 +422,17 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, * Flushes locally cached to content in memory to remote OrientDB server. */ @Override - public void flush() { - ODatabaseDocumentTx updateTx = connectionPool.acquire(); - updateTx.activateOnCurrentThread(); - try { + public void flush() throws GoraException { + try (ODatabaseDocumentTx updateTx = connectionPool.acquire()) { + updateTx.activateOnCurrentThread(); flushLock.lock(); for (ODocument document : docBatch) { updateTx.save(document); } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } finally { - updateTx.close(); docBatch.clear(); flushLock.unlock(); } @@ -455,7 +458,7 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, return connectionPool; } - public T convertOrientDocToAvroBean(final ODocument obj, final String[] fields) { + public T convertOrientDocToAvroBean(final ODocument obj, final String[] fields) throws GoraException { T persistent = newPersistent(); String[] dbFields = getFieldsToQuery(fields); for (String f : dbFields) { @@ -480,7 +483,7 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, final OrientDBMapping.DocumentFieldType storeType, final Schema.Field field, final String docf, - final ODocument obj) { + final ODocument obj) throws GoraException { Object result = null; switch (fieldSchema.getType()) { case MAP: @@ -543,7 +546,7 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, final Schema fieldSchema, final ODocument doc, final Schema.Field f, - final OrientDBMapping.DocumentFieldType storeType) { + final OrientDBMapping.DocumentFieldType storeType) throws GoraException { if (storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDSET) { OTrackedSet<Object> set = doc.field(docf); List<Object> rlist = new ArrayList<>(); @@ -605,7 +608,7 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, private Object convertDocFieldToAvroMap(final String docf, final Schema fieldSchema, final ODocument doc, final Schema.Field f, - final OrientDBMapping.DocumentFieldType storeType) { + final OrientDBMapping.DocumentFieldType storeType) throws GoraException { if (storeType == OrientDBMapping.DocumentFieldType.EMBEDDEDMAP) { OTrackedMap<Object> map = doc.field(docf); Map<Utf8, Object> rmap = new HashMap<>(); @@ -682,13 +685,14 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, } private Object convertAvroBeanToOrientDoc(final Schema fieldSchema, - final ODocument doc) { + final ODocument doc) throws GoraException { Object result; Class<?> clazz = null; try { clazz = ClassLoadingUtils.loadClass(fieldSchema.getFullName()); - } catch (ClassNotFoundException e) { - //Ignore + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } PersistentBase record = (PersistentBase) new BeanFactoryImpl(keyClass, clazz).newPersistent(); for (Schema.Field recField : fieldSchema.getFields()) { @@ -727,7 +731,7 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, final OrientDBMapping.DocumentFieldType storeType, final Schema.Field field, final String docf, - final ODocument doc) { + final ODocument doc) throws GoraException { Object result; Schema.Type type0 = fieldSchema.getTypes().get(0).getType(); Schema.Type type1 = fieldSchema.getTypes().get(1).getType(); @@ -746,7 +750,7 @@ public class OrientDBStore<K, T extends PersistentBase> extends DataStoreBase<K, result = convertDocFieldToAvroField(innerSchema, storeType, field, docf, doc); } else { - throw new IllegalStateException("OrientDBStore only supports Union of two types field."); + throw new GoraException("OrientDBStore only supports Union of two types field."); } return result; } http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-solr/src/main/java/org/apache/gora/solr/query/SolrResult.java ---------------------------------------------------------------------- diff --git a/gora-solr/src/main/java/org/apache/gora/solr/query/SolrResult.java b/gora-solr/src/main/java/org/apache/gora/solr/query/SolrResult.java index d3ef88a..1f2cfe1 100644 --- a/gora-solr/src/main/java/org/apache/gora/solr/query/SolrResult.java +++ b/gora-solr/src/main/java/org/apache/gora/solr/query/SolrResult.java @@ -27,6 +27,7 @@ import org.apache.gora.query.impl.PartitionQueryImpl; import org.apache.gora.query.impl.ResultBase; import org.apache.gora.solr.store.SolrStore; import org.apache.gora.store.DataStore; +import org.apache.gora.util.GoraException; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.response.QueryResponse; @@ -34,6 +35,8 @@ import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * SolrResult specific implementation of the {@link org.apache.gora.query.Result} @@ -41,6 +44,8 @@ import org.apache.solr.common.params.ModifiableSolrParams; */ public class SolrResult<K, T extends PersistentBase> extends ResultBase<K, T> { + private static final Logger LOG = LoggerFactory.getLogger(SolrResult.class); + SolrDocumentList list = null; SolrStore<K, T> store; String[] fields; @@ -55,7 +60,7 @@ public class SolrResult<K, T extends PersistentBase> extends ResultBase<K, T> { * @param resultsSize The number of rows to be returned */ public SolrResult(DataStore<K, T> dataStore, Query<K, T> query, - SolrClient server, int resultsSize) throws IOException { + SolrClient server, int resultsSize) throws GoraException { super(dataStore, query); store = (SolrStore<K, T>)dataStore; ModifiableSolrParams params = new ModifiableSolrParams(); @@ -82,8 +87,9 @@ public class SolrResult<K, T extends PersistentBase> extends ResultBase<K, T> { try { QueryResponse rsp = server.query(params); list = rsp.getResults(); - } catch (SolrServerException e) { - throw new IOException(e); + } catch (SolrServerException | IOException e) { + LOG.error(e.getMessage(), e); + throw new GoraException(e); } } http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java ---------------------------------------------------------------------- diff --git a/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java b/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java index ac44a06..ae85a66 100644 --- a/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java +++ b/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java @@ -17,11 +17,11 @@ package org.apache.gora.solr.store; import java.io.IOException; import java.net.MalformedURLException; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Locale; -import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; @@ -43,6 +43,7 @@ import org.apache.gora.solr.query.SolrResult; import org.apache.gora.store.DataStoreFactory; import org.apache.gora.store.impl.DataStoreBase; import org.apache.gora.util.AvroUtils; +import org.apache.gora.util.GoraException; import org.apache.gora.util.IOUtils; import org.apache.hadoop.util.StringUtils; import org.apache.http.impl.client.DefaultHttpClient; @@ -201,7 +202,7 @@ public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, T> */ @Override public void initialize(Class<K> keyClass, Class<T> persistentClass, - Properties properties) { + Properties properties) throws GoraException { super.initialize(keyClass, persistentClass, properties); try { String mappingFile = DataStoreFactory.getMappingFile(properties, this, @@ -209,6 +210,7 @@ public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, T> mapping = readMapping(mappingFile); } catch (IOException e) { LOG.error(e.getMessage(), e); + throw new GoraException(e); } SolrClientUrl = DataStoreFactory.findProperty(properties, this, @@ -371,51 +373,49 @@ public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, T> } @Override - public void createSchema() { + public void createSchema() throws GoraException { try { if (!schemaExists()) CoreAdminRequest.createCore(mapping.getCoreName(), mapping.getCoreName(), adminServer, solrConfig, solrSchema); } catch (Exception e) { LOG.error(e.getMessage(), e); + throw new GoraException(e); } } @Override /** Default implementation deletes and recreates the schema*/ - public void truncateSchema() { + public void truncateSchema() throws GoraException { try { server.deleteByQuery("*:*"); server.commit(); } catch (Exception e) { - // ignore? LOG.error(e.getMessage(), e); + throw new GoraException(e); } } @Override - public void deleteSchema() { + public void deleteSchema() throws GoraException { // XXX should this be only in truncateSchema ??? try { server.deleteByQuery("*:*"); server.commit(); - } catch (Exception e) { - // ignore? - // LOG.error(e.getMessage(), e); - } - try { + CoreAdminRequest.unloadCore(mapping.getCoreName(), adminServer); } catch (Exception e) { if (e.getMessage().contains("No such core")) { return; // it's ok, the core is not there } else { LOG.error(e.getMessage(), e); + throw new GoraException(e); } } } @Override - public boolean schemaExists() { + public boolean schemaExists() throws GoraException { boolean exists = false; try { CoreAdminResponse rsp = CoreAdminRequest.getStatus(mapping.getCoreName(), @@ -423,6 +423,7 @@ public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, T> exists = rsp.getUptime(mapping.getCoreName()) != null; } catch (Exception e) { LOG.error(e.getMessage(), e); + throw new GoraException(e); } return exists; } @@ -460,7 +461,7 @@ public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, T> } @Override - public T get(K key, String[] fields) { + public T get(K key, String[] fields) throws GoraException { ModifiableSolrParams params = new ModifiableSolrParams(); params.set(CommonParams.QT, "/get"); params.set(CommonParams.FL, toDelimitedString(fields, ",")); @@ -474,8 +475,8 @@ public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, T> return newInstance((SolrDocument) o, fields); } catch (Exception e) { LOG.error(e.getMessage(), e); + throw new GoraException(e); } - return null; } public T newInstance(SolrDocument doc, String[] fields) throws IOException { @@ -589,7 +590,7 @@ public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, T> } @Override - public void put(K key, T persistent) { + public void put(K key, T persistent) throws GoraException { Schema schema = persistent.getSchema(); if (!persistent.isDirty()) { // nothing to do @@ -624,6 +625,7 @@ public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, T> batch.clear(); } catch (Exception e) { LOG.error(e.getMessage(), e); + throw new GoraException(e); } } } @@ -727,7 +729,7 @@ public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, T> } @Override - public boolean delete(K key) { + public boolean delete(K key) throws GoraException { String keyField = mapping.getPrimaryKey(); try { UpdateResponse rsp = server.deleteByQuery(keyField + ":" @@ -737,12 +739,12 @@ public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, T> return true; } catch (Exception e) { LOG.error(e.getMessage(), e); + throw new GoraException(e); } - return false; } @Override - public long deleteByQuery(Query<K, T> query) { + public long deleteByQuery(Query<K, T> query) throws GoraException { UpdateResponse rsp; try { /* @@ -778,20 +780,18 @@ public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, T> server.commit(); LOG.info(rsp.toString()); } + } catch (GoraException e) { + throw e; } catch (Exception e) { LOG.error(e.getMessage(), e); + throw new GoraException(e); } return 0; } @Override - public Result<K, T> execute(Query<K, T> query) { - try { - return new SolrResult<>(this, query, server, resultsSize); - } catch (IOException e) { - LOG.error(e.getMessage(), e); - } - return null; + public Result<K, T> execute(Query<K, T> query) throws GoraException{ + return new SolrResult<>(this, query, server, resultsSize); } @Override @@ -813,7 +813,7 @@ public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, T> } @Override - public void flush() { + public void flush() throws GoraException { try { if (batch.size() > 0) { add(batch, commitWithin); @@ -821,12 +821,18 @@ public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, T> } } catch (Exception e) { LOG.error(e.getMessage(), e); + throw new GoraException(e); } } @Override public void close() { - flush(); + try { + flush(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + // Ignore the exception. Just nothing more to do if does not got close + } } private void add(ArrayList<SolrInputDocument> batch, int commitWithin) http://git-wip-us.apache.org/repos/asf/gora/blob/b06da5f3/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/DistributedLogManager.java ---------------------------------------------------------------------- diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/DistributedLogManager.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/DistributedLogManager.java index 02f588a..a7efba4 100644 --- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/DistributedLogManager.java +++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/DistributedLogManager.java @@ -18,16 +18,6 @@ package org.apache.gora.tutorial.log; -import org.apache.avro.util.Utf8; -import org.apache.gora.query.Query; -import org.apache.gora.query.Result; -import org.apache.gora.store.DataStore; -import org.apache.gora.store.DataStoreFactory; -import org.apache.gora.tutorial.log.generated.Pageview; -import org.apache.hadoop.conf.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; @@ -40,6 +30,17 @@ import java.util.Collections; import java.util.Locale; import java.util.StringTokenizer; +import org.apache.avro.util.Utf8; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.store.DataStore; +import org.apache.gora.store.DataStoreFactory; +import org.apache.gora.tutorial.log.generated.Pageview; +import org.apache.gora.util.GoraException; +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * DistributedLogManager {@link org.apache.gora.tutorial.log.DistributedLogManager} is the tutorial class to @@ -399,7 +400,7 @@ public class DistributedLogManager { } } - private void deleteSchema() { + private void deleteSchema() throws GoraException { cacheStore.deleteSchema(); log.info("Deleted schema on dataStore"); }
