Repository: ignite
Updated Branches:
  refs/heads/master 0e0c2c7dc -> aa81dd14a


IGNITE-5658 Optimizations for data streamer


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aa81dd14
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aa81dd14
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aa81dd14

Branch: refs/heads/master
Commit: aa81dd14a7d0e21d448acf2bbbb3a8a3d4eca0f8
Parents: 0e0c2c7
Author: Igor Seliverstov <gvvinbl...@gmail.com>
Authored: Wed Aug 9 19:29:39 2017 +0300
Committer: Yakov Zhdanov <yzhda...@gridgain.com>
Committed: Wed Aug 9 19:29:39 2017 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/IgniteDataStreamer.java   |  26 +-
 .../apache/ignite/IgniteSystemProperties.java   |   8 +
 .../ignite/internal/GridKernalContext.java      |   2 +-
 .../ignite/internal/GridKernalContextImpl.java  |   6 +-
 .../apache/ignite/internal/IgniteKernal.java    |   5 +-
 .../ignite/internal/IgniteNodeAttributes.java   |   3 +
 .../org/apache/ignite/internal/IgnitionEx.java  |  23 +-
 .../ignite/internal/jdbc2/JdbcConnection.java   |   5 +-
 .../managers/communication/GridIoManager.java   |   6 +
 .../managers/communication/GridIoMessage.java   |   3 +
 .../cache/persistence/freelist/PagesList.java   |  67 +++--
 .../wal/reader/StandaloneGridKernalContext.java |   2 +-
 .../datastreamer/DataStreamerImpl.java          | 281 ++++++++++++-------
 .../datastreamer/DataStreamerRequest.java       |  59 +++-
 .../ignite/internal/util/StripedExecutor.java   |  89 +++++-
 .../org/apache/ignite/thread/IgniteThread.java  |  10 +
 .../DataStreamProcessorSelfTest.java            |  14 +-
 .../datastreamer/DataStreamerImplSelfTest.java  |   3 +-
 18 files changed, 427 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/aa81dd14/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
index e2473dc..b1f5851 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
@@ -20,6 +20,7 @@ package org.apache.ignite;
 import java.util.Collection;
 import java.util.Map;
 import javax.cache.CacheException;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.stream.StreamReceiver;
@@ -72,7 +73,7 @@ import org.jetbrains.annotations.Nullable;
  *      this setting limits maximum allowed number of parallel buffered stream 
messages that
  *      are being processed on remote nodes. If this number is exceeded, then
  *      {@link #addData(Object, Object)} method will block to control memory 
utilization.
- *      Default is defined by {@link #DFLT_MAX_PARALLEL_OPS} value.
+ *      Default is equal to CPU count on remote node multiply by {@link 
#DFLT_PARALLEL_OPS_MULTIPLIER}.
  *  </li>
  *  <li>
  *      {@link #autoFlushFrequency(long)} - automatic flush frequency in 
milliseconds. Essentially,
@@ -100,11 +101,23 @@ import org.jetbrains.annotations.Nullable;
  * </ul>
  */
 public interface IgniteDataStreamer<K, V> extends AutoCloseable {
-    /** Default max concurrent put operations count. */
+    /**
+     * Default max concurrent put operations count.
+     * @deprecated Is not used anymore.
+     */
+    @Deprecated
     public static final int DFLT_MAX_PARALLEL_OPS = 16;
 
-    /** Default per node buffer size. */
-    public static final int DFLT_PER_NODE_BUFFER_SIZE = 1024;
+    /**
+     * Default multiplier for data streamer pool size to get concurrent 
batches count for each remote node.
+     *
+     * @see IgniteConfiguration#getDataStreamerThreadPoolSize()
+     * @see #perNodeParallelOperations()
+     */
+    public static final int DFLT_PARALLEL_OPS_MULTIPLIER = 8;
+
+    /** Default operations batch size to sent to remote node for loading. */
+    public static final int DFLT_PER_NODE_BUFFER_SIZE = 512;
 
     /** Default timeout for streamer's operations. */
     public static final long DFLT_UNLIMIT_TIMEOUT = -1;
@@ -203,9 +216,11 @@ public interface IgniteDataStreamer<K, V> extends 
AutoCloseable {
      * <p>
      * This method should be called prior to {@link #addData(Object, Object)} 
call.
      * <p>
-     * If not provided, default value is {@link #DFLT_MAX_PARALLEL_OPS}.
+     * If not provided, default value is calculated as follows
+     * {@link #DFLT_PARALLEL_OPS_MULTIPLIER} * {@code 
DATA_STREAMER_POOL_SIZE_ON_REMOTE_NODE}.
      *
      * @param parallelOps Maximum number of parallel stream operations for a 
single node.
+     * @see IgniteConfiguration#getDataStreamerThreadPoolSize()
      */
     public void perNodeParallelOperations(int parallelOps);
 
@@ -450,5 +465,4 @@ public interface IgniteDataStreamer<K, V> extends 
AutoCloseable {
      */
     @Override public void close() throws CacheException, 
IgniteInterruptedException,
         IgniteDataStreamerTimeoutException;
-
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa81dd14/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 2fa52b6..8af66c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -703,6 +703,14 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_ENABLE_FORCIBLE_NODE_KILL = 
"IGNITE_ENABLE_FORCIBLE_NODE_KILL";
 
     /**
+     * Tasks stealing will be started if tasks queue size per data-streamer 
thread exceeds this threshold.
+     * <p>
+     * Default value is {@code 4}.
+     */
+    public static final String 
IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD =
+            "IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD";
+
+    /**
      * Enforces singleton.
      */
     private IgniteSystemProperties() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa81dd14/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index ef4001f..48a1e3e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -532,7 +532,7 @@ public interface GridKernalContext extends 
Iterable<GridComponent> {
      *
      * @return Thread pool implementation to be used for data stream messages.
      */
-    public ExecutorService getDataStreamerExecutorService();
+    public StripedExecutor getDataStreamerExecutorService();
 
     /**
      * Should return an instance of fully configured thread pool to be used for

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa81dd14/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 42a9b2c..89ead1a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -317,7 +317,7 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
 
     /** */
     @GridToStringExclude
-    private ExecutorService dataStreamExecSvc;
+    private StripedExecutor dataStreamExecSvc;
 
     /** */
     @GridToStringExclude
@@ -423,7 +423,7 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
         ExecutorService p2pExecSvc,
         ExecutorService mgmtExecSvc,
         ExecutorService igfsExecSvc,
-        ExecutorService dataStreamExecSvc,
+        StripedExecutor dataStreamExecSvc,
         ExecutorService restExecSvc,
         ExecutorService affExecSvc,
         @Nullable ExecutorService idxExecSvc,
@@ -976,7 +976,7 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
-    @Override public ExecutorService getDataStreamerExecutorService() {
+    @Override public StripedExecutor getDataStreamerExecutorService() {
         return dataStreamExecSvc;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa81dd14/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 756f2ab..f5111c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -213,6 +213,7 @@ import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_VER;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CLIENT_MODE;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DAEMON;
+import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS;
@@ -718,7 +719,7 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
         ExecutorService p2pExecSvc,
         ExecutorService mgmtExecSvc,
         ExecutorService igfsExecSvc,
-        ExecutorService dataStreamExecSvc,
+        StripedExecutor dataStreamExecSvc,
         ExecutorService restExecSvc,
         ExecutorService affExecSvc,
         @Nullable ExecutorService idxExecSvc,
@@ -1427,6 +1428,8 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
      */
     @SuppressWarnings({"SuspiciousMethodCalls", "unchecked", 
"TypeMayBeWeakened"})
     private void fillNodeAttributes(boolean notifyEnabled) throws 
IgniteCheckedException {
+        ctx.addNodeAttribute(ATTR_DATA_STREAMER_POOL_SIZE, 
configuration().getDataStreamerThreadPoolSize());
+
         final String[] incProps = cfg.getIncludeProperties();
 
         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa81dd14/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
index a45f991..fe23f92 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
@@ -177,6 +177,9 @@ public final class IgniteNodeAttributes {
     /** Ignite security compatibility mode. */
     public static final String ATTR_SECURITY_COMPATIBILITY_MODE = ATTR_PREFIX 
+ ".security.compatibility.enabled";
 
+    /** */
+    public static final String ATTR_DATA_STREAMER_POOL_SIZE = ATTR_PREFIX + 
".data.streamer.pool.size";
+
     /**
      * Enforces singleton.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa81dd14/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 23baeb3..07a5c43 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1508,7 +1508,7 @@ public class IgnitionEx {
         private ThreadPoolExecutor igfsExecSvc;
 
         /** Data streamer executor service. */
-        private ThreadPoolExecutor dataStreamerExecSvc;
+        private StripedExecutor dataStreamerExecSvc;
 
         /** REST requests executor service. */
         private ThreadPoolExecutor restExecSvc;
@@ -1728,7 +1728,11 @@ public class IgnitionEx {
 
             validateThreadPoolSize(cfg.getStripedPoolSize(), "stripedPool");
 
-            stripedExecSvc = new StripedExecutor(cfg.getStripedPoolSize(), 
cfg.getIgniteInstanceName(), "sys", log);
+            stripedExecSvc = new StripedExecutor(
+                cfg.getStripedPoolSize(),
+                cfg.getIgniteInstanceName(),
+                "sys",
+                log);
 
             // Note that since we use 'LinkedBlockingQueue', number of
             // maximum threads has no effect.
@@ -1763,17 +1767,12 @@ public class IgnitionEx {
 
             p2pExecSvc.allowCoreThreadTimeOut(true);
 
-            // Note that we do not pre-start threads here as this pool may not 
be needed.
-            dataStreamerExecSvc = new IgniteThreadPoolExecutor(
-                "data-streamer",
-                cfg.getIgniteInstanceName(),
-                cfg.getDataStreamerThreadPoolSize(),
+            dataStreamerExecSvc = new StripedExecutor(
                 cfg.getDataStreamerThreadPoolSize(),
-                DFLT_THREAD_KEEP_ALIVE_TIME,
-                new LinkedBlockingQueue<Runnable>(),
-                GridIoPolicy.DATA_STREAMER_POOL);
-
-            dataStreamerExecSvc.allowCoreThreadTimeOut(true);
+                cfg.getIgniteInstanceName(),
+                "data-streamer",
+                log,
+                true);
 
             // Note that we do not pre-start threads here as igfs pool may not 
be needed.
             validateThreadPoolSize(cfg.getIgfsThreadPoolSize(), "IGFS");

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa81dd14/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
index 1bf51f2..6fb723e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
@@ -185,8 +185,9 @@ public class JdbcConnection implements Connection {
         streamFlushTimeout = 
Long.parseLong(props.getProperty(PROP_STREAMING_FLUSH_FREQ, "0"));
         streamNodeBufSize = 
Integer.parseInt(props.getProperty(PROP_STREAMING_PER_NODE_BUF_SIZE,
             String.valueOf(IgniteDataStreamer.DFLT_PER_NODE_BUFFER_SIZE)));
-        streamNodeParOps = 
Integer.parseInt(props.getProperty(PROP_STREAMING_PER_NODE_PAR_OPS,
-            String.valueOf(IgniteDataStreamer.DFLT_MAX_PARALLEL_OPS)));
+        // If value is zero, server data-streamer pool size multiplied
+        // by IgniteDataStreamer.DFLT_PARALLEL_OPS_MULTIPLIER will be used
+        streamNodeParOps = 
Integer.parseInt(props.getProperty(PROP_STREAMING_PER_NODE_PAR_OPS, "0"));
 
         String nodeIdProp = props.getProperty(PROP_NODE_ID);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa81dd14/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index bb36b26..2005032 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1125,6 +1125,12 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
             return;
         }
 
+        if (plc == GridIoPolicy.DATA_STREAMER_POOL && msg.partition() != 
GridIoMessage.STRIPE_DISABLED_PART) {
+            ctx.getDataStreamerExecutorService().execute(msg.partition(), c);
+
+            return;
+        }
+
         if (msg.topicOrdinal() == TOPIC_IO_TEST.ordinal()) {
             IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg.message();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa81dd14/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index dccd336..fe61aec 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import org.apache.ignite.internal.ExecutorAwareMessage;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -336,6 +337,8 @@ public class GridIoMessage implements Message {
     public int partition() {
         if (msg instanceof GridCacheMessage)
             return ((GridCacheMessage)msg).partition();
+        if (msg instanceof DataStreamerRequest)
+            return ((DataStreamerRequest)msg).partition();
         else
             return STRIPE_DISABLED_PART;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa81dd14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java
index 39a6865..6c355f6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 
 import static java.lang.Boolean.FALSE;
@@ -66,7 +67,7 @@ public abstract class PagesList extends DataStructure {
     /** */
     private static final int MAX_STRIPES_PER_BUCKET =
         
IgniteSystemProperties.getInteger("IGNITE_PAGES_LIST_STRIPES_PER_BUCKET",
-            Math.min(8, Runtime.getRuntime().availableProcessors() * 2));
+            Math.max(8, Runtime.getRuntime().availableProcessors()));
 
     /** */
     protected final AtomicLong[] bucketsSize;
@@ -507,6 +508,21 @@ public abstract class PagesList extends DataStructure {
      * @throws IgniteCheckedException If failed.
      */
     private Stripe getPageForPut(int bucket) throws IgniteCheckedException {
+        // Striped pool optimization.
+        int stripeIdx; IgniteThread igniteThread = IgniteThread.current();
+
+        if (igniteThread != null && (stripeIdx = igniteThread.stripe()) != -1) 
{
+            Stripe[] tails = getBucket(bucket);
+
+            while (tails == null || stripeIdx >= tails.length) {
+                addStripe(bucket, true);
+
+                tails = getBucket(bucket);
+            }
+
+            return tails[stripeIdx];
+        }
+
         Stripe[] tails = getBucket(bucket);
 
         if (tails == null)
@@ -607,12 +623,8 @@ public abstract class PagesList extends DataStructure {
             try {
                 long tailAddr = writeLockPage(tailId, tailPage, bucket, 
lockAttempt++); // Explicit check.
 
-                if (tailAddr == 0L) {
-                    if (isReuseBucket(bucket) && lockAttempt == 
TRY_LOCK_ATTEMPTS)
-                        addStripeForReuseBucket(bucket);
-
+                if (tailAddr == 0L)
                     continue;
-                }
 
                 assert PageIO.getPageId(tailAddr) == tailId : "pageId = " + 
PageIO.getPageId(tailAddr) + ", tailId = " + tailId;
                 assert PageIO.getType(tailAddr) == PageIO.T_PAGE_LIST_NODE;
@@ -912,13 +924,26 @@ public abstract class PagesList extends DataStructure {
      * @param bucket Bucket index.
      * @return Page for take.
      */
-    private Stripe getPageForTake(int bucket) {
+    private Stripe getPageForTake(int bucket) throws IgniteCheckedException {
         Stripe[] tails = getBucket(bucket);
 
         if (tails == null || bucketsSize[bucket].get() == 0)
             return null;
 
         int len = tails.length;
+
+        // Striped pool optimization.
+        int stripeIdx; IgniteThread igniteThread = IgniteThread.current();
+
+        if (igniteThread != null && (stripeIdx = igniteThread.stripe()) != -1) 
{
+            if (stripeIdx >= len)
+                return null;
+
+            Stripe stripe = tails[stripeIdx];
+
+            return stripe.empty ? null : stripe;
+        }
+
         int init = randomInt(len);
         int cur = init;
 
@@ -943,6 +968,12 @@ public abstract class PagesList extends DataStructure {
      */
     private long writeLockPage(long pageId, long page, int bucket, int 
lockAttempt)
         throws IgniteCheckedException {
+        // Striped pool optimization.
+        IgniteThread igniteThread = IgniteThread.current();
+
+        if (igniteThread != null && igniteThread.stripe() != -1)
+            return writeLock(pageId, page);
+
         long pageAddr = tryWriteLock(pageId, page);
 
         if (pageAddr != 0L)
@@ -952,8 +983,7 @@ public abstract class PagesList extends DataStructure {
             Stripe[] stripes = getBucket(bucket);
 
             if (stripes == null || stripes.length < MAX_STRIPES_PER_BUCKET) {
-                if (!isReuseBucket(bucket))
-                    addStripe(bucket, true);
+                addStripe(bucket, !isReuseBucket(bucket));
 
                 return 0L;
             }
@@ -963,19 +993,6 @@ public abstract class PagesList extends DataStructure {
     }
 
     /**
-     * @param bucket Bucket.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void addStripeForReuseBucket(int bucket) throws 
IgniteCheckedException {
-        assert isReuseBucket(bucket);
-
-        Stripe[] stripes = getBucket(bucket);
-
-        if (stripes == null || stripes.length < MAX_STRIPES_PER_BUCKET)
-            addStripe(bucket, false);
-    }
-
-    /**
      * @param bucket Bucket index.
      * @param initIoVers Optional IO to initialize page.
      * @return Removed page ID.
@@ -994,12 +1011,8 @@ public abstract class PagesList extends DataStructure {
             try {
                 long tailAddr = writeLockPage(tailId, tailPage, bucket, 
lockAttempt++); // Explicit check.
 
-                if (tailAddr == 0L) {
-                    if (isReuseBucket(bucket) && lockAttempt == 
TRY_LOCK_ATTEMPTS)
-                        addStripeForReuseBucket(bucket);
-
+                if (tailAddr == 0L)
                     continue;
-                }
 
                 if (stripe.empty) {
                     // Another thread took the last page.

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa81dd14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index 02b9352..8752eaa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -421,7 +421,7 @@ public class StandaloneGridKernalContext implements 
GridKernalContext {
     }
 
     /** {@inheritDoc} */
-    @Override public ExecutorService getDataStreamerExecutorService() {
+    @Override public StripedExecutor getDataStreamerExecutorService() {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa81dd14/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index df51fac..eaced66 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.datastreamer;
 
+import java.lang.reflect.Array;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -60,8 +61,10 @@ import 
org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import 
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -152,7 +155,7 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
     private int bufSize = DFLT_PER_NODE_BUFFER_SIZE;
 
     /** */
-    private int parallelOps = DFLT_MAX_PARALLEL_OPS;
+    private int parallelOps;
 
     /** */
     private long timeout = DFLT_UNLIMIT_TIMEOUT;
@@ -791,6 +794,9 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
                             initPda = false;
                         }
 
+                        if (key.partition() == -1)
+                            key.partition(cctx.affinity().partition(key, 
false));
+
                         nodes = nodes(key, topVer, cctx);
                     }
                     catch (IgniteCheckedException e) {
@@ -932,10 +938,10 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
                         }
                     };
 
-                    final GridFutureAdapter<?> f;
+                    final List<GridFutureAdapter<?>> futs;
 
                     try {
-                        f = buf.update(entriesForNode, topVer, lsnr, remap);
+                        futs = buf.update(entriesForNode, topVer, lsnr, remap);
                     }
                     catch (IgniteInterruptedCheckedException e1) {
                         resFut.onDone(e1);
@@ -951,9 +957,13 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
                                 @Override public void run() {
                                     buf0.onNodeLeft();
 
-                                    if (f != null)
-                                        f.onDone(new 
ClusterTopologyCheckedException("Failed to wait for request completion " +
-                                            "(node has left): " + nodeId));
+                                    if (futs != null) {
+                                        Throwable ex = new 
ClusterTopologyCheckedException(
+                                                "Failed to wait for request 
completion (node has left): " + nodeId);
+
+                                        for (int i = 0; i < futs.size(); i++)
+                                            futs.get(i).onDone(ex);
+                                    }
                                 }
                             }, ctx.discovery().topologyVersion(), false);
                         }
@@ -1311,11 +1321,7 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
         private final Collection<IgniteInternalFuture<Object>> locFuts;
 
         /** Buffered entries. */
-        private List<DataStreamerEntry> entries;
-
-        /** */
-        @GridToStringExclude
-        private GridFutureAdapter<Object> curFut;
+        private final PerStripeBuffer[] stripes;
 
         /** Local node flag. */
         private final boolean isLocNode;
@@ -1329,16 +1335,17 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
         /** */
         private final Semaphore sem;
 
-        /** Batch topology. */
-        private AffinityTopologyVersion batchTopVer;
+        /** */
+        private final int perNodeParallelOps;
 
         /** Closure to signal on task finish. */
         @GridToStringExclude
-        private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = 
new IgniteInClosure<IgniteInternalFuture<Object>>() {
-            @Override public void apply(IgniteInternalFuture<Object> t) {
-                signalTaskFinished(t);
-            }
-        };
+        private final IgniteInClosure<IgniteInternalFuture<Object>> signalC =
+            new IgniteInClosure<IgniteInternalFuture<Object>>() {
+                @Override public void apply(IgniteInternalFuture<Object> t) {
+                    signalTaskFinished(t);
+                }
+            };
 
         /**
          * @param node Node.
@@ -1354,24 +1361,19 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
             // Cache local node flag.
             isLocNode = node.equals(ctx.discovery().localNode());
 
-            entries = newEntries();
-            curFut = new GridFutureAdapter<>();
-            curFut.listen(signalC);
+            Integer attrStreamerPoolSize = 
node.attribute(IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE);
 
-            sem = new Semaphore(parallelOps);
-        }
+            int streamerPoolSize = attrStreamerPoolSize != null ? 
attrStreamerPoolSize : node.metrics().getTotalCpus();
 
-        /**
-         * @param remap Remapping flag.
-         */
-        private void renewBatch(boolean remap) {
-            entries = newEntries();
-            curFut = new GridFutureAdapter<>();
+            perNodeParallelOps = parallelOps != 0 ? parallelOps :
+                streamerPoolSize * 
IgniteDataStreamer.DFLT_PARALLEL_OPS_MULTIPLIER;
 
-            batchTopVer = null;
+            sem = new Semaphore(perNodeParallelOps);
 
-            if (!remap)
-                curFut.listen(signalC);
+            stripes = 
(PerStripeBuffer[])Array.newInstance(PerStripeBuffer.class, streamerPoolSize);
+
+            for (int i = 0; i < stripes.length; i++)
+                stripes[i] = new PerStripeBuffer(i, signalC);
         }
 
         /**
@@ -1382,61 +1384,69 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
          * @return Future for operation.
          * @throws IgniteInterruptedCheckedException If failed.
          */
-        @Nullable GridFutureAdapter<?> update(Iterable<DataStreamerEntry> 
newEntries,
+        @Nullable List<GridFutureAdapter<?>> update(
+            Iterable<DataStreamerEntry> newEntries,
             AffinityTopologyVersion topVer,
             IgniteInClosure<IgniteInternalFuture<?>> lsnr,
-            boolean remap) throws IgniteInterruptedCheckedException {
-            List<DataStreamerEntry> entries0 = null;
+            boolean remap
+        ) throws IgniteInterruptedCheckedException {
+            List<GridFutureAdapter<?>> res = null;
 
-            GridFutureAdapter<Object> curFut0;
+            for (DataStreamerEntry entry : newEntries) {
+                List<DataStreamerEntry> entries0 = null;
+                AffinityTopologyVersion curBatchTopVer;
+
+                // Init buffer.
+                int part = entry.getKey().partition();
 
-            AffinityTopologyVersion curBatchTopVer;
+                GridFutureAdapter<Object> curFut0;
+                PerStripeBuffer b = stripes[part % stripes.length];
 
-            synchronized (this) {
-                curFut0 = curFut;
+                synchronized (b) {
+                    curFut0 = b.curFut;
 
-                curFut0.listen(lsnr);
+                    // Listener should be added only once per whole entries 
collection.
+                    // Should we simplify the model and get rid of all futures?
+                    curFut0.listen(lsnr);
 
-                if (batchTopVer == null)
-                    batchTopVer = topVer;
+                    if (b.batchTopVer == null)
+                        b.batchTopVer = topVer;
 
-                curBatchTopVer = batchTopVer;
+                    curBatchTopVer = b.batchTopVer;
 
-                for (DataStreamerEntry entry : newEntries)
-                    entries.add(entry);
+                    b.entries.add(entry);
 
-                if (entries.size() >= bufSize) {
-                    entries0 = entries;
+                    if (b.entries.size() >= bufSize) {
+                        entries0 = b.entries;
 
-                    renewBatch(remap);
+                        b.renewBatch(remap);
+                    }
                 }
-            }
 
-            if (!allowOverwrite() && !topVer.equals(curBatchTopVer)) {
-                renewBatch(remap);
+                if (res == null)
+                    res = new ArrayList<>();
 
-                curFut0.onDone(null, new IgniteCheckedException("Topology 
changed during batch preparation." +
-                    "[batchTopVer=" + curBatchTopVer + ", topVer=" + topVer + 
"]"));
-            }
-            else if (entries0 != null) {
-                submit(entries0, curBatchTopVer, curFut0, remap);
-
-                if (cancelled)
-                    curFut0.onDone(new IgniteCheckedException("Data streamer 
has been cancelled: " +
-                        DataStreamerImpl.this));
-                else if (ctx.clientDisconnected())
-                    curFut0.onDone(new 
IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
-                        "Client node disconnected."));
-            }
+                res.add(curFut0);
 
-            return curFut0;
-        }
+                if (!allowOverwrite() && !topVer.equals(curBatchTopVer)) {
+                    b.renewBatch(remap);
 
-        /**
-         * @return Fresh collection with some space for outgrowth.
-         */
-        private List<DataStreamerEntry> newEntries() {
-            return new ArrayList<>((int)(bufSize * 1.2));
+                    curFut0.onDone(null, new IgniteCheckedException("Topology 
changed during batch preparation." +
+                        "[batchTopVer=" + curBatchTopVer + ", topVer=" + 
topVer + "]"));
+                }
+                else if (entries0 != null) {
+                    submit(entries0, curBatchTopVer, curFut0, remap, b.partId);
+
+                    if (cancelled)
+                        curFut0.onDone(new IgniteCheckedException("Data 
streamer has been cancelled: " +
+                            DataStreamerImpl.this));
+                    else if (ctx.clientDisconnected())
+                        curFut0.onDone(new 
IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+                            "Client node disconnected."));
+                }
+            }
+
+            return res;
         }
 
         /**
@@ -1444,24 +1454,26 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
          * @throws IgniteInterruptedCheckedException If thread has been 
interrupted.
          */
         @Nullable IgniteInternalFuture<?> flush() throws 
IgniteInterruptedCheckedException {
-            List<DataStreamerEntry> entries0 = null;
-            GridFutureAdapter<Object> curFut0 = null;
-
             acquireRemapSemaphore();
 
-            synchronized (this) {
-                if (!entries.isEmpty()) {
-                    entries0 = entries;
-                    curFut0 = curFut;
+            for (PerStripeBuffer b : stripes) {
+                AffinityTopologyVersion batchTopVer = null;
+                List<DataStreamerEntry> entries0 = null;
+                GridFutureAdapter<Object> curFut0 = null;
+
+                synchronized (b) {
+                    if (!b.entries.isEmpty()) {
+                        entries0 = b.entries;
+                        curFut0 = b.curFut;
+                        batchTopVer = b.batchTopVer;
 
-                    entries = newEntries();
-                    curFut = new GridFutureAdapter<>();
-                    curFut.listen(signalC);
+                        b.renewBatch(false);
+                    }
                 }
-            }
 
-            if (entries0 != null)
-                submit(entries0, batchTopVer, curFut0, false);
+                if (entries0 != null)
+                    submit(entries0, batchTopVer, curFut0, false, b.partId);
+            }
 
             // Create compound future for this flush.
             GridCompoundFuture<Object, Object> res = null;
@@ -1601,13 +1613,16 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
          * @param topVer Topology version.
          * @param curFut Current future.
          * @param remap Remapping flag.
+         * @param partId Partition ID.
          * @throws IgniteInterruptedCheckedException If interrupted.
          */
-        private void submit(final Collection<DataStreamerEntry> entries,
+        private void submit(
+            final Collection<DataStreamerEntry> entries,
             @Nullable AffinityTopologyVersion topVer,
             final GridFutureAdapter<Object> curFut,
-            boolean remap)
-            throws IgniteInterruptedCheckedException {
+            boolean remap,
+            int partId
+        ) throws IgniteInterruptedCheckedException {
             assert entries != null;
             assert !entries.isEmpty();
             assert curFut != null;
@@ -1668,7 +1683,8 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
                             cache.context().deploy().onEnter();
                     }
                     catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to deploy class (request will not 
be sent): " + jobPda0.deployClass(), e);
+                        U.error(log, "Failed to deploy class (request will not 
be sent): " +
+                            jobPda0.deployClass(), e);
 
                         return;
                     }
@@ -1701,7 +1717,9 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
                     dep != null ? dep.participants() : null,
                     dep != null ? dep.classLoaderId() : null,
                     dep == null,
-                    topVer);
+                    topVer,
+                    (rcvr == ISOLATED_UPDATER) ?
+                        partId : GridIoMessage.STRIPE_DISABLED_PART);
 
                 try {
                     ctx.io().sendToGridTopic(node, TOPIC_DATASTREAM, req, plc);
@@ -1750,11 +1768,13 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
             // Make sure to complete current future.
             GridFutureAdapter<Object> curFut0;
 
-            synchronized (this) {
-                curFut0 = curFut;
-            }
+            for (PerStripeBuffer b : stripes) {
+                synchronized (b) {
+                    curFut0 = b.curFut;
+                }
 
-            curFut0.onDone(e);
+                curFut0.onDone(e);
+            }
         }
 
         /**
@@ -1828,10 +1848,14 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
 
         /** {@inheritDoc} */
         @Override public String toString() {
-            int size;
+            int size = 0;
 
-            synchronized (this) {
-                size = entries.size();
+            for (int i = 0; i < stripes.length; i++) {
+                PerStripeBuffer b = stripes[i];
+
+                synchronized (b) {
+                    size += b.entries.size();
+                }
             }
 
             return S.toString(Buffer.class, this,
@@ -1920,8 +1944,10 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
         private static final long serialVersionUID = 0L;
 
         /** {@inheritDoc} */
-        @Override public void receive(IgniteCache<KeyCacheObject, CacheObject> 
cache,
-            Collection<Map.Entry<KeyCacheObject, CacheObject>> entries) {
+        @Override public void receive(
+            IgniteCache<KeyCacheObject, CacheObject> cache,
+            Collection<Map.Entry<KeyCacheObject, CacheObject>> entries
+        ) {
             IgniteCacheProxy<KeyCacheObject, CacheObject> proxy = 
(IgniteCacheProxy<KeyCacheObject, CacheObject>)cache;
 
             GridCacheAdapter<KeyCacheObject, CacheObject> internalCache = 
proxy.context().cache();
@@ -2080,4 +2106,63 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
             return S.toString(KeyCacheObjectWrapper.class, this);
         }
     }
+
+    /**
+     *
+     */
+    private class PerStripeBuffer {
+        /** */
+        private final int partId;
+
+        /** */
+        private List<DataStreamerEntry> entries;
+
+        /** */
+        private GridFutureAdapter<Object> curFut;
+
+        /** Batch topology. */
+        private AffinityTopologyVersion batchTopVer;
+
+        /** */
+        private final IgniteInClosure<? super IgniteInternalFuture<Object>> 
signalC;
+
+        /**
+         * @param partId Partition ID.
+         * @param c Signal closure.
+         */
+        public PerStripeBuffer(
+            int partId,
+            IgniteInClosure<? super IgniteInternalFuture<Object>> c
+        ) {
+            this.partId = partId;
+            signalC = c;
+
+            renewBatch(false);
+        }
+
+        /**
+         * @param remap Remap.
+         */
+        synchronized void renewBatch(boolean remap) {
+            entries = newEntries();
+            curFut = new GridFutureAdapter<>();
+
+            batchTopVer = null;
+
+            if (!remap)
+                curFut.listen(signalC);
+        }
+
+        /**
+         * @return Fresh collection with some space for outgrowth.
+         */
+        private List<DataStreamerEntry> newEntries() {
+            return new ArrayList<>((int)(bufSize * 1.2));
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(PerStripeBuffer.class, this, super.toString());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa81dd14/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
index b4cbf66..f70ee9c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
@@ -90,6 +90,9 @@ public class DataStreamerRequest implements Message {
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
+    /** */
+    private int partId;
+
     /**
      * {@code Externalizable} support.
      */
@@ -113,8 +116,10 @@ public class DataStreamerRequest implements Message {
      * @param clsLdrId Class loader ID.
      * @param forceLocDep Force local deployment.
      * @param topVer Topology version.
+     * @param partId Partition ID.
      */
-    public DataStreamerRequest(long reqId,
+    public DataStreamerRequest(
+        long reqId,
         byte[] resTopicBytes,
         @Nullable String cacheName,
         byte[] updaterBytes,
@@ -128,7 +133,9 @@ public class DataStreamerRequest implements Message {
         Map<UUID, IgniteUuid> ldrParticipants,
         IgniteUuid clsLdrId,
         boolean forceLocDep,
-        @NotNull AffinityTopologyVersion topVer) {
+        @NotNull AffinityTopologyVersion topVer,
+        int partId
+    ) {
         assert topVer != null;
 
         this.reqId = reqId;
@@ -146,6 +153,7 @@ public class DataStreamerRequest implements Message {
         this.clsLdrId = clsLdrId;
         this.forceLocDep = forceLocDep;
         this.topVer = topVer;
+        this.partId = partId;
     }
 
     /**
@@ -253,6 +261,13 @@ public class DataStreamerRequest implements Message {
         return topVer;
     }
 
+    /**
+     * @return Partition ID.
+     */
+    public int partition() {
+        return partId;
+    }
+
     /** {@inheritDoc} */
     @Override public void onAckReceived() {
         // No-op.
@@ -324,42 +339,48 @@ public class DataStreamerRequest implements Message {
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeLong("reqId", reqId))
+                if (!writer.writeInt("partId", partId))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeByteArray("resTopicBytes", resTopicBytes))
+                if (!writer.writeLong("reqId", reqId))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeString("sampleClsName", sampleClsName))
+                if (!writer.writeByteArray("resTopicBytes", resTopicBytes))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeBoolean("skipStore", skipStore))
+                if (!writer.writeString("sampleClsName", sampleClsName))
                     return false;
 
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeBoolean("skipStore", skipStore))
                     return false;
 
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeByteArray("updaterBytes", updaterBytes))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 14:
+                if (!writer.writeByteArray("updaterBytes", updaterBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 15:
                 if (!writer.writeString("userVer", userVer))
                     return false;
 
@@ -447,7 +468,7 @@ public class DataStreamerRequest implements Message {
                 reader.incrementState();
 
             case 8:
-                reqId = reader.readLong("reqId");
+                partId = reader.readInt("partId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -455,7 +476,7 @@ public class DataStreamerRequest implements Message {
                 reader.incrementState();
 
             case 9:
-                resTopicBytes = reader.readByteArray("resTopicBytes");
+                reqId = reader.readLong("reqId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -463,7 +484,7 @@ public class DataStreamerRequest implements Message {
                 reader.incrementState();
 
             case 10:
-                sampleClsName = reader.readString("sampleClsName");
+                resTopicBytes = reader.readByteArray("resTopicBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -471,7 +492,7 @@ public class DataStreamerRequest implements Message {
                 reader.incrementState();
 
             case 11:
-                skipStore = reader.readBoolean("skipStore");
+                sampleClsName = reader.readString("sampleClsName");
 
                 if (!reader.isLastRead())
                     return false;
@@ -479,7 +500,7 @@ public class DataStreamerRequest implements Message {
                 reader.incrementState();
 
             case 12:
-                topVer = reader.readMessage("topVer");
+                skipStore = reader.readBoolean("skipStore");
 
                 if (!reader.isLastRead())
                     return false;
@@ -487,7 +508,7 @@ public class DataStreamerRequest implements Message {
                 reader.incrementState();
 
             case 13:
-                updaterBytes = reader.readByteArray("updaterBytes");
+                topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -495,6 +516,14 @@ public class DataStreamerRequest implements Message {
                 reader.incrementState();
 
             case 14:
+                updaterBytes = reader.readByteArray("updaterBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 15:
                 userVer = reader.readString("userVer");
 
                 if (!reader.isLastRead())
@@ -514,6 +543,6 @@ public class DataStreamerRequest implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 15;
+        return 16;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa81dd14/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 6d5dc71..630d34c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -20,10 +20,12 @@ package org.apache.ignite.internal.util;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -36,6 +38,8 @@ import java.util.concurrent.locks.LockSupport;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -62,6 +66,17 @@ public class StripedExecutor implements ExecutorService {
      * @param log Logger.
      */
     public StripedExecutor(int cnt, String igniteInstanceName, String 
poolName, final IgniteLogger log) {
+        this(cnt, igniteInstanceName, poolName, log, false);
+    }
+
+    /**
+     * @param cnt Count.
+     * @param igniteInstanceName Node name.
+     * @param poolName Pool name.
+     * @param log Logger.
+     * @param stealTasks {@code True} to steal tasks.
+     */
+    public StripedExecutor(int cnt, String igniteInstanceName, String 
poolName, final IgniteLogger log, boolean stealTasks) {
         A.ensure(cnt > 0, "cnt > 0");
 
         boolean success = false;
@@ -76,14 +91,19 @@ public class StripedExecutor implements ExecutorService {
 
         try {
             for (int i = 0; i < cnt; i++) {
-                stripes[i] = new StripeConcurrentQueue(
+                stripes[i] = stealTasks ? new StripeConcurrentQueue(
                     igniteInstanceName,
                     poolName,
                     i,
-                    log);
+                    log, stripes) : new StripeConcurrentQueue(
+                        igniteInstanceName,
+                        poolName,
+                        i,
+                        log);
+            }
 
+            for (int i = 0; i < cnt; i++)
                 stripes[i].start();
-            }
 
             success = true;
         }
@@ -397,7 +417,7 @@ public class StripedExecutor implements ExecutorService {
         private final String poolName;
 
         /** */
-        private final int idx;
+        protected final int idx;
 
         /** */
         private final IgniteLogger log;
@@ -536,8 +556,17 @@ public class StripedExecutor implements ExecutorService {
      * Stripe.
      */
     private static class StripeConcurrentQueue extends Stripe {
+        /** */
+        private static final int IGNITE_TASKS_STEALING_THRESHOLD =
+            IgniteSystemProperties.getInteger(
+                
IgniteSystemProperties.IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD,
 4);
+
         /** Queue. */
-        private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
+        private final Queue<Runnable> queue;
+
+        /** */
+        @GridToStringExclude
+        private final Stripe[] others;
 
         /** */
         private volatile boolean parked;
@@ -548,16 +577,37 @@ public class StripedExecutor implements ExecutorService {
          * @param idx Stripe index.
          * @param log Logger.
          */
-        public StripeConcurrentQueue(
+        StripeConcurrentQueue(
             String igniteInstanceName,
             String poolName,
             int idx,
             IgniteLogger log
         ) {
-            super(igniteInstanceName,
+            this(igniteInstanceName, poolName, idx, log, null);
+        }
+
+        /**
+         * @param igniteInstanceName Ignite instance name.
+         * @param poolName Pool name.
+         * @param idx Stripe index.
+         * @param log Logger.
+         */
+        StripeConcurrentQueue(
+            String igniteInstanceName,
+            String poolName,
+            int idx,
+            IgniteLogger log,
+            Stripe[] others
+        ) {
+            super(
+                igniteInstanceName,
                 poolName,
                 idx,
                 log);
+
+            this.others = others;
+
+            this.queue = others == null ? new 
ConcurrentLinkedQueue<Runnable>() : new ConcurrentLinkedDeque<Runnable>();
         }
 
         /** {@inheritDoc} */
@@ -580,6 +630,24 @@ public class StripedExecutor implements ExecutorService {
                     if (r != null)
                         return r;
 
+                    if(others != null) {
+                        int len = others.length;
+                        int init = ThreadLocalRandom.current().nextInt(len);
+                        int cur = init;
+
+                        while (true) {
+                            if(cur != idx) {
+                                Deque<Runnable> queue = (Deque<Runnable>) 
((StripeConcurrentQueue) others[cur]).queue;
+
+                                if(queue.size() > 
IGNITE_TASKS_STEALING_THRESHOLD && (r = queue.pollLast()) != null)
+                                    return r;
+                            }
+
+                            if ((cur = (cur + 1) % len) == init)
+                                break;
+                        }
+                    }
+
                     LockSupport.park();
 
                     if (Thread.interrupted())
@@ -597,6 +665,13 @@ public class StripedExecutor implements ExecutorService {
 
             if (parked)
                 LockSupport.unpark(thread);
+
+            if(others != null && queueSize() > 
IGNITE_TASKS_STEALING_THRESHOLD) {
+                for (Stripe other : others) {
+                    if(((StripeConcurrentQueue)other).parked)
+                        LockSupport.unpark(other.thread);
+                }
+            }
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa81dd14/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java 
b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
index c814625..14c3d4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
@@ -149,6 +149,16 @@ public class IgniteThread extends Thread {
     }
 
     /**
+     * @return IgniteThread or {@code null} if current thread is not an 
instance of IgniteThread.
+     */
+    public static IgniteThread current(){
+        Thread thread = Thread.currentThread();
+
+        return thread.getClass() == IgniteThread.class || thread instanceof 
IgniteThread ?
+            ((IgniteThread)thread) : null;
+    }
+
+    /**
      * Creates new thread name.
      *
      * @param num Thread number.

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa81dd14/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
index ec5e6d0..ac89021 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
@@ -962,8 +962,7 @@ public class DataStreamProcessorSelfTest extends 
GridCommonAbstractTest {
 
             final IgniteCache<String, String> cache = 
ignite.cache(DEFAULT_CACHE_NAME);
 
-            IgniteDataStreamer<String, String> ldr = 
ignite.dataStreamer(DEFAULT_CACHE_NAME);
-            try {
+            try (IgniteDataStreamer<String, String> ldr = 
ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
                 ldr.receiver(new StreamReceiver<String, String>() {
                     @Override public void receive(IgniteCache<String, String> 
cache,
                         Collection<Map.Entry<String, String>> entries) throws 
IgniteException {
@@ -972,6 +971,7 @@ public class DataStreamProcessorSelfTest extends 
GridCommonAbstractTest {
                         cache.put("key", threadName);
                     }
                 });
+
                 ldr.addData("key", "value");
 
                 ldr.tryFlush();
@@ -982,9 +982,6 @@ public class DataStreamProcessorSelfTest extends 
GridCommonAbstractTest {
                     }
                 }, 3_000);
             }
-            finally {
-                ldr.close(true);
-            }
 
             assertNotNull(cache.get("key"));
 
@@ -1011,9 +1008,7 @@ public class DataStreamProcessorSelfTest extends 
GridCommonAbstractTest {
 
             final IgniteCache<String, String> cache = 
ignite.cache(DEFAULT_CACHE_NAME);
 
-            IgniteDataStreamer<String, String> ldr = 
client.dataStreamer(DEFAULT_CACHE_NAME);
-
-            try {
+            try (IgniteDataStreamer<String, String> ldr = 
client.dataStreamer(DEFAULT_CACHE_NAME)) {
                 ldr.receiver(new StringStringStreamReceiver());
 
                 ldr.addData("key", "value");
@@ -1026,9 +1021,6 @@ public class DataStreamProcessorSelfTest extends 
GridCommonAbstractTest {
                     }
                 }, 3_000);
             }
-            finally {
-                ldr.close(true);
-            }
 
             assertNotNull(cache.get("key"));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa81dd14/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
index e72a9b4..6d3466b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
@@ -346,7 +346,8 @@ public class DataStreamerImplSelfTest extends 
GridCommonAbstractTest {
                             req.participants(),
                             req.classLoaderId(),
                             req.forceLocalDeployment(),
-                            staleTop);
+                            staleTop,
+                            -1);
 
                         msg = new GridIoMessage(
                             GridTestUtils.<Byte>getFieldValue(ioMsg, "plc"),

Reply via email to