Repository: gora Updated Branches: refs/heads/master f4debf25a -> ed8081081
adding review fixes plus tests for GORA-409 Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/ed808108 Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/ed808108 Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/ed808108 Branch: refs/heads/master Commit: ed80810818c47d67668f26f3469e90ac10c97bb2 Parents: f4debf2 Author: Kevin <[email protected]> Authored: Fri Aug 12 01:00:18 2016 +0530 Committer: Kevin <[email protected]> Committed: Fri Aug 12 01:00:18 2016 +0530 ---------------------------------------------------------------------- .../gora/persistency/impl/PersistentBase.java | 2 + .../org/apache/gora/store/DataStoreFactory.java | 12 +-- gora-jcache/pom.xml | 7 ++ .../apache/gora/jcache/query/JCacheQuery.java | 4 + .../apache/gora/jcache/query/JCacheResult.java | 9 +++ .../jcache/store/JCacheCacheEntryListener.java | 13 +++- .../store/JCacheCacheEntryListenerFactory.java | 8 +- .../jcache/store/JCacheCacheFactoryBuilder.java | 4 + .../gora/jcache/store/JCacheCacheLoader.java | 9 ++- .../jcache/store/JCacheCacheLoaderFactory.java | 5 ++ .../gora/jcache/store/JCacheCacheWriter.java | 8 +- .../jcache/store/JCacheCacheWriterFactory.java | 5 ++ .../apache/gora/jcache/store/JCacheStore.java | 81 +++++++++++++++----- .../gora/jcache/GoraHazelcastTestDriver.java | 73 ++++++++++++++++++ .../mapreduce/JCacheStoreMapReduceTest.java | 63 +++++++++++++++ .../jcache/store/JCacheGoraDataStoreTest.java | 22 ++++-- gora-jcache/src/test/resources/gora.properties | 3 +- .../src/test/resources/hazelcast-client.xml | 31 ++++++++ gora-jcache/src/test/resources/hazelcast.xml | 5 +- pom.xml | 2 +- 20 files changed, 319 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/ed808108/gora-core/src/main/java/org/apache/gora/persistency/impl/PersistentBase.java ---------------------------------------------------------------------- diff --git a/gora-core/src/main/java/org/apache/gora/persistency/impl/PersistentBase.java b/gora-core/src/main/java/org/apache/gora/persistency/impl/PersistentBase.java index 8c1a30c..3635c69 100644 --- a/gora-core/src/main/java/org/apache/gora/persistency/impl/PersistentBase.java +++ b/gora-core/src/main/java/org/apache/gora/persistency/impl/PersistentBase.java @@ -193,6 +193,8 @@ public abstract class PersistentBase extends SpecificRecordBase implements * on velocity template record.vm. * <p> * Note {@link java.nio.ByteBuffer} is not itself not in serializable form. + * + * @return __g__dirty dirty bytes */ public ByteBuffer getDirtyBytes() { return __g__dirty; http://git-wip-us.apache.org/repos/asf/gora/blob/ed808108/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java ---------------------------------------------------------------------- diff --git a/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java b/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java index eca57bf..de5da36 100644 --- a/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java +++ b/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java @@ -41,7 +41,7 @@ public class DataStoreFactory{ public static final String GORA_DEFAULT_PROPERTIES_FILE = "gora.properties"; public static final String GORA_DEFAULT_DATASTORE_KEY = "gora.datastore.default"; - + /*This selects the default caching dataStore which wraps any GORA persistency dataStore*/ public static final String GORA_DEFAULT_CACHE_DATASTORE_KEY = "gora.cache.datastore.default"; public static final String GORA = "gora"; @@ -278,8 +278,8 @@ public class DataStoreFactory{ /** - * Instantiate <i>the default</i> {@link DataStore} wrapped over JCache datastore which provides caching - * abstraction over any GORA persistence dataStore. + * Instantiate <i>the default</i> {@link DataStore} wrapped over caching dataStore which provides caching + * abstraction over the GORA persistence dataStore. * Uses default properties. Uses 'null' schema. * * Note: @@ -287,10 +287,10 @@ public class DataStoreFactory{ * * @param keyClass The key class. * @param persistent The value class. - * @param conf {@link Configuration} to be used be the store. - * @param isCacheEnabled caching enable + * @param conf {@link Configuration} To be used be the store. + * @param isCacheEnabled Caching enable or not. * @return A new store instance. - * @throws GoraException + * @throws GoraException If cache or persistency dataStore initialization interrupted. */ @SuppressWarnings("unchecked") public static <K, T extends Persistent> DataStore<K, T> getDataStore( http://git-wip-us.apache.org/repos/asf/gora/blob/ed808108/gora-jcache/pom.xml ---------------------------------------------------------------------- diff --git a/gora-jcache/pom.xml b/gora-jcache/pom.xml index a56fce5..58ba61b 100644 --- a/gora-jcache/pom.xml +++ b/gora-jcache/pom.xml @@ -133,6 +133,7 @@ <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </dependency> + <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> @@ -150,6 +151,12 @@ <artifactId>junit</artifactId> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <scope>test</scope> + </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/gora/blob/ed808108/gora-jcache/src/main/java/org/apache/gora/jcache/query/JCacheQuery.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/main/java/org/apache/gora/jcache/query/JCacheQuery.java b/gora-jcache/src/main/java/org/apache/gora/jcache/query/JCacheQuery.java index c3d9c0c..5fb9ed9 100644 --- a/gora-jcache/src/main/java/org/apache/gora/jcache/query/JCacheQuery.java +++ b/gora-jcache/src/main/java/org/apache/gora/jcache/query/JCacheQuery.java @@ -21,6 +21,10 @@ import org.apache.gora.persistency.impl.PersistentBase; import org.apache.gora.query.impl.QueryBase; import org.apache.gora.store.DataStore; +/** + * {@link org.apache.gora.jcache.query.JCacheQuery} is the primary class + * responsible for representing a cache manipulation query. + */ public class JCacheQuery<K, T extends PersistentBase> extends QueryBase<K, T> { public JCacheQuery() { http://git-wip-us.apache.org/repos/asf/gora/blob/ed808108/gora-jcache/src/main/java/org/apache/gora/jcache/query/JCacheResult.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/main/java/org/apache/gora/jcache/query/JCacheResult.java b/gora-jcache/src/main/java/org/apache/gora/jcache/query/JCacheResult.java index 235f6ac..dfe8083 100644 --- a/gora-jcache/src/main/java/org/apache/gora/jcache/query/JCacheResult.java +++ b/gora-jcache/src/main/java/org/apache/gora/jcache/query/JCacheResult.java @@ -26,9 +26,17 @@ import org.apache.gora.persistency.impl.PersistentBase; import org.apache.gora.query.Query; import org.apache.gora.query.impl.ResultBase; import org.apache.gora.store.DataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +/** + * {@link org.apache.gora.jcache.query.JCacheResult} is the primary class + * responsible for representing result set of a cache manipulation query + * {@link org.apache.gora.jcache.query.JCacheQuery} + */ public class JCacheResult<K, T extends PersistentBase> extends ResultBase<K, T> { + private static final Logger LOG = LoggerFactory.getLogger(JCacheResult.class); private NavigableSet<K> cacheKeySet; private Iterator<K> iterator; private int current; @@ -68,6 +76,7 @@ public class JCacheResult<K, T extends PersistentBase> extends ResultBase<K, T> return false; } key = iterator.next(); + LOG.info("Results set pointer is now moved to key {}.", key); persistent = dataStore.get(key); this.current++; return true; http://git-wip-us.apache.org/repos/asf/gora/blob/ed808108/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheEntryListener.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheEntryListener.java b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheEntryListener.java index f7c9d36..1f960f5 100644 --- a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheEntryListener.java +++ b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheEntryListener.java @@ -29,6 +29,11 @@ import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryListenerException; import java.util.concurrent.ConcurrentSkipListSet; +/** + * {@link org.apache.gora.jcache.store.JCacheCacheEntryListener} is the primary class + * responsible for listening on {@link javax.cache.event.CacheEntryEvent} cache entry events + * EG:- Creation, Removal, Expiry etc of entries on caches and trigger actions as specified. + */ public class JCacheCacheEntryListener<K, T extends PersistentBase> implements CacheEntryCreatedListener<K, T>, CacheEntryRemovedListener<K, T>, CacheEntryUpdatedListener<K, T>, CacheEntryExpiredListener<K, T> { @@ -45,7 +50,7 @@ public class JCacheCacheEntryListener<K, T extends PersistentBase> throws CacheEntryListenerException { for (CacheEntryEvent<? extends K, ? extends T> event : cacheEntryEvents) { cacheEntryList.add(event.getKey()); - LOG.info("Cache entry added on key " + event.getKey().toString()); + LOG.info("Cache entry added on key {}.", event.getKey().toString()); } } @@ -54,7 +59,7 @@ public class JCacheCacheEntryListener<K, T extends PersistentBase> throws CacheEntryListenerException { for (CacheEntryEvent<? extends K, ? extends T> event : cacheEntryEvents) { cacheEntryList.remove(event.getKey()); - LOG.info("Cache entry removed on key " + event.getKey().toString()); + LOG.info("Cache entry removed on key {}.", event.getKey().toString()); } } @@ -62,7 +67,7 @@ public class JCacheCacheEntryListener<K, T extends PersistentBase> public void onUpdated(Iterable<CacheEntryEvent<? extends K, ? extends T>> cacheEntryEvents) throws CacheEntryListenerException { for (CacheEntryEvent<? extends K, ? extends T> event : cacheEntryEvents) { - LOG.info("Cache entry updated set on key " + event.getKey().toString()); + LOG.info("Cache entry updated set on key {}.", event.getKey().toString()); } } @@ -70,7 +75,7 @@ public class JCacheCacheEntryListener<K, T extends PersistentBase> public void onExpired(Iterable<CacheEntryEvent<? extends K, ? extends T>> cacheEntryEvents) throws CacheEntryListenerException { for (CacheEntryEvent<? extends K, ? extends T> event : cacheEntryEvents) { - LOG.warn("Cache entry expired on key " + event.getKey().toString()); + LOG.warn("Cache entry expired on key {}.", event.getKey().toString()); } } http://git-wip-us.apache.org/repos/asf/gora/blob/ed808108/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheEntryListenerFactory.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheEntryListenerFactory.java b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheEntryListenerFactory.java index 42b3c93..bbd30c0 100644 --- a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheEntryListenerFactory.java +++ b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheEntryListenerFactory.java @@ -23,6 +23,11 @@ import org.slf4j.LoggerFactory; import javax.cache.configuration.Factory; +/** + * {@link org.apache.gora.jcache.store.JCacheCacheEntryListenerFactory} is the primary class + * responsible for creating cache entry listeners which listens on {@link javax.cache.event.CacheEntryEvent} + * cache entry events EG:- Creation, Removal, etc of keys on caches and trigger actions as specified. + */ public class JCacheCacheEntryListenerFactory <K,T extends PersistentBase> implements Factory<JCacheCacheEntryListener<K, T>> { @@ -50,7 +55,4 @@ public class JCacheCacheEntryListenerFactory <K,T extends PersistentBase> } } - public int hashCode() { - return this.instance.hashCode(); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/ed808108/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheFactoryBuilder.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheFactoryBuilder.java b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheFactoryBuilder.java index 7e1bb72..253d345 100644 --- a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheFactoryBuilder.java +++ b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheFactoryBuilder.java @@ -21,6 +21,10 @@ import org.apache.gora.persistency.impl.PersistentBase; import org.apache.gora.store.DataStore; import javax.cache.configuration.Factory; +/** + * {@link org.apache.gora.jcache.store.JCacheCacheFactoryBuilder} is a Generic Factory + * builder that creates Factory instances which extends {@link javax.cache.configuration.Factory} + */ public class JCacheCacheFactoryBuilder { public static <K, T extends PersistentBase> Factory<JCacheCacheLoader<K,T>> http://git-wip-us.apache.org/repos/asf/gora/blob/ed808108/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoader.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoader.java b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoader.java index 74e910a..daf0409 100644 --- a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoader.java +++ b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoader.java @@ -27,7 +27,10 @@ import javax.cache.integration.CacheLoaderException; import java.util.HashMap; import java.util.Map; - +/** + * {@link org.apache.gora.jcache.store.JCacheCacheLoader} is the primary class + * responsible for loading data beans from persistency dataStore to in memory cache. + */ public class JCacheCacheLoader<K, T extends PersistentBase> implements CacheLoader<K, T> { private static final Logger LOG = LoggerFactory.getLogger(JCacheCacheLoader.class); @@ -42,7 +45,7 @@ public class JCacheCacheLoader<K, T extends PersistentBase> implements CacheLoad T persistent = null; try { persistent = dataStore.get(key); - LOG.info("Loaded data bean from persistent datastore on key " + key.toString()); + LOG.info("Loaded data bean from persistent datastore on key {}.", key.toString()); } catch (CacheLoaderException ex) { throw ex; } @@ -54,7 +57,7 @@ public class JCacheCacheLoader<K, T extends PersistentBase> implements CacheLoad Map<K, T> loaded = new HashMap<K, T>(); for (K key : keys) { T persistent = dataStore.get(key); - LOG.info("Loaded data bean from persistent datastore on key " + key.toString()); + LOG.info("Loaded data bean from persistent datastore on key {}.", key.toString()); if (persistent != null) { loaded.put(key, persistent); } http://git-wip-us.apache.org/repos/asf/gora/blob/ed808108/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoaderFactory.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoaderFactory.java b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoaderFactory.java index b1f59d2..0a86ebe 100644 --- a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoaderFactory.java +++ b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoaderFactory.java @@ -23,6 +23,11 @@ import org.slf4j.LoggerFactory; import javax.cache.configuration.Factory; +/** + * {@link org.apache.gora.jcache.store.JCacheCacheLoaderFactory} is the primary class + * responsible for creating cache loader {@link javax.cache.integration.CacheLoader} instances which itself + * loads data beans from persistency dataStore to in memory cache. + */ public class JCacheCacheLoaderFactory<K, T extends PersistentBase> implements Factory<JCacheCacheLoader<K,T>> { http://git-wip-us.apache.org/repos/asf/gora/blob/ed808108/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriter.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriter.java b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriter.java index e9c4373..e1e5ae8 100644 --- a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriter.java +++ b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriter.java @@ -28,6 +28,10 @@ import javax.cache.integration.CacheWriterException; import java.util.Collection; import java.util.Iterator; +/** + * {@link org.apache.gora.jcache.store.JCacheCacheWriter} is the primary class + * responsible for writing data beans to persistency dataStore from in memory cache. + */ public class JCacheCacheWriter<K, T extends PersistentBase> implements CacheWriter<K, T> { private static final Logger LOG = LoggerFactory.getLogger(JCacheCacheWriter.class); @@ -41,7 +45,7 @@ public class JCacheCacheWriter<K, T extends PersistentBase> implements CacheWrit public void write(Cache.Entry<? extends K, ? extends T> entry) throws CacheWriterException { dataStore.put(entry.getKey(), entry.getValue()); - LOG.info("Written data bean to persistent datastore on key " + entry.getKey().toString()); + LOG.info("Written data bean to persistent datastore on key {}.", entry.getKey().toString()); } @Override @@ -57,7 +61,7 @@ public class JCacheCacheWriter<K, T extends PersistentBase> implements CacheWrit @Override public void delete(Object key) throws CacheWriterException { dataStore.delete((K) key); - LOG.info("Deleted data bean from persistent datastore on key " + key.toString()); + LOG.info("Deleted data bean from persistent datastore on key {}.", key.toString()); } @Override http://git-wip-us.apache.org/repos/asf/gora/blob/ed808108/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriterFactory.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriterFactory.java b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriterFactory.java index 7d883d7..ef537a7 100644 --- a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriterFactory.java +++ b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriterFactory.java @@ -23,6 +23,11 @@ import org.slf4j.LoggerFactory; import javax.cache.configuration.Factory; +/** + * {@link org.apache.gora.jcache.store.JCacheCacheWriterFactory} is the primary class + * responsible for creating cache writer {@link javax.cache.integration.CacheWriter} instances which itself + * writes data beans to persistency dataStore from in memory cache. + */ public class JCacheCacheWriterFactory<K, T extends PersistentBase> implements Factory<JCacheCacheWriter<K,T>> { private static final Logger LOG = LoggerFactory.getLogger(JCacheCacheWriterFactory.class); http://git-wip-us.apache.org/repos/asf/gora/blob/ed808108/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheStore.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheStore.java b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheStore.java index 7d3622b..6e58e55 100644 --- a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheStore.java +++ b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheStore.java @@ -20,18 +20,24 @@ package org.apache.gora.jcache.store; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.Iterator; +import java.util.Properties; import java.util.List; import java.util.Arrays; import java.util.ArrayList; -import java.util.Properties; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.TimeUnit; import com.hazelcast.cache.HazelcastCachingProvider; import com.hazelcast.cache.ICache; +import com.hazelcast.client.HazelcastClient; +import com.hazelcast.client.config.ClientConfig; +import com.hazelcast.client.config.XmlClientConfigBuilder; import com.hazelcast.config.CacheConfig; -import com.hazelcast.config.EvictionConfig; +import com.hazelcast.config.Config; +import com.hazelcast.config.ClasspathXmlConfig; import com.hazelcast.config.EvictionPolicy; +import com.hazelcast.config.EvictionConfig; import com.hazelcast.config.InMemoryFormat; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; @@ -54,6 +60,7 @@ import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.cache.Cache; import javax.cache.CacheManager; import javax.cache.Caching; import javax.cache.configuration.FactoryBuilder; @@ -65,10 +72,19 @@ import javax.cache.expiry.TouchedExpiryPolicy; import javax.cache.expiry.Duration; import javax.cache.spi.CachingProvider; +/** + * {@link org.apache.gora.jcache.store.JCacheStore} is the primary class + * responsible for GORA CRUD operations on Hazelcast Caches. This class can be think + * of as caching layer that can is wrapped over any persistency dataStore implementations + * which extends {@link org.apache.gora.store.DataStore}. This class delegates + * most operations to it s persistency dataStore. Hazelcast cache implementation is based on + * JCache JSR 107 specification. + */ public class JCacheStore<K, T extends PersistentBase> extends DataStoreBase<K, T> { private static final String GORA_DEFAULT_JCACHE_NAMESPACE = "gora.jcache.namespace"; private static final String GORA_DEFAULT_JCACHE_PROVIDER_KEY = "gora.datastore.jcache.provider"; + private static final String GORA_DEFAULT_JCACHE_HAZELCAST_CONFIG_KEY = "gora.datastore.jcache.hazelcast.config"; private static final String JCACHE_READ_THROUGH_PROPERTY_KEY = "jcache.read.through.enable"; private static final String JCACHE_WRITE_THROUGH_PROPERTY_KEY = "jcache.write.through.enable"; private static final String JCACHE_STORE_BY_VALUE_PROPERTY_KEY = "jcache.store.by.value.enable"; @@ -89,6 +105,7 @@ public class JCacheStore<K, T extends PersistentBase> extends DataStoreBase<K, T private static final String HAZELCAST_CACHE_OBJECT_IN_MEMORY_FORMAT_IDENTIFIER = "OBJECT"; private static final String HAZELCAST_CACHE_NATIVE_IN_MEMORY_FORMAT_IDENTIFIER = "NATIVE"; private static final String JCACHE_AUTO_CREATE_CACHE_PROPERTY_KEY = "jcache.auto.create.cache"; + private static final String HAZELCAST_SERVER_CACHE_PROVIDER_IDENTIFIER = "Server"; private static final Logger LOG = LoggerFactory.getLogger(JCacheStore.class); private ICache<K, T> cache; private CacheManager manager; @@ -138,16 +155,28 @@ public class JCacheStore<K, T extends PersistentBase> extends DataStoreBase<K, T this.persistentDataStore = DataStoreFactory.getDataStore(keyClass, persistentClass, new Configuration()); } catch (GoraException ex) { - LOG.error("Couldn't initialize persistent DataStore"); + LOG.error("Couldn't initialize persistent DataStore.", ex); + } + if (properties.getProperty(GORA_DEFAULT_JCACHE_PROVIDER_KEY) + .contains(HAZELCAST_SERVER_CACHE_PROVIDER_IDENTIFIER)) { + Config config = new ClasspathXmlConfig(properties.getProperty(GORA_DEFAULT_JCACHE_HAZELCAST_CONFIG_KEY)); + hazelcastInstance = Hazelcast.newHazelcastInstance(config); + } else { + try { + ClientConfig config = + new XmlClientConfigBuilder(properties.getProperty(GORA_DEFAULT_JCACHE_HAZELCAST_CONFIG_KEY)).build(); + hazelcastInstance = HazelcastClient.newHazelcastClient(config); + } catch (IOException ex) { + LOG.error("Couldn't locate the client side cache provider configuration.", ex); + } } - hazelcastInstance = Hazelcast.newHazelcastInstance(); Properties providerProperties = new Properties(); providerProperties.setProperty(HazelcastCachingProvider.HAZELCAST_INSTANCE_NAME, hazelcastInstance.getName()); try { manager = cachingProvider.getCacheManager(new URI(goraCacheNamespace), null, providerProperties); } catch (URISyntaxException ex) { - LOG.error("Couldn't initialize cache manager to bounded hazelcast instance"); + LOG.error("Couldn't initialize cache manager to bounded hazelcast instance.", ex); manager = cachingProvider.getCacheManager(); } cacheEntryList = new ConcurrentSkipListSet<>(); @@ -225,9 +254,7 @@ public class JCacheStore<K, T extends PersistentBase> extends DataStoreBase<K, T new MutableCacheEntryListenerConfiguration<>( JCacheCacheFactoryBuilder .factoryOfEntryListener(new JCacheCacheEntryListener<K, T>(cacheEntryList)), - null, true, true - ) - ); + null, true, true)); if (properties.getProperty(JCACHE_AUTO_CREATE_CACHE_PROPERTY_KEY) != null) { Boolean createCache = Boolean.valueOf(properties.getProperty(JCACHE_AUTO_CREATE_CACHE_PROPERTY_KEY)); if (createCache) { @@ -235,8 +262,14 @@ public class JCacheStore<K, T extends PersistentBase> extends DataStoreBase<K, T cacheConfig).unwrap(ICache.class); } } else { - cache = manager.createCache(persistentClass.getSimpleName(), - cacheConfig).unwrap(ICache.class); + if (manager.getCache(super.getPersistentClass().getSimpleName(), keyClass, persistentClass) == null) { + cache = manager.createCache(persistentClass.getSimpleName(), + cacheConfig).unwrap(ICache.class); + } else { + cache = manager.getCache(super.getPersistentClass().getSimpleName(), + keyClass, persistentClass).unwrap(ICache.class); + this.populateLocalCacheEntrySet(cache.iterator()); + } } LOG.info("JCache Gora datastore initialized successfully."); } @@ -254,8 +287,8 @@ public class JCacheStore<K, T extends PersistentBase> extends DataStoreBase<K, T cacheConfig).unwrap(ICache.class); } persistentDataStore.createSchema(); - LOG.info("Created schema on persistent store and initialized cache for persistent bean " - + super.getPersistentClass().getSimpleName()); + LOG.info("Created schema on persistent store and initialized cache for persistent bean {}." + , super.getPersistentClass().getSimpleName()); } @Override @@ -263,8 +296,8 @@ public class JCacheStore<K, T extends PersistentBase> extends DataStoreBase<K, T cacheEntryList.clear(); manager.destroyCache(super.getPersistentClass().getSimpleName()); persistentDataStore.deleteSchema(); - LOG.info("Deleted schema on persistent store and destroyed cache for persistent bean " - + super.getPersistentClass().getSimpleName()); + LOG.info("Deleted schema on persistent store and destroyed cache for persistent bean {}." + , super.getPersistentClass().getSimpleName()); } @Override @@ -323,10 +356,10 @@ public class JCacheStore<K, T extends PersistentBase> extends DataStoreBase<K, T } } } - LOG.info("JCache Gora datastore deleled " + deletedRows + " rows from Persistent datastore"); + LOG.info("JCache Gora datastore deleled {} rows from Persistent datastore.", deletedRows); return deletedRows; } catch (Exception e) { - LOG.error("Exception occured while deleting entries from JCache Gora datastore. Hence returning 0"); + LOG.error("Exception occurred while deleting entries from JCache Gora datastore. Hence returning 0.", e); return 0; } } @@ -350,7 +383,7 @@ public class JCacheStore<K, T extends PersistentBase> extends DataStoreBase<K, T try { cacheEntrySubList = (ConcurrentSkipListSet<K>) cacheEntryList.subSet(startKey, true, endKey, true); } catch (NullPointerException npe) { - LOG.error("NPE occurred while executing the query for JCacheStore. Hence returning empty entry set."); + LOG.error("NPE occurred while executing the query for JCacheStore. Hence returning empty entry set.", npe); return new JCacheResult<>(this, query, new ConcurrentSkipListSet<K>()); } return new JCacheResult<>(this, query, cacheEntrySubList); @@ -383,9 +416,10 @@ public class JCacheStore<K, T extends PersistentBase> extends DataStoreBase<K, T partitions.add(partition); } } catch (java.lang.Exception ex) { - LOG.error("Exception occurred while partitioning the query based on Hazelcast partitions."); + LOG.error("Exception occurred while partitioning the query based on Hazelcast partitions.", ex); return null; } + LOG.info("Query is partitioned to {} number of partitions.", partitions.size()); return partitions; } @@ -400,13 +434,22 @@ public class JCacheStore<K, T extends PersistentBase> extends DataStoreBase<K, T flush(); cacheEntryList.clear(); if (!cache.isDestroyed() && !manager.isClosed()) { - cache.destroy(); + cache.close(); } if (!manager.isClosed()) { manager.close(); } + hazelcastInstance.shutdown(); persistentDataStore.close(); LOG.info("JCache Gora datastore destroyed successfully."); } + private void populateLocalCacheEntrySet(Iterator<Cache.Entry<K, T>> cacheEntryIterator) { + cacheEntryList.clear(); + while (cacheEntryIterator.hasNext()) { + cacheEntryList.add(cacheEntryIterator.next().getKey()); + } + LOG.info("Populated local cache entry set with respect to remote cache provider."); + } + } http://git-wip-us.apache.org/repos/asf/gora/blob/ed808108/gora-jcache/src/test/java/org/apache/gora/jcache/GoraHazelcastTestDriver.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/test/java/org/apache/gora/jcache/GoraHazelcastTestDriver.java b/gora-jcache/src/test/java/org/apache/gora/jcache/GoraHazelcastTestDriver.java new file mode 100644 index 0000000..7918406 --- /dev/null +++ b/gora-jcache/src/test/java/org/apache/gora/jcache/GoraHazelcastTestDriver.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.jcache; + +import org.apache.gora.GoraTestDriver; +import org.apache.gora.examples.generated.WebPage; +import org.apache.gora.jcache.store.JCacheStore; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.store.DataStore; +import org.apache.gora.util.GoraException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public class GoraHazelcastTestDriver extends GoraTestDriver { + + private static Logger log = LoggerFactory.getLogger(GoraHazelcastTestDriver.class); + private JCacheStore<String, WebPage> serverCacheProvider; + private static final String GORA_DEFAULT_JCACHE_PROVIDER_KEY = "gora.datastore.jcache.provider"; + private static final String PROVIDER = "com.hazelcast.cache.impl.HazelcastServerCachingProvider"; + private static final String GORA_DEFAULT_JCACHE_HAZELCAST_CONFIG_KEY = "gora.datastore.jcache.hazelcast.config"; + private static final String CONFIG = "hazelcast.xml"; + public static final String GORA_DEFAULT_DATASTORE_KEY = "gora.datastore.default"; + public static final String MEMSTORE = "org.apache.gora.memory.store.MemStore"; + + public GoraHazelcastTestDriver() { + super(JCacheStore.class); + } + + @Override + public void setUpClass() throws Exception { + super.setUpClass(); + log.info("Starting Hazelcast server side cache provider."); + Properties properties = new Properties(); + properties.setProperty(GORA_DEFAULT_JCACHE_PROVIDER_KEY, PROVIDER); + properties.setProperty(GORA_DEFAULT_JCACHE_HAZELCAST_CONFIG_KEY, CONFIG); + properties.setProperty(GORA_DEFAULT_DATASTORE_KEY, MEMSTORE); + serverCacheProvider = new JCacheStore(); + serverCacheProvider.initialize(String.class, WebPage.class, properties); + } + + @Override + public void tearDownClass() throws Exception { + super.tearDownClass(); + log.info("Stopping Hazelcast server side cache provider."); + serverCacheProvider.close(); + } + + @Override + public <K, T extends Persistent> DataStore<K, T> + createDataStore(Class<K> keyClass, Class<T> persistentClass) throws GoraException { + JCacheStore store = (JCacheStore) super.createDataStore(keyClass, persistentClass); + return store; + } + +} http://git-wip-us.apache.org/repos/asf/gora/blob/ed808108/gora-jcache/src/test/java/org/apache/gora/jcache/mapreduce/JCacheStoreMapReduceTest.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/test/java/org/apache/gora/jcache/mapreduce/JCacheStoreMapReduceTest.java b/gora-jcache/src/test/java/org/apache/gora/jcache/mapreduce/JCacheStoreMapReduceTest.java new file mode 100644 index 0000000..2e9edaf --- /dev/null +++ b/gora-jcache/src/test/java/org/apache/gora/jcache/mapreduce/JCacheStoreMapReduceTest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.jcache.mapreduce; + +import org.apache.gora.jcache.GoraHazelcastTestDriver; +import org.apache.gora.mapreduce.DataStoreMapReduceTestBase; +import org.apache.gora.examples.generated.WebPage; +import org.apache.gora.store.DataStore; +import org.apache.gora.store.DataStoreFactory; +import org.apache.hadoop.conf.Configuration; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; + +public class JCacheStoreMapReduceTest extends DataStoreMapReduceTestBase { + + private GoraHazelcastTestDriver driver; + + public JCacheStoreMapReduceTest() throws IOException { + super(); + driver = new GoraHazelcastTestDriver(); + } + + @Override + @Before + public void setUp() throws Exception { + driver.setUpClass(); + super.setUp(); + } + + @Override + @After + public void tearDown() throws Exception { + super.tearDown(); + driver.tearDownClass(); + } + + @Override + protected DataStore<String, WebPage> createWebPageDataStore() throws IOException { + try { + return DataStoreFactory.getDataStore(String.class, WebPage.class, new Configuration(), true); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/ed808108/gora-jcache/src/test/java/org/apache/gora/jcache/store/JCacheGoraDataStoreTest.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/test/java/org/apache/gora/jcache/store/JCacheGoraDataStoreTest.java b/gora-jcache/src/test/java/org/apache/gora/jcache/store/JCacheGoraDataStoreTest.java index 51eb086..dc81ee5 100644 --- a/gora-jcache/src/test/java/org/apache/gora/jcache/store/JCacheGoraDataStoreTest.java +++ b/gora-jcache/src/test/java/org/apache/gora/jcache/store/JCacheGoraDataStoreTest.java @@ -21,6 +21,7 @@ package org.apache.gora.jcache.store; import org.apache.gora.examples.WebPageDataCreator; import org.apache.gora.examples.generated.Employee; import org.apache.gora.examples.generated.WebPage; +import org.apache.gora.jcache.GoraHazelcastTestDriver; import org.apache.gora.persistency.BeanFactory; import org.apache.gora.persistency.impl.BeanFactoryImpl; import org.apache.gora.query.Query; @@ -29,8 +30,10 @@ import org.apache.gora.store.DataStoreFactory; import org.apache.gora.store.DataStoreTestBase; import org.apache.gora.store.DataStoreTestUtil; import org.apache.hadoop.conf.Configuration; -import org.junit.After; +import org.junit.BeforeClass; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.After; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +54,17 @@ public class JCacheGoraDataStoreTest extends DataStoreTestBase { private static final int NUM_KEYS = 4; private Configuration conf = new Configuration(); + @BeforeClass + public static void setUpClass() throws Exception { + setTestDriver(new GoraHazelcastTestDriver()); + DataStoreTestBase.setUpClass(); + } + + @AfterClass + public static void tearDownClass() throws Exception { + DataStoreTestBase.tearDownClass(); + } + @Before public void setUp() throws Exception { super.setUp(); @@ -59,7 +73,7 @@ public class JCacheGoraDataStoreTest extends DataStoreTestBase { @After public void tearDown() throws Exception { super.tearDown(); - //mandatory to clean up hazelcast instances + //clean up for client side cache provider instances //this is not handled at super class level super.employeeStore.close(); super.webPageStore.close(); @@ -107,7 +121,6 @@ public class JCacheGoraDataStoreTest extends DataStoreTestBase { @Test public void testGetWithFields() throws Exception { - DataStore<String, WebPage> store = super.webPageStore; BeanFactory<String, WebPage> beanFactory = new BeanFactoryImpl<>(String.class, WebPage.class); store.setBeanFactory(beanFactory); @@ -128,7 +141,6 @@ public class JCacheGoraDataStoreTest extends DataStoreTestBase { @Test public void testDeleteByQueryFields() throws Exception { - DataStore<String, WebPage> store = super.webPageStore; BeanFactory<String, WebPage> beanFactory = new BeanFactoryImpl<>(String.class, WebPage.class); store.setBeanFactory(beanFactory); @@ -189,6 +201,6 @@ public class JCacheGoraDataStoreTest extends DataStoreTestBase { assertTrue(page.getParsedContent().size() > 0); } } - } + } http://git-wip-us.apache.org/repos/asf/gora/blob/ed808108/gora-jcache/src/test/resources/gora.properties ---------------------------------------------------------------------- diff --git a/gora-jcache/src/test/resources/gora.properties b/gora-jcache/src/test/resources/gora.properties index 929115a..b740466 100644 --- a/gora-jcache/src/test/resources/gora.properties +++ b/gora-jcache/src/test/resources/gora.properties @@ -14,5 +14,6 @@ # limitations under the License. gora.cache.datastore.default=org.apache.gora.jcache.store.JCacheStore -gora.datastore.jcache.provider=com.hazelcast.cache.impl.HazelcastServerCachingProvider +gora.datastore.jcache.provider=com.hazelcast.client.cache.impl.HazelcastClientCachingProvider +gora.datastore.jcache.hazelcast.config=hazelcast-client.xml gora.datastore.default=org.apache.gora.memory.store.MemStore http://git-wip-us.apache.org/repos/asf/gora/blob/ed808108/gora-jcache/src/test/resources/hazelcast-client.xml ---------------------------------------------------------------------- diff --git a/gora-jcache/src/test/resources/hazelcast-client.xml b/gora-jcache/src/test/resources/hazelcast-client.xml new file mode 100644 index 0000000..93f21e2 --- /dev/null +++ b/gora-jcache/src/test/resources/hazelcast-client.xml @@ -0,0 +1,31 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + Hazelcast client side cache provider configuration. +--> + +<hazelcast-client xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.hazelcast.com/schema/client-config + http://www.hazelcast.com/schema/client-config/hazelcast-client-config-3.6.xsd" + xmlns="http://www.hazelcast.com/schema/client-config"> + <network> + <cluster-members> + <address>127.0.0.1</address> + </cluster-members> + </network> +</hazelcast-client> http://git-wip-us.apache.org/repos/asf/gora/blob/ed808108/gora-jcache/src/test/resources/hazelcast.xml ---------------------------------------------------------------------- diff --git a/gora-jcache/src/test/resources/hazelcast.xml b/gora-jcache/src/test/resources/hazelcast.xml index dcf138e..9838710 100755 --- a/gora-jcache/src/test/resources/hazelcast.xml +++ b/gora-jcache/src/test/resources/hazelcast.xml @@ -16,18 +16,17 @@ --> <!-- - Hazelcast instance configuration hazelcast.xml. + Hazelcast server side cache provider configuration. --> <hazelcast xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.hazelcast.com/schema/config http://www.hazelcast.com/schema/config/hazelcast-config-3.6.xsd" xmlns="http://www.hazelcast.com/schema/config"> - <network> <join> <multicast enabled="false"/> - <tcp-ip enabled="false"> + <tcp-ip enabled="true"> <member>127.0.0.1</member> </tcp-ip> </join> http://git-wip-us.apache.org/repos/asf/gora/blob/ed808108/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ebb54b4..6522ac0 100644 --- a/pom.xml +++ b/pom.xml @@ -717,7 +717,7 @@ <!-- JCache Dependencies --> <jsr107.api.version>1.0.0</jsr107.api.version> - <hazelcast.version>3.6.3</hazelcast.version> + <hazelcast.version>3.6.4</hazelcast.version> <!-- Testing Dependencies --> <junit.version>4.10</junit.version>
