IGNITE-2004 Fixed "Asynchronous execution of ContinuousQuery's remote filter & local list".
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/395f4738 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/395f4738 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/395f4738 Branch: refs/heads/ignite-2788 Commit: 395f47388ef515301e7e49cea7a444063712b9f9 Parents: 24b24bf Author: nikolay_tikhonov <[email protected]> Authored: Fri Apr 22 18:41:58 2016 +0300 Committer: shtykh_roman <[email protected]> Committed: Fri May 13 16:11:14 2016 +0900 ---------------------------------------------------------------------- .../CacheContinuousAsyncQueryExample.java | 138 +++ .../datagrid/CacheContinuousQueryExample.java | 13 +- .../ignite/cache/query/ContinuousQuery.java | 27 + .../configuration/IgniteConfiguration.java | 32 + .../ignite/internal/GridKernalContext.java | 8 + .../ignite/internal/GridKernalContextImpl.java | 12 + .../apache/ignite/internal/IgniteKernal.java | 3 + .../org/apache/ignite/internal/IgnitionEx.java | 16 +- .../processors/cache/GridCacheEntryEx.java | 5 +- .../processors/cache/GridCacheMapEntry.java | 49 +- .../dht/atomic/GridDhtAtomicCache.java | 98 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 121 +-- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 33 +- .../distributed/near/GridNearAtomicCache.java | 3 +- .../continuous/CacheContinuousQueryEvent.java | 7 + .../continuous/CacheContinuousQueryHandler.java | 446 +++++++-- .../CacheContinuousQueryListener.java | 6 +- .../continuous/CacheContinuousQueryManager.java | 62 +- .../apache/ignite/lang/IgniteAsyncCallback.java | 111 +++ .../thread/IgniteStripedThreadPoolExecutor.java | 164 +-- .../processors/cache/GridCacheTestEntryEx.java | 4 +- ...FailoverAtomicPrimaryWriteOrderSelfTest.java | 50 + ...sQueryAsyncFailoverTxReplicatedSelfTest.java | 37 + ...eContinuousQueryAsyncFailoverTxSelfTest.java | 44 + ...eContinuousQueryAsyncFilterListenerTest.java | 986 +++++++++++++++++++ ...ryFactoryAsyncFilterRandomOperationTest.java | 131 +++ ...usQueryFactoryFilterRandomOperationTest.java | 725 ++++++++++++++ .../CacheContinuousQueryFactoryFilterTest.java | 714 -------------- ...ContinuousQueryFailoverAbstractSelfTest.java | 63 +- .../CacheContinuousQueryLostPartitionTest.java | 14 + ...ontinuousQueryOperationFromCallbackTest.java | 627 ++++++++++++ .../CacheContinuousQueryOrderingEventTest.java | 722 ++++++++++++++ ...acheContinuousQueryRandomOperationsTest.java | 23 + .../junits/GridTestKernalContext.java | 1 + .../IgniteBinaryCacheQueryTestSuite.java | 1 - .../IgniteCacheQuerySelfTestSuite3.java | 14 +- .../IgniteCacheQuerySelfTestSuite4.java | 7 + .../cache/CacheEntryEventAsyncProbe.java | 61 ++ .../yardstick/cache/CacheEntryEventProbe.java | 33 +- 39 files changed, 4408 insertions(+), 1203 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java new file mode 100644 index 0000000..4ac7ecb --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.datagrid; + +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.examples.ExampleNodeStartup; +import org.apache.ignite.lang.IgniteAsyncCallback; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.resources.IgniteInstanceResource; + +/** + * This examples demonstrates asynchronous continuous query API. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will + * start node with {@code examples/config/example-ignite.xml} configuration. + */ +public class CacheContinuousAsyncQueryExample { + /** Cache name. */ + private static final String CACHE_NAME = CacheContinuousAsyncQueryExample.class.getSimpleName(); + + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws Exception If example execution failed. + */ + public static void main(String[] args) throws Exception { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println(">>> Cache continuous query example started."); + + // Auto-close cache at the end of the example. + try (IgniteCache<Integer, String> cache = ignite.getOrCreateCache(CACHE_NAME)) { + int keyCnt = 20; + + // These entries will be queried by initial predicate. + for (int i = 0; i < keyCnt; i++) + cache.put(i, Integer.toString(i)); + + // Create new continuous query. + ContinuousQuery<Integer, String> qry = new ContinuousQuery<>(); + + qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Integer, String>() { + @Override public boolean apply(Integer key, String val) { + return key > 10; + } + })); + + // Callback that is called locally when update notifications are received. + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends String> e : evts) + System.out.println("Updated entry [key=" + e.getKey() + ", val=" + e.getValue() + ']'); + } + }); + + // This filter will be evaluated remotely on all nodes. + // Entry that pass this filter will be sent to the caller. + qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter<Integer, String>>() { + @Override public CacheEntryEventFilter<Integer, String> create() { + return new CacheEntryFilter(); + } + }); + + // Execute query. + try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) { + // Iterate through existing data. + for (Cache.Entry<Integer, String> e : cur) + System.out.println("Queried existing entry [key=" + e.getKey() + ", val=" + e.getValue() + ']'); + + // Add a few more keys and watch more query notifications. + for (int i = 0; i < keyCnt; i++) + cache.put(i, Integer.toString(i)); + + // Wait for a while while callback is notified about remaining puts. + Thread.sleep(2000); + } + + // Iterate through entries which was updated from filter. + for (int i = 0; i < 10; i++) + System.out.println("Entry updated from filter [key=" + i + ", val=" + cache.get(i) + ']'); + } + finally { + // Distributed cache could be removed from cluster only by #destroyCache() call. + ignite.destroyCache(CACHE_NAME); + } + } + } + + /** + * Filter returns {@code true} for entries which have key bigger than 10. + */ + @IgniteAsyncCallback + private static class CacheEntryFilter implements CacheEntryEventFilter<Integer, String> { + /** Ignite instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) + throws CacheEntryListenerException { + // This cache operation is safe because filter has Ignite AsyncCallback annotation. + if (e.getKey() < 10 && String.valueOf(e.getKey()).equals(e.getValue())) + ignite.cache(CACHE_NAME).put(e.getKey(), e.getValue() + "_less_than_10"); + + return e.getKey() > 10; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java index 59759af..aad5b5d 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java @@ -18,12 +18,13 @@ package org.apache.ignite.examples.datagrid; import javax.cache.Cache; +import javax.cache.configuration.Factory; import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryUpdatedListener; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.Ignition; -import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.ScanQuery; @@ -81,9 +82,13 @@ public class CacheContinuousQueryExample { // This filter will be evaluated remotely on all nodes. // Entry that pass this filter will be sent to the caller. - qry.setRemoteFilter(new CacheEntryEventSerializableFilter<Integer, String>() { - @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) { - return e.getKey() > 10; + qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter<Integer, String>>() { + @Override public CacheEntryEventFilter<Integer, String> create() { + return new CacheEntryEventFilter<Integer, String>() { + @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) { + return e.getKey() > 10; + } + }; } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java index 3ea8f93..bbfe8cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java @@ -23,6 +23,8 @@ import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryUpdatedListener; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteAsyncCallback; /** * API for configuring continuous cache queries. @@ -92,6 +94,16 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter; * Note that this works even if you didn't provide initial query. Cursor will * be empty in this case, but it will still unregister listeners when {@link QueryCursor#close()} * is called. + * <p> + * {@link IgniteAsyncCallback} annotation is supported for {@link CacheEntryEventFilter} + * (see {@link #setRemoteFilterFactory(Factory)}) and {@link CacheEntryUpdatedListener} + * (see {@link #setLocalListener(CacheEntryUpdatedListener)}). + * If filter and/or listener are annotated with {@link IgniteAsyncCallback} then annotated callback + * is executed in async callback pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) + * and notification order is kept the same as update order for given cache key. + * + * @see IgniteAsyncCallback + * @see IgniteConfiguration#getAsyncCallbackPoolSize() */ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { /** */ @@ -173,9 +185,14 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking (e.g., * synchronization or transactional cache operations), should be executed asynchronously without * blocking the thread that called the callback. Otherwise, you can get deadlocks. + * <p> + * If local listener are annotated with {@link IgniteAsyncCallback} then it is executed in async callback pool + * (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations. * * @param locLsnr Local callback. * @return {@code this} for chaining. + * @see IgniteAsyncCallback + * @see IgniteConfiguration#getAsyncCallbackPoolSize() */ public ContinuousQuery<K, V> setLocalListener(CacheEntryUpdatedListener<K, V> locLsnr) { this.locLsnr = locLsnr; @@ -198,11 +215,16 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking * (e.g., synchronization or transactional cache operations), should be executed asynchronously * without blocking the thread that called the filter. Otherwise, you can get deadlocks. + * <p> + * If remote filter are annotated with {@link IgniteAsyncCallback} then it is executed in async callback + * pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations. * * @param rmtFilter Key-value filter. * @return {@code this} for chaining. * * @deprecated Use {@link #setRemoteFilterFactory(Factory)} instead. + * @see IgniteAsyncCallback + * @see IgniteConfiguration#getAsyncCallbackPoolSize() */ @Deprecated public ContinuousQuery<K, V> setRemoteFilter(CacheEntryEventSerializableFilter<K, V> rmtFilter) { @@ -227,9 +249,14 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking * (e.g., synchronization or transactional cache operations), should be executed asynchronously * without blocking the thread that called the filter. Otherwise, you can get deadlocks. + * <p> + * If remote filter are annotated with {@link IgniteAsyncCallback} then it is executed in async callback + * pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations. * * @param rmtFilterFactory Key-value filter factory. * @return {@code this} for chaining. + * @see IgniteAsyncCallback + * @see IgniteConfiguration#getAsyncCallbackPoolSize() */ public ContinuousQuery<K, V> setRemoteFilterFactory( Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory) { http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index ca112c2..c7ebbb6 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -43,6 +43,7 @@ import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lifecycle.LifecycleBean; @@ -223,6 +224,9 @@ public class IgniteConfiguration { /** Public pool size. */ private int pubPoolSize = DFLT_PUBLIC_THREAD_CNT; + /** Async Callback pool size. */ + private int callbackPoolSize = DFLT_PUBLIC_THREAD_CNT; + /** System pool size. */ private int sysPoolSize = DFLT_SYSTEM_CORE_THREAD_CNT; @@ -723,6 +727,20 @@ public class IgniteConfiguration { } /** + * Size of thread pool that is in charge of processing asynchronous callbacks. + * <p> + * This pool is used for callbacks annotated with {@link IgniteAsyncCallback}. + * <p> + * If not provided, executor service will have size {@link #DFLT_PUBLIC_THREAD_CNT}. + * + * @return Thread pool size to be used. + * @see IgniteAsyncCallback + */ + public int getAsyncCallbackPoolSize() { + return callbackPoolSize; + } + + /** * Size of thread pool that is in charge of processing internal and Visor * {@link ComputeJob GridJobs}. * <p> @@ -831,6 +849,20 @@ public class IgniteConfiguration { } /** + * Sets async callback thread pool size to use within grid. + * + * @param poolSize Thread pool size to use within grid. + * @return {@code this} for chaining. + * @see IgniteConfiguration#getAsyncCallbackPoolSize() + * @see IgniteAsyncCallback + */ + public IgniteConfiguration setAsyncCallbackPoolSize(int poolSize) { + this.callbackPoolSize = poolSize; + + return this; + } + + /** * Sets management thread pool size to use within grid. * * @param poolSize Thread pool size to use within grid. http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 77c8794..f51727d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -67,6 +67,7 @@ import org.apache.ignite.internal.util.IgniteExceptionRegistry; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.plugin.PluginNotFoundException; import org.apache.ignite.plugin.PluginProvider; +import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; /** * @@ -298,6 +299,13 @@ public interface GridKernalContext extends Iterable<GridComponent> { public ExecutorService marshallerCachePool(); /** + * Gets async callback pool. + * + * @return Async callback pool. + */ + public IgniteStripedThreadPoolExecutor asyncCallbackPool(); + + /** * Gets cache object processor. * * @return Cache object processor. http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 753dbe8..79d67df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -89,6 +89,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.PluginNotFoundException; import org.apache.ignite.plugin.PluginProvider; +import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_DAEMON; @@ -305,6 +306,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringExclude + protected IgniteStripedThreadPoolExecutor callbackExecSvc; + + /** */ + @GridToStringExclude private Map<String, Object> attrs = new HashMap<>(); /** */ @@ -379,6 +384,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable ExecutorService mgmtExecSvc, ExecutorService igfsExecSvc, ExecutorService restExecSvc, + IgniteStripedThreadPoolExecutor callbackExecSvc, List<PluginProvider> plugins) throws IgniteCheckedException { assert grid != null; assert cfg != null; @@ -395,6 +401,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable this.mgmtExecSvc = mgmtExecSvc; this.igfsExecSvc = igfsExecSvc; this.restExecSvc = restExecSvc; + this.callbackExecSvc = callbackExecSvc; marshCtx = new MarshallerContextImpl(plugins); @@ -746,6 +753,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public IgniteStripedThreadPoolExecutor asyncCallbackPool() { + return callbackExecSvc; + } + + /** {@inheritDoc} */ @Override public IgniteCacheObjectProcessor cacheObjects() { return cacheObjProc; } http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 20795fc..d6655d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -167,6 +167,7 @@ import org.apache.ignite.plugin.PluginProvider; import org.apache.ignite.spi.IgniteSpi; import org.apache.ignite.spi.IgniteSpiVersionCheckException; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; +import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONFIG_URL; @@ -667,6 +668,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ExecutorService mgmtExecSvc, ExecutorService igfsExecSvc, ExecutorService restExecSvc, + IgniteStripedThreadPoolExecutor callbackExecSvc, GridAbsClosure errHnd) throws IgniteCheckedException { @@ -771,6 +773,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { mgmtExecSvc, igfsExecSvc, restExecSvc, + callbackExecSvc, plugins); cfg.getMarshaller().setContext(ctx.marshallerContext()); http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index c46a05c..7776687 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -95,6 +95,7 @@ import org.apache.ignite.spi.indexing.noop.NoopIndexingSpi; import org.apache.ignite.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi; import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi; import org.apache.ignite.spi.swapspace.noop.NoopSwapSpaceSpi; +import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; import org.apache.ignite.thread.IgniteThread; import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.jetbrains.annotations.Nullable; @@ -1476,6 +1477,9 @@ public class IgnitionEx { /** Marshaller cache executor service. */ private ExecutorService marshCacheExecSvc; + /** Continuous query executor service. */ + private IgniteStripedThreadPoolExecutor callbackExecSvc; + /** Grid state. */ private volatile IgniteState state = STOPPED; @@ -1688,6 +1692,12 @@ public class IgnitionEx { 0, new LinkedBlockingQueue<Runnable>()); + // Note that we do not pre-start threads here as this pool may not be needed. + callbackExecSvc = new IgniteStripedThreadPoolExecutor( + cfg.getAsyncCallbackPoolSize(), + cfg.getGridName(), + "callback"); + if (myCfg.getConnectorConfiguration() != null) { restExecSvc = new IgniteThreadPoolExecutor( "rest", @@ -1727,7 +1737,7 @@ public class IgnitionEx { grid = grid0; grid0.start(myCfg, utilityCacheExecSvc, marshCacheExecSvc, execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc, - igfsExecSvc, restExecSvc, + igfsExecSvc, restExecSvc, callbackExecSvc, new CA() { @Override public void apply() { startLatch.countDown(); @@ -2335,6 +2345,10 @@ public class IgnitionEx { U.shutdownNow(getClass(), marshCacheExecSvc, log); marshCacheExecSvc = null; + + U.shutdownNow(getClass(), callbackExecSvc, log); + + callbackExecSvc = null; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index d6d7335..e679dfd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -27,6 +27,7 @@ import org.apache.ignite.cache.eviction.EvictableEntry; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -462,6 +463,7 @@ public interface GridCacheEntryEx { * @param subjId Subject ID initiated this update. * @param taskName Task name. * @param updateCntr Update counter. + * @param fut Dht atomic future. * @return Tuple where first value is flag showing whether operation succeeded, * second value is old entry value if return value is requested, third is updated entry value, * fourth is the version to enqueue for deferred delete the fifth is DR conflict context @@ -497,7 +499,8 @@ public interface GridCacheEntryEx { @Nullable UUID subjId, String taskName, @Nullable CacheObject prevVal, - @Nullable Long updateCntr + @Nullable Long updateCntr, + @Nullable GridDhtAtomicUpdateFuture fut ) throws IgniteCheckedException, GridCacheEntryRemovedException; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 735e20a..75d96d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -40,6 +40,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras; import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras; @@ -1248,6 +1249,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme tx.local(), false, updateCntr0, + null, topVer); } @@ -1445,6 +1447,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme tx.local(), false, updateCntr0, + null, topVer); } @@ -1821,6 +1824,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme true, false, updateCntr, + null, AffinityTopologyVersion.NONE); } @@ -1870,7 +1874,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable final UUID subjId, final String taskName, @Nullable final CacheObject prevVal, - @Nullable final Long updateCntr + @Nullable final Long updateCntr, + @Nullable GridDhtAtomicUpdateFuture fut ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException { assert cctx.atomic(); @@ -1899,7 +1904,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme Long updateCntr0 = null; synchronized (this) { - boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter); + boolean internal = isInternal() || !context().userCache(); + + Map<UUID, CacheContinuousQueryListener> lsnrs = cctx.continuousQueries().updateListeners(internal, false); + + boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM + || !F.isEmptyOrNulls(filter); checkObsolete(); @@ -2093,6 +2103,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme primary, false, updateCntr0, + null, topVer); } @@ -2501,13 +2512,42 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (res) updateMetrics(op, metrics); + // Continuous query filter should be perform under lock. + if (lsnrs != null) { + CacheObject evtVal = updated; + CacheObject evtOldVal = oldVal; + + if (isOffHeapValuesOnly()) { + evtVal = cctx.toCacheObject(cctx.unwrapTemporary(evtVal)); + + evtOldVal = cctx.toCacheObject(cctx.unwrapTemporary(evtOldVal)); + } + + cctx.continuousQueries().onEntryUpdated(lsnrs, key, evtVal, evtOldVal, internal, + partition(), primary, false, updateCntr0, fut, topVer); + } + cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary); if (intercept) { if (op == GridCacheOperation.UPDATE) - cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, updated, updated0, keepBinary, updateCntr0)); + cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry( + cctx, + key, + key0, + updated, + updated0, + keepBinary, + updateCntr0)); else - cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx, key, key0, oldVal, old0, keepBinary, updateCntr0)); + cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry( + cctx, + key, + key0, + oldVal, + old0, + keepBinary, + updateCntr0)); if (interceptRes != null) oldVal = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2())); @@ -3302,6 +3342,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme true, preload, updateCntr, + null, topVer); cctx.dataStructures().onEntryUpdated(key, false, true); http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 013184b..d28aaaa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -76,7 +76,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; -import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; @@ -2141,10 +2140,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean intercept = ctx.config().getInterceptor() != null; - boolean initLsnrs = false; - Map<UUID, CacheContinuousQueryListener> lsnrs = null; - boolean internal = false; - // Avoid iterator creation. for (int i = 0; i < keys.size(); i++) { KeyCacheObject k = keys.get(i); @@ -2159,14 +2154,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (entry == null) continue; - if (!initLsnrs) { - internal = entry.isInternal() || !context().userCache(); - - lsnrs = ctx.continuousQueries().updateListeners(internal, false); - - initLsnrs = true; - } - GridCacheVersion newConflictVer = req.conflictVersion(i); long newConflictTtl = req.conflictTtl(i); long newConflictExpireTime = req.conflictExpireTime(i); @@ -2195,7 +2182,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.invokeArguments(), primary && writeThrough() && !req.skipStore(), !req.skipStore(), - lsnrs != null || sndPrevVal || req.returnValue(), + sndPrevVal || req.returnValue(), req.keepBinary(), expiry, true, @@ -2213,7 +2200,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.subjectId(), taskName, null, - null); + null, + dhtFut); if (dhtFut == null && !F.isEmpty(filteredReaders)) { dhtFut = createDhtFuture(ver, req, res, completionCb, true); @@ -2222,8 +2210,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (dhtFut != null) { - dhtFut.listeners(lsnrs); - if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios. GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult(); @@ -2260,19 +2246,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']'); } } - else if (lsnrs != null && updRes.updateCounter() != 0) { - ctx.continuousQueries().onEntryUpdated( - lsnrs, - entry.key(), - updRes.newValue(), - updRes.oldValue(), - internal, - entry.partition(), - primary, - false, - updRes.updateCounter(), - topVer); - } if (hasNear) { if (primary && updRes.sendToDht()) { @@ -2446,9 +2419,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean intercept = ctx.config().getInterceptor() != null; - boolean initLsnrs = false; - Map<UUID, CacheContinuousQueryListener> lsnrs = null; - // Avoid iterator creation. for (int i = 0; i < entries.size(); i++) { GridDhtCacheEntry entry = entries.get(i); @@ -2482,14 +2452,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id())); } - if (!initLsnrs) { - lsnrs = ctx.continuousQueries().updateListeners( - entry.isInternal() || !context().userCache(), - false); - - initLsnrs = true; - } - GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, node.id(), @@ -2499,7 +2461,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, /*write-through*/false, /*read-through*/false, - /*retval*/sndPrevVal || lsnrs != null, + /*retval*/sndPrevVal, req.keepBinary(), expiry, /*event*/true, @@ -2517,7 +2479,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.subjectId(), taskName, null, - null); + null, + dhtFut); assert !updRes.success() || updRes.newTtl() == CU.TTL_NOT_CHANGED || expiry != null : "success=" + updRes.success() + ", newTtl=" + updRes.newTtl() + ", expiry=" + expiry; @@ -2548,12 +2511,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (dhtFut != null) { - dhtFut.listeners(lsnrs); - EntryProcessor<Object, Object, Object> entryProcessor = entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()); - if (!batchRes.readersOnly()) + if (!batchRes.readersOnly()) { dhtFut.addWriteEntry(entry, writeVal, entryProcessor, @@ -2563,6 +2524,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { sndPrevVal, updRes.oldValue(), updRes.updateCounter()); + } if (!F.isEmpty(filteredReaders)) dhtFut.addNearWriteEntries(filteredReaders, @@ -2572,19 +2534,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE); } - else if (lsnrs != null && updRes.updateCounter() != 0) { - ctx.continuousQueries().onEntryUpdated( - lsnrs, - entry.key(), - updRes.newValue(), - updRes.oldValue(), - entry.isInternal() || !context().userCache(), - entry.partition(), - primary, - false, - updRes.updateCounter(), - topVer); - } if (hasNear) { if (primary) { @@ -2965,10 +2914,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); - boolean initLsnrs = false; - Map<UUID, CacheContinuousQueryListener> lsnrs = null; - boolean internal = false; - for (int i = 0; i < req.size(); i++) { KeyCacheObject key = req.key(i); @@ -2991,14 +2936,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { long ttl = req.ttl(i); long expireTime = req.conflictExpireTime(i); - if (!initLsnrs) { - internal = entry.isInternal() || !context().userCache(); - - lsnrs = ctx.continuousQueries().updateListeners(internal, false); - - initLsnrs = true; - } - GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, nodeId, @@ -3008,7 +2945,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { op == TRANSFORM ? req.invokeArguments() : null, /*write-through*/false, /*read-through*/false, - /*retval*/lsnrs != null, + /*retval*/false, req.keepBinary(), /*expiry policy*/null, /*event*/true, @@ -3026,25 +2963,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.subjectId(), taskName, prevVal, - updateIdx); + updateIdx, + null); if (updRes.removeVersion() != null) ctx.onDeferredDelete(entry, updRes.removeVersion()); - if (lsnrs != null && updRes.updateCounter() != 0) { - ctx.continuousQueries().onEntryUpdated( - lsnrs, - entry.key(), - updRes.newValue(), - updRes.oldValue(), - internal, - entry.partition(), - false, - false, - updRes.updateCounter(), - req.topologyVersion()); - } - entry.onUnlock(); break; // While. http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 4721d6e..5760596 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -37,11 +37,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; -import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; @@ -97,15 +97,15 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** Future keys. */ private final Collection<KeyCacheObject> keys; + /** Continuous query closures. */ + private Collection<CI1<Boolean>> cntQryClsrs; + /** */ private final boolean waitForExchange; /** Response count. */ private volatile int resCnt; - /** */ - private Map<UUID, CacheContinuousQueryListener> lsnrs; - /** * @param cctx Cache context. * @param completionCb Callback to invoke when future is completed. @@ -138,13 +138,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest())); } - /** - * @param lsnrs Continuous query listeners. - */ - void listeners(@Nullable Map<UUID, CacheContinuousQueryListener> lsnrs) { - this.lsnrs = lsnrs; - } - /** {@inheritDoc} */ @Override public IgniteUuid futureId() { return futVer.asGridUuid(); @@ -276,27 +269,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> addPrevVal, entry.partition(), prevVal, - updateCntr, - lsnrs != null); - } - else if (lsnrs != null && dhtNodes.size() == 1) { - try { - cctx.continuousQueries().onEntryUpdated( - lsnrs, - entry.key(), - val, - prevVal, - entry.key().internal() || !cctx.userCache(), - entry.partition(), - true, - false, - updateCntr, - updateReq.topologyVersion()); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to send continuous query message. [key=" + entry.key() + ", newVal=" - + val + ", err=" + e + "]"); - } + updateCntr); } } } @@ -361,77 +334,33 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> } } + /** + * @param clsr Continuous query closure. + */ + public void addContinuousQueryClosure(CI1<Boolean> clsr){ + assert !isDone() : this; + + if (cntQryClsrs == null) + cntQryClsrs = new ArrayList<>(10); + + cntQryClsrs.add(clsr); + } + /** {@inheritDoc} */ @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) { if (super.onDone(res, err)) { cctx.mvcc().removeAtomicFuture(version()); - if (err != null) { - if (!mappings.isEmpty() && lsnrs != null) { - Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size()); - - exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) { - for (int i = 0; i < req.size(); i++) { - KeyCacheObject key = req.key(i); - - if (!hndKeys.contains(key)) { - updateRes.addFailedKey(key, err); + boolean suc = err == null; - cctx.continuousQueries().skipUpdateEvent( - lsnrs, - key, - req.partitionId(i), - req.updateCounter(i), - updateReq.topologyVersion()); - - hndKeys.add(key); - - if (hndKeys.size() == keys.size()) - break exit; - } - } - } - } - else - for (KeyCacheObject key : keys) - updateRes.addFailedKey(key, err); + if (!suc) { + for (KeyCacheObject key : keys) + updateRes.addFailedKey(key, err); } - else { - if (lsnrs != null) { - Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size()); - - exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) { - for (int i = 0; i < req.size(); i++) { - KeyCacheObject key = req.key(i); - - if (!hndKeys.contains(key)) { - try { - cctx.continuousQueries().onEntryUpdated( - lsnrs, - key, - req.value(i), - req.localPreviousValue(i), - key.internal() || !cctx.userCache(), - req.partitionId(i), - true, - false, - req.updateCounter(i), - updateReq.topologyVersion()); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to send continuous query message. [key=" + key + - ", newVal=" + req.value(i) + - ", err=" + e + "]"); - } - - hndKeys.add(key); - - if (hndKeys.size() == keys.size()) - break exit; - } - } - } - } + + if (cntQryClsrs != null) { + for (CI1<Boolean> clsr : cntQryClsrs) + clsr.apply(suc); } if (updateReq.writeSynchronizationMode() == FULL_SYNC) http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index c8e33c2..b5e2835 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -155,10 +155,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid @GridDirectTransient private List<Integer> partIds; - /** */ - @GridDirectTransient - private List<CacheObject> locPrevVals; - /** Keep binary flag. */ private boolean keepBinary; @@ -242,7 +238,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid * @param partId Partition. * @param prevVal Previous value. * @param updateCntr Update counter. - * @param storeLocPrevVal If {@code true} stores previous value. */ public void addWriteValue(KeyCacheObject key, @Nullable CacheObject val, @@ -253,19 +248,12 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid boolean addPrevVal, int partId, @Nullable CacheObject prevVal, - @Nullable Long updateCntr, - boolean storeLocPrevVal) { + @Nullable Long updateCntr + ) { keys.add(key); partIds.add(partId); - if (storeLocPrevVal) { - if (locPrevVals == null) - locPrevVals = new ArrayList<>(); - - locPrevVals.add(prevVal); - } - if (forceTransformBackups) { assert entryProcessor != null; @@ -526,16 +514,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** * @param idx Key index. - * @return Value. - */ - @Nullable public CacheObject localPreviousValue(int idx) { - assert locPrevVals != null; - - return locPrevVals.get(idx); - } - - /** - * @param idx Key index. * @return Entry processor. */ @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) { @@ -1069,13 +1047,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid private void cleanup() { nearVals = null; prevVals = null; - - // Do not keep values if they are not needed for continuous query notification. - if (locPrevVals == null) { - keys = null; - vals = null; - locPrevVals = null; - } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index a7481d3..3e0e392 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -37,7 +37,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheOperation; -import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; @@ -271,6 +270,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { subjId, taskName, null, + null, null); if (updRes.removeVersion() != null) @@ -372,6 +372,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { req.subjectId(), taskName, null, + null, null); if (updRes.removeVersion() != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java index eab5dbd..db70e2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java @@ -56,6 +56,13 @@ class CacheContinuousQueryEvent<K, V> extends CacheQueryEntryEvent<K, V> { return e; } + /** + * @return Partition ID. + */ + public int partitionId() { + return e.partition(); + } + /** {@inheritDoc} */ @Override public K getKey() { return (K)cctx.cacheObjectContext().unwrapBinaryIfNeeded(e.key(), e.isKeepBinary(), false); http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 16513b0..9ae2972 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -48,6 +48,7 @@ import org.apache.ignite.events.CacheQueryExecutedEvent; import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteDeploymentCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.deployment.GridDeployment; @@ -58,7 +59,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter; import org.apache.ignite.internal.processors.continuous.GridContinuousBatch; import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; @@ -66,13 +70,14 @@ import org.apache.ignite.internal.processors.platform.cache.query.PlatformContin import org.apache.ignite.internal.util.GridConcurrentSkipListSet; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.C1; +import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentLinkedDeque8; @@ -159,6 +164,21 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** */ private transient boolean ignoreClsNotFound; + /** */ + private transient boolean asyncCallback; + + /** */ + private transient UUID nodeId; + + /** */ + private transient UUID routineId; + + /** */ + private transient GridKernalContext ctx; + + /** */ + private transient IgniteLogger log; + /** * Required by {@link Externalizable}. */ @@ -283,13 +303,36 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler assert routineId != null; assert ctx != null; - if (locLsnr != null) - ctx.resource().injectGeneric(locLsnr); + if (locLsnr != null) { + if (locLsnr instanceof JCacheQueryLocalListener) { + ctx.resource().injectGeneric(((JCacheQueryLocalListener)locLsnr).impl); + + asyncCallback = ((JCacheQueryLocalListener)locLsnr).async(); + } + else { + ctx.resource().injectGeneric(locLsnr); + + asyncCallback = U.hasAnnotation(locLsnr, IgniteAsyncCallback.class); + } + } final CacheEntryEventFilter filter = getEventFilter(); - if (filter != null) - ctx.resource().injectGeneric(filter); + if (filter != null) { + if (filter instanceof JCacheQueryRemoteFilter) { + if (((JCacheQueryRemoteFilter)filter).impl != null) + ctx.resource().injectGeneric(((JCacheQueryRemoteFilter)filter).impl); + + if (!asyncCallback) + asyncCallback = ((JCacheQueryRemoteFilter)filter).async(); + } + else { + ctx.resource().injectGeneric(filter); + + if (!asyncCallback) + asyncCallback = U.hasAnnotation(filter, IgniteAsyncCallback.class); + } + } entryBufs = new ConcurrentHashMap<>(); @@ -299,10 +342,18 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler rcvs = new ConcurrentHashMap<>(); + this.nodeId = nodeId; + + this.routineId = routineId; + + this.ctx = ctx; + final boolean loc = nodeId.equals(ctx.localNodeId()); assert !skipPrimaryCheck || loc; + log = ctx.log(CacheContinuousQueryHandler.class); + CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() { @Override public void onExecution() { if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { @@ -324,15 +375,16 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } } - /** {@inheritDoc} */ @Override public boolean keepBinary() { return keepBinary; } - @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, - boolean recordIgniteEvt) { + @Override public void onEntryUpdated(final CacheContinuousQueryEvent<K, V> evt, + boolean primary, + final boolean recordIgniteEvt, + GridDhtAtomicUpdateFuture fut) { if (ignoreExpired && evt.getEventType() == EventType.EXPIRED) - return; + return ; final GridCacheContext<K, V> cctx = cacheContext(ctx); @@ -343,93 +395,33 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler // skipPrimaryCheck is set only when listen locally for replicated cache events. assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId)); - boolean notify = !evt.entry().isFiltered(); + if (asyncCallback) { + ContinuousQueryAsyncClosure clsr = new ContinuousQueryAsyncClosure( + primary, + evt, + recordIgniteEvt, + fut); - if (notify && filter != null) { - try { - notify = filter.evaluate(evt); - } - catch (Exception e) { - U.error(cctx.logger(CacheContinuousQueryHandler.class), "CacheEntryEventFilter failed: " + e); - } + ctx.asyncCallbackPool().execute(clsr, evt.partitionId()); } - - try { - final CacheContinuousQueryEntry entry = evt.entry(); - - if (!notify) - entry.markFiltered(); + else { + final boolean notify = filter(evt, primary); if (primary || skipPrimaryCheck) { - if (loc) { - if (!locCache) { - Collection<CacheEntryEvent<? extends K, ? extends V>> entries = handleEvent(ctx, entry); - - if (!entries.isEmpty()) { - locLsnr.onUpdated(entries); - - if (!internal && !skipPrimaryCheck) - sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx); - } - } - else { - if (!entry.isFiltered()) - locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); - } - } + if (fut == null) + onEntryUpdate(evt, notify, loc, recordIgniteEvt); else { - if (!entry.isFiltered()) - prepareEntry(cctx, nodeId, entry); - - CacheContinuousQueryEntry e = handleEntry(entry); - - if (e != null) - ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true); - } - } - else { - if (!internal) { - // Skip init query and expire entries. - if (entry.updateCounter() != -1L) { - entry.markBackup(); + fut.addContinuousQueryClosure(new CI1<Boolean>() { + @Override public void apply(Boolean suc) { + if (!suc) + evt.entry().markFiltered(); - backupQueue.add(entry); - } + onEntryUpdate(evt, notify, loc, recordIgniteEvt); + } + }); } } } - catch (ClusterTopologyCheckedException ex) { - IgniteLogger log = ctx.log(getClass()); - - if (log.isDebugEnabled()) - log.debug("Failed to send event notification to node, node left cluster " + - "[node=" + nodeId + ", err=" + ex + ']'); - } - catch (IgniteCheckedException ex) { - U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); - } - - if (recordIgniteEvt && notify) { - ctx.event().record(new CacheQueryReadEvent<>( - ctx.discovery().localNode(), - "Continuous query executed.", - EVT_CACHE_QUERY_OBJECT_READ, - CacheQueryType.CONTINUOUS.name(), - cacheName, - null, - null, - null, - filter instanceof CacheEntryEventSerializableFilter ? - (CacheEntryEventSerializableFilter)filter : null, - null, - nodeId, - taskName(), - evt.getKey(), - evt.getValue(), - evt.getOldValue(), - null - )); - } } @Override public void onUnregister() { @@ -475,15 +467,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx); } - @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer, - boolean primary) { + @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, + AffinityTopologyVersion topVer, boolean primary) { assert evt != null; CacheContinuousQueryEntry e = evt.entry(); e.markFiltered(); - onEntryUpdated(evt, primary, false); + onEntryUpdated(evt, primary, false, null); } @Override public void onPartitionEvicted(int part) { @@ -580,17 +572,73 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, GridKernalContext ctx) { + @Override public void notifyCallback(final UUID nodeId, + final UUID routineId, + Collection<?> objs, + final GridKernalContext ctx) { assert nodeId != null; assert routineId != null; assert objs != null; assert ctx != null; - Collection<CacheContinuousQueryEntry> entries = (Collection<CacheContinuousQueryEntry>)objs; + final List<CacheContinuousQueryEntry> entries = (List<CacheContinuousQueryEntry>)objs; + + if (entries.isEmpty()) + return; + + if (asyncCallback) { + IgniteStripedThreadPoolExecutor asyncPool = ctx.asyncCallbackPool(); + + int threadId = asyncPool.threadId(entries.get(0).partition()); + + int startIdx = 0; + + if (entries.size() != 1) { + for (int i = 1; i < entries.size(); i++) { + int curThreadId = asyncPool.threadId(entries.get(i).partition()); + + // If all entries from one partition avoid creation new collections. + if (curThreadId == threadId) + continue; + + final int i0 = i; + final int startIdx0 = startIdx; + + asyncPool.execute(new Runnable() { + @Override public void run() { + notifyCallback0(nodeId, ctx, entries.subList(startIdx0, i0)); + } + }, threadId); + + startIdx = i0; + threadId = curThreadId; + } + } + + final int startIdx0 = startIdx; + + asyncPool.execute(new Runnable() { + @Override public void run() { + notifyCallback0(nodeId, ctx, + startIdx0 == 0 ? entries : entries.subList(startIdx0, entries.size())); + } + }, threadId); + } + else + notifyCallback0(nodeId, ctx, entries); + } + /** + * @param nodeId Node id. + * @param ctx Kernal context. + * @param entries Entries. + */ + private void notifyCallback0(UUID nodeId, + final GridKernalContext ctx, + Collection<CacheContinuousQueryEntry> entries) { final GridCacheContext cctx = cacheContext(ctx); - Collection<CacheEntryEvent<? extends K, ? extends V>> entries0 = new ArrayList<>(); + final Collection<CacheEntryEvent<? extends K, ? extends V>> entries0 = new ArrayList<>(entries.size()); for (CacheContinuousQueryEntry e : entries) { GridCacheDeploymentManager depMgr = cctx.deploy(); @@ -609,7 +657,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler try { e.unmarshal(cctx, ldr); - entries0.addAll(handleEvent(ctx, e)); + Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, e); + + if (evts != null && !evts.isEmpty()) + entries0.addAll(evts); } catch (IgniteCheckedException ex) { if (ignoreClsNotFound) @@ -640,8 +691,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler if (e.isFiltered()) return Collections.emptyList(); else - return F.<CacheEntryEvent<? extends K, ? extends V>>asList( - new CacheContinuousQueryEvent<K, V>(cache, cctx, e)); + return F.<CacheEntryEvent<? extends K, ? extends V>> + asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, e)); } // Initial query entry or evicted entry. These events should be fired immediately. @@ -653,7 +704,117 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition()); - return rec.collectEntries(cctx, cache, e); + return rec.collectEntries(e, cctx, cache); + } + + /** + * @param primary Primary. + * @param evt Query event. + * @return {@code True} if event passed filter otherwise {@code true}. + */ + public boolean filter(CacheContinuousQueryEvent evt, boolean primary) { + CacheContinuousQueryEntry entry = evt.entry(); + + boolean notify = !entry.isFiltered(); + + try { + if (notify && getEventFilter() != null) + notify = getEventFilter().evaluate(evt); + } + catch (Exception e) { + U.error(log, "CacheEntryEventFilter failed: " + e); + } + + if (!notify) + entry.markFiltered(); + + if (!primary && !internal && entry.updateCounter() != -1L /* Skip init query and expire entries */) { + entry.markBackup(); + + backupQueue.add(entry); + } + + return notify; + } + + /** + * @param evt Continuous query event. + * @param notify Notify flag. + * @param loc Listener deployed on this node. + * @param recordIgniteEvt Record ignite event. + */ + private void onEntryUpdate(CacheContinuousQueryEvent evt, boolean notify, boolean loc, boolean recordIgniteEvt) { + try { + GridCacheContext<K, V> cctx = cacheContext(ctx); + + if (cctx == null) + return; + + final CacheContinuousQueryEntry entry = evt.entry(); + + if (loc) { + if (!locCache) { + Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, entry); + + if (!evts.isEmpty()) { + locLsnr.onUpdated(evts); + + if (!internal && !skipPrimaryCheck) + sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx); + } + } + else { + if (!entry.isFiltered()) + locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); + } + } + else { + if (!entry.isFiltered()) + prepareEntry(cctx, nodeId, entry); + + CacheContinuousQueryEntry e = handleEntry(entry); + + if (e != null) + ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true); + } + } + catch (ClusterTopologyCheckedException ex) { + if (log.isDebugEnabled()) + log.debug("Failed to send event notification to node, node left cluster " + + "[node=" + nodeId + ", err=" + ex + ']'); + } + catch (IgniteCheckedException ex) { + U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); + } + + if (recordIgniteEvt && notify) { + ctx.event().record(new CacheQueryReadEvent<>( + ctx.discovery().localNode(), + "Continuous query executed.", + EVT_CACHE_QUERY_OBJECT_READ, + CacheQueryType.CONTINUOUS.name(), + cacheName, + null, + null, + null, + getEventFilter() instanceof CacheEntryEventSerializableFilter ? + (CacheEntryEventSerializableFilter)getEventFilter() : null, + null, + nodeId, + taskName(), + evt.getKey(), + evt.getValue(), + evt.getOldValue(), + null + )); + } + } + + /** + * @return Task name. + */ + private String taskName() { + return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null; } /** @@ -781,9 +942,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler * @param entry Cache continuous query entry. * @return Collection entries which will be fired. This collection should contains only non-filtered events. */ - public <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries(GridCacheContext cctx, - IgniteCache cache, - CacheContinuousQueryEntry entry) { + <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries( + CacheContinuousQueryEntry entry, + GridCacheContext cctx, + IgniteCache cache + ) { assert entry != null; if (entry.topologyVersion() == null) { // Possible if entry is sent from old node. @@ -1241,6 +1404,87 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } /** + * + */ + private class ContinuousQueryAsyncClosure implements Runnable { + /** */ + private final CacheContinuousQueryEvent<K, V> evt; + + /** */ + private final boolean primary; + + /** */ + private final boolean recordIgniteEvt; + + /** */ + private final IgniteInternalFuture<?> fut; + + /** + * @param primary Primary flag. + * @param evt Event. + * @param recordIgniteEvt Fired event. + * @param fut Dht future. + */ + ContinuousQueryAsyncClosure( + boolean primary, + CacheContinuousQueryEvent<K, V> evt, + boolean recordIgniteEvt, + IgniteInternalFuture<?> fut) { + this.primary = primary; + this.evt = evt; + this.recordIgniteEvt = recordIgniteEvt; + this.fut = fut; + } + + /** {@inheritDoc} */ + @Override public void run() { + final boolean notify = filter(evt, primary); + + if (!primary()) + return; + + if (fut == null) { + onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); + + return; + } + + if (fut.isDone()) { + if (fut.error() != null) + evt.entry().markFiltered(); + + onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); + } + else { + fut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + if (f.error() != null) + evt.entry().markFiltered(); + + ctx.asyncCallbackPool().execute(new Runnable() { + @Override public void run() { + onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); + } + }, evt.entry().partition()); + } + }); + } + } + + /** + * @return {@code True} if event fired on this node. + */ + private boolean primary() { + return primary || skipPrimaryCheck; + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(ContinuousQueryAsyncClosure.class, this); + } + } + + /** * Deployable object. */ protected static class DeployableObject implements Externalizable { http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index 83ff32c..8eca81c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import java.util.Map; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture; +import org.jetbrains.annotations.Nullable; /** * Continuous query listener. @@ -36,8 +38,10 @@ public interface CacheContinuousQueryListener<K, V> { * @param evt Event * @param primary Primary flag. * @param recordIgniteEvt Whether to record event. + * @param fut Dht atomic future. */ - public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, boolean recordIgniteEvt); + public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, + boolean recordIgniteEvt, @Nullable GridDhtAtomicUpdateFuture fut); /** * Listener unregistered callback.
