IGNITE-3470 - Support EXPIRED events in continuous queries
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aedfde69 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aedfde69 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aedfde69 Branch: refs/heads/ignite-2649 Commit: aedfde69af6e91277616052ab60fa0037693c2c6 Parents: b81dbbf Author: Valentin Kulichenko <valentin.luliche...@gmail.com> Authored: Tue Jul 19 16:01:32 2016 -0700 Committer: Valentin Kulichenko <valentin.luliche...@gmail.com> Committed: Tue Jul 19 16:01:32 2016 -0700 ---------------------------------------------------------------------- .../ignite/cache/query/ContinuousQuery.java | 47 +++++++++++++++---- .../processors/cache/IgniteCacheProxy.java | 3 +- .../continuous/CacheContinuousQueryManager.java | 9 ++-- ...ridCacheContinuousQueryAbstractSelfTest.java | 48 +++++++++++++++++++- 4 files changed, 91 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aedfde69/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java index bbfe8cc..49d471e 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java @@ -21,6 +21,7 @@ import javax.cache.Cache; import javax.cache.configuration.Factory; import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryUpdatedListener; +import javax.cache.event.EventType; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.configuration.IgniteConfiguration; @@ -142,6 +143,9 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { /** Automatic unsubscription flag. */ private boolean autoUnsubscribe = DFLT_AUTO_UNSUBSCRIBE; + /** Whether to notify about {@link EventType#EXPIRED} events. */ + private boolean includeExpired; + /** * Creates new continuous query. */ @@ -324,6 +328,38 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { return this; } + /** + * Gets automatic unsubscription flag value. + * + * @return Automatic unsubscription flag. + */ + public boolean isAutoUnsubscribe() { + return autoUnsubscribe; + } + + /** + * Sets the flag value defining whether to notify about {@link EventType#EXPIRED} events. + * If {@code true}, then the remote listener will get notifications about entries + * expired in cache. Otherwise, only {@link EventType#CREATED}, {@link EventType#UPDATED} + * and {@link EventType#REMOVED} events will be fired in the remote listener. + * <p> + * This flag is {@code false} by default, so {@link EventType#EXPIRED} events are disabled. + * + * @param includeExpired Whether to notify about {@link EventType#EXPIRED} events. + */ + public void setIncludeExpired(boolean includeExpired) { + this.includeExpired = includeExpired; + } + + /** + * Gets the flag value defining whether to notify about {@link EventType#EXPIRED} events. + * + * @return Whether to notify about {@link EventType#EXPIRED} events. + */ + public boolean isIncludeExpired() { + return includeExpired; + } + /** {@inheritDoc} */ @Override public ContinuousQuery<K, V> setPageSize(int pageSize) { return (ContinuousQuery<K, V>)super.setPageSize(pageSize); @@ -333,13 +369,4 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { @Override public ContinuousQuery<K, V> setLocal(boolean loc) { return (ContinuousQuery<K, V>)super.setLocal(loc); } - - /** - * Gets automatic unsubscription flag value. - * - * @return Automatic unsubscription flag. - */ - public boolean isAutoUnsubscribe() { - return autoUnsubscribe; - } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/aedfde69/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 92e59db..249cfae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -598,7 +598,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V qry.getTimeInterval(), qry.isAutoUnsubscribe(), loc, - keepBinary); + keepBinary, + qry.isIncludeExpired()); final QueryCursor<Cache.Entry<K, V>> cur = qry.getInitialQuery() != null ? query(qry.getInitialQuery()) : null; http://git-wip-us.apache.org/repos/asf/ignite/blob/aedfde69/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 195f3ae..a8e5a6b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -421,7 +421,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { long timeInterval, boolean autoUnsubscribe, boolean loc, - final boolean keepBinary) throws IgniteCheckedException + final boolean keepBinary, + final boolean includeExpired) throws IgniteCheckedException { IgniteClosure<Boolean, CacheContinuousQueryHandler> clsr; @@ -438,7 +439,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { rmtFilterFactory, true, false, - true, + !includeExpired, false, null); else { @@ -456,7 +457,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { (CacheEntryEventSerializableFilter)fltr, true, false, - true, + !includeExpired, false); } @@ -473,7 +474,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { rmtFilter, true, false, - true, + !includeExpired, false); } }; http://git-wip-us.apache.org/repos/asf/ignite/blob/aedfde69/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java index 3d238af..08acc42 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java @@ -34,6 +34,9 @@ import javax.cache.Cache; import javax.cache.configuration.Factory; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryUpdatedListener; +import javax.cache.event.EventType; +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; import javax.cache.integration.CacheWriterException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; @@ -71,7 +74,6 @@ import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; -import org.jsr166.ConcurrentLinkedDeque8; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; @@ -1094,6 +1096,50 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo } /** + * @throws Exception If failed. + */ + public void testExpired() throws Exception { + IgniteCache<Object, Object> cache = grid(0).cache(null). + withExpiryPolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 1000))); + + final Map<Object, Object> map = new ConcurrentHashMap8<>(); + final CountDownLatch latch = new CountDownLatch(2); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setIncludeExpired(true); + + qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> e : evts) { + if (e.getEventType() == EventType.EXPIRED) { + assertNull(e.getValue()); + + map.put(e.getKey(), e.getOldValue()); + + latch.countDown(); + } + } + } + }); + + try (QueryCursor<Cache.Entry<Object, Object>> ignored = cache.query(qry)) { + cache.put(1, 1); + cache.put(2, 2); + + // Wait for expiration. + Thread.sleep(2000); + + assert latch.await(LATCH_TIMEOUT, MILLISECONDS); + + assertEquals(2, map.size()); + + assertEquals(1, (int)map.get(1)); + assertEquals(2, (int)map.get(2)); + } + } + + /** * */ private static class StoreFactory implements Factory<CacheStore> {