Repository: ignite
Updated Branches:
  refs/heads/ignite-2004 af7731ece -> 09afb1453


IGNITE-2004 Fixed review notes.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/09afb145
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/09afb145
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/09afb145

Branch: refs/heads/ignite-2004
Commit: 09afb1453a68daf97bf1029e7d838a6860721add
Parents: af7731e
Author: nikolay_tikhonov <[email protected]>
Authored: Tue Apr 12 18:44:50 2016 +0300
Committer: nikolay_tikhonov <[email protected]>
Committed: Tue Apr 12 18:45:28 2016 +0300

----------------------------------------------------------------------
 .../configuration/IgniteConfiguration.java      | 16 ++++----
 .../ignite/internal/GridKernalContext.java      |  6 +--
 .../ignite/internal/GridKernalContextImpl.java  |  2 +-
 .../org/apache/ignite/internal/IgnitionEx.java  | 12 +++---
 .../processors/cache/GridCacheMapEntry.java     | 11 +++---
 .../dht/atomic/GridDhtAtomicCache.java          | 41 ++------------------
 .../continuous/CacheContinuousQueryHandler.java | 16 ++++----
 .../apache/ignite/lang/IgniteAsyncCallback.java |  2 +-
 ...ontinuousQueryOperationFromCallbackTest.java | 18 +++------
 9 files changed, 41 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/09afb145/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 285a8a2..51abb78 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -223,8 +223,8 @@ public class IgniteConfiguration {
     /** Public pool size. */
     private int pubPoolSize = DFLT_PUBLIC_THREAD_CNT;
 
-    /** Async continuous query pool size. */
-    private int conQryPoolSize = DFLT_PUBLIC_THREAD_CNT;
+    /** Async Callback pool size. */
+    private int callbackPoolSize = DFLT_PUBLIC_THREAD_CNT;
 
     /** System pool size. */
     private int sysPoolSize = DFLT_SYSTEM_CORE_THREAD_CNT;
@@ -732,8 +732,8 @@ public class IgniteConfiguration {
      *
      * @return Thread pool size to be used
      */
-    public int getContinuousQueryPoolSize() {
-        return conQryPoolSize;
+    public int getAsyncCallbackPoolSize() {
+        return callbackPoolSize;
     }
 
     /**
@@ -845,14 +845,14 @@ public class IgniteConfiguration {
     }
 
     /**
-     * Sets continuous query thread pool size to use within grid.
+     * Sets async callback thread pool size to use within grid.
      *
      * @param poolSize Thread pool size to use within grid.
      * @return {@code this} for chaining.
-     * @see IgniteConfiguration#getContinuousQueryPoolSize()
+     * @see IgniteConfiguration#getAsyncCallbackPoolSize()
      */
-    public IgniteConfiguration setContinuousQueryPoolSize(int poolSize) {
-        this.conQryPoolSize = poolSize;
+    public IgniteConfiguration setAsyncCallbackPoolSize(int poolSize) {
+        this.callbackPoolSize = poolSize;
 
         return this;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/09afb145/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 7d23326..3eaef1e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -299,11 +299,11 @@ public interface GridKernalContext extends 
Iterable<GridComponent> {
     public ExecutorService marshallerCachePool();
 
     /**
-     * Gets continuous query pool.
+     * Gets async callback pool.
      *
-     * @return Continuous query pool.
+     * @return Async callback pool.
      */
-    public IgniteStripedThreadPoolExecutor continuousQueryPool();
+    public IgniteStripedThreadPoolExecutor asyncCallbackPool();
 
     /**
      * Gets cache object processor.

http://git-wip-us.apache.org/repos/asf/ignite/blob/09afb145/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index f1ce9fb..e6541eb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -753,7 +753,7 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteStripedThreadPoolExecutor continuousQueryPool() {
+    @Override public IgniteStripedThreadPoolExecutor asyncCallbackPool() {
         return conQryExecSvc;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/09afb145/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 0d29673..5d2a820 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1437,7 +1437,7 @@ public class IgnitionEx {
         private ExecutorService marshCacheExecSvc;
 
         /** Continuous query executor service. */
-        private IgniteStripedThreadPoolExecutor conQryExecSvc;
+        private IgniteStripedThreadPoolExecutor callbackExecSvc;
 
         /** Grid state. */
         private volatile IgniteState state = STOPPED;
@@ -1652,8 +1652,8 @@ public class IgnitionEx {
                 new LinkedBlockingQueue<Runnable>());
 
             // Note that we do not pre-start threads here as continuous query 
pool may not be needed.
-            conQryExecSvc = new IgniteStripedThreadPoolExecutor(
-                cfg.getContinuousQueryPoolSize(),
+            callbackExecSvc = new IgniteStripedThreadPoolExecutor(
+                cfg.getAsyncCallbackPoolSize(),
                 1,
                 cfg.getGridName(),
                 "contQry");
@@ -1697,7 +1697,7 @@ public class IgnitionEx {
                 grid = grid0;
 
                 grid0.start(myCfg, utilityCacheExecSvc, marshCacheExecSvc, 
execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc,
-                    igfsExecSvc, restExecSvc, conQryExecSvc,
+                    igfsExecSvc, restExecSvc, callbackExecSvc,
                     new CA() {
                         @Override public void apply() {
                             startLatch.countDown();
@@ -2301,9 +2301,9 @@ public class IgnitionEx {
 
             marshCacheExecSvc = null;
 
-            U.shutdownNow(getClass(), conQryExecSvc, log);
+            U.shutdownNow(getClass(), callbackExecSvc, log);
 
-            conQryExecSvc = null;
+            callbackExecSvc = null;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/09afb145/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 955fc74..cbf8497 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1904,7 +1904,12 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         Long updateCntr0 = null;
 
         synchronized (this) {
-            boolean needVal = intercept || retval || op == 
GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter);
+            boolean internal = isInternal() || !context().userCache();
+
+            Map<UUID, CacheContinuousQueryListener> lsnrs = 
cctx.continuousQueries().updateListeners(internal, false);
+
+            boolean needVal = lsnrs != null || intercept || retval || op == 
GridCacheOperation.TRANSFORM
+                || !F.isEmptyOrNulls(filter);
 
             checkObsolete();
 
@@ -2507,10 +2512,6 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             if (res)
                 updateMetrics(op, metrics);
 
-            boolean internal = isInternal() || !context().userCache();
-
-            Map<UUID, CacheContinuousQueryListener> lsnrs = 
cctx.continuousQueries().updateListeners(internal, false);
-
             // Continuous query filter should be perform under lock.
             if (lsnrs != null) {
                 CacheObject evtVal = updated;

http://git-wip-us.apache.org/repos/asf/ignite/blob/09afb145/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index a6483c2..cc39dee 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2103,10 +2103,6 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         boolean intercept = ctx.config().getInterceptor() != null;
 
-        boolean initLsnrs = false;
-        Map<UUID, CacheContinuousQueryListener> lsnrs = null;
-        boolean internal = false;
-
         // Avoid iterator creation.
         for (int i = 0; i < keys.size(); i++) {
             KeyCacheObject k = keys.get(i);
@@ -2121,14 +2117,6 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                 if (entry == null)
                     continue;
 
-                if (!initLsnrs) {
-                    internal = entry.isInternal() || !context().userCache();
-
-                    lsnrs = ctx.continuousQueries().updateListeners(internal, 
false);
-
-                    initLsnrs = true;
-                }
-
                 GridCacheVersion newConflictVer = req.conflictVersion(i);
                 long newConflictTtl = req.conflictTtl(i);
                 long newConflictExpireTime = req.conflictExpireTime(i);
@@ -2157,7 +2145,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                     req.invokeArguments(),
                     primary && writeThrough() && !req.skipStore(),
                     !req.skipStore(),
-                    lsnrs != null || sndPrevVal || req.returnValue(),
+                    sndPrevVal || req.returnValue(),
                     req.keepBinary(),
                     expiry,
                     true,
@@ -2394,9 +2382,6 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
             boolean intercept = ctx.config().getInterceptor() != null;
 
-            boolean initLsnrs = false;
-            Map<UUID, CacheContinuousQueryListener> lsnrs = null;
-
             // Avoid iterator creation.
             for (int i = 0; i < entries.size(); i++) {
                 GridDhtCacheEntry entry = entries.get(i);
@@ -2430,14 +2415,6 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                         filteredReaders = F.view(entry.readers(), 
F.notEqualTo(node.id()));
                     }
 
-                    if (!initLsnrs) {
-                        lsnrs = ctx.continuousQueries().updateListeners(
-                            entry.isInternal() || !context().userCache(),
-                            false);
-
-                        initLsnrs = true;
-                    }
-
                     GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                         ver,
                         node.id(),
@@ -2447,7 +2424,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                         null,
                         /*write-through*/false,
                         /*read-through*/false,
-                        /*retval*/sndPrevVal || lsnrs != null,
+                        /*retval*/sndPrevVal,
                         req.keepBinary(),
                         expiry,
                         /*event*/true,
@@ -2895,10 +2872,6 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         String taskName = 
ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
 
-        boolean initLsnrs = false;
-        Map<UUID, CacheContinuousQueryListener> lsnrs = null;
-        boolean internal = false;
-
         for (int i = 0; i < req.size(); i++) {
             KeyCacheObject key = req.key(i);
 
@@ -2921,14 +2894,6 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                         long ttl = req.ttl(i);
                         long expireTime = req.conflictExpireTime(i);
 
-                        if (!initLsnrs) {
-                            internal = entry.isInternal() || 
!context().userCache();
-
-                            lsnrs = 
ctx.continuousQueries().updateListeners(internal, false);
-
-                            initLsnrs = true;
-                        }
-
                         GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                             ver,
                             nodeId,
@@ -2938,7 +2903,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                             op == TRANSFORM ? req.invokeArguments() : null,
                             /*write-through*/false,
                             /*read-through*/false,
-                            /*retval*/lsnrs != null,
+                            /*retval*/false,
                             req.keepBinary(),
                             /*expiry policy*/null,
                             /*event*/true,

http://git-wip-us.apache.org/repos/asf/ignite/blob/09afb145/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index cb9e503..1caab3a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -384,7 +384,7 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
                         recordIgniteEvt,
                         fut);
 
-                    ctx.continuousQueryPool().execute(clsr, evt.partitionId());
+                    ctx.asyncCallbackPool().execute(clsr, evt.partitionId());
                 }
                 else {
                     final boolean notify = filter(evt, primary);
@@ -563,20 +563,18 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
         assert objs != null;
         assert ctx != null;
 
-        final Collection<CacheContinuousQueryEntry> entries = 
(Collection<CacheContinuousQueryEntry>)objs;
+        final Collection<CacheContinuousQueryEntry> ents = 
(Collection<CacheContinuousQueryEntry>)objs;
 
-        if (entries.iterator().hasNext()) {
+        if (!ents.isEmpty()) {
             if (asyncCallback) {
-                int partId = entries.iterator().next().partition();
-
-                ctx.continuousQueryPool().execute(new Runnable() {
+                ctx.asyncCallbackPool().execute(new Runnable() {
                     @Override public void run() {
-                        notifyCallback0(nodeId, ctx, entries);
+                        notifyCallback0(nodeId, ctx, ents);
                     }
-                }, partId);
+                });
             }
             else
-                notifyCallback0(nodeId, ctx, entries);
+                notifyCallback0(nodeId, ctx, ents);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/09afb145/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java 
b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java
index 04de586..88e6684 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java
@@ -33,7 +33,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
  * to use cache API in a callbacks.
  * <p/>
  * Different implementations can use different thread pools. For example 
continuous query will use continuous query
- * thread poll which can be configured by {@link 
IgniteConfiguration#setContinuousQueryPoolSize(int)}
+ * thread poll which can be configured by {@link 
IgniteConfiguration#setAsyncCallbackPoolSize(int)}
  */
 @Retention(RetentionPolicy.RUNTIME)
 @Target(ElementType.TYPE)

http://git-wip-us.apache.org/repos/asf/ignite/blob/09afb145/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
index 41b30be..d301036 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
@@ -19,9 +19,11 @@ package 
org.apache.ignite.internal.processors.cache.query.continuous;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -55,7 +57,6 @@ import 
org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
-import org.eclipse.jetty.util.ConcurrentHashSet;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
@@ -188,15 +189,6 @@ public class CacheContinuousQueryOperationFromCallbackTest 
extends GridCommonAbs
     /**
      * @throws Exception If failed.
      */
-    public void testAtomicWithoutBackup() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(PARTITIONED, 0, ATOMIC);
-
-        doTest(ccfg, true);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
     public void testTxTwoBackup() throws Exception {
         CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL);
 
@@ -234,8 +226,10 @@ public class CacheContinuousQueryOperationFromCallbackTest 
extends GridCommonAbs
             final int threadCnt = 10;
 
             for (int idx = 0; idx < NODES; idx++) {
-                Set<T2<QueryTestKey, QueryTestValue>> evts = new 
ConcurrentHashSet<>();
-                Set<T2<QueryTestKey, QueryTestValue>> evtsFromCb = new 
ConcurrentHashSet<>();
+                Set<T2<QueryTestKey, QueryTestValue>> evts = Collections.
+                    newSetFromMap(new ConcurrentHashMap<T2<QueryTestKey, 
QueryTestValue>, Boolean>());
+                Set<T2<QueryTestKey, QueryTestValue>> evtsFromCb = Collections.
+                    newSetFromMap(new ConcurrentHashMap<T2<QueryTestKey, 
QueryTestValue>, Boolean>());
 
                 IgniteCache<Object, Object> cache = 
grid(idx).getOrCreateCache(ccfg.getName());
 

Reply via email to