Repository: ignite Updated Branches: refs/heads/ignite-2004 ff595dbe7 -> 970861673
IGNITE-2004 Fixed review notes. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/97086167 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/97086167 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/97086167 Branch: refs/heads/ignite-2004 Commit: 970861673ac7f6b8451d5c60a687ff49bf89d21b Parents: ff595db Author: nikolay_tikhonov <[email protected]> Authored: Thu Apr 14 14:56:03 2016 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Thu Apr 14 14:56:03 2016 +0300 ---------------------------------------------------------------------- .../ignite/cache/query/ContinuousQuery.java | 9 +-- .../internal/GridEventConsumeHandler.java | 4 +- .../internal/GridMessageListenHandler.java | 4 +- .../continuous/CacheContinuousQueryHandler.java | 60 +++++++++++++------- .../continuous/GridContinuousHandler.java | 4 +- .../continuous/GridContinuousProcessor.java | 3 +- .../apache/ignite/lang/IgniteAsyncCallback.java | 6 +- 7 files changed, 56 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/97086167/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java index cb5b05e..b37c899 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java @@ -175,10 +175,11 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { * synchronization or transactional cache operations), should be executed asynchronously without * blocking the thread that called the callback. Otherwise, you can get deadlocks. * * <p> - * If listener implements {@link IgniteAsyncCallback} marker interface then cache operations are allowed. - * see {@link IgniteAsyncCallback}. + * If listener has {@link IgniteAsyncCallback} annotation then cache operations are allowed. + * * * @param locLsnr Local callback. + * @see IgniteAsyncCallback * @return {@code this} for chaining. */ public ContinuousQuery<K, V> setLocalListener(CacheEntryUpdatedListener<K, V> locLsnr) { @@ -232,10 +233,10 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { * (e.g., synchronization or transactional cache operations), should be executed asynchronously * without blocking the thread that called the filter. Otherwise, you can get deadlocks. * <p> - * If filter implements {@link IgniteAsyncCallback} marker interface then cache operations are allowed. - * see {@link IgniteAsyncCallback}. + * If filter has {@link IgniteAsyncCallback} annotation then cache operations are allowed. * * @param rmtFilterFactory Key-value filter factory. + * @see IgniteAsyncCallback * @return {@code this} for chaining. */ public ContinuousQuery<K, V> setRemoteFilterFactory( http://git-wip-us.apache.org/repos/asf/ignite/blob/97086167/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index 19bf1a7..cc656f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -21,8 +21,8 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.util.Collection; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Queue; import java.util.UUID; @@ -301,7 +301,7 @@ class GridEventConsumeHandler implements GridContinuousHandler { * @param nodeId Node ID. * @param objs Notification objects. */ - @Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, GridKernalContext ctx) { + @Override public void notifyCallback(UUID nodeId, UUID routineId, List<?> objs, GridKernalContext ctx) { assert nodeId != null; assert routineId != null; assert objs != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/97086167/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java index 0ac6877..70b9da7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java @@ -21,7 +21,7 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; @@ -149,7 +149,7 @@ public class GridMessageListenHandler implements GridContinuousHandler { } /** {@inheritDoc} */ - @Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, GridKernalContext ctx) { + @Override public void notifyCallback(UUID nodeId, UUID routineId, List<?> objs, GridKernalContext ctx) { assert false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/97086167/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index e2ee7c5..294a3ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -573,47 +573,65 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler @SuppressWarnings("unchecked") @Override public void notifyCallback(final UUID nodeId, final UUID routineId, - Collection<?> objs, + List<?> objs, final GridKernalContext ctx) { assert nodeId != null; assert routineId != null; assert objs != null; assert ctx != null; - final Collection<CacheContinuousQueryEntry> entries = (Collection<CacheContinuousQueryEntry>)objs; + final List<CacheContinuousQueryEntry> entries = (List<CacheContinuousQueryEntry>)objs; if (!entries.isEmpty()) { if (asyncCallback) { + int partId = entries.get(0).partition(); + if (entries.size() != 1) { - Map<Integer, Collection<CacheContinuousQueryEntry>> entriesByPart = new HashMap<>(); + Map<Integer, Collection<CacheContinuousQueryEntry>> entriesByPart = null; + + for (int i = 0; i < entries.size(); i++) { + int curPart = entries.get(i).partition(); + + // If all entries from one partition avoid creation new collections. + if (curPart == partId && entriesByPart == null) + continue; + + if (entriesByPart == null) { + entriesByPart = new HashMap<>(); - for (CacheContinuousQueryEntry e : entries) { - Collection<CacheContinuousQueryEntry> ents = entriesByPart.get(e.partition()); + entriesByPart.put(partId, entries.subList(0, i)); + } + + Collection<CacheContinuousQueryEntry> ents = entriesByPart.get(curPart); if (ents == null) { - ents = new ArrayList<>(entries.size()); + ents = new ArrayList<>(entries.size() - i); - entriesByPart.put(e.partition(), ents); + entriesByPart.put(curPart, ents); } - ents.add(e); + ents.add(entries.get(i)); } - for (final Map.Entry<Integer, Collection<CacheContinuousQueryEntry>> e : entriesByPart.entrySet()) { - ctx.asyncCallbackPool().execute(new Runnable() { - @Override public void run() { - notifyCallback0(nodeId, ctx, e.getValue()); - } - }, e.getKey()); - } - } - else { - ctx.asyncCallbackPool().execute(new Runnable() { - @Override public void run() { - notifyCallback0(nodeId, ctx, entries); + if (entriesByPart != null) { + for (final Map.Entry<Integer, Collection<CacheContinuousQueryEntry>> e : + entriesByPart.entrySet()) { + ctx.asyncCallbackPool().execute(new Runnable() { + @Override public void run() { + notifyCallback0(nodeId, ctx, e.getValue()); + } + }, e.getKey()); } - }, entries.iterator().next().partition()); + + return; + } } + + ctx.asyncCallbackPool().execute(new Runnable() { + @Override public void run() { + notifyCallback0(nodeId, ctx, entries); + } + }, partId); } else notifyCallback0(nodeId, ctx, entries); http://git-wip-us.apache.org/repos/asf/ignite/blob/97086167/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java index 46e87af..318f5ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.continuous; import java.io.Externalizable; -import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; @@ -80,7 +80,7 @@ public interface GridContinuousHandler extends Externalizable, Cloneable { * @param objs Notification objects. * @param ctx Kernal context. */ - public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, GridKernalContext ctx); + public void notifyCallback(UUID nodeId, UUID routineId, List<?> objs, GridKernalContext ctx); /** * Deploys and marshals inner objects (called only if peer deployment is enabled). http://git-wip-us.apache.org/repos/asf/ignite/blob/97086167/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index d7838f3..277f829 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -27,6 +27,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentMap; @@ -963,7 +964,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { LocalRoutineInfo routine = locInfos.get(routineId); if (routine != null) - routine.hnd.notifyCallback(nodeId, routineId, (Collection<?>)msg.data(), ctx); + routine.hnd.notifyCallback(nodeId, routineId, (List<?>)msg.data(), ctx); } finally { if (msg.futureId() != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/97086167/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java index 88e6684..4800b55 100644 --- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java +++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java @@ -32,8 +32,10 @@ import org.apache.ignite.configuration.IgniteConfiguration; * annotated this annotation then they will be executing on a separate thread pool. It allows * to use cache API in a callbacks. * <p/> - * Different implementations can use different thread pools. For example continuous query will use continuous query - * thread poll which can be configured by {@link IgniteConfiguration#setAsyncCallbackPoolSize(int)} + * For executing callbacks using callback thread pool which can be configured by + * {@link IgniteConfiguration#setAsyncCallbackPoolSize(int)} + * + * @see IgniteConfiguration#setAsyncCallbackPoolSize(int) */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE)
