Repository: ignite Updated Branches: refs/heads/ignite-2004 af7731ece -> 09afb1453
IGNITE-2004 Fixed review notes. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/09afb145 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/09afb145 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/09afb145 Branch: refs/heads/ignite-2004 Commit: 09afb1453a68daf97bf1029e7d838a6860721add Parents: af7731e Author: nikolay_tikhonov <[email protected]> Authored: Tue Apr 12 18:44:50 2016 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Tue Apr 12 18:45:28 2016 +0300 ---------------------------------------------------------------------- .../configuration/IgniteConfiguration.java | 16 ++++---- .../ignite/internal/GridKernalContext.java | 6 +-- .../ignite/internal/GridKernalContextImpl.java | 2 +- .../org/apache/ignite/internal/IgnitionEx.java | 12 +++--- .../processors/cache/GridCacheMapEntry.java | 11 +++--- .../dht/atomic/GridDhtAtomicCache.java | 41 ++------------------ .../continuous/CacheContinuousQueryHandler.java | 16 ++++---- .../apache/ignite/lang/IgniteAsyncCallback.java | 2 +- ...ontinuousQueryOperationFromCallbackTest.java | 18 +++------ 9 files changed, 41 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/09afb145/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 285a8a2..51abb78 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -223,8 +223,8 @@ public class IgniteConfiguration { /** Public pool size. */ private int pubPoolSize = DFLT_PUBLIC_THREAD_CNT; - /** Async continuous query pool size. */ - private int conQryPoolSize = DFLT_PUBLIC_THREAD_CNT; + /** Async Callback pool size. */ + private int callbackPoolSize = DFLT_PUBLIC_THREAD_CNT; /** System pool size. */ private int sysPoolSize = DFLT_SYSTEM_CORE_THREAD_CNT; @@ -732,8 +732,8 @@ public class IgniteConfiguration { * * @return Thread pool size to be used */ - public int getContinuousQueryPoolSize() { - return conQryPoolSize; + public int getAsyncCallbackPoolSize() { + return callbackPoolSize; } /** @@ -845,14 +845,14 @@ public class IgniteConfiguration { } /** - * Sets continuous query thread pool size to use within grid. + * Sets async callback thread pool size to use within grid. * * @param poolSize Thread pool size to use within grid. * @return {@code this} for chaining. - * @see IgniteConfiguration#getContinuousQueryPoolSize() + * @see IgniteConfiguration#getAsyncCallbackPoolSize() */ - public IgniteConfiguration setContinuousQueryPoolSize(int poolSize) { - this.conQryPoolSize = poolSize; + public IgniteConfiguration setAsyncCallbackPoolSize(int poolSize) { + this.callbackPoolSize = poolSize; return this; } http://git-wip-us.apache.org/repos/asf/ignite/blob/09afb145/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 7d23326..3eaef1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -299,11 +299,11 @@ public interface GridKernalContext extends Iterable<GridComponent> { public ExecutorService marshallerCachePool(); /** - * Gets continuous query pool. + * Gets async callback pool. * - * @return Continuous query pool. + * @return Async callback pool. */ - public IgniteStripedThreadPoolExecutor continuousQueryPool(); + public IgniteStripedThreadPoolExecutor asyncCallbackPool(); /** * Gets cache object processor. http://git-wip-us.apache.org/repos/asf/ignite/blob/09afb145/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index f1ce9fb..e6541eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -753,7 +753,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ - @Override public IgniteStripedThreadPoolExecutor continuousQueryPool() { + @Override public IgniteStripedThreadPoolExecutor asyncCallbackPool() { return conQryExecSvc; } http://git-wip-us.apache.org/repos/asf/ignite/blob/09afb145/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 0d29673..5d2a820 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -1437,7 +1437,7 @@ public class IgnitionEx { private ExecutorService marshCacheExecSvc; /** Continuous query executor service. */ - private IgniteStripedThreadPoolExecutor conQryExecSvc; + private IgniteStripedThreadPoolExecutor callbackExecSvc; /** Grid state. */ private volatile IgniteState state = STOPPED; @@ -1652,8 +1652,8 @@ public class IgnitionEx { new LinkedBlockingQueue<Runnable>()); // Note that we do not pre-start threads here as continuous query pool may not be needed. - conQryExecSvc = new IgniteStripedThreadPoolExecutor( - cfg.getContinuousQueryPoolSize(), + callbackExecSvc = new IgniteStripedThreadPoolExecutor( + cfg.getAsyncCallbackPoolSize(), 1, cfg.getGridName(), "contQry"); @@ -1697,7 +1697,7 @@ public class IgnitionEx { grid = grid0; grid0.start(myCfg, utilityCacheExecSvc, marshCacheExecSvc, execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc, - igfsExecSvc, restExecSvc, conQryExecSvc, + igfsExecSvc, restExecSvc, callbackExecSvc, new CA() { @Override public void apply() { startLatch.countDown(); @@ -2301,9 +2301,9 @@ public class IgnitionEx { marshCacheExecSvc = null; - U.shutdownNow(getClass(), conQryExecSvc, log); + U.shutdownNow(getClass(), callbackExecSvc, log); - conQryExecSvc = null; + callbackExecSvc = null; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/09afb145/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 955fc74..cbf8497 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1904,7 +1904,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme Long updateCntr0 = null; synchronized (this) { - boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter); + boolean internal = isInternal() || !context().userCache(); + + Map<UUID, CacheContinuousQueryListener> lsnrs = cctx.continuousQueries().updateListeners(internal, false); + + boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM + || !F.isEmptyOrNulls(filter); checkObsolete(); @@ -2507,10 +2512,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (res) updateMetrics(op, metrics); - boolean internal = isInternal() || !context().userCache(); - - Map<UUID, CacheContinuousQueryListener> lsnrs = cctx.continuousQueries().updateListeners(internal, false); - // Continuous query filter should be perform under lock. if (lsnrs != null) { CacheObject evtVal = updated; http://git-wip-us.apache.org/repos/asf/ignite/blob/09afb145/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index a6483c2..cc39dee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -2103,10 +2103,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean intercept = ctx.config().getInterceptor() != null; - boolean initLsnrs = false; - Map<UUID, CacheContinuousQueryListener> lsnrs = null; - boolean internal = false; - // Avoid iterator creation. for (int i = 0; i < keys.size(); i++) { KeyCacheObject k = keys.get(i); @@ -2121,14 +2117,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (entry == null) continue; - if (!initLsnrs) { - internal = entry.isInternal() || !context().userCache(); - - lsnrs = ctx.continuousQueries().updateListeners(internal, false); - - initLsnrs = true; - } - GridCacheVersion newConflictVer = req.conflictVersion(i); long newConflictTtl = req.conflictTtl(i); long newConflictExpireTime = req.conflictExpireTime(i); @@ -2157,7 +2145,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.invokeArguments(), primary && writeThrough() && !req.skipStore(), !req.skipStore(), - lsnrs != null || sndPrevVal || req.returnValue(), + sndPrevVal || req.returnValue(), req.keepBinary(), expiry, true, @@ -2394,9 +2382,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean intercept = ctx.config().getInterceptor() != null; - boolean initLsnrs = false; - Map<UUID, CacheContinuousQueryListener> lsnrs = null; - // Avoid iterator creation. for (int i = 0; i < entries.size(); i++) { GridDhtCacheEntry entry = entries.get(i); @@ -2430,14 +2415,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id())); } - if (!initLsnrs) { - lsnrs = ctx.continuousQueries().updateListeners( - entry.isInternal() || !context().userCache(), - false); - - initLsnrs = true; - } - GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, node.id(), @@ -2447,7 +2424,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, /*write-through*/false, /*read-through*/false, - /*retval*/sndPrevVal || lsnrs != null, + /*retval*/sndPrevVal, req.keepBinary(), expiry, /*event*/true, @@ -2895,10 +2872,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); - boolean initLsnrs = false; - Map<UUID, CacheContinuousQueryListener> lsnrs = null; - boolean internal = false; - for (int i = 0; i < req.size(); i++) { KeyCacheObject key = req.key(i); @@ -2921,14 +2894,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { long ttl = req.ttl(i); long expireTime = req.conflictExpireTime(i); - if (!initLsnrs) { - internal = entry.isInternal() || !context().userCache(); - - lsnrs = ctx.continuousQueries().updateListeners(internal, false); - - initLsnrs = true; - } - GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, nodeId, @@ -2938,7 +2903,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { op == TRANSFORM ? req.invokeArguments() : null, /*write-through*/false, /*read-through*/false, - /*retval*/lsnrs != null, + /*retval*/false, req.keepBinary(), /*expiry policy*/null, /*event*/true, http://git-wip-us.apache.org/repos/asf/ignite/blob/09afb145/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index cb9e503..1caab3a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -384,7 +384,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler recordIgniteEvt, fut); - ctx.continuousQueryPool().execute(clsr, evt.partitionId()); + ctx.asyncCallbackPool().execute(clsr, evt.partitionId()); } else { final boolean notify = filter(evt, primary); @@ -563,20 +563,18 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler assert objs != null; assert ctx != null; - final Collection<CacheContinuousQueryEntry> entries = (Collection<CacheContinuousQueryEntry>)objs; + final Collection<CacheContinuousQueryEntry> ents = (Collection<CacheContinuousQueryEntry>)objs; - if (entries.iterator().hasNext()) { + if (!ents.isEmpty()) { if (asyncCallback) { - int partId = entries.iterator().next().partition(); - - ctx.continuousQueryPool().execute(new Runnable() { + ctx.asyncCallbackPool().execute(new Runnable() { @Override public void run() { - notifyCallback0(nodeId, ctx, entries); + notifyCallback0(nodeId, ctx, ents); } - }, partId); + }); } else - notifyCallback0(nodeId, ctx, entries); + notifyCallback0(nodeId, ctx, ents); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/09afb145/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java index 04de586..88e6684 100644 --- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java +++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java @@ -33,7 +33,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; * to use cache API in a callbacks. * <p/> * Different implementations can use different thread pools. For example continuous query will use continuous query - * thread poll which can be configured by {@link IgniteConfiguration#setContinuousQueryPoolSize(int)} + * thread poll which can be configured by {@link IgniteConfiguration#setAsyncCallbackPoolSize(int)} */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) http://git-wip-us.apache.org/repos/asf/ignite/blob/09afb145/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java index 41b30be..d301036 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java @@ -19,9 +19,11 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -55,7 +57,6 @@ import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; -import org.eclipse.jetty.util.ConcurrentHashSet; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; @@ -188,15 +189,6 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs /** * @throws Exception If failed. */ - public void testAtomicWithoutBackup() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, ATOMIC); - - doTest(ccfg, true); - } - - /** - * @throws Exception If failed. - */ public void testTxTwoBackup() throws Exception { CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL); @@ -234,8 +226,10 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs final int threadCnt = 10; for (int idx = 0; idx < NODES; idx++) { - Set<T2<QueryTestKey, QueryTestValue>> evts = new ConcurrentHashSet<>(); - Set<T2<QueryTestKey, QueryTestValue>> evtsFromCb = new ConcurrentHashSet<>(); + Set<T2<QueryTestKey, QueryTestValue>> evts = Collections. + newSetFromMap(new ConcurrentHashMap<T2<QueryTestKey, QueryTestValue>, Boolean>()); + Set<T2<QueryTestKey, QueryTestValue>> evtsFromCb = Collections. + newSetFromMap(new ConcurrentHashMap<T2<QueryTestKey, QueryTestValue>, Boolean>()); IgniteCache<Object, Object> cache = grid(idx).getOrCreateCache(ccfg.getName());
