IGNITE-3875: Added separate thread pool for data streamer. This closes #1067.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f597aff1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f597aff1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f597aff1

Branch: refs/heads/ignite-ssl-hotfix
Commit: f597aff1bdf65d3d430cf85c9932391a72c2d7dc
Parents: 0659beb
Author: Andrey V. Mashenkov <andrey.mashen...@gmail.com>
Authored: Wed Oct 12 15:44:08 2016 +0300
Committer: vozerov-gridgain <voze...@gridgain.com>
Committed: Wed Oct 12 15:44:08 2016 +0300

----------------------------------------------------------------------
 .../configuration/IgniteConfiguration.java      | 31 +++++++
 .../ignite/internal/GridKernalContext.java      |  7 ++
 .../ignite/internal/GridKernalContextImpl.java  | 12 +++
 .../apache/ignite/internal/IgniteKernal.java    |  3 +
 .../org/apache/ignite/internal/IgnitionEx.java  | 20 +++-
 .../managers/communication/GridIoManager.java   |  2 +
 .../managers/communication/GridIoPolicy.java    |  3 +
 .../closure/GridClosureProcessor.java           |  3 +-
 .../datastreamer/DataStreamProcessor.java       | 82 +++++++++++++++--
 .../datastreamer/DataStreamerImpl.java          | 31 +------
 .../internal/processors/pool/PoolProcessor.java |  3 +
 .../DataStreamProcessorSelfTest.java            | 97 ++++++++++++++++++++
 .../junits/GridTestKernalContext.java           | 12 +--
 13 files changed, 262 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f597aff1/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 73de470..412ecbf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -148,6 +148,9 @@ public class IgniteConfiguration {
     /** Default core size of public thread pool. */
     public static final int DFLT_PUBLIC_THREAD_CNT = Math.max(8, 
AVAILABLE_PROC_CNT) * 2;
 
+    /** Default size of data streamer thread pool. */
+    public static final int DFLT_DATA_STREAMER_POOL_SIZE = 
DFLT_PUBLIC_THREAD_CNT;
+
     /** Default keep alive time for public thread pool. */
     @Deprecated
     public static final long DFLT_PUBLIC_KEEP_ALIVE_TIME = 0;
@@ -245,6 +248,9 @@ public class IgniteConfiguration {
     /** IGFS pool size. */
     private int igfsPoolSize = AVAILABLE_PROC_CNT;
 
+    /** Data stream pool size. */
+    private int dataStreamerPoolSize = DFLT_DATA_STREAMER_POOL_SIZE;
+
     /** Utility cache pool size. */
     private int utilityCachePoolSize = DFLT_SYSTEM_CORE_THREAD_CNT;
 
@@ -508,6 +514,7 @@ public class IgniteConfiguration {
         clockSyncFreq = cfg.getClockSyncFrequency();
         clockSyncSamples = cfg.getClockSyncSamples();
         consistentId = cfg.getConsistentId();
+        dataStreamerPoolSize = cfg.getDataStreamerThreadPoolSize();
         deployMode = cfg.getDeploymentMode();
         discoStartupDelay = cfg.getDiscoveryStartupDelay();
         failureDetectionTimeout = cfg.getFailureDetectionTimeout();
@@ -789,6 +796,17 @@ public class IgniteConfiguration {
     }
 
     /**
+     * Size of thread pool that is in charge of processing data stream 
messages.
+     * <p>
+     * If not provided, executor service will have size {@link 
#DFLT_DATA_STREAMER_POOL_SIZE}.
+     *
+     * @return Thread pool size to be used for data stream messages.
+     */
+    public int getDataStreamerThreadPoolSize() {
+        return dataStreamerPoolSize;
+    }
+
+    /**
      * Default size of thread pool that is in charge of processing utility 
cache messages.
      * <p>
      * If not provided, executor service will have size {@link 
#DFLT_SYSTEM_CORE_THREAD_CNT}.
@@ -912,6 +930,19 @@ public class IgniteConfiguration {
     }
 
     /**
+     * Set thread pool size that will be used to process data stream messages.
+     *
+     * @param poolSize Executor service to use for data stream messages.
+     * @see IgniteConfiguration#getDataStreamerThreadPoolSize()
+     * @return {@code this} for chaining.
+     */
+    public IgniteConfiguration setDataStreamerThreadPoolSize(int poolSize) {
+        dataStreamerPoolSize = poolSize;
+
+        return this;
+    }
+
+    /**
      * Sets default thread pool size that will be used to process utility 
cache messages.
      *
      * @param poolSize Default executor service size to use for utility cache 
messages.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f597aff1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index e608af2..178239b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -532,6 +532,13 @@ public interface GridKernalContext extends 
Iterable<GridComponent> {
     public ExecutorService getIgfsExecutorService();
 
     /**
+     * Executor service that is in charge of processing data stream messages.
+     *
+     * @return Thread pool implementation to be used for data stream messages.
+     */
+    public ExecutorService getDataStreamerExecutorService();
+
+    /**
      * Should return an instance of fully configured thread pool to be used for
      * processing of client messages (REST requests).
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/f597aff1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index ddef345..1a9663b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -312,6 +312,10 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
 
     /** */
     @GridToStringExclude
+    private ExecutorService dataStreamExecSvc;
+
+    /** */
+    @GridToStringExclude
     protected ExecutorService restExecSvc;
 
     /** */
@@ -380,6 +384,7 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
      * @param p2pExecSvc P2P executor service.
      * @param mgmtExecSvc Management executor service.
      * @param igfsExecSvc IGFS executor service.
+     * @param dataStreamExecSvc data stream executor service.
      * @param restExecSvc REST executor service.
      * @param affExecSvc Affinity executor service.
      * @param plugins Plugin providers.
@@ -398,6 +403,7 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
         ExecutorService p2pExecSvc,
         ExecutorService mgmtExecSvc,
         ExecutorService igfsExecSvc,
+        ExecutorService dataStreamExecSvc,
         ExecutorService restExecSvc,
         ExecutorService affExecSvc,
         IgniteStripedThreadPoolExecutor callbackExecSvc,
@@ -416,6 +422,7 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
         this.p2pExecSvc = p2pExecSvc;
         this.mgmtExecSvc = mgmtExecSvc;
         this.igfsExecSvc = igfsExecSvc;
+        this.dataStreamExecSvc = dataStreamExecSvc;
         this.restExecSvc = restExecSvc;
         this.affExecSvc = affExecSvc;
         this.callbackExecSvc = callbackExecSvc;
@@ -956,6 +963,11 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
+    @Override public ExecutorService getDataStreamerExecutorService() {
+        return dataStreamExecSvc;
+    }
+
+    /** {@inheritDoc} */
     @Override public ExecutorService getRestExecutorService() {
         return restExecSvc;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f597aff1/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 586215a..e838dd2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -666,6 +666,7 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
      * @param p2pExecSvc P2P executor service.
      * @param mgmtExecSvc Management executor service.
      * @param igfsExecSvc IGFS executor service.
+     * @param dataStreamExecSvc data stream executor service.
      * @param restExecSvc Reset executor service.
      * @param affExecSvc Affinity executor service.
      * @param errHnd Error handler to use for notification about startup 
problems.
@@ -680,6 +681,7 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
         ExecutorService p2pExecSvc,
         ExecutorService mgmtExecSvc,
         ExecutorService igfsExecSvc,
+        ExecutorService dataStreamExecSvc,
         ExecutorService restExecSvc,
         ExecutorService affExecSvc,
         IgniteStripedThreadPoolExecutor callbackExecSvc,
@@ -786,6 +788,7 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
                 p2pExecSvc,
                 mgmtExecSvc,
                 igfsExecSvc,
+                dataStreamExecSvc,
                 restExecSvc,
                 affExecSvc,
                 callbackExecSvc,

http://git-wip-us.apache.org/repos/asf/ignite/blob/f597aff1/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index a6860b3..104599c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1468,6 +1468,9 @@ public class IgnitionEx {
         /** IGFS executor service. */
         private ThreadPoolExecutor igfsExecSvc;
 
+        /** Data streamer executor service. */
+        private ThreadPoolExecutor dataStreamerExecSvc;
+
         /** REST requests executor service. */
         private ThreadPoolExecutor restExecSvc;
 
@@ -1687,6 +1690,17 @@ public class IgnitionEx {
 
             p2pExecSvc.allowCoreThreadTimeOut(true);
 
+            // Note that we do not pre-start threads here as this pool may not 
be needed.
+            dataStreamerExecSvc = new IgniteThreadPoolExecutor(
+                "data-streamer",
+                cfg.getGridName(),
+                cfg.getDataStreamerThreadPoolSize(),
+                cfg.getDataStreamerThreadPoolSize(),
+                DFLT_THREAD_KEEP_ALIVE_TIME,
+                new LinkedBlockingQueue<Runnable>());
+
+            dataStreamerExecSvc.allowCoreThreadTimeOut(true);
+
             // Note that we do not pre-start threads here as igfs pool may not 
be needed.
             igfsExecSvc = new IgniteThreadPoolExecutor(
                 cfg.getIgfsThreadPoolSize(),
@@ -1759,7 +1773,7 @@ public class IgnitionEx {
                 grid = grid0;
 
                 grid0.start(myCfg, utilityCacheExecSvc, marshCacheExecSvc, 
execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc,
-                    igfsExecSvc, restExecSvc, affExecSvc, callbackExecSvc,
+                    igfsExecSvc, dataStreamerExecSvc, restExecSvc, affExecSvc, 
callbackExecSvc,
                     new CA() {
                         @Override public void apply() {
                             startLatch.countDown();
@@ -2377,6 +2391,10 @@ public class IgnitionEx {
 
             p2pExecSvc = null;
 
+            U.shutdownNow(getClass(), dataStreamerExecSvc, log);
+
+            dataStreamerExecSvc = null;
+
             U.shutdownNow(getClass(), igfsExecSvc, log);
 
             igfsExecSvc = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f597aff1/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index cd43318..ba41dab 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -84,6 +84,7 @@ import static 
org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER;
 import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
+import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.DATA_STREAMER_POOL;
 import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL;
 import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
 import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.MARSH_CACHE_POOL;
@@ -574,6 +575,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
                 case UTILITY_CACHE_POOL:
                 case MARSH_CACHE_POOL:
                 case IGFS_POOL:
+                case DATA_STREAMER_POOL:
                 {
                     if (msg.isOrdered())
                         processOrderedMessage(nodeId, msg, plc, msgC);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f597aff1/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
index 00590ba..71279d9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
@@ -46,6 +46,9 @@ public class GridIoPolicy {
     /** Marshaller cache execution pool. */
     public static final byte IGFS_POOL = 7;
 
+    /** Data streamer execution pool. */
+    public static final byte DATA_STREAMER_POOL = 8;
+
     /**
      * Defines the range of reserved pools that are not available for plugins.
      * @param key The key.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f597aff1/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index d388584..4bd22c5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -27,7 +27,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.IgniteCheckedException;
@@ -970,7 +969,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
      * @param plc Policy to choose executor pool.
      * @return Future.
      */
-    private <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, byte plc) 
{
+    public  <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, byte plc) 
{
         try {
             return callLocal(c, plc);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f597aff1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 7663735..bd33f62 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -21,10 +21,12 @@ import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.DelayQueue;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
@@ -35,6 +37,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.stream.StreamReceiver;
@@ -42,12 +45,23 @@ import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.GridTopic.TOPIC_DATASTREAM;
-import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
 
 /**
- *
+ * Data stream processor.
  */
 public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
+    /** Data streamer separate pool feature major version. */
+    private static final int DATA_STREAMER_POOL_MAJOR_VER = 1;
+
+    /** Data streamer separate pool feature minor version. */
+    private static final int DATA_STREAMER_POOL_MINOR_VER = 6;
+
+    /** Data streamer separate pool feature maintenance version. */
+    private static final int DATA_STREAMER_POOL_MAINTENANCE_VER = 10;
+
+    /** Default pool for data streamer messages processing. */
+    public static final byte DFLT_POLICY = GridIoPolicy.PUBLIC_POOL;
+
     /** Loaders map (access is not supposed to be highly concurrent). */
     private Collection<DataStreamerImpl> ldrs = new GridConcurrentHashSet<>();
 
@@ -218,13 +232,15 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
                 IgniteInternalFuture<?> fut = 
ctx.cache().context().exchange().affinityReadyFuture(rmtAffVer);
 
                 if (fut != null && !fut.isDone()) {
+                    final byte plc = threadIoPolicy();
+
                     fut.listen(new CI1<IgniteInternalFuture<?>>() {
                         @Override public void apply(IgniteInternalFuture<?> t) 
{
                             ctx.closure().runLocalSafe(new Runnable() {
                                 @Override public void run() {
                                     processRequest(nodeId, req);
                                 }
-                            }, false);
+                            }, plc);
                         }
                     });
 
@@ -340,12 +356,7 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
         DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, 
forceLocDep);
 
         try {
-            Byte plc = GridIoManager.currentPolicy();
-
-            if (plc == null)
-                plc = PUBLIC_POOL;
-
-            ctx.io().send(nodeId, resTopic, res, plc);
+            ctx.io().send(nodeId, resTopic, res, threadIoPolicy());
         }
         catch (IgniteCheckedException e) {
             if (ctx.discovery().alive(nodeId))
@@ -355,6 +366,59 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
         }
     }
 
+    /**
+     * Get IO policy.
+     *
+     * @return IO policy.
+     */
+    private static byte threadIoPolicy() {
+        Byte plc = GridIoManager.currentPolicy();
+
+        if (plc == null)
+            plc = DFLT_POLICY;
+
+        return plc;
+    }
+
+    /**
+     * Get IO policy for particular node.
+     *
+     * @param node Node.
+     * @return Policy.
+     */
+    public static byte ioPolicy(ClusterNode node) {
+        assert node != null;
+
+        if (node.isLocal() || node.version().greaterThanEqual(
+            DATA_STREAMER_POOL_MAJOR_VER,
+            DATA_STREAMER_POOL_MINOR_VER,
+            DATA_STREAMER_POOL_MAINTENANCE_VER))
+            return GridIoPolicy.DATA_STREAMER_POOL;
+        else
+            return DFLT_POLICY;
+    }
+
+    /**
+     * Get IO policy for particular node with provided resolver.
+     *
+     * @param rslvr Resolver.
+     * @param node Node.
+     * @return IO policy.
+     */
+    public static byte ioPolicy(@Nullable IgniteClosure<ClusterNode, Byte> 
rslvr, ClusterNode node) {
+        assert node != null;
+
+        Byte res = null;
+
+        if (rslvr != null)
+            res = rslvr.apply(node);
+
+        if (res == null)
+            res = ioPolicy(node);
+
+        return res;
+    }
+
     /** {@inheritDoc} */
     @Override public void printMemoryStats() {
         X.println(">>>");

http://git-wip-us.apache.org/repos/asf/ignite/blob/f597aff1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index c2f226c..46f6380 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.CacheException;
 import javax.cache.expiry.ExpiryPolicy;
-
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -60,7 +59,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import 
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -106,16 +104,12 @@ import org.jsr166.ConcurrentHashMap8;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridTopic.TOPIC_DATASTREAM;
-import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
 
 /**
  * Data streamer implementation.
  */
 @SuppressWarnings("unchecked")
 public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, 
Delayed {
-    /** Default policy reoslver. */
-    private static final DefaultIoPolicyResolver DFLT_IO_PLC_RSLVR = new 
DefaultIoPolicyResolver();
-
     /** Isolated receiver. */
     private static final StreamReceiver ISOLATED_UPDATER = new 
IsolatedUpdater();
 
@@ -126,7 +120,7 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
     private byte[] updaterBytes;
 
     /** IO policy resovler for data load request. */
-    private IgniteClosure<ClusterNode, Byte> ioPlcRslvr = DFLT_IO_PLC_RSLVR;
+    private IgniteClosure<ClusterNode, Byte> ioPlcRslvr;
 
     /** Max remap count before issuing an error. */
     private static final int DFLT_MAX_REMAP_CNT = 32;
@@ -1313,14 +1307,12 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
 
             IgniteInternalFuture<Object> fut;
 
-            Byte plc = ioPlcRslvr.apply(node);
-
-            if (plc == null)
-                plc = PUBLIC_POOL;
+            byte plc = DataStreamProcessor.ioPolicy(ioPlcRslvr, node);
 
-            if (isLocNode && plc == GridIoPolicy.PUBLIC_POOL) {
+            if (isLocNode) {
                 fut = ctx.closure().callLocalSafe(
-                    new DataStreamerUpdateJob(ctx, log, cacheName, entries, 
false, skipStore, keepBinary, rcvr), false);
+                    new DataStreamerUpdateJob(ctx, log, cacheName, entries, 
false, skipStore, keepBinary, rcvr),
+                    plc);
 
                 locFuts.add(fut);
 
@@ -1684,19 +1676,6 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
     }
 
     /**
-     * Default IO policy resolver.
-     */
-    private static class DefaultIoPolicyResolver implements 
IgniteClosure<ClusterNode, Byte> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public Byte apply(ClusterNode gridNode) {
-            return PUBLIC_POOL;
-        }
-    }
-
-    /**
      * Key object wrapper. Using identity equals prevents slow down in case of 
hash code collision.
      */
     private static class KeyCacheObjectWrapper {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f597aff1/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
index 41e805e..5d38e16 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
@@ -123,6 +123,9 @@ public class PoolProcessor extends GridProcessorAdapter {
 
                 return ctx.getIgfsExecutorService();
 
+            case GridIoPolicy.DATA_STREAMER_POOL:
+                return ctx.getDataStreamerExecutorService();
+
             default: {
                 if (plc < 0)
                     throw new IgniteCheckedException("Policy cannot be 
negative: " + plc);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f597aff1/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
index 9fedc35..401b09c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.affinity.Affinity;
@@ -49,6 +50,7 @@ import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteClosure;
@@ -59,6 +61,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.stream.StreamReceiver;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
 
@@ -949,6 +952,100 @@ public class DataStreamProcessorSelfTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testLocalDataStreamerDedicatedThreadPool() throws Exception {
+        try {
+            useCache = true;
+
+            Ignite ignite = startGrid(1);
+
+            final IgniteCache<String, String> cache = ignite.cache(null);
+
+            IgniteDataStreamer<String, String> ldr = ignite.dataStreamer(null);
+            try {
+                ldr.receiver(new StreamReceiver<String, String>() {
+                    @Override public void receive(IgniteCache<String, String> 
cache,
+                        Collection<Map.Entry<String, String>> entries) throws 
IgniteException {
+                        String threadName = Thread.currentThread().getName();
+
+                        cache.put("key", threadName);
+                    }
+                });
+                ldr.addData("key", "value");
+
+                ldr.tryFlush();
+
+                GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                    @Override public boolean apply() {
+                        return cache.get("key") != null;
+                    }
+                }, 3_000);
+            }
+            finally {
+                ldr.close(true);
+            }
+
+            assertNotNull(cache.get("key"));
+
+            assertTrue(cache.get("key").startsWith("data-streamer"));
+
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRemoteDataStreamerDedicatedThreadPool() throws Exception {
+        try {
+            useCache = true;
+
+            Ignite ignite = startGrid(1);
+
+            useCache = false;
+
+            Ignite client = startGrid(0);
+
+            final IgniteCache<String, String> cache = ignite.cache(null);
+
+            IgniteDataStreamer<String, String> ldr = client.dataStreamer(null);
+            try {
+                ldr.receiver(new StreamReceiver<String, String>() {
+                    @Override public void receive(IgniteCache<String, String> 
cache,
+                        Collection<Map.Entry<String, String>> entries) throws 
IgniteException {
+                        String threadName = Thread.currentThread().getName();
+
+                        cache.put("key", threadName);
+                    }
+                });
+
+                ldr.addData("key", "value");
+
+                ldr.tryFlush();
+
+                GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                    @Override public boolean apply() {
+                        return cache.get("key") != null;
+                    }
+                }, 3_000);
+            }
+            finally {
+                ldr.close(true);
+            }
+
+            assertNotNull(cache.get("key"));
+
+            assertTrue(cache.get("key").startsWith("data-streamer"));
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
      *
      */
     public static class TestObject {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f597aff1/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
index cba67e0..1d72d46 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
@@ -62,6 +62,7 @@ public class GridTestKernalContext extends 
GridKernalContextImpl {
                 null,
                 null,
                 null,
+                null,
                 U.allPluginProviders());
 
         GridTestUtils.setFieldValue(grid(), "cfg", config());
@@ -95,11 +96,6 @@ public class GridTestKernalContext extends 
GridKernalContextImpl {
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridTestKernalContext.class, this, super.toString());
-    }
-
     /**
      * Sets system executor service.
      *
@@ -109,7 +105,6 @@ public class GridTestKernalContext extends 
GridKernalContextImpl {
         this.sysExecSvc = sysExecSvc;
     }
 
-
     /**
      * Sets executor service.
      *
@@ -118,4 +113,9 @@ public class GridTestKernalContext extends 
GridKernalContextImpl {
     public void setExecutorService(ExecutorService execSvc){
         this.execSvc = execSvc;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridTestKernalContext.class, this, super.toString());
+    }
 }

Reply via email to