Added related GridIoPolicy in IgniteThread.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3a3650fb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3a3650fb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3a3650fb Branch: refs/heads/ignite-5578 Commit: 3a3650fb2eadcd6a1186cc186fa5d471cd1e74d5 Parents: 2574beb Author: sboikov <sboi...@gridgain.com> Authored: Thu Jul 27 13:57:11 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Jul 27 13:57:11 2017 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/internal/IgnitionEx.java | 34 +++++++++++++------ .../managers/communication/GridIoPolicy.java | 3 ++ .../service/GridServiceProcessor.java | 2 +- .../util/StripedCompositeReadWriteLock.java | 6 ++-- .../ignite/internal/util/StripedExecutor.java | 4 ++- .../org/apache/ignite/thread/IgniteThread.java | 35 ++++++++++---------- .../ignite/thread/IgniteThreadFactory.java | 15 ++++++--- .../ignite/thread/IgniteThreadPoolExecutor.java | 33 +++++++++++++++++- .../GridThreadPoolExecutorServiceSelfTest.java | 2 +- 9 files changed, 95 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 1139ec6..23baeb3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -64,6 +64,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.MemoryConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor; import org.apache.ignite.internal.processors.igfs.IgfsThreadFactory; import org.apache.ignite.internal.processors.igfs.IgfsUtils; @@ -1694,7 +1695,8 @@ public class IgnitionEx { cfg.getPublicThreadPoolSize(), cfg.getPublicThreadPoolSize(), DFLT_THREAD_KEEP_ALIVE_TIME, - new LinkedBlockingQueue<Runnable>()); + new LinkedBlockingQueue<Runnable>(), + GridIoPolicy.PUBLIC_POOL); execSvc.allowCoreThreadTimeOut(true); @@ -1706,7 +1708,8 @@ public class IgnitionEx { cfg.getServiceThreadPoolSize(), cfg.getServiceThreadPoolSize(), DFLT_THREAD_KEEP_ALIVE_TIME, - new LinkedBlockingQueue<Runnable>()); + new LinkedBlockingQueue<Runnable>(), + GridIoPolicy.SERVICE_POOL); svcExecSvc.allowCoreThreadTimeOut(true); @@ -1718,7 +1721,8 @@ public class IgnitionEx { cfg.getSystemThreadPoolSize(), cfg.getSystemThreadPoolSize(), DFLT_THREAD_KEEP_ALIVE_TIME, - new LinkedBlockingQueue<Runnable>()); + new LinkedBlockingQueue<Runnable>(), + GridIoPolicy.SYSTEM_POOL); sysExecSvc.allowCoreThreadTimeOut(true); @@ -1738,7 +1742,8 @@ public class IgnitionEx { cfg.getManagementThreadPoolSize(), cfg.getManagementThreadPoolSize(), DFLT_THREAD_KEEP_ALIVE_TIME, - new LinkedBlockingQueue<Runnable>()); + new LinkedBlockingQueue<Runnable>(), + GridIoPolicy.MANAGEMENT_POOL); mgmtExecSvc.allowCoreThreadTimeOut(true); @@ -1753,7 +1758,8 @@ public class IgnitionEx { cfg.getPeerClassLoadingThreadPoolSize(), cfg.getPeerClassLoadingThreadPoolSize(), DFLT_THREAD_KEEP_ALIVE_TIME, - new LinkedBlockingQueue<Runnable>()); + new LinkedBlockingQueue<Runnable>(), + GridIoPolicy.P2P_POOL); p2pExecSvc.allowCoreThreadTimeOut(true); @@ -1764,7 +1770,8 @@ public class IgnitionEx { cfg.getDataStreamerThreadPoolSize(), cfg.getDataStreamerThreadPoolSize(), DFLT_THREAD_KEEP_ALIVE_TIME, - new LinkedBlockingQueue<Runnable>()); + new LinkedBlockingQueue<Runnable>(), + GridIoPolicy.DATA_STREAMER_POOL); dataStreamerExecSvc.allowCoreThreadTimeOut(true); @@ -1811,7 +1818,8 @@ public class IgnitionEx { myCfg.getUtilityCacheThreadPoolSize(), myCfg.getUtilityCacheThreadPoolSize(), myCfg.getUtilityCacheKeepAliveTime(), - new LinkedBlockingQueue<Runnable>()); + new LinkedBlockingQueue<Runnable>(), + GridIoPolicy.UTILITY_CACHE_POOL); utilityCacheExecSvc.allowCoreThreadTimeOut(true); @@ -1821,7 +1829,8 @@ public class IgnitionEx { 1, 1, DFLT_THREAD_KEEP_ALIVE_TIME, - new LinkedBlockingQueue<Runnable>()); + new LinkedBlockingQueue<Runnable>(), + GridIoPolicy.AFFINITY_POOL); affExecSvc.allowCoreThreadTimeOut(true); @@ -1834,7 +1843,8 @@ public class IgnitionEx { cpus, cpus * 2, 3000L, - new LinkedBlockingQueue<Runnable>(1000) + new LinkedBlockingQueue<Runnable>(1000), + GridIoPolicy.IDX_POOL ); } @@ -1846,7 +1856,8 @@ public class IgnitionEx { cfg.getQueryThreadPoolSize(), cfg.getQueryThreadPoolSize(), DFLT_THREAD_KEEP_ALIVE_TIME, - new LinkedBlockingQueue<Runnable>()); + new LinkedBlockingQueue<Runnable>(), + GridIoPolicy.QUERY_POOL); qryExecSvc.allowCoreThreadTimeOut(true); @@ -1856,7 +1867,8 @@ public class IgnitionEx { 2, 2, DFLT_THREAD_KEEP_ALIVE_TIME, - new LinkedBlockingQueue<Runnable>()); + new LinkedBlockingQueue<Runnable>(), + GridIoPolicy.SCHEMA_POOL); schemaExecSvc.allowCoreThreadTimeOut(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java index 13bc4c4..3f31f92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java @@ -22,6 +22,9 @@ package org.apache.ignite.internal.managers.communication; * message processing by the communication manager. */ public class GridIoPolicy { + /** */ + public static final byte UNDEFINED = -1; + /** Public execution pool. */ public static final byte PUBLIC_POOL = 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index db632ec..46fcfea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -153,7 +153,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite private volatile GridSpinBusyLock busyLock = new GridSpinBusyLock(); /** Thread factory. */ - private ThreadFactory threadFactory = new IgniteThreadFactory(ctx.igniteInstanceName()); + private ThreadFactory threadFactory = new IgniteThreadFactory(ctx.igniteInstanceName(), "service"); /** Thread local for service name. */ private ThreadLocal<String> svcName = new ThreadLocal<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java index e215663..18ef06c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java @@ -67,8 +67,10 @@ public class StripedCompositeReadWriteLock implements ReadWriteLock { @NotNull @Override public Lock readLock() { int idx; - if (Thread.currentThread() instanceof IgniteThread) { - IgniteThread igniteThread = (IgniteThread)Thread.currentThread(); + Thread curThread = Thread.currentThread(); + + if (curThread instanceof IgniteThread) { + IgniteThread igniteThread = (IgniteThread)curThread; idx = igniteThread.compositeRwLockIndex(); http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java index 6c85b32..6d5dc71 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.LockSupport; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -439,7 +440,8 @@ public class StripedExecutor implements ExecutorService { poolName + "-stripe-" + idx, this, IgniteThread.GRP_IDX_UNASSIGNED, - idx); + idx, + GridIoPolicy.UNDEFINED); thread.start(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java index 6005ac9..c814625 100644 --- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java +++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java @@ -18,6 +18,7 @@ package org.apache.ignite.thread; import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.worker.GridWorker; @@ -52,13 +53,16 @@ public class IgniteThread extends Thread { /** */ private final int stripe; + /** */ + private final byte plc; + /** * Creates thread with given worker. * * @param worker Runnable to create thread with. */ public IgniteThread(GridWorker worker) { - this(DFLT_GRP, worker.igniteInstanceName(), worker.name(), worker, GRP_IDX_UNASSIGNED, -1); + this(worker.igniteInstanceName(), worker.name(), worker, GRP_IDX_UNASSIGNED, -1, GridIoPolicy.UNDEFINED); } /** @@ -69,41 +73,28 @@ public class IgniteThread extends Thread { * @param r Runnable to execute. */ public IgniteThread(String igniteInstanceName, String threadName, Runnable r) { - this(igniteInstanceName, threadName, r, GRP_IDX_UNASSIGNED, -1); - } - - /** - * Creates grid thread with given name for a given Ignite instance. - * - * @param igniteInstanceName Name of the Ignite instance this thread is created for. - * @param threadName Name of thread. - * @param r Runnable to execute. - * @param grpIdx Index within a group. - * @param stripe Non-negative stripe number if this thread is striped pool thread. - */ - public IgniteThread(String igniteInstanceName, String threadName, Runnable r, int grpIdx, int stripe) { - this(DFLT_GRP, igniteInstanceName, threadName, r, grpIdx, stripe); + this(igniteInstanceName, threadName, r, GRP_IDX_UNASSIGNED, -1, GridIoPolicy.UNDEFINED); } /** * Creates grid thread with given name for a given Ignite instance with specified * thread group. * - * @param grp Thread group. * @param igniteInstanceName Name of the Ignite instance this thread is created for. * @param threadName Name of thread. * @param r Runnable to execute. * @param grpIdx Thread index within a group. * @param stripe Non-negative stripe number if this thread is striped pool thread. */ - public IgniteThread(ThreadGroup grp, String igniteInstanceName, String threadName, Runnable r, int grpIdx, int stripe) { - super(grp, r, createName(cntr.incrementAndGet(), threadName, igniteInstanceName)); + public IgniteThread(String igniteInstanceName, String threadName, Runnable r, int grpIdx, int stripe, byte plc) { + super(DFLT_GRP, r, createName(cntr.incrementAndGet(), threadName, igniteInstanceName)); A.ensure(grpIdx >= -1, "grpIdx >= -1"); this.igniteInstanceName = igniteInstanceName; this.compositeRwLockIdx = grpIdx; this.stripe = stripe; + this.plc = plc; } /** @@ -117,6 +108,14 @@ public class IgniteThread extends Thread { this.igniteInstanceName = igniteInstanceName; this.compositeRwLockIdx = GRP_IDX_UNASSIGNED; this.stripe = -1; + this.plc = GridIoPolicy.UNDEFINED; + } + + /** + * @return Related {@link GridIoPolicy} for internal Ignite pools. + */ + public byte policy() { + return plc; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java index d2f0b15..062c973 100644 --- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java @@ -20,6 +20,7 @@ package org.apache.ignite.thread; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.NotNull; @@ -37,14 +38,18 @@ public class IgniteThreadFactory implements ThreadFactory { /** Index generator for threads. */ private final AtomicInteger idxGen = new AtomicInteger(); + /** */ + private final byte plc; + /** * Constructs new thread factory for given grid. All threads will belong * to the same default thread group. * * @param igniteInstanceName Ignite instance name. + * @param threadName Thread name. */ - public IgniteThreadFactory(String igniteInstanceName) { - this(igniteInstanceName, "ignite"); + public IgniteThreadFactory(String igniteInstanceName, String threadName) { + this(igniteInstanceName, threadName, GridIoPolicy.UNDEFINED); } /** @@ -53,15 +58,17 @@ public class IgniteThreadFactory implements ThreadFactory { * * @param igniteInstanceName Ignite instance name. * @param threadName Thread name. + * @param plc {@link GridIoPolicy} for thread pool. */ - public IgniteThreadFactory(String igniteInstanceName, String threadName) { + public IgniteThreadFactory(String igniteInstanceName, String threadName, byte plc) { this.igniteInstanceName = igniteInstanceName; this.threadName = threadName; + this.plc = plc; } /** {@inheritDoc} */ @Override public Thread newThread(@NotNull Runnable r) { - return new IgniteThread(igniteInstanceName, threadName, r, idxGen.incrementAndGet(), -1); + return new IgniteThread(igniteInstanceName, threadName, r, idxGen.incrementAndGet(), -1, plc); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java index 639ef94..83c64c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java @@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; /** * An {@link ExecutorService} that executes submitted tasks using pooled grid threads. @@ -46,13 +47,43 @@ public class IgniteThreadPoolExecutor extends ThreadPoolExecutor { int maxPoolSize, long keepAliveTime, BlockingQueue<Runnable> workQ) { + this(threadNamePrefix, + igniteInstanceName, + corePoolSize, + maxPoolSize, + keepAliveTime, + workQ, + GridIoPolicy.UNDEFINED); + } + + /** + * Creates a new service with the given initial parameters. + * + * @param threadNamePrefix Will be added at the beginning of all created threads. + * @param igniteInstanceName Must be the name of the grid. + * @param corePoolSize The number of threads to keep in the pool, even if they are idle. + * @param maxPoolSize The maximum number of threads to allow in the pool. + * @param keepAliveTime When the number of threads is greater than the core, this is the maximum time + * that excess idle threads will wait for new tasks before terminating. + * @param workQ The queue to use for holding tasks before they are executed. This queue will hold only + * runnable tasks submitted by the {@link #execute(Runnable)} method. + * @param plc {@link GridIoPolicy} for thread pool. + */ + public IgniteThreadPoolExecutor( + String threadNamePrefix, + String igniteInstanceName, + int corePoolSize, + int maxPoolSize, + long keepAliveTime, + BlockingQueue<Runnable> workQ, + byte plc) { super( corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, workQ, - new IgniteThreadFactory(igniteInstanceName, threadNamePrefix) + new IgniteThreadFactory(igniteInstanceName, threadNamePrefix, plc) ); } http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/test/java/org/apache/ignite/thread/GridThreadPoolExecutorServiceSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/thread/GridThreadPoolExecutorServiceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/thread/GridThreadPoolExecutorServiceSelfTest.java index 3948f6a..dce6328 100644 --- a/modules/core/src/test/java/org/apache/ignite/thread/GridThreadPoolExecutorServiceSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/thread/GridThreadPoolExecutorServiceSelfTest.java @@ -64,7 +64,7 @@ public class GridThreadPoolExecutorServiceSelfTest extends GridCommonAbstractTes * @throws Exception If failed. */ public void testSingleGridThreadExecutor() throws Exception { - ExecutorService exec = Executors.newSingleThreadExecutor(new IgniteThreadFactory("gridName")); + ExecutorService exec = Executors.newSingleThreadExecutor(new IgniteThreadFactory("gridName", "testThread")); exec.submit(new InterruptingRunnable()).get();