IGNITE-2004 Fixed "Asynchronous execution of ContinuousQuery's remote filter & local list".
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9df46fc0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9df46fc0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9df46fc0 Branch: refs/heads/ignite-3163 Commit: 9df46fc0792a375c91e216a670edb3069e2f9b40 Parents: c88182c Author: nikolay_tikhonov <[email protected]> Authored: Fri Apr 22 19:02:26 2016 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Fri Apr 22 19:02:26 2016 +0300 ---------------------------------------------------------------------- .../CacheContinuousAsyncQueryExample.java | 138 +++ .../datagrid/CacheContinuousQueryExample.java | 13 +- .../ignite/cache/query/ContinuousQuery.java | 27 + .../configuration/IgniteConfiguration.java | 32 + .../ignite/internal/GridKernalContext.java | 8 + .../ignite/internal/GridKernalContextImpl.java | 12 + .../apache/ignite/internal/IgniteKernal.java | 3 + .../org/apache/ignite/internal/IgnitionEx.java | 16 +- .../processors/cache/GridCacheEntryEx.java | 5 +- .../processors/cache/GridCacheMapEntry.java | 31 +- .../dht/atomic/GridDhtAtomicCache.java | 98 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 121 +-- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 33 +- .../distributed/near/GridNearAtomicCache.java | 2 + .../continuous/CacheContinuousQueryEvent.java | 7 + .../continuous/CacheContinuousQueryHandler.java | 519 +++++++--- .../CacheContinuousQueryListener.java | 6 +- .../continuous/CacheContinuousQueryManager.java | 62 +- .../apache/ignite/lang/IgniteAsyncCallback.java | 111 +++ .../thread/IgniteStripedThreadPoolExecutor.java | 164 +-- .../processors/cache/GridCacheTestEntryEx.java | 4 +- ...FailoverAtomicPrimaryWriteOrderSelfTest.java | 50 + ...sQueryAsyncFailoverTxReplicatedSelfTest.java | 37 + ...eContinuousQueryAsyncFailoverTxSelfTest.java | 44 + ...eContinuousQueryAsyncFilterListenerTest.java | 986 +++++++++++++++++++ ...ryFactoryAsyncFilterRandomOperationTest.java | 131 +++ ...usQueryFactoryFilterRandomOperationTest.java | 725 ++++++++++++++ .../CacheContinuousQueryFactoryFilterTest.java | 714 -------------- ...ContinuousQueryFailoverAbstractSelfTest.java | 63 +- .../CacheContinuousQueryLostPartitionTest.java | 14 + ...ontinuousQueryOperationFromCallbackTest.java | 627 ++++++++++++ .../CacheContinuousQueryOrderingEventTest.java | 722 ++++++++++++++ ...acheContinuousQueryRandomOperationsTest.java | 23 + .../junits/GridTestKernalContext.java | 25 +- .../IgniteBinaryCacheQueryTestSuite.java | 1 - .../IgniteCacheQuerySelfTestSuite3.java | 14 +- .../IgniteCacheQuerySelfTestSuite4.java | 7 + .../cache/CacheEntryEventAsyncProbe.java | 61 ++ .../yardstick/cache/CacheEntryEventProbe.java | 33 +- 39 files changed, 4432 insertions(+), 1257 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java new file mode 100644 index 0000000..4ac7ecb --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.datagrid; + +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.examples.ExampleNodeStartup; +import org.apache.ignite.lang.IgniteAsyncCallback; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.resources.IgniteInstanceResource; + +/** + * This examples demonstrates asynchronous continuous query API. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will + * start node with {@code examples/config/example-ignite.xml} configuration. + */ +public class CacheContinuousAsyncQueryExample { + /** Cache name. */ + private static final String CACHE_NAME = CacheContinuousAsyncQueryExample.class.getSimpleName(); + + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws Exception If example execution failed. + */ + public static void main(String[] args) throws Exception { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println(">>> Cache continuous query example started."); + + // Auto-close cache at the end of the example. + try (IgniteCache<Integer, String> cache = ignite.getOrCreateCache(CACHE_NAME)) { + int keyCnt = 20; + + // These entries will be queried by initial predicate. + for (int i = 0; i < keyCnt; i++) + cache.put(i, Integer.toString(i)); + + // Create new continuous query. + ContinuousQuery<Integer, String> qry = new ContinuousQuery<>(); + + qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Integer, String>() { + @Override public boolean apply(Integer key, String val) { + return key > 10; + } + })); + + // Callback that is called locally when update notifications are received. + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends String> e : evts) + System.out.println("Updated entry [key=" + e.getKey() + ", val=" + e.getValue() + ']'); + } + }); + + // This filter will be evaluated remotely on all nodes. + // Entry that pass this filter will be sent to the caller. + qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter<Integer, String>>() { + @Override public CacheEntryEventFilter<Integer, String> create() { + return new CacheEntryFilter(); + } + }); + + // Execute query. + try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) { + // Iterate through existing data. + for (Cache.Entry<Integer, String> e : cur) + System.out.println("Queried existing entry [key=" + e.getKey() + ", val=" + e.getValue() + ']'); + + // Add a few more keys and watch more query notifications. + for (int i = 0; i < keyCnt; i++) + cache.put(i, Integer.toString(i)); + + // Wait for a while while callback is notified about remaining puts. + Thread.sleep(2000); + } + + // Iterate through entries which was updated from filter. + for (int i = 0; i < 10; i++) + System.out.println("Entry updated from filter [key=" + i + ", val=" + cache.get(i) + ']'); + } + finally { + // Distributed cache could be removed from cluster only by #destroyCache() call. + ignite.destroyCache(CACHE_NAME); + } + } + } + + /** + * Filter returns {@code true} for entries which have key bigger than 10. + */ + @IgniteAsyncCallback + private static class CacheEntryFilter implements CacheEntryEventFilter<Integer, String> { + /** Ignite instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) + throws CacheEntryListenerException { + // This cache operation is safe because filter has Ignite AsyncCallback annotation. + if (e.getKey() < 10 && String.valueOf(e.getKey()).equals(e.getValue())) + ignite.cache(CACHE_NAME).put(e.getKey(), e.getValue() + "_less_than_10"); + + return e.getKey() > 10; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java index 0ce6e89..fdfbc47 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java @@ -18,12 +18,13 @@ package org.apache.ignite.examples.datagrid; import javax.cache.Cache; +import javax.cache.configuration.Factory; import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryUpdatedListener; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.Ignition; -import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.ScanQuery; @@ -80,9 +81,13 @@ public class CacheContinuousQueryExample { // This filter will be evaluated remotely on all nodes. // Entry that pass this filter will be sent to the caller. - qry.setRemoteFilter(new CacheEntryEventSerializableFilter<Integer, String>() { - @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) { - return e.getKey() > 10; + qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter<Integer, String>>() { + @Override public CacheEntryEventFilter<Integer, String> create() { + return new CacheEntryEventFilter<Integer, String>() { + @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) { + return e.getKey() > 10; + } + }; } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java index 3ea8f93..bbfe8cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java @@ -23,6 +23,8 @@ import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryUpdatedListener; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteAsyncCallback; /** * API for configuring continuous cache queries. @@ -92,6 +94,16 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter; * Note that this works even if you didn't provide initial query. Cursor will * be empty in this case, but it will still unregister listeners when {@link QueryCursor#close()} * is called. + * <p> + * {@link IgniteAsyncCallback} annotation is supported for {@link CacheEntryEventFilter} + * (see {@link #setRemoteFilterFactory(Factory)}) and {@link CacheEntryUpdatedListener} + * (see {@link #setLocalListener(CacheEntryUpdatedListener)}). + * If filter and/or listener are annotated with {@link IgniteAsyncCallback} then annotated callback + * is executed in async callback pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) + * and notification order is kept the same as update order for given cache key. + * + * @see IgniteAsyncCallback + * @see IgniteConfiguration#getAsyncCallbackPoolSize() */ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { /** */ @@ -173,9 +185,14 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking (e.g., * synchronization or transactional cache operations), should be executed asynchronously without * blocking the thread that called the callback. Otherwise, you can get deadlocks. + * <p> + * If local listener are annotated with {@link IgniteAsyncCallback} then it is executed in async callback pool + * (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations. * * @param locLsnr Local callback. * @return {@code this} for chaining. + * @see IgniteAsyncCallback + * @see IgniteConfiguration#getAsyncCallbackPoolSize() */ public ContinuousQuery<K, V> setLocalListener(CacheEntryUpdatedListener<K, V> locLsnr) { this.locLsnr = locLsnr; @@ -198,11 +215,16 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking * (e.g., synchronization or transactional cache operations), should be executed asynchronously * without blocking the thread that called the filter. Otherwise, you can get deadlocks. + * <p> + * If remote filter are annotated with {@link IgniteAsyncCallback} then it is executed in async callback + * pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations. * * @param rmtFilter Key-value filter. * @return {@code this} for chaining. * * @deprecated Use {@link #setRemoteFilterFactory(Factory)} instead. + * @see IgniteAsyncCallback + * @see IgniteConfiguration#getAsyncCallbackPoolSize() */ @Deprecated public ContinuousQuery<K, V> setRemoteFilter(CacheEntryEventSerializableFilter<K, V> rmtFilter) { @@ -227,9 +249,14 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking * (e.g., synchronization or transactional cache operations), should be executed asynchronously * without blocking the thread that called the filter. Otherwise, you can get deadlocks. + * <p> + * If remote filter are annotated with {@link IgniteAsyncCallback} then it is executed in async callback + * pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations. * * @param rmtFilterFactory Key-value filter factory. * @return {@code this} for chaining. + * @see IgniteAsyncCallback + * @see IgniteConfiguration#getAsyncCallbackPoolSize() */ public ContinuousQuery<K, V> setRemoteFilterFactory( Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory) { http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index f705638..0aac562 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -41,6 +41,7 @@ import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lifecycle.LifecycleBean; @@ -218,6 +219,9 @@ public class IgniteConfiguration { /** Public pool size. */ private int pubPoolSize = DFLT_PUBLIC_THREAD_CNT; + /** Async Callback pool size. */ + private int callbackPoolSize = DFLT_PUBLIC_THREAD_CNT; + /** System pool size. */ private int sysPoolSize = DFLT_SYSTEM_CORE_THREAD_CNT; @@ -710,6 +714,20 @@ public class IgniteConfiguration { } /** + * Size of thread pool that is in charge of processing asynchronous callbacks. + * <p> + * This pool is used for callbacks annotated with {@link IgniteAsyncCallback}. + * <p> + * If not provided, executor service will have size {@link #DFLT_PUBLIC_THREAD_CNT}. + * + * @return Thread pool size to be used. + * @see IgniteAsyncCallback + */ + public int getAsyncCallbackPoolSize() { + return callbackPoolSize; + } + + /** * Size of thread pool that is in charge of processing internal and Visor * {@link ComputeJob GridJobs}. * <p> @@ -818,6 +836,20 @@ public class IgniteConfiguration { } /** + * Sets async callback thread pool size to use within grid. + * + * @param poolSize Thread pool size to use within grid. + * @return {@code this} for chaining. + * @see IgniteConfiguration#getAsyncCallbackPoolSize() + * @see IgniteAsyncCallback + */ + public IgniteConfiguration setAsyncCallbackPoolSize(int poolSize) { + this.callbackPoolSize = poolSize; + + return this; + } + + /** * Sets management thread pool size to use within grid. * * @param poolSize Thread pool size to use within grid. http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index c0b50a2..d650c97 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -66,6 +66,7 @@ import org.apache.ignite.internal.util.IgniteExceptionRegistry; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.plugin.PluginNotFoundException; import org.apache.ignite.plugin.PluginProvider; +import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; /** * @@ -297,6 +298,13 @@ public interface GridKernalContext extends Iterable<GridComponent> { public ExecutorService marshallerCachePool(); /** + * Gets async callback pool. + * + * @return Async callback pool. + */ + public IgniteStripedThreadPoolExecutor asyncCallbackPool(); + + /** * Gets cache object processor. * * @return Cache object processor. http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 22fd96c..ebc2688 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -88,6 +88,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.PluginNotFoundException; import org.apache.ignite.plugin.PluginProvider; +import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_DAEMON; @@ -300,6 +301,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringExclude + protected IgniteStripedThreadPoolExecutor callbackExecSvc; + + /** */ + @GridToStringExclude private Map<String, Object> attrs = new HashMap<>(); /** */ @@ -374,6 +379,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable ExecutorService mgmtExecSvc, ExecutorService igfsExecSvc, ExecutorService restExecSvc, + IgniteStripedThreadPoolExecutor callbackExecSvc, List<PluginProvider> plugins) throws IgniteCheckedException { assert grid != null; assert cfg != null; @@ -390,6 +396,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable this.mgmtExecSvc = mgmtExecSvc; this.igfsExecSvc = igfsExecSvc; this.restExecSvc = restExecSvc; + this.callbackExecSvc = callbackExecSvc; marshCtx = new MarshallerContextImpl(plugins); @@ -739,6 +746,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public IgniteStripedThreadPoolExecutor asyncCallbackPool() { + return callbackExecSvc; + } + + /** {@inheritDoc} */ @Override public IgniteCacheObjectProcessor cacheObjects() { return cacheObjProc; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 89992cf..16df367 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -166,6 +166,7 @@ import org.apache.ignite.plugin.PluginProvider; import org.apache.ignite.spi.IgniteSpi; import org.apache.ignite.spi.IgniteSpiVersionCheckException; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; +import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONFIG_URL; @@ -657,6 +658,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ExecutorService mgmtExecSvc, ExecutorService igfsExecSvc, ExecutorService restExecSvc, + IgniteStripedThreadPoolExecutor callbackExecSvc, GridAbsClosure errHnd) throws IgniteCheckedException { @@ -761,6 +763,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { mgmtExecSvc, igfsExecSvc, restExecSvc, + callbackExecSvc, plugins); cfg.getMarshaller().setContext(ctx.marshallerContext()); http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 5d74a6d..e2c4751 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -95,6 +95,7 @@ import org.apache.ignite.spi.indexing.noop.NoopIndexingSpi; import org.apache.ignite.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi; import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi; import org.apache.ignite.spi.swapspace.noop.NoopSwapSpaceSpi; +import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; import org.apache.ignite.thread.IgniteThread; import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.jetbrains.annotations.Nullable; @@ -1436,6 +1437,9 @@ public class IgnitionEx { /** Marshaller cache executor service. */ private ExecutorService marshCacheExecSvc; + /** Continuous query executor service. */ + private IgniteStripedThreadPoolExecutor callbackExecSvc; + /** Grid state. */ private volatile IgniteState state = STOPPED; @@ -1648,6 +1652,12 @@ public class IgnitionEx { 0, new LinkedBlockingQueue<Runnable>()); + // Note that we do not pre-start threads here as this pool may not be needed. + callbackExecSvc = new IgniteStripedThreadPoolExecutor( + cfg.getAsyncCallbackPoolSize(), + cfg.getGridName(), + "callback"); + if (myCfg.getConnectorConfiguration() != null) { restExecSvc = new IgniteThreadPoolExecutor( "rest", @@ -1687,7 +1697,7 @@ public class IgnitionEx { grid = grid0; grid0.start(myCfg, utilityCacheExecSvc, marshCacheExecSvc, execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc, - igfsExecSvc, restExecSvc, + igfsExecSvc, restExecSvc, callbackExecSvc, new CA() { @Override public void apply() { startLatch.countDown(); @@ -2290,6 +2300,10 @@ public class IgnitionEx { U.shutdownNow(getClass(), marshCacheExecSvc, log); marshCacheExecSvc = null; + + U.shutdownNow(getClass(), callbackExecSvc, log); + + callbackExecSvc = null; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index dc8e08c..73a9dbf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -27,6 +27,7 @@ import org.apache.ignite.cache.eviction.EvictableEntry; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -462,6 +463,7 @@ public interface GridCacheEntryEx { * @param subjId Subject ID initiated this update. * @param taskName Task name. * @param updateCntr Update counter. + * @param fut Dht atomic future. * @return Tuple where first value is flag showing whether operation succeeded, * second value is old entry value if return value is requested, third is updated entry value, * fourth is the version to enqueue for deferred delete the fifth is DR conflict context @@ -497,7 +499,8 @@ public interface GridCacheEntryEx { @Nullable UUID subjId, String taskName, @Nullable CacheObject prevVal, - @Nullable Long updateCntr + @Nullable Long updateCntr, + @Nullable GridDhtAtomicUpdateFuture fut ) throws IgniteCheckedException, GridCacheEntryRemovedException; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 435d337..45be26c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -40,6 +40,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras; import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras; @@ -1248,6 +1249,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme tx.local(), false, updateCntr0, + null, topVer); } @@ -1445,6 +1447,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme tx.local(), false, updateCntr0, + null, topVer); } @@ -1819,6 +1822,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme true, false, updateCntr, + null, AffinityTopologyVersion.NONE); } @@ -1868,7 +1872,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable final UUID subjId, final String taskName, @Nullable final CacheObject prevVal, - @Nullable final Long updateCntr + @Nullable final Long updateCntr, + @Nullable GridDhtAtomicUpdateFuture fut ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException { assert cctx.atomic(); @@ -1897,7 +1902,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme Long updateCntr0 = null; synchronized (this) { - boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter); + boolean internal = isInternal() || !context().userCache(); + + Map<UUID, CacheContinuousQueryListener> lsnrs = cctx.continuousQueries().updateListeners(internal, false); + + boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM + || !F.isEmptyOrNulls(filter); checkObsolete(); @@ -2091,6 +2101,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme primary, false, updateCntr0, + null, topVer); } @@ -2499,6 +2510,21 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (res) updateMetrics(op, metrics); + // Continuous query filter should be perform under lock. + if (lsnrs != null) { + CacheObject evtVal = updated; + CacheObject evtOldVal = oldVal; + + if (isOffHeapValuesOnly()) { + evtVal = cctx.toCacheObject(cctx.unwrapTemporary(evtVal)); + + evtOldVal = cctx.toCacheObject(cctx.unwrapTemporary(evtOldVal)); + } + + cctx.continuousQueries().onEntryUpdated(lsnrs, key, evtVal, evtOldVal, internal, + partition(), primary, false, updateCntr0, fut, topVer); + } + cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary); if (intercept) { @@ -3334,6 +3360,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme true, preload, updateCntr, + null, topVer); cctx.dataStructures().onEntryUpdated(key, false, true); http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index cae13e8..25e4e3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -76,7 +76,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; -import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; @@ -2004,10 +2003,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean intercept = ctx.config().getInterceptor() != null; - boolean initLsnrs = false; - Map<UUID, CacheContinuousQueryListener> lsnrs = null; - boolean internal = false; - // Avoid iterator creation. for (int i = 0; i < keys.size(); i++) { KeyCacheObject k = keys.get(i); @@ -2022,14 +2017,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (entry == null) continue; - if (!initLsnrs) { - internal = entry.isInternal() || !context().userCache(); - - lsnrs = ctx.continuousQueries().updateListeners(internal, false); - - initLsnrs = true; - } - GridCacheVersion newConflictVer = req.conflictVersion(i); long newConflictTtl = req.conflictTtl(i); long newConflictExpireTime = req.conflictExpireTime(i); @@ -2058,7 +2045,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.invokeArguments(), primary && writeThrough() && !req.skipStore(), !req.skipStore(), - lsnrs != null || sndPrevVal || req.returnValue(), + sndPrevVal || req.returnValue(), req.keepBinary(), expiry, true, @@ -2076,7 +2063,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.subjectId(), taskName, null, - null); + null, + dhtFut); if (dhtFut == null && !F.isEmpty(filteredReaders)) { dhtFut = createDhtFuture(ver, req, res, completionCb, true); @@ -2085,8 +2073,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (dhtFut != null) { - dhtFut.listeners(lsnrs); - if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios. GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult(); @@ -2123,19 +2109,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']'); } } - else if (lsnrs != null && updRes.updateCounter() != 0) { - ctx.continuousQueries().onEntryUpdated( - lsnrs, - entry.key(), - updRes.newValue(), - updRes.oldValue(), - internal, - entry.partition(), - primary, - false, - updRes.updateCounter(), - topVer); - } if (hasNear) { if (primary && updRes.sendToDht()) { @@ -2309,9 +2282,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean intercept = ctx.config().getInterceptor() != null; - boolean initLsnrs = false; - Map<UUID, CacheContinuousQueryListener> lsnrs = null; - // Avoid iterator creation. for (int i = 0; i < entries.size(); i++) { GridDhtCacheEntry entry = entries.get(i); @@ -2345,14 +2315,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id())); } - if (!initLsnrs) { - lsnrs = ctx.continuousQueries().updateListeners( - entry.isInternal() || !context().userCache(), - false); - - initLsnrs = true; - } - GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, node.id(), @@ -2362,7 +2324,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, /*write-through*/false, /*read-through*/false, - /*retval*/sndPrevVal || lsnrs != null, + /*retval*/sndPrevVal, req.keepBinary(), expiry, /*event*/true, @@ -2380,7 +2342,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.subjectId(), taskName, null, - null); + null, + dhtFut); assert !updRes.success() || updRes.newTtl() == CU.TTL_NOT_CHANGED || expiry != null : "success=" + updRes.success() + ", newTtl=" + updRes.newTtl() + ", expiry=" + expiry; @@ -2411,12 +2374,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (dhtFut != null) { - dhtFut.listeners(lsnrs); - EntryProcessor<Object, Object, Object> entryProcessor = entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()); - if (!batchRes.readersOnly()) + if (!batchRes.readersOnly()) { dhtFut.addWriteEntry(entry, writeVal, entryProcessor, @@ -2426,6 +2387,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { sndPrevVal, updRes.oldValue(), updRes.updateCounter()); + } if (!F.isEmpty(filteredReaders)) dhtFut.addNearWriteEntries(filteredReaders, @@ -2435,19 +2397,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE); } - else if (lsnrs != null && updRes.updateCounter() != 0) { - ctx.continuousQueries().onEntryUpdated( - lsnrs, - entry.key(), - updRes.newValue(), - updRes.oldValue(), - entry.isInternal() || !context().userCache(), - entry.partition(), - primary, - false, - updRes.updateCounter(), - topVer); - } if (hasNear) { if (primary) { @@ -2823,10 +2772,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); - boolean initLsnrs = false; - Map<UUID, CacheContinuousQueryListener> lsnrs = null; - boolean internal = false; - for (int i = 0; i < req.size(); i++) { KeyCacheObject key = req.key(i); @@ -2849,14 +2794,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { long ttl = req.ttl(i); long expireTime = req.conflictExpireTime(i); - if (!initLsnrs) { - internal = entry.isInternal() || !context().userCache(); - - lsnrs = ctx.continuousQueries().updateListeners(internal, false); - - initLsnrs = true; - } - GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, nodeId, @@ -2866,7 +2803,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { op == TRANSFORM ? req.invokeArguments() : null, /*write-through*/false, /*read-through*/false, - /*retval*/lsnrs != null, + /*retval*/false, req.keepBinary(), /*expiry policy*/null, /*event*/true, @@ -2884,25 +2821,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.subjectId(), taskName, prevVal, - updateIdx); + updateIdx, + null); if (updRes.removeVersion() != null) ctx.onDeferredDelete(entry, updRes.removeVersion()); - if (lsnrs != null && updRes.updateCounter() != 0) { - ctx.continuousQueries().onEntryUpdated( - lsnrs, - entry.key(), - updRes.newValue(), - updRes.oldValue(), - internal, - entry.partition(), - false, - false, - updRes.updateCounter(), - req.topologyVersion()); - } - entry.onUnlock(); break; // While. http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 07f0e14..47f6cb2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -37,11 +37,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; -import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; @@ -97,15 +97,15 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** Future keys. */ private final Collection<KeyCacheObject> keys; + /** Continuous query closures. */ + private Collection<CI1<Boolean>> cntQryClsrs; + /** */ private final boolean waitForExchange; /** Response count. */ private volatile int resCnt; - /** */ - private Map<UUID, CacheContinuousQueryListener> lsnrs; - /** * @param cctx Cache context. * @param completionCb Callback to invoke when future is completed. @@ -140,13 +140,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> waitForExchange = !topLocked; } - /** - * @param lsnrs Continuous query listeners. - */ - void listeners(@Nullable Map<UUID, CacheContinuousQueryListener> lsnrs) { - this.lsnrs = lsnrs; - } - /** {@inheritDoc} */ @Override public IgniteUuid futureId() { return futVer.asGridUuid(); @@ -283,27 +276,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> addPrevVal, entry.partition(), prevVal, - updateCntr, - lsnrs != null); - } - else if (lsnrs != null && dhtNodes.size() == 1) { - try { - cctx.continuousQueries().onEntryUpdated( - lsnrs, - entry.key(), - val, - prevVal, - entry.key().internal() || !cctx.userCache(), - entry.partition(), - true, - false, - updateCntr, - updateReq.topologyVersion()); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to send continuous query message. [key=" + entry.key() + ", newVal=" - + val + ", err=" + e + "]"); - } + updateCntr); } } } @@ -368,77 +341,33 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> } } + /** + * @param clsr Continuous query closure. + */ + public void addContinuousQueryClosure(CI1<Boolean> clsr){ + assert !isDone() : this; + + if (cntQryClsrs == null) + cntQryClsrs = new ArrayList<>(10); + + cntQryClsrs.add(clsr); + } + /** {@inheritDoc} */ @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) { if (super.onDone(res, err)) { cctx.mvcc().removeAtomicFuture(version()); - if (err != null) { - if (!mappings.isEmpty() && lsnrs != null) { - Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size()); - - exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) { - for (int i = 0; i < req.size(); i++) { - KeyCacheObject key = req.key(i); - - if (!hndKeys.contains(key)) { - updateRes.addFailedKey(key, err); + boolean suc = err == null; - cctx.continuousQueries().skipUpdateEvent( - lsnrs, - key, - req.partitionId(i), - req.updateCounter(i), - updateReq.topologyVersion()); - - hndKeys.add(key); - - if (hndKeys.size() == keys.size()) - break exit; - } - } - } - } - else - for (KeyCacheObject key : keys) - updateRes.addFailedKey(key, err); + if (!suc) { + for (KeyCacheObject key : keys) + updateRes.addFailedKey(key, err); } - else { - if (lsnrs != null) { - Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size()); - - exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) { - for (int i = 0; i < req.size(); i++) { - KeyCacheObject key = req.key(i); - - if (!hndKeys.contains(key)) { - try { - cctx.continuousQueries().onEntryUpdated( - lsnrs, - key, - req.value(i), - req.localPreviousValue(i), - key.internal() || !cctx.userCache(), - req.partitionId(i), - true, - false, - req.updateCounter(i), - updateReq.topologyVersion()); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to send continuous query message. [key=" + key + - ", newVal=" + req.value(i) + - ", err=" + e + "]"); - } - - hndKeys.add(key); - - if (hndKeys.size() == keys.size()) - break exit; - } - } - } - } + + if (cntQryClsrs != null) { + for (CI1<Boolean> clsr : cntQryClsrs) + clsr.apply(suc); } if (updateReq.writeSynchronizationMode() == FULL_SYNC) http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 0f45a4b..126cd83 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -155,10 +155,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid @GridDirectTransient private List<Integer> partIds; - /** */ - @GridDirectTransient - private List<CacheObject> locPrevVals; - /** Keep binary flag. */ private boolean keepBinary; @@ -242,7 +238,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid * @param partId Partition. * @param prevVal Previous value. * @param updateCntr Update counter. - * @param storeLocPrevVal If {@code true} stores previous value. */ public void addWriteValue(KeyCacheObject key, @Nullable CacheObject val, @@ -253,19 +248,12 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid boolean addPrevVal, int partId, @Nullable CacheObject prevVal, - @Nullable Long updateCntr, - boolean storeLocPrevVal) { + @Nullable Long updateCntr + ) { keys.add(key); partIds.add(partId); - if (storeLocPrevVal) { - if (locPrevVals == null) - locPrevVals = new ArrayList<>(); - - locPrevVals.add(prevVal); - } - if (forceTransformBackups) { assert entryProcessor != null; @@ -526,16 +514,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** * @param idx Key index. - * @return Value. - */ - @Nullable public CacheObject localPreviousValue(int idx) { - assert locPrevVals != null; - - return locPrevVals.get(idx); - } - - /** - * @param idx Key index. * @return Entry processor. */ @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) { @@ -1064,13 +1042,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid private void cleanup() { nearVals = null; prevVals = null; - - // Do not keep values if they are not needed for continuous query notification. - if (locPrevVals == null) { - keys = null; - vals = null; - locPrevVals = null; - } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 28cfc15..d099613 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -271,6 +271,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { subjId, taskName, null, + null, null); if (updRes.removeVersion() != null) @@ -372,6 +373,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { req.subjectId(), taskName, null, + null, null); if (updRes.removeVersion() != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java index eab5dbd..db70e2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java @@ -56,6 +56,13 @@ class CacheContinuousQueryEvent<K, V> extends CacheQueryEntryEvent<K, V> { return e; } + /** + * @return Partition ID. + */ + public int partitionId() { + return e.partition(); + } + /** {@inheritDoc} */ @Override public K getKey() { return (K)cctx.cacheObjectContext().unwrapBinaryIfNeeded(e.key(), e.isKeepBinary(), false);
