Repository: gora Updated Branches: refs/heads/master ed8081081 -> 61c25fefb
GORA-409 tutorial Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/a02c5dd9 Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/a02c5dd9 Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/a02c5dd9 Branch: refs/heads/master Commit: a02c5dd9ff307cafdd89ff57c1fdd7f02cd06867 Parents: ed80810 Author: Kevin <[email protected]> Authored: Mon Aug 15 01:11:56 2016 +0530 Committer: Kevin <[email protected]> Committed: Mon Aug 15 01:11:56 2016 +0530 ---------------------------------------------------------------------- .../apache/gora/jcache/query/package-info.java | 21 + .../jcache/store/JCacheCacheEntryListener.java | 25 +- .../store/JCacheCacheEntryListenerFactory.java | 12 +- .../jcache/store/JCacheCacheFactoryBuilder.java | 7 +- .../gora/jcache/store/JCacheCacheLoader.java | 10 +- .../jcache/store/JCacheCacheLoaderFactory.java | 6 +- .../gora/jcache/store/JCacheCacheWriter.java | 8 +- .../jcache/store/JCacheCacheWriterFactory.java | 12 +- .../apache/gora/jcache/store/JCacheStore.java | 207 +++++---- .../apache/gora/jcache/store/package-info.java | 29 ++ .../gora/jcache/GoraHazelcastTestDriver.java | 5 + .../gora/jcache/mapreduce/package-info.java | 20 + .../apache/gora/jcache/store/package-info.java | 20 + gora-jcache/src/test/resources/gora.properties | 3 + gora-tutorial/conf/gora.properties | 6 + gora-tutorial/conf/hazelcast-client.xml | 31 ++ gora-tutorial/conf/hazelcast.xml | 34 ++ gora-tutorial/pom.xml | 5 + .../tutorial/log/DistributedLogManager.java | 461 +++++++++++++++++++ .../src/main/resources/log4j.properties | 42 ++ pom.xml | 12 + 21 files changed, 856 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/a02c5dd9/gora-jcache/src/main/java/org/apache/gora/jcache/query/package-info.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/main/java/org/apache/gora/jcache/query/package-info.java b/gora-jcache/src/main/java/org/apache/gora/jcache/query/package-info.java new file mode 100644 index 0000000..6d6cd1d --- /dev/null +++ b/gora-jcache/src/main/java/org/apache/gora/jcache/query/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains all the JCache store query representation class as well as Result set representing class + * when query is executed over the JCache dataStore. + */ +package org.apache.gora.jcache.query; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/a02c5dd9/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheEntryListener.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheEntryListener.java b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheEntryListener.java index 1f960f5..5891048 100644 --- a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheEntryListener.java +++ b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheEntryListener.java @@ -36,10 +36,11 @@ import java.util.concurrent.ConcurrentSkipListSet; */ public class JCacheCacheEntryListener<K, T extends PersistentBase> implements CacheEntryCreatedListener<K, T>, - CacheEntryRemovedListener<K, T>, CacheEntryUpdatedListener<K, T>, CacheEntryExpiredListener<K, T> { + CacheEntryRemovedListener<K, T>, CacheEntryUpdatedListener<K, T>, + CacheEntryExpiredListener<K, T>, java.io.Serializable { private static final Logger LOG = LoggerFactory.getLogger(JCacheCacheEntryListener.class); - private ConcurrentSkipListSet<K> cacheEntryList; + private transient ConcurrentSkipListSet<K> cacheEntryList; public JCacheCacheEntryListener(ConcurrentSkipListSet cacheEntryList) { this.cacheEntryList = cacheEntryList; @@ -48,6 +49,10 @@ public class JCacheCacheEntryListener<K, T extends PersistentBase> @Override public void onCreated(Iterable<CacheEntryEvent<? extends K, ? extends T>> cacheEntryEvents) throws CacheEntryListenerException { + //get rid execution of listener chain/executing only one initialized + if (cacheEntryList == null) { + return; + } for (CacheEntryEvent<? extends K, ? extends T> event : cacheEntryEvents) { cacheEntryList.add(event.getKey()); LOG.info("Cache entry added on key {}.", event.getKey().toString()); @@ -57,6 +62,10 @@ public class JCacheCacheEntryListener<K, T extends PersistentBase> @Override public void onRemoved(Iterable<CacheEntryEvent<? extends K, ? extends T>> cacheEntryEvents) throws CacheEntryListenerException { + //get rid execution of listener chain/executing only one initialized + if (cacheEntryList == null) { + return; + } for (CacheEntryEvent<? extends K, ? extends T> event : cacheEntryEvents) { cacheEntryList.remove(event.getKey()); LOG.info("Cache entry removed on key {}.", event.getKey().toString()); @@ -66,6 +75,10 @@ public class JCacheCacheEntryListener<K, T extends PersistentBase> @Override public void onUpdated(Iterable<CacheEntryEvent<? extends K, ? extends T>> cacheEntryEvents) throws CacheEntryListenerException { + //get rid execution of listener chain/executing only one initialized + if (cacheEntryList == null) { + return; + } for (CacheEntryEvent<? extends K, ? extends T> event : cacheEntryEvents) { LOG.info("Cache entry updated set on key {}.", event.getKey().toString()); } @@ -74,9 +87,17 @@ public class JCacheCacheEntryListener<K, T extends PersistentBase> @Override public void onExpired(Iterable<CacheEntryEvent<? extends K, ? extends T>> cacheEntryEvents) throws CacheEntryListenerException { + //get rid execution of listener chain/executing only one initialized + if (cacheEntryList == null) { + return; + } for (CacheEntryEvent<? extends K, ? extends T> event : cacheEntryEvents) { LOG.warn("Cache entry expired on key {}.", event.getKey().toString()); } } + public void setCacheEntryList(ConcurrentSkipListSet<K> cacheEntryList) { + this.cacheEntryList = cacheEntryList; + } + } http://git-wip-us.apache.org/repos/asf/gora/blob/a02c5dd9/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheEntryListenerFactory.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheEntryListenerFactory.java b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheEntryListenerFactory.java index bbd30c0..abc283d 100644 --- a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheEntryListenerFactory.java +++ b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheEntryListenerFactory.java @@ -28,12 +28,12 @@ import javax.cache.configuration.Factory; * responsible for creating cache entry listeners which listens on {@link javax.cache.event.CacheEntryEvent} * cache entry events EG:- Creation, Removal, etc of keys on caches and trigger actions as specified. */ -public class JCacheCacheEntryListenerFactory <K,T extends PersistentBase> +public class JCacheCacheEntryListenerFactory<K, T extends PersistentBase> implements Factory<JCacheCacheEntryListener<K, T>> { - private static final Logger LOG = LoggerFactory.getLogger(JCacheCacheEntryListenerFactory.class); public static final long serialVersionUID = 201305101634L; - private transient JCacheCacheEntryListener<K, T> instance; + private static final Logger LOG = LoggerFactory.getLogger(JCacheCacheEntryListenerFactory.class); + private JCacheCacheEntryListener<K, T> instance; public JCacheCacheEntryListenerFactory(JCacheCacheEntryListener<K, T> instance) { LOG.info("JCache cache entry listener factory initialized successfully."); @@ -45,10 +45,10 @@ public class JCacheCacheEntryListenerFactory <K,T extends PersistentBase> } public boolean equals(Object other) { - if(this == other) { + if (this == other) { return true; - } else if(other != null && this.getClass() == other.getClass()) { - JCacheCacheEntryListenerFactory that = (JCacheCacheEntryListenerFactory)other; + } else if (other != null && this.getClass() == other.getClass()) { + JCacheCacheEntryListenerFactory that = (JCacheCacheEntryListenerFactory) other; return this.instance.equals(that.instance); } else { return false; http://git-wip-us.apache.org/repos/asf/gora/blob/a02c5dd9/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheFactoryBuilder.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheFactoryBuilder.java b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheFactoryBuilder.java index 253d345..75c3f36 100644 --- a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheFactoryBuilder.java +++ b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheFactoryBuilder.java @@ -19,6 +19,7 @@ package org.apache.gora.jcache.store; import org.apache.gora.persistency.impl.PersistentBase; import org.apache.gora.store.DataStore; + import javax.cache.configuration.Factory; /** @@ -27,17 +28,17 @@ import javax.cache.configuration.Factory; */ public class JCacheCacheFactoryBuilder { - public static <K, T extends PersistentBase> Factory<JCacheCacheLoader<K,T>> + public static <K, T extends PersistentBase> Factory<JCacheCacheLoader<K, T>> factoryOfCacheLoader(DataStore<K, T> dataStore) { return new JCacheCacheLoaderFactory<>(new JCacheCacheLoader<>(dataStore)); } - public static <K, T extends PersistentBase> Factory<JCacheCacheWriter<K,T>> + public static <K, T extends PersistentBase> Factory<JCacheCacheWriter<K, T>> factoryOfCacheWriter(DataStore<K, T> dataStore) { return new JCacheCacheWriterFactory<>(new JCacheCacheWriter<>(dataStore)); } - public static <K,T extends PersistentBase> Factory<JCacheCacheEntryListener<K, T>> + public static <K, T extends PersistentBase> Factory<JCacheCacheEntryListener<K, T>> factoryOfEntryListener(JCacheCacheEntryListener<K, T> instance) { return new JCacheCacheEntryListenerFactory<>(instance); } http://git-wip-us.apache.org/repos/asf/gora/blob/a02c5dd9/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoader.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoader.java b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoader.java index daf0409..fed2ba2 100644 --- a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoader.java +++ b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoader.java @@ -31,13 +31,13 @@ import java.util.Map; * {@link org.apache.gora.jcache.store.JCacheCacheLoader} is the primary class * responsible for loading data beans from persistency dataStore to in memory cache. */ -public class JCacheCacheLoader<K, T extends PersistentBase> implements CacheLoader<K, T> { +public class JCacheCacheLoader<K, T extends PersistentBase> implements CacheLoader<K, T>, java.io.Serializable { private static final Logger LOG = LoggerFactory.getLogger(JCacheCacheLoader.class); - private DataStore<K, T> dataStore; + private transient DataStore<K, T> dataStore; public JCacheCacheLoader(DataStore<K, T> dataStore) { - this.dataStore = dataStore; + this.dataStore = dataStore; } @Override @@ -65,4 +65,8 @@ public class JCacheCacheLoader<K, T extends PersistentBase> implements CacheLoad return loaded; } + public void setDataStore(DataStore<K, T> dataStore) { + this.dataStore = dataStore; + } + } http://git-wip-us.apache.org/repos/asf/gora/blob/a02c5dd9/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoaderFactory.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoaderFactory.java b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoaderFactory.java index 0a86ebe..2b82f1f 100644 --- a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoaderFactory.java +++ b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheLoaderFactory.java @@ -29,11 +29,11 @@ import javax.cache.configuration.Factory; * loads data beans from persistency dataStore to in memory cache. */ public class JCacheCacheLoaderFactory<K, T extends PersistentBase> - implements Factory<JCacheCacheLoader<K,T>> { + implements Factory<JCacheCacheLoader<K, T>> { - private static final Logger LOG = LoggerFactory.getLogger(JCacheCacheLoaderFactory.class); public static final long serialVersionUID = 201305101626L; - private transient JCacheCacheLoader<K, T> instance; + private static final Logger LOG = LoggerFactory.getLogger(JCacheCacheLoaderFactory.class); + private JCacheCacheLoader<K, T> instance; public JCacheCacheLoaderFactory(JCacheCacheLoader<K, T> instance) { LOG.info("JCache cache entry loader factory initialized successfully."); http://git-wip-us.apache.org/repos/asf/gora/blob/a02c5dd9/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriter.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriter.java b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriter.java index e1e5ae8..b7a95c3 100644 --- a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriter.java +++ b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriter.java @@ -32,10 +32,10 @@ import java.util.Iterator; * {@link org.apache.gora.jcache.store.JCacheCacheWriter} is the primary class * responsible for writing data beans to persistency dataStore from in memory cache. */ -public class JCacheCacheWriter<K, T extends PersistentBase> implements CacheWriter<K, T> { +public class JCacheCacheWriter<K, T extends PersistentBase> implements CacheWriter<K, T>, java.io.Serializable { private static final Logger LOG = LoggerFactory.getLogger(JCacheCacheWriter.class); - private DataStore<K, T> dataStore; + private transient DataStore<K, T> dataStore; public JCacheCacheWriter(DataStore<K, T> dataStore) { this.dataStore = dataStore; @@ -73,4 +73,8 @@ public class JCacheCacheWriter<K, T extends PersistentBase> implements CacheWrit } } + public void setDataStore(DataStore<K, T> dataStore) { + this.dataStore = dataStore; + } + } http://git-wip-us.apache.org/repos/asf/gora/blob/a02c5dd9/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriterFactory.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriterFactory.java b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriterFactory.java index ef537a7..33c2fbb 100644 --- a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriterFactory.java +++ b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheCacheWriterFactory.java @@ -28,19 +28,19 @@ import javax.cache.configuration.Factory; * responsible for creating cache writer {@link javax.cache.integration.CacheWriter} instances which itself * writes data beans to persistency dataStore from in memory cache. */ -public class JCacheCacheWriterFactory<K, T extends PersistentBase> implements Factory<JCacheCacheWriter<K,T>> { +public class JCacheCacheWriterFactory<K, T extends PersistentBase> implements Factory<JCacheCacheWriter<K, T>> { - private static final Logger LOG = LoggerFactory.getLogger(JCacheCacheWriterFactory.class); public static final long serialVersionUID = 201205101621L; - private transient JCacheCacheWriter<K,T> instance; + private static final Logger LOG = LoggerFactory.getLogger(JCacheCacheWriterFactory.class); + private JCacheCacheWriter<K, T> instance; - public JCacheCacheWriterFactory(JCacheCacheWriter<K,T> instance) { + public JCacheCacheWriterFactory(JCacheCacheWriter<K, T> instance) { LOG.info("JCache cache writer factory initialized successfully."); this.instance = instance; } - public JCacheCacheWriter<K,T> create() { - return (JCacheCacheWriter<K,T>)this.instance; + public JCacheCacheWriter<K, T> create() { + return (JCacheCacheWriter<K, T>) this.instance; } public boolean equals(Object other) { http://git-wip-us.apache.org/repos/asf/gora/blob/a02c5dd9/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheStore.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheStore.java b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheStore.java index 6e58e55..f0c9c27 100644 --- a/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheStore.java +++ b/gora-jcache/src/main/java/org/apache/gora/jcache/store/JCacheStore.java @@ -21,9 +21,9 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Iterator; -import java.util.Properties; -import java.util.List; import java.util.Arrays; +import java.util.List; +import java.util.Properties; import java.util.ArrayList; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.TimeUnit; @@ -63,6 +63,7 @@ import org.slf4j.LoggerFactory; import javax.cache.Cache; import javax.cache.CacheManager; import javax.cache.Caching; +import javax.cache.configuration.CacheEntryListenerConfiguration; import javax.cache.configuration.FactoryBuilder; import javax.cache.configuration.MutableCacheEntryListenerConfiguration; import javax.cache.expiry.AccessedExpiryPolicy; @@ -145,9 +146,8 @@ public class JCacheStore<K, T extends PersistentBase> extends DataStoreBase<K, T @Override public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) { super.initialize(keyClass, persistentClass, properties); - CachingProvider cachingProvider = Caching.getCachingProvider( - properties.getProperty(GORA_DEFAULT_JCACHE_PROVIDER_KEY) - ); + CachingProvider cachingProvider = Caching.getCachingProvider + (properties.getProperty(GORA_DEFAULT_JCACHE_PROVIDER_KEY)); if (properties.getProperty(JCACHE_CACHE_NAMESPACE_PROPERTY_KEY) != null) { goraCacheNamespace = properties.getProperty(JCACHE_CACHE_NAMESPACE_PROPERTY_KEY); } @@ -179,97 +179,92 @@ public class JCacheStore<K, T extends PersistentBase> extends DataStoreBase<K, T LOG.error("Couldn't initialize cache manager to bounded hazelcast instance.", ex); manager = cachingProvider.getCacheManager(); } - cacheEntryList = new ConcurrentSkipListSet<>(); - cacheConfig = new CacheConfig<K, T>(); - cacheConfig.setTypes(keyClass, persistentClass); - if (properties.getProperty(JCACHE_READ_THROUGH_PROPERTY_KEY) != null) { - cacheConfig.setReadThrough(Boolean.valueOf(properties.getProperty(JCACHE_READ_THROUGH_PROPERTY_KEY))); - } else { - cacheConfig.setReadThrough(true); - } - if (properties.getProperty(JCACHE_WRITE_THROUGH_PROPERTY_KEY) != null) { - cacheConfig.setWriteThrough(Boolean.valueOf(properties.getProperty(JCACHE_WRITE_THROUGH_PROPERTY_KEY))); - } else { - cacheConfig.setWriteThrough(true); - } - if (properties.getProperty(JCACHE_STORE_BY_VALUE_PROPERTY_KEY) != null) { - cacheConfig.setStoreByValue(Boolean.valueOf(properties.getProperty(JCACHE_STORE_BY_VALUE_PROPERTY_KEY))); - } - if (properties.getProperty(JCACHE_STATISTICS_PROPERTY_KEY) != null) { - cacheConfig.setStatisticsEnabled(Boolean.valueOf(properties.getProperty(JCACHE_STATISTICS_PROPERTY_KEY))); - } - if (properties.getProperty(JCACHE_MANAGEMENT_PROPERTY_KEY) != null) { - cacheConfig.setStatisticsEnabled(Boolean.valueOf(properties.getProperty(JCACHE_MANAGEMENT_PROPERTY_KEY))); - } - if (properties.getProperty(JCACHE_EVICTION_POLICY_PROPERTY_KEY) != null) { - cacheConfig.getEvictionConfig() - .setEvictionPolicy(EvictionPolicy.valueOf(properties.getProperty(JCACHE_EVICTION_POLICY_PROPERTY_KEY))); - } - if (properties.getProperty(JCACHE_EVICTION_MAX_SIZE_POLICY_PROPERTY_KEY) != null) { - cacheConfig.getEvictionConfig() - .setMaximumSizePolicy(EvictionConfig.MaxSizePolicy - .valueOf(properties.getProperty(JCACHE_EVICTION_MAX_SIZE_POLICY_PROPERTY_KEY))); - } - if (properties.getProperty(JCACHE_EVICTION_SIZE_PROPERTY_KEY) != null) { - cacheConfig.getEvictionConfig() - .setSize(Integer.valueOf(properties.getProperty(JCACHE_EVICTION_SIZE_PROPERTY_KEY))); - } - if (properties.getProperty(JCACHE_EXPIRE_POLICY_PROPERTY_KEY) != null) { - String expiryPolicyIdentifier = properties.getProperty(JCACHE_EXPIRE_POLICY_PROPERTY_KEY); - if (expiryPolicyIdentifier.equals(JCACHE_ACCESSED_EXPIRY_IDENTIFIER)) { - cacheConfig.setExpiryPolicyFactory(FactoryBuilder.factoryOf( - new AccessedExpiryPolicy(new Duration(TimeUnit.SECONDS, - Integer.valueOf(properties.getProperty(JCACHE_EXPIRE_POLICY_DURATION_PROPERTY_KEY)))) - )); - } else if (expiryPolicyIdentifier.equals(JCACHE_CREATED_EXPIRY_IDENTIFIER)) { - cacheConfig.setExpiryPolicyFactory(FactoryBuilder.factoryOf( - new CreatedExpiryPolicy(new Duration(TimeUnit.SECONDS, - Integer.valueOf(properties.getProperty(JCACHE_EXPIRE_POLICY_DURATION_PROPERTY_KEY)))) - )); - } else if (expiryPolicyIdentifier.equals(JCACHE_MODIFIED_EXPIRY_IDENTIFIER)) { - cacheConfig.setExpiryPolicyFactory(FactoryBuilder.factoryOf( - new ModifiedExpiryPolicy(new Duration(TimeUnit.SECONDS, - Integer.valueOf(properties.getProperty(JCACHE_EXPIRE_POLICY_DURATION_PROPERTY_KEY)))) - )); - } else if (expiryPolicyIdentifier.equals(JCACHE_TOUCHED_EXPIRY_IDENTIFIER)) { - cacheConfig.setExpiryPolicyFactory(FactoryBuilder.factoryOf( - new TouchedExpiryPolicy(new Duration(TimeUnit.SECONDS, - Integer.valueOf(properties.getProperty(JCACHE_EXPIRE_POLICY_DURATION_PROPERTY_KEY)))) - )); + if (((properties.getProperty(JCACHE_AUTO_CREATE_CACHE_PROPERTY_KEY) != null) && + Boolean.valueOf(properties.getProperty(JCACHE_AUTO_CREATE_CACHE_PROPERTY_KEY))) + || ((manager.getCache(super.getPersistentClass().getSimpleName(), keyClass, persistentClass) == null))) { + cacheEntryList = new ConcurrentSkipListSet<>(); + cacheConfig = new CacheConfig<K, T>(); + cacheConfig.setTypes(keyClass, persistentClass); + if (properties.getProperty(JCACHE_READ_THROUGH_PROPERTY_KEY) != null) { + cacheConfig.setReadThrough(Boolean.valueOf(properties.getProperty(JCACHE_READ_THROUGH_PROPERTY_KEY))); + } else { + cacheConfig.setReadThrough(true); } - } - if (properties.getProperty(HAZELCAST_CACHE_IN_MEMORY_FORMAT_PROPERTY_KEY) != null) { - String inMemoryFormat = properties.getProperty(HAZELCAST_CACHE_IN_MEMORY_FORMAT_PROPERTY_KEY); - if (inMemoryFormat.equals(HAZELCAST_CACHE_BINARY_IN_MEMORY_FORMAT_IDENTIFIER) || - inMemoryFormat.equals(HAZELCAST_CACHE_OBJECT_IN_MEMORY_FORMAT_IDENTIFIER) || - inMemoryFormat.equals(HAZELCAST_CACHE_NATIVE_IN_MEMORY_FORMAT_IDENTIFIER)) { - cacheConfig.setInMemoryFormat(InMemoryFormat.valueOf(inMemoryFormat)); + if (properties.getProperty(JCACHE_WRITE_THROUGH_PROPERTY_KEY) != null) { + cacheConfig.setWriteThrough(Boolean.valueOf(properties.getProperty(JCACHE_WRITE_THROUGH_PROPERTY_KEY))); + } else { + cacheConfig.setWriteThrough(true); } - } - cacheConfig.setCacheLoaderFactory(JCacheCacheFactoryBuilder - .factoryOfCacheLoader(this.persistentDataStore)); - cacheConfig.setCacheWriterFactory(JCacheCacheFactoryBuilder - .factoryOfCacheWriter(this.persistentDataStore)); - cacheConfig.addCacheEntryListenerConfiguration( - new MutableCacheEntryListenerConfiguration<>( - JCacheCacheFactoryBuilder - .factoryOfEntryListener(new JCacheCacheEntryListener<K, T>(cacheEntryList)), - null, true, true)); - if (properties.getProperty(JCACHE_AUTO_CREATE_CACHE_PROPERTY_KEY) != null) { - Boolean createCache = Boolean.valueOf(properties.getProperty(JCACHE_AUTO_CREATE_CACHE_PROPERTY_KEY)); - if (createCache) { - cache = manager.createCache(persistentClass.getSimpleName(), - cacheConfig).unwrap(ICache.class); + if (properties.getProperty(JCACHE_STORE_BY_VALUE_PROPERTY_KEY) != null) { + cacheConfig.setStoreByValue(Boolean.valueOf(properties.getProperty(JCACHE_STORE_BY_VALUE_PROPERTY_KEY))); } - } else { - if (manager.getCache(super.getPersistentClass().getSimpleName(), keyClass, persistentClass) == null) { - cache = manager.createCache(persistentClass.getSimpleName(), - cacheConfig).unwrap(ICache.class); - } else { - cache = manager.getCache(super.getPersistentClass().getSimpleName(), - keyClass, persistentClass).unwrap(ICache.class); - this.populateLocalCacheEntrySet(cache.iterator()); + if (properties.getProperty(JCACHE_STATISTICS_PROPERTY_KEY) != null) { + cacheConfig.setStatisticsEnabled(Boolean.valueOf(properties.getProperty(JCACHE_STATISTICS_PROPERTY_KEY))); + } + if (properties.getProperty(JCACHE_MANAGEMENT_PROPERTY_KEY) != null) { + cacheConfig.setStatisticsEnabled(Boolean.valueOf(properties.getProperty(JCACHE_MANAGEMENT_PROPERTY_KEY))); + } + if (properties.getProperty(JCACHE_EVICTION_POLICY_PROPERTY_KEY) != null) { + cacheConfig.getEvictionConfig() + .setEvictionPolicy(EvictionPolicy.valueOf(properties.getProperty(JCACHE_EVICTION_POLICY_PROPERTY_KEY))); + } + if (properties.getProperty(JCACHE_EVICTION_MAX_SIZE_POLICY_PROPERTY_KEY) != null) { + cacheConfig.getEvictionConfig() + .setMaximumSizePolicy(EvictionConfig.MaxSizePolicy + .valueOf(properties.getProperty(JCACHE_EVICTION_MAX_SIZE_POLICY_PROPERTY_KEY))); + } + if (properties.getProperty(JCACHE_EVICTION_SIZE_PROPERTY_KEY) != null) { + cacheConfig.getEvictionConfig() + .setSize(Integer.valueOf(properties.getProperty(JCACHE_EVICTION_SIZE_PROPERTY_KEY))); + } + if (properties.getProperty(JCACHE_EXPIRE_POLICY_PROPERTY_KEY) != null) { + String expiryPolicyIdentifier = properties.getProperty(JCACHE_EXPIRE_POLICY_PROPERTY_KEY); + if (expiryPolicyIdentifier.equals(JCACHE_ACCESSED_EXPIRY_IDENTIFIER)) { + cacheConfig.setExpiryPolicyFactory(FactoryBuilder.factoryOf( + new AccessedExpiryPolicy(new Duration(TimeUnit.SECONDS, + Integer.valueOf(properties.getProperty(JCACHE_EXPIRE_POLICY_DURATION_PROPERTY_KEY)))) + )); + } else if (expiryPolicyIdentifier.equals(JCACHE_CREATED_EXPIRY_IDENTIFIER)) { + cacheConfig.setExpiryPolicyFactory(FactoryBuilder.factoryOf( + new CreatedExpiryPolicy(new Duration(TimeUnit.SECONDS, + Integer.valueOf(properties.getProperty(JCACHE_EXPIRE_POLICY_DURATION_PROPERTY_KEY)))) + )); + } else if (expiryPolicyIdentifier.equals(JCACHE_MODIFIED_EXPIRY_IDENTIFIER)) { + cacheConfig.setExpiryPolicyFactory(FactoryBuilder.factoryOf( + new ModifiedExpiryPolicy(new Duration(TimeUnit.SECONDS, + Integer.valueOf(properties.getProperty(JCACHE_EXPIRE_POLICY_DURATION_PROPERTY_KEY)))) + )); + } else if (expiryPolicyIdentifier.equals(JCACHE_TOUCHED_EXPIRY_IDENTIFIER)) { + cacheConfig.setExpiryPolicyFactory(FactoryBuilder.factoryOf( + new TouchedExpiryPolicy(new Duration(TimeUnit.SECONDS, + Integer.valueOf(properties.getProperty(JCACHE_EXPIRE_POLICY_DURATION_PROPERTY_KEY)))) + )); + } + } + if (properties.getProperty(HAZELCAST_CACHE_IN_MEMORY_FORMAT_PROPERTY_KEY) != null) { + String inMemoryFormat = properties.getProperty(HAZELCAST_CACHE_IN_MEMORY_FORMAT_PROPERTY_KEY); + if (inMemoryFormat.equals(HAZELCAST_CACHE_BINARY_IN_MEMORY_FORMAT_IDENTIFIER) || + inMemoryFormat.equals(HAZELCAST_CACHE_OBJECT_IN_MEMORY_FORMAT_IDENTIFIER) || + inMemoryFormat.equals(HAZELCAST_CACHE_NATIVE_IN_MEMORY_FORMAT_IDENTIFIER)) { + cacheConfig.setInMemoryFormat(InMemoryFormat.valueOf(inMemoryFormat)); + } } + cacheConfig.setCacheLoaderFactory(JCacheCacheFactoryBuilder + .factoryOfCacheLoader(this.persistentDataStore)); + cacheConfig.setCacheWriterFactory(JCacheCacheFactoryBuilder + .factoryOfCacheWriter(this.persistentDataStore)); + cacheConfig.addCacheEntryListenerConfiguration( + new MutableCacheEntryListenerConfiguration<>( + JCacheCacheFactoryBuilder + .factoryOfEntryListener(new JCacheCacheEntryListener<K, T>(cacheEntryList)), + null, true, true)); + cache = manager.createCache(persistentClass.getSimpleName(), + cacheConfig).unwrap(ICache.class); + } else { + cache = manager.getCache(super.getPersistentClass().getSimpleName(), + keyClass, persistentClass).unwrap(ICache.class); + this.populateLocalCacheEntrySet(cache); + this.populateLocalCacheConfig(cache); } LOG.info("JCache Gora datastore initialized successfully."); } @@ -293,7 +288,7 @@ public class JCacheStore<K, T extends PersistentBase> extends DataStoreBase<K, T @Override public void deleteSchema() { - cacheEntryList.clear(); + cache.removeAll(); manager.destroyCache(super.getPersistentClass().getSimpleName()); persistentDataStore.deleteSchema(); LOG.info("Deleted schema on persistent store and destroyed cache for persistent bean {}." @@ -444,12 +439,34 @@ public class JCacheStore<K, T extends PersistentBase> extends DataStoreBase<K, T LOG.info("JCache Gora datastore destroyed successfully."); } - private void populateLocalCacheEntrySet(Iterator<Cache.Entry<K, T>> cacheEntryIterator) { - cacheEntryList.clear(); + private void populateLocalCacheEntrySet(ICache<K, T> cache) { + cacheEntryList = new ConcurrentSkipListSet<>(); + Iterator<Cache.Entry<K, T>> cacheEntryIterator = cache.iterator(); while (cacheEntryIterator.hasNext()) { cacheEntryList.add(cacheEntryIterator.next().getKey()); } + cacheConfig = cache.getConfiguration(CacheConfig.class); + Iterator<CacheEntryListenerConfiguration<K, T>> itr = + cacheConfig.getCacheEntryListenerConfigurations().iterator(); + while (itr.hasNext()) { + JCacheCacheEntryListenerFactory<K, T> listenerFac = (JCacheCacheEntryListenerFactory<K, T>) + ((MutableCacheEntryListenerConfiguration) itr.next()).getCacheEntryListenerFactory(); + //populate transient field in Cache Entry Listener + listenerFac.create().setCacheEntryList(cacheEntryList); + //register exactly one listener on each local node either client/server + break; + } LOG.info("Populated local cache entry set with respect to remote cache provider."); } + private void populateLocalCacheConfig(ICache<K, T> cache) { + cacheConfig = cache.getConfiguration(CacheConfig.class); + //populate transient fields in Cache Loader/Cache Listener + ((JCacheCacheLoaderFactory) cacheConfig.getCacheLoaderFactory()) + .create().setDataStore(this.persistentDataStore); + ((JCacheCacheWriterFactory) cacheConfig.getCacheWriterFactory()) + .create().setDataStore(this.persistentDataStore); + LOG.info("Populated transient cache loader/writer in local cache configuration."); + } + } http://git-wip-us.apache.org/repos/asf/gora/blob/a02c5dd9/gora-jcache/src/main/java/org/apache/gora/jcache/store/package-info.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/main/java/org/apache/gora/jcache/store/package-info.java b/gora-jcache/src/main/java/org/apache/gora/jcache/store/package-info.java new file mode 100644 index 0000000..abc7ca6 --- /dev/null +++ b/gora-jcache/src/main/java/org/apache/gora/jcache/store/package-info.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains all the JCache store related classes which involve manipulating Hazelcast + * caches. {@link org.apache.gora.jcache.store.JCacheCacheWriter} and + * {@link org.apache.gora.jcache.store.JCacheCacheLoader} handles the read/write operations + * from/to caches and persistent backend. {@link org.apache.gora.jcache.store.JCacheCacheLoaderFactory} and + * {@link org.apache.gora.jcache.store.JCacheCacheWriterFactory} provides factory implementation that handles + * singleton instance creation of writer/loader. {@link org.apache.gora.jcache.store.JCacheCacheEntryListener} + * is the class which manages local cache entry set and + * {@link org.apache.gora.jcache.store.JCacheCacheEntryListenerFactory} takes care of singleton instance creation + * for entry listener. {@link org.apache.gora.jcache.store.JCacheCacheFactoryBuilder} is generic factory builder + * for above mentioned factory classes. + */ +package org.apache.gora.jcache.store; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/a02c5dd9/gora-jcache/src/test/java/org/apache/gora/jcache/GoraHazelcastTestDriver.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/test/java/org/apache/gora/jcache/GoraHazelcastTestDriver.java b/gora-jcache/src/test/java/org/apache/gora/jcache/GoraHazelcastTestDriver.java index 7918406..bace8eb 100644 --- a/gora-jcache/src/test/java/org/apache/gora/jcache/GoraHazelcastTestDriver.java +++ b/gora-jcache/src/test/java/org/apache/gora/jcache/GoraHazelcastTestDriver.java @@ -39,6 +39,9 @@ public class GoraHazelcastTestDriver extends GoraTestDriver { private static final String CONFIG = "hazelcast.xml"; public static final String GORA_DEFAULT_DATASTORE_KEY = "gora.datastore.default"; public static final String MEMSTORE = "org.apache.gora.memory.store.MemStore"; + private static final String JCACHE_READ_THROUGH_PROPERTY_KEY = "jcache.read.through.enable"; + private static final String JCACHE_WRITE_THROUGH_PROPERTY_KEY = "jcache.write.through.enable"; + private static final String FALSE = "false"; public GoraHazelcastTestDriver() { super(JCacheStore.class); @@ -52,6 +55,8 @@ public class GoraHazelcastTestDriver extends GoraTestDriver { properties.setProperty(GORA_DEFAULT_JCACHE_PROVIDER_KEY, PROVIDER); properties.setProperty(GORA_DEFAULT_JCACHE_HAZELCAST_CONFIG_KEY, CONFIG); properties.setProperty(GORA_DEFAULT_DATASTORE_KEY, MEMSTORE); + properties.setProperty(JCACHE_READ_THROUGH_PROPERTY_KEY,FALSE); + properties.setProperty(JCACHE_WRITE_THROUGH_PROPERTY_KEY,FALSE); serverCacheProvider = new JCacheStore(); serverCacheProvider.initialize(String.class, WebPage.class, properties); } http://git-wip-us.apache.org/repos/asf/gora/blob/a02c5dd9/gora-jcache/src/test/java/org/apache/gora/jcache/mapreduce/package-info.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/test/java/org/apache/gora/jcache/mapreduce/package-info.java b/gora-jcache/src/test/java/org/apache/gora/jcache/mapreduce/package-info.java new file mode 100644 index 0000000..371629b --- /dev/null +++ b/gora-jcache/src/test/java/org/apache/gora/jcache/mapreduce/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains unit test for MR jobs execution over JCache dataStore. + */ +package org.apache.gora.jcache.mapreduce; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/a02c5dd9/gora-jcache/src/test/java/org/apache/gora/jcache/store/package-info.java ---------------------------------------------------------------------- diff --git a/gora-jcache/src/test/java/org/apache/gora/jcache/store/package-info.java b/gora-jcache/src/test/java/org/apache/gora/jcache/store/package-info.java new file mode 100644 index 0000000..6bb67ba --- /dev/null +++ b/gora-jcache/src/test/java/org/apache/gora/jcache/store/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains all the unit tests for basic CRUD operations functionality of the JCache dataStore. + */ +package org.apache.gora.jcache.store; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/a02c5dd9/gora-jcache/src/test/resources/gora.properties ---------------------------------------------------------------------- diff --git a/gora-jcache/src/test/resources/gora.properties b/gora-jcache/src/test/resources/gora.properties index b740466..625faab 100644 --- a/gora-jcache/src/test/resources/gora.properties +++ b/gora-jcache/src/test/resources/gora.properties @@ -17,3 +17,6 @@ gora.cache.datastore.default=org.apache.gora.jcache.store.JCacheStore gora.datastore.jcache.provider=com.hazelcast.client.cache.impl.HazelcastClientCachingProvider gora.datastore.jcache.hazelcast.config=hazelcast-client.xml gora.datastore.default=org.apache.gora.memory.store.MemStore +jcache.read.through.enable=false +jcache.write.through.enable=false + http://git-wip-us.apache.org/repos/asf/gora/blob/a02c5dd9/gora-tutorial/conf/gora.properties ---------------------------------------------------------------------- diff --git a/gora-tutorial/conf/gora.properties b/gora-tutorial/conf/gora.properties index 2e5c113..5084216 100644 --- a/gora-tutorial/conf/gora.properties +++ b/gora-tutorial/conf/gora.properties @@ -52,3 +52,9 @@ gora.solrstore.solr.batchsize=100 # cloud, concurrent, http, loadbalance gora.solrstore.solr.solrjserver=http +#JCache dataStore properties +gora.cache.datastore.default=org.apache.gora.jcache.store.JCacheStore +gora.datastore.jcache.provider=com.hazelcast.cache.impl.HazelcastServerCachingProvider +#gora.datastore.jcache.provider=com.hazelcast.client.cache.impl.HazelcastClientCachingProvider +#gora.datastore.jcache.hazelcast.config=hazelcast-client.xml +gora.datastore.jcache.hazelcast.config=hazelcast.xml \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/a02c5dd9/gora-tutorial/conf/hazelcast-client.xml ---------------------------------------------------------------------- diff --git a/gora-tutorial/conf/hazelcast-client.xml b/gora-tutorial/conf/hazelcast-client.xml new file mode 100644 index 0000000..93f21e2 --- /dev/null +++ b/gora-tutorial/conf/hazelcast-client.xml @@ -0,0 +1,31 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + Hazelcast client side cache provider configuration. +--> + +<hazelcast-client xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.hazelcast.com/schema/client-config + http://www.hazelcast.com/schema/client-config/hazelcast-client-config-3.6.xsd" + xmlns="http://www.hazelcast.com/schema/client-config"> + <network> + <cluster-members> + <address>127.0.0.1</address> + </cluster-members> + </network> +</hazelcast-client> http://git-wip-us.apache.org/repos/asf/gora/blob/a02c5dd9/gora-tutorial/conf/hazelcast.xml ---------------------------------------------------------------------- diff --git a/gora-tutorial/conf/hazelcast.xml b/gora-tutorial/conf/hazelcast.xml new file mode 100755 index 0000000..9838710 --- /dev/null +++ b/gora-tutorial/conf/hazelcast.xml @@ -0,0 +1,34 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + Hazelcast server side cache provider configuration. +--> + +<hazelcast xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.hazelcast.com/schema/config + http://www.hazelcast.com/schema/config/hazelcast-config-3.6.xsd" + xmlns="http://www.hazelcast.com/schema/config"> + <network> + <join> + <multicast enabled="false"/> + <tcp-ip enabled="true"> + <member>127.0.0.1</member> + </tcp-ip> + </join> + </network> +</hazelcast> http://git-wip-us.apache.org/repos/asf/gora/blob/a02c5dd9/gora-tutorial/pom.xml ---------------------------------------------------------------------- diff --git a/gora-tutorial/pom.xml b/gora-tutorial/pom.xml index d73b4df..ab6892f 100644 --- a/gora-tutorial/pom.xml +++ b/gora-tutorial/pom.xml @@ -105,6 +105,11 @@ <dependency> <groupId>org.apache.gora</groupId> + <artifactId>gora-jcache</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.gora</groupId> <artifactId>gora-cassandra</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/gora/blob/a02c5dd9/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/DistributedLogManager.java ---------------------------------------------------------------------- diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/DistributedLogManager.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/DistributedLogManager.java new file mode 100644 index 0000000..44e9c45 --- /dev/null +++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/DistributedLogManager.java @@ -0,0 +1,461 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.tutorial.log; + +import org.apache.avro.util.Utf8; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.store.DataStore; +import org.apache.gora.store.DataStoreFactory; +import org.apache.gora.tutorial.log.generated.Pageview; +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Locale; +import java.util.StringTokenizer; + + +/** + * DistributedLogManager {@link org.apache.gora.tutorial.log.DistributedLogManager} is the tutorial class to + * illustrate the basic distributed features that can be gained when persistent dataStore is used together with + * cache dataStore similar to {@link org.apache.gora.jcache.store.JCacheStore}. Since Hazelcast provides cache as + * a service approach, Apache Gora data stores can now be exposed as a data SERVICE when persistent data store is + * exposed over the JCache store. + * + * JCache data store has two modes. + * + * 1. Server mode - Participate in Hazelcast cluster as a member. ( Data grid ) and communicates directly + * with persistent store to full fill cache read/write operations. + * + * Add following properties gora.properties file to start JCache store in server mode. + * + * gora.cache.datastore.default=org.apache.gora.jcache.store.JCacheStore + * gora.datastore.jcache.provider=com.hazelcast.cache.impl.HazelcastServerCachingProvider + * gora.datastore.jcache.hazelcast.config=hazelcast.xml + * + * For cluster member network configuration use hazelcast.xml. + * <p>See Network Configuration on + * <a href="http://docs.hazelcast.org/docs/3.5/manual/html/networkconfiguration.html"> + * web site</a>for more information.</p> + * + * 2. Client mode - DOES not participate in Hazelcast cluster as a member. ( Data grid ) and For cache + * read/write operations client forwards the requests to hazelcast cluster members which run in SERVER mode. + * + * Add following properties gora.properties file to start JCache store in client mode. + * + * gora.cache.datastore.default=org.apache.gora.jcache.store.JCacheStore + * gora.datastore.jcache.provider=com.hazelcast.client.cache.impl.HazelcastClientCachingProvider + * gora.datastore.jcache.hazelcast.config=hazelcast-client.xml + * + * For Hazelcast client configuration use hazelcast-client.xml. + * <p>See Java Client Configuration on + * <a href="http://docs.hazelcast.org/docs/3.5/manual/html/javaclientconfiguration.html#java-client-configuration"> + * web site</a>for more information.</p> + * + * Sample + * ------ + * 1. Start DistributedLogManager in SERVER for two or higher instances. ( separate JVMs ). + * Notice the Hazelcast cluster is well formed by following Hazelcast logs. + * Members [2] { + * Member [127.0.0.1]:5701 + * Member [127.0.0.1]:5702 this + * } + * + * 2. Start DistributedLogManager in CLIENT mode for one instances. + * Notice the client correctly connected to the cluster by following Hazelcast logs. + * Members [2] { + * Member [127.0.0.1]:5701 + * Member [127.0.0.1]:5702 + * } + * INFO: HazelcastClient[hz.client_0_dev][3.6.4] is CLIENT_CONNECTED + * + * 3. Now use CLIENT's command line console to forward cache queries to cluster. + * + * (a) -parse cache <input_log_file> - This will parse logs from logs file and put Pageview data beans to + * persistent store via the cache. + * Notice following logs + * INFO 19:46:34,833 Written data bean to persistent datastore on key 45. + * on SERVER instance of DistributedLogManager. Notice the persistent data bean writes are LOAD BALANCED + * among SERVER instances. + * (b) -parse persistent <input_log_file> - This will write parsed log data beans directly to persistent store. + * NOT via cache. + * (c) Executing with (a) will create cache entry per each data bean key on each SERVER and CLIENT instances. Since + * now data bean ( key/value ) is now loaded to Hazelcast DATA GRID, entries created data beans + * are now available to all the SERVER and CLIENT instances. Data beans which were loaded to Hazelcast + * DATA Grid can be retrieved from cache so that the latency is reduced compared to when data bean is + * direct retrieved from persistent data store. * + * (d) Executing with (b) will not create cache entries on keys since the data beans were directly put into + * to persistent store. + * Executing following command + * -get <lineNum> + * Data will be first loaded from persistent store to cache from one of SERVER instances. Then cache + * entry on given key will be created on all SERVER/CLIENT instances. + * Notice the persistent data bean load on SINGLE SERVER instance. Only one SERVER instance will handle this work. + * INFO 17:13:22,652 Loaded data bean from persistent datastore on key 4. + * Notice the cache entry creation on ALL SERVER/CLIENT instances + * INFO 17:13:22,656 Cache entry added on key 4. + * Once the cache entry is created, data bean is now available to be retrieved from cache without reaching the + * persistent store. + * Execute the above command consecutively for several times. + * -get <lineNum> + * Notice there will be NO log entry similar to below + * INFO 17:13:22,652 Loaded data bean from persistent datastore on key 4. + * Since there will be no data bean load from persistent data store and the data bean is now loaded from + * cache. + * (e) DistributedLogManager has two Apache Gora data stores instances. + * dataStore - which call directly underline persistent data store. + * cacheStore - which call same persistent data store via the caching layer. + * Simple benchmarking purposes use + * -benchmark <startLineNum> <endLineNum> <iterations> + * to compare data beans read for two cases. ( Cache layer is present and Not present when executing + * consecutive data reads for same data items in nearby intervals ) + * It generates LOG entries similar to below which indicates time spent for two cases in milliseconds + * INFO 17:13:22,652 Direct Backend took 1973 ms + * INFO 17:18:49,252 Via Cache took 1923 ms + * + * <p>In the data model, keys are the line numbers in the log file, + * and the values are Pageview objects, generated from + * <code>gora-tutorial/src/main/avro/pageview.json</code>. + * + * <p>See the tutorial.html file in docs or go to the + * <a href="http://gora.apache.org/docs/current/tutorial.html"> + * web site</a>for more information.</p> + */ + +public class DistributedLogManager { + + private static final Logger log = LoggerFactory.getLogger(DistributedLogManager.class); + private static final SimpleDateFormat dateFormat + = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.getDefault()); + private static final String USAGE = + " -parse cache|persistent <input_log_file>\n" + + " -benchmark <startLineNum> <endLineNum> <iterations>\n" + + " -get <lineNum>\n" + + " -get <lineNum> <fieldList>\n" + + " -query <startLineNum> <endLineNum>\n" + + " -delete <lineNum>\n" + + " -deleteByQuery <startLineNum> <endLineNum>\n" + + " -deleteSchema\n"+ + " -exit\n"; + private DataStore<Long, Pageview> dataStore; + private DataStore<Long, Pageview> cacheStore; + + public DistributedLogManager() { + try { + init(); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + public static void main(String[] args) throws Exception { + DistributedLogManager manager = new DistributedLogManager(); + + BufferedReader input = new BufferedReader(new InputStreamReader(System.in, + Charset.defaultCharset())); + + for (; ; ) { + // read next command from commandline + String command = input.readLine(); + if ("".equals(command) || command == null) { + // ignore and show new cmd line + continue; + } + + args = command.split("\\s+"); + if ("-parse".equals(args[0])) { + if (args.length == 3) { + if (args[1].contains("cache")) { + manager.parse(args[2], true); + } else if (args[1].contains("persistent")) { + manager.parse(args[2], false); + } + } + } else if ("-benchmark".equals(args[0])) { + if (args.length == 4) + manager.benchmark(Integer.parseInt(args[1]), Integer.parseInt(args[2]), Integer.parseInt(args[3]), manager); + } else if ("-get".equals(args[0])) { + if (args.length == 2) { + manager.get(Long.parseLong(args[1]), true); + } else { + //field array should be input as comma ',' separated + String[] fields = args[2].split(","); + manager.get(Long.parseLong(args[1]), fields); + } + } else if ("-query".equals(args[0])) { + if (args.length == 3) + manager.query(Long.parseLong(args[1]), Long.parseLong(args[2])); + } else if ("-delete".equals(args[0])) { + manager.delete(Long.parseLong(args[1])); + } else if ("-deleteByQuery".equalsIgnoreCase(args[0])) { + manager.deleteByQuery(Long.parseLong(args[1]), Long.parseLong(args[2])); + } else if ("-deleteSchema".equals(args[0])) { + manager.deleteSchema(); + continue; + } else if ("-exit".equalsIgnoreCase(args[0])) { + input.close(); + manager.close(); + System.exit(1); + } else { + log.info(USAGE); + } + } + } + + private void init() throws IOException { + //Data store objects are created from a factory. It is necessary to + //provide the key and value class. The datastore class is optional, + //and if not specified it will be read from the properties file + + //this dataStore talks directly to persistent store + dataStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, + new Configuration()); + //this dataStore talks to persistent store via the cache + cacheStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, new Configuration(), true); + + } + + /** + * Parses a log file and store the contents at the data store. + * + * @param input the input file location + */ + private void parse(String input, boolean isCacheEnabled) throws Exception { + log.info("Parsing file: {}", input); + long lineCount = 0; + try (BufferedReader reader = new BufferedReader(new InputStreamReader( + new FileInputStream(input), Charset.defaultCharset()))) { + String line = reader.readLine(); + do { + Pageview pageview = parseLine(line); + + if (pageview != null) { + //store the pageview + storePageview(lineCount++, pageview, isCacheEnabled); + } + + line = reader.readLine(); + } while (line != null); + + } + log.info("finished parsing file. Total number of log lines: {}", lineCount); + } + + /** + * Parses a single log line in combined log format using StringTokenizers + */ + private Pageview parseLine(String line) throws ParseException { + StringTokenizer matcher = new StringTokenizer(line); + //parse the log line + String ip = matcher.nextToken(); + matcher.nextToken(); //discard + matcher.nextToken(); + long timestamp = dateFormat.parse(matcher.nextToken("]").substring(2)).getTime(); + matcher.nextToken("\""); + String request = matcher.nextToken("\""); + String[] requestParts = request.split(" "); + String httpMethod = requestParts[0]; + String url = requestParts[1]; + matcher.nextToken(" "); + int httpStatusCode = Integer.parseInt(matcher.nextToken()); + int responseSize = Integer.parseInt(matcher.nextToken()); + matcher.nextToken("\""); + String referrer = matcher.nextToken("\""); + matcher.nextToken("\""); + String userAgent = matcher.nextToken("\""); + + //construct and return pageview object + Pageview pageview = new Pageview(); + pageview.setIp(new Utf8(ip)); + pageview.setTimestamp(timestamp); + pageview.setHttpMethod(new Utf8(httpMethod)); + pageview.setUrl(new Utf8(url)); + pageview.setHttpStatusCode(httpStatusCode); + pageview.setResponseSize(responseSize); + pageview.setReferrer(new Utf8(referrer)); + pageview.setUserAgent(new Utf8(userAgent)); + + return pageview; + } + + /** + * Stores the pageview object with the given key + */ + private void storePageview(long key, Pageview pageview, boolean isCacheEnabled) throws Exception { + if (!isCacheEnabled) { + log.info("Storing Pageview in: " + dataStore.toString()); + dataStore.put(key, pageview); + } else { + log.info("Storing Pageview in: " + dataStore.toString()); + cacheStore.put(key, pageview); + } + } + + /** + * Fetches a single pageview object and prints it + */ + private void get(long key, boolean isCacheEnabled) throws Exception { + if (!isCacheEnabled) { + Pageview pageview = dataStore.get(key); + printPageview(pageview); + } else { + Pageview pageview = cacheStore.get(key); + printPageview(pageview); + } + } + + /** + * Fetches a single pageview object with required fields and prints it + */ + private void get(long key, String[] fields) throws Exception { + Pageview pageview = cacheStore.get(key, fields); + printPageview(pageview); + } + + /** + * Queries and prints pageview object that have keys between startKey and endKey + */ + private void query(long startKey, long endKey) throws Exception { + Query<Long, Pageview> query = cacheStore.newQuery(); + //set the properties of query + query.setStartKey(startKey); + query.setEndKey(endKey); + + Result<Long, Pageview> result = query.execute(); + + printResult(result); + } + + /** + * Deletes the pageview with the given line number + */ + private void delete(long lineNum) throws Exception { + cacheStore.delete(lineNum); + cacheStore.flush(); //write changes may need to be flushed before + //they are committed + log.info("pageview with key: {} deleted", lineNum); + } + + /** + * This method illustrates delete by query call + */ + private void deleteByQuery(long startKey, long endKey) throws Exception { + //Constructs a query from the dataStore. The matching rows to this query will be deleted + Query<Long, Pageview> query = cacheStore.newQuery(); + //set the properties of query + query.setStartKey(startKey); + query.setEndKey(endKey); + + cacheStore.deleteByQuery(query); + log.info("pageviews with keys between {} and {} are deleted.", startKey, endKey); + } + + private void printResult(Result<Long, Pageview> result) throws Exception { + + while (result.next()) { //advances the Result object and breaks if at end + long resultKey = result.getKey(); //obtain current key + Pageview resultPageview = result.get(); //obtain current value object + + log.info("{} :", resultKey); + printPageview(resultPageview); + } + + log.info("Number of pageviews from the query: {}", result.getOffset()); + } + + /** + * Pretty prints the pageview object to stdout + */ + private void printPageview(Pageview pageview) { + if (pageview == null) { + log.info("No result to show"); + } else { + log.info(pageview.toString()); + } + } + + private void deleteSchema() { + cacheStore.deleteSchema(); + log.info("Deleted schema on dataStore"); + + } + + private void close() throws Exception { + //It is very important to close the datastore properly, otherwise + //some data loss might occur. + if (dataStore != null) + dataStore.close(); + if (cacheStore != null) + cacheStore.close(); + } + + /** + * Simple benchmarking for comparison between when cache layer is present and not present. Purpose is to + * exploit temporal locality of consecutive data bean reads. + */ + private boolean benchmark(int start, + int end, + int iterations, + DistributedLogManager manager) throws Exception { + if (!(start < 10000) && (end < 10000)) { + return false; + } + long startTime; + long finishTime; + ArrayList<Integer> entryset= getShuffledEntrySet(start,end); + startTime = System.currentTimeMillis(); + for (int itr = 0; itr < iterations; itr++) { + for (int entry : entryset){ + manager.get(entry, false); + } + } + finishTime = System.currentTimeMillis(); + long directDifference = (finishTime - startTime); + startTime = System.currentTimeMillis(); + for (int itr = 0; itr < iterations; itr++) { + for (int entry : entryset){ + manager.get(entry, true); + } + } + finishTime = System.currentTimeMillis(); + log.info("Direct Backend took {} ms", directDifference); + log.info("Via Cache took {} ms", (finishTime - startTime)); + return true; + } + + private ArrayList<Integer> getShuffledEntrySet(int start, int end) { + ArrayList<Integer> entrySet = new ArrayList<>(); + for (int counter = start; counter <= end; counter++) { + entrySet.add(counter); + } + Collections.shuffle(entrySet); + return entrySet; + } + +} http://git-wip-us.apache.org/repos/asf/gora/blob/a02c5dd9/gora-tutorial/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/gora-tutorial/src/main/resources/log4j.properties b/gora-tutorial/src/main/resources/log4j.properties new file mode 100644 index 0000000..79062bf --- /dev/null +++ b/gora-tutorial/src/main/resources/log4j.properties @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# for production, you should probably set pattern to %c instead of %l. +# (%l is slower.) + +# output messages into a rolling log file as well as stdout +log4j.rootLogger=INFO,stdout + +# stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n + +# rolling log file +log4j.appender.R=org.apache.log4j.RollingFileAppender +log4j.appender.R.maxFileSize=20MB +log4j.appender.R.maxBackupIndex=50 +log4j.appender.R.layout=org.apache.log4j.PatternLayout +log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n +# Edit the next line to point to your logs directory + +# Application logging options +log4j.logger.org.apache=ERROR + +log4j.logger.com.hazelcast=INFO +log4j.logger.org.apache.gora.jcache=DEBUG +log4j.logger.org.apache.gora.hbase=DEBUG +log4j.logger.org.apache.gora.tutorial.log.DistributedLogManager=DEBUG http://git-wip-us.apache.org/repos/asf/gora/blob/a02c5dd9/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 6522ac0..5221fd1 100644 --- a/pom.xml +++ b/pom.xml @@ -830,6 +830,18 @@ <dependency> <groupId>org.apache.gora</groupId> + <artifactId>gora-jcache</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.gora</groupId> + <artifactId>gora-jcache</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.gora</groupId> <artifactId>gora-tutorial</artifactId> <version>${project.version}</version> </dependency>
