Repository: ignite Updated Branches: refs/heads/ignite-2004 db1bf2cbd -> ff595dbe7
ignite-2004 Review Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ff595dbe Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ff595dbe Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ff595dbe Branch: refs/heads/ignite-2004 Commit: ff595dbe7cf3c15a8b7ec3935055646549c2b427 Parents: db1bf2c Author: sboikov <[email protected]> Authored: Thu Apr 14 08:59:55 2016 +0300 Committer: sboikov <[email protected]> Committed: Thu Apr 14 09:12:36 2016 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/internal/IgnitionEx.java | 3 +- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 21 --- .../continuous/CacheContinuousQueryHandler.java | 15 +- .../thread/IgniteStripedThreadPoolExecutor.java | 184 ++++++------------- 4 files changed, 64 insertions(+), 159 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ff595dbe/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index c8ad3cd..64aa6f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -1651,10 +1651,9 @@ public class IgnitionEx { 0, new LinkedBlockingQueue<Runnable>()); - // Note that we do not pre-start threads here as continuous query pool may not be needed. + // Note that we do not pre-start threads here as this pool may not be needed. callbackExecSvc = new IgniteStripedThreadPoolExecutor( cfg.getAsyncCallbackPoolSize(), - 1, cfg.getGridName(), "callback"); http://git-wip-us.apache.org/repos/asf/ignite/blob/ff595dbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 0232f22..a177b07 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -155,10 +155,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid @GridDirectTransient private List<Integer> partIds; - /** */ - @GridDirectTransient - private List<CacheObject> locPrevVals; - /** Keep binary flag. */ private boolean keepBinary; @@ -517,16 +513,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** * @param idx Key index. - * @return Value. - */ - @Nullable public CacheObject localPreviousValue(int idx) { - assert locPrevVals != null; - - return locPrevVals.get(idx); - } - - /** - * @param idx Key index. * @return Entry processor. */ @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) { @@ -1060,13 +1046,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid private void cleanup() { nearVals = null; prevVals = null; - - // Do not keep values if they are not needed for continuous query notification. - if (locPrevVals == null) { - keys = null; - vals = null; - locPrevVals = null; - } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ff595dbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 0f9f5b7..e2ee7c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -690,7 +690,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler // Initial query entry or evicted entry. These events should be fired immediately. if (e.updateCounter() == -1L) { return !e.isFiltered() ? F.<CacheEntryEvent<? extends K, ? extends V>>asList( - new CacheContinuousQueryEvent<K, V>(cache, cctx, e)) : + new CacheContinuousQueryEvent<K, V>(cache, cctx, e)) : Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList(); } @@ -1403,16 +1403,16 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler */ private class ContinuousQueryAsyncClosure implements Runnable { /** */ - private CacheContinuousQueryEvent<K, V> evt; + private final CacheContinuousQueryEvent<K, V> evt; /** */ - private boolean primary; + private final boolean primary; /** */ - private boolean recordIgniteEvt; + private final boolean recordIgniteEvt; /** */ - private IgniteInternalFuture<?> fut; + private final IgniteInternalFuture<?> fut; /** * @param primary Primary flag. @@ -1463,6 +1463,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler return true; } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(ContinuousQueryAsyncClosure.class, this); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/ff595dbe/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java index 44ea823..5876b08 100644 --- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java @@ -17,75 +17,57 @@ package org.apache.ignite.thread; +import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.NotNull; /** * An {@link ExecutorService} that executes submitted tasks using pooled grid threads. */ public class IgniteStripedThreadPoolExecutor implements ExecutorService { /** */ - public static final int DFLT_SEG_POOL_SIZE = 8; - - /** */ - public static final int DFLT_CONCUR_LVL = 16; - - /** */ private final ExecutorService[] execs; - /** */ - private final int segShift; - - /** */ - private final int segMask; - - /** - * Create thread pool with default concurrency level {@link #DFLT_CONCUR_LVL}. - */ - public IgniteStripedThreadPoolExecutor() { - this(DFLT_CONCUR_LVL, DFLT_SEG_POOL_SIZE, "null", "null"); - } - /** * Create striped thread pool. * * @param concurrentLvl Concurrency level. - * @param poolSize Pool size. + * @param gridName Node name. + * @param threadNamePrefix Thread name prefix. */ - public IgniteStripedThreadPoolExecutor(int concurrentLvl, int poolSize, String gridName, String threadNamePrefix) { + public IgniteStripedThreadPoolExecutor(int concurrentLvl, String gridName, String threadNamePrefix) { execs = new ExecutorService[concurrentLvl]; ThreadFactory factory = new IgniteThreadFactory(gridName, threadNamePrefix); for (int i = 0; i < concurrentLvl; i++) - if (poolSize == 1) - execs[i] = Executors.newSingleThreadExecutor(factory); - else - execs[i] = Executors.newFixedThreadPool(poolSize, factory); - - // Find power-of-two sizes best matching arguments - int sshift = 0; - int ssize = 1; - - while (ssize < concurrentLvl) { - ++sshift; - - ssize <<= 1; - } + execs[i] = Executors.newSingleThreadExecutor(factory); + } - segShift = 32 - sshift; - segMask = ssize - 1; + /** + * Executes the given command at some time in the future. The command with the same {@code index} + * will be executed in the same thread. + * + * @param task the runnable task + * @param idx Striped index. + * @throws RejectedExecutionException if this task cannot be + * accepted for execution. + * @throws NullPointerException If command is null + */ + public void execute(Runnable task, int idx) { + if (idx < execs.length) + execs[idx].execute(task); + else + execs[idx % execs.length].execute(task); } /** {@inheritDoc} */ @@ -96,7 +78,7 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService { /** {@inheritDoc} */ @Override public List<Runnable> shutdownNow() { - List<Runnable> res = new LinkedList<>(); + List<Runnable> res = new ArrayList<>(); for (ExecutorService exec : execs) { for (Runnable r : exec.shutdownNow()) @@ -137,121 +119,61 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService { } /** {@inheritDoc} */ - @Override public <T> Future<T> submit(Callable<T> task) { - return execForTask(task).submit(task); - } + @NotNull @Override public <T> Future<T> submit(Callable<T> task) { + assert false; - /** {@inheritDoc} */ - @Override public <T> Future<T> submit(Runnable task, T result) { - return execForTask(task).submit(task, result); + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ - @Override public Future<?> submit(Runnable task) { - return execForTask(task).submit(task); - } + @NotNull @Override public <T> Future<T> submit(Runnable task, T res) { + assert false; - /** - * Executes the given command at some time in the future. The command with the same {@code index} - * will be executed in the same thread. - * - * @param task the runnable task - * @param idx Striped index. - * @throws RejectedExecutionException if this task cannot be - * accepted for execution. - * @throws NullPointerException if command is null - */ - public void execute(Runnable task, int idx) { - if (idx < execs.length) - execs[idx].execute(task); - else - execs[idx % execs.length].execute(task); + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ - @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) - throws InterruptedException { - List<Future<T>> futs = new LinkedList<>(); - - for (Callable<T> task : tasks) - futs.add(execForTask(task).submit(task)); - - boolean done = false; + @NotNull @Override public Future<?> submit(Runnable task) { + assert false; - try { - for (Future<T> fut : futs) { - try { - fut.get(); - } - catch (ExecutionException | InterruptedException ignored) { - // No-op. - } - } - - done = true; - - return futs; - } - finally { - if (!done) { - for (Future<T> fut : futs) - fut.cancel(true); - } - } + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ - @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, - TimeUnit unit) throws InterruptedException { - throw new RuntimeException("Not implemented."); + @NotNull @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { + assert false; + + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ - @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, - ExecutionException { - throw new RuntimeException("Not implemented."); + @NotNull @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, + long timeout, + TimeUnit unit) { + assert false; + + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ - @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - throw new RuntimeException("Not implemented."); + @NotNull @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks) { + assert false; + + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ - @Override public void execute(Runnable cmd) { - execForTask(cmd).execute(cmd); - } + @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) { + assert false; - /** - * Applies a supplemental hash function to a given hashCode, which - * defends against poor quality hash functions. This is critical - * because ConcurrentHashMap uses power-of-two length hash tables, - * that otherwise encounter collisions for hashCodes that do not - * differ in lower or upper bits. - * - * @param h Hash code. - * @return Enhanced hash code. - */ - private int hash(int h) { - // Spread bits to regularize both segment and index locations, - // using variant of single-word Wang/Jenkins hash. - h += (h << 15) ^ 0xffffcd7d; - h ^= (h >>> 10); - h += (h << 3); - h ^= (h >>> 6); - h += (h << 2) + (h << 14); - return h ^ (h >>> 16); + throw new UnsupportedOperationException(); } - /** - * @param cmd Command. - * @return Service. - */ - private <T> ExecutorService execForTask(T cmd) { - assert cmd != null; + /** {@inheritDoc} */ + @Override public void execute(Runnable cmd) { + assert false; - return execs[(hash(System.identityHashCode(cmd)) >>> segShift) & segMask]; + throw new UnsupportedOperationException(); } /** {@inheritDoc} */
