Repository: ignite
Updated Branches:
  refs/heads/master 2b5f78f20 -> 45698810a


IGNITE-6699: Optimize client-side data streamer performance. - Fixes #3442.

Signed-off-by: Nikolay Izhikov <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/45698810
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/45698810
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/45698810

Branch: refs/heads/master
Commit: 45698810a996e839bc288bd64870f7f1659682b3
Parents: 2b5f78f
Author: Fedotov <[email protected]>
Authored: Thu May 3 12:30:49 2018 +0300
Committer: Nikolay Izhikov <[email protected]>
Committed: Thu May 3 12:32:55 2018 +0300

----------------------------------------------------------------------
 .../streamer/JmhStreamerAddDataBenchmark.java   | 221 +++++++++++++++++
 .../org/apache/ignite/IgniteDataStreamer.java   |  17 ++
 .../datastreamer/DataStreamerImpl.java          | 237 ++++++++++++-------
 .../DataStreamProcessorSelfTest.java            |   1 +
 .../datastreamer/DataStreamerImplSelfTest.java  |   6 +
 .../datastreamer/DataStreamerTimeoutTest.java   |   1 +
 .../ApiParity/StreamerParityTest.cs             |   3 +-
 7 files changed, 400 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/45698810/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/streamer/JmhStreamerAddDataBenchmark.java
----------------------------------------------------------------------
diff --git 
a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/streamer/JmhStreamerAddDataBenchmark.java
 
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/streamer/JmhStreamerAddDataBenchmark.java
new file mode 100644
index 0000000..f9c42b1
--- /dev/null
+++ 
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/streamer/JmhStreamerAddDataBenchmark.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.streamer;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.logger.NullLogger;
+import org.jetbrains.annotations.NotNull;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * DataStreamerImpl.addData(Collection) vs DataStreamerImpl.addData(Key, 
Value).
+ */
+@BenchmarkMode(Mode.AverageTime)
+@Fork(value = 1, jvmArgsAppend = {"-Xms1g", "-Xmx3g", "-server", 
"-XX:+AggressiveOpts", "-XX:MaxMetaspaceSize=256m"})
+@Measurement(iterations = 11)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+@Threads(1)
+@Warmup(iterations = 21)
+public class JmhStreamerAddDataBenchmark {
+    /** Data amount. */
+    private static final int DATA_AMOUNT = 512;
+
+    /** Ignite client instance. */
+    private static final String IGNITE_CLIENT_INSTANCE_NAME = "client";
+
+    /** Ignite client cache name. */
+    private static final String IGNITE_CLIENT_CACHE_NAME = "clientCache";
+
+    /**
+     * Create Ignite configuration.
+     *
+     * @return Ignite configuration.
+     */
+    private IgniteConfiguration getConfiguration(String cfgName, boolean 
isClient) {
+        IgniteConfiguration cfg = new IgniteConfiguration();
+
+        cfg.setGridLogger(new NullLogger());
+
+        cfg.setIgniteInstanceName(cfgName);
+
+        if (isClient) {
+            cfg.setClientMode(true);
+
+            
cfg.setCacheConfiguration(defaultCacheConfiguration(IGNITE_CLIENT_CACHE_NAME));
+        }
+        else
+            cfg.setCacheConfiguration(defaultCacheConfiguration("server"));
+
+        return cfg;
+    }
+
+    /**
+     * Create cache configuration.
+     *
+     * @return Cache configuration.
+     */
+    private CacheConfiguration defaultCacheConfiguration(String cacheName) {
+        CacheConfiguration cfg = new CacheConfiguration(cacheName);
+
+        cfg.setWriteSynchronizationMode(writeSynchronizationMode());
+
+        return cfg;
+    }
+
+    /**
+     * Stop all grids after tests.
+     */
+    @TearDown(Level.Trial)
+    public void tearDown() {
+        Ignition.stopAll(true);
+    }
+
+    /**
+     * Start 2 servers and 1 client.
+     */
+    @Setup(Level.Trial)
+    public void setup() {
+        int cnt = gridCnt();
+
+        for (int i = 0; i < cnt; i++)
+            Ignition.start(getConfiguration("srv" + i, false));
+
+        Ignition.start(getConfiguration(IGNITE_CLIENT_INSTANCE_NAME, true));
+    }
+
+    /**
+     * Prepare and clean collection with streaming data.
+     */
+    @State(Scope.Thread)
+    public static class StreamingData {
+        /**
+         * Collection that will be streamed from client.
+         */
+        Collection<AbstractMap.SimpleEntry<Integer, Integer>> streamingCol = 
new ArrayList<>(DATA_AMOUNT);
+
+        /**
+         * Prepare collection.
+         */
+        @Setup(Level.Iteration)
+        public void prepareCollection() {
+            for (int i = 0; i < DATA_AMOUNT; i++)
+                streamingCol.add(new 
HashMap.SimpleEntry<>(ThreadLocalRandom.current().nextInt(),
+                    ThreadLocalRandom.current().nextInt()));
+        }
+
+        /**
+         * Clean collection after each test.
+         */
+        @TearDown(Level.Iteration)
+        public void cleanCollection() {
+            streamingCol.clear();
+        }
+    }
+
+    /**
+     * Create streamer on client cache.
+     */
+    @State(Scope.Benchmark)
+    public static class DataStreamer {
+        /** Data loader. */
+        final IgniteDataStreamer<Integer, Integer> dataLdr =
+            
Ignition.ignite(IGNITE_CLIENT_INSTANCE_NAME).dataStreamer(IGNITE_CLIENT_CACHE_NAME);
+    }
+
+    /**
+     * Test addData(Collection).
+     *
+     * @param data Data that will be streamed.
+     * @param streamer Data loader.
+     */
+    @Benchmark
+    public void addDataCollection(StreamingData data, DataStreamer streamer) {
+        streamer.dataLdr.addData(data.streamingCol);
+    }
+
+    /**
+     * Test addData(Key, Value).
+     *
+     * @param data Data that will be streamed.
+     * @param streamer Data loader.
+     */
+    @Benchmark
+    public void addDataKeyValue(StreamingData data, DataStreamer streamer) {
+        for (Map.Entry<Integer, Integer> entry : data.streamingCol)
+            streamer.dataLdr.addData(entry.getKey(), entry.getValue());
+    }
+
+    /**
+     * @return Synchronization mode.
+     */
+    @NotNull protected CacheWriteSynchronizationMode 
writeSynchronizationMode() {
+        return FULL_SYNC;
+    }
+
+    /**
+     * @return Node amount.
+     */
+    protected int gridCnt() {
+        return 3;
+    }
+
+    /**
+     * Run benchmark.
+     *
+     * @param args Args.
+     */
+    public static void main(String[] args) throws RunnerException {
+        final Options options = new OptionsBuilder()
+            .include(JmhStreamerAddDataBenchmark.class.getSimpleName())
+            .build();
+
+        new Runner(options).run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/45698810/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
index b1f5851..8b40bfa 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
@@ -119,6 +119,9 @@ public interface IgniteDataStreamer<K, V> extends 
AutoCloseable {
     /** Default operations batch size to sent to remote node for loading. */
     public static final int DFLT_PER_NODE_BUFFER_SIZE = 512;
 
+    /** Default batch size per thread to send to buffer on node. */
+    public static final int DFLT_PER_THREAD_BUFFER_SIZE = 4096;
+
     /** Default timeout for streamer's operations. */
     public static final long DFLT_UNLIMIT_TIMEOUT = -1;
 
@@ -225,6 +228,20 @@ public interface IgniteDataStreamer<K, V> extends 
AutoCloseable {
     public void perNodeParallelOperations(int parallelOps);
 
     /**
+     * Allows to set buffer size for thread in case of stream by {@link 
#addData(Object, Object)} call.
+     *
+     * @param size Size of buffer.
+     */
+    public void perThreadBufferSize(int size);
+
+    /**
+     * Gets buffer size set by {@link #perThreadBufferSize(int)}.
+     *
+     * @return Buffer size.
+     */
+    public int perThreadBufferSize();
+
+    /**
      * Sets the timeout that is used in the following cases:
      * <ul>
      * <li>any data addition method can be blocked when all per node parallel 
operations are exhausted.

http://git-wip-us.apache.org/repos/asf/ignite/blob/45698810/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 8cad342..070a0da 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -93,7 +93,7 @@ import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.processors.dr.GridDrType;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
-import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.GridSpinReadWriteLock;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
@@ -102,6 +102,7 @@ import 
org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.GPC;
@@ -125,6 +126,16 @@ import static 
org.apache.ignite.internal.GridTopic.TOPIC_DATASTREAM;
  */
 @SuppressWarnings("unchecked")
 public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, 
Delayed {
+    /** Per thread buffer size. */
+    private int bufLdrSzPerThread = DFLT_PER_THREAD_BUFFER_SIZE;
+
+    /**
+     * Thread buffer map: on each thread there are future and list of entries 
which will be streamed after filling
+     * thread batch.
+     */
+    private final Map<Long, T2<IgniteCacheFutureImpl, 
List<DataStreamerEntry>>> threadBufMap =
+        new ConcurrentHashMap<>();
+
     /** Isolated receiver. */
     private static final StreamReceiver ISOLATED_UPDATER = new 
IsolatedUpdater();
 
@@ -238,7 +249,7 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
     private final IgniteFuture<?> publicFut;
 
     /** Busy lock. */
-    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+    private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
 
     /** */
     private CacheException disconnectErr;
@@ -371,6 +382,16 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
         ensureCacheStarted();
     }
 
+    /** {@inheritDoc} */
+    public void perThreadBufferSize(int size) {
+        bufLdrSzPerThread = size;
+    }
+
+    /** {@inheritDoc} */
+    public int perThreadBufferSize() {
+        return bufLdrSzPerThread;
+    }
+
     /**
      * @param c Closure to run.
      * @param topVer Topology version to wait for.
@@ -404,27 +425,35 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
     }
 
     /**
-     * Enters busy lock.
+     * Acquires read or write lock.
+     * 
+     * @param writeLock {@code True} if acquires write lock.
      */
-    private void enterBusy() {
-        if (!busyLock.enterBusy()) {
+    private void lock(boolean writeLock) {
+        if (writeLock)
+            busyLock.writeLock();
+        else
+            busyLock.readLock();
+
+        if (closed.get() || cancelled) {
+            unlock(writeLock);
+
             if (disconnectErr != null)
                 throw disconnectErr;
 
             closedException();
         }
-        else if (cancelled) {
-            busyLock.leaveBusy();
-
-            closedException();
-        }
     }
 
     /**
-     * Leaves busy lock.
+     * Read or write unlock.
+     * @param writeLock {@code True} if write unlock.
      */
-    private void leaveBusy() {
-        busyLock.leaveBusy();
+    private void unlock(boolean writeLock) {
+        if (writeLock)
+            busyLock.writeUnlock();
+        else
+            busyLock.readUnlock();
     }
 
     /** {@inheritDoc} */
@@ -563,39 +592,16 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
 
         checkSecurityPermission(SecurityPermission.CACHE_PUT);
 
-        enterBusy();
-
-        try {
-            GridFutureAdapter<Object> resFut = new GridFutureAdapter<>();
-
-            resFut.listen(rmvActiveFut);
+        Collection<DataStreamerEntry> batch = new ArrayList<>(entries.size());
 
-            activeFuts.add(resFut);
+        for (Map.Entry<K, V> entry : entries) {
+            KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, 
null, entry.getKey(), true);
+            CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, 
entry.getValue(), true);
 
-            Collection<KeyCacheObjectWrapper> keys =
-                new GridConcurrentHashSet<>(entries.size());
-
-            Collection<DataStreamerEntry> entries0 = new 
ArrayList<>(entries.size());
-
-            for (Map.Entry<K, V> entry : entries) {
-                KeyCacheObject key = 
cacheObjProc.toCacheKeyObject(cacheObjCtx, null, entry.getKey(), true);
-                CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, 
entry.getValue(), true);
-
-                keys.add(new KeyCacheObjectWrapper(key));
-
-                entries0.add(new DataStreamerEntry(key, val));
-            }
-
-            load0(entries0, resFut, keys, 0);
-
-            return new IgniteCacheFutureImpl<>(resFut);
-        }
-        catch (IgniteDataStreamerTimeoutException e) {
-            throw e;
-        }
-        finally {
-            leaveBusy();
+            batch.add(new DataStreamerEntry(key, val));
         }
+
+        return addDataInternal(batch);
     }
 
     /**
@@ -620,39 +626,79 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
      * @return Future.
      */
     public IgniteFuture<?> addDataInternal(Collection<? extends 
DataStreamerEntry> entries) {
-        enterBusy();
+        IgniteCacheFutureImpl fut = null;
+
+        GridFutureAdapter internalFut = null;
+
+        List entriesList;
 
-        GridFutureAdapter<Object> resFut = new GridFutureAdapter<>();
+        lock(false);
 
         try {
-            resFut.listen(rmvActiveFut);
+            long threadId = Thread.currentThread().getId();
+
+            T2<IgniteCacheFutureImpl, List<DataStreamerEntry>> futAndEntries = 
threadBufMap.get(threadId);
+
+            if (futAndEntries == null) {
+                internalFut = new GridFutureAdapter();
 
-            activeFuts.add(resFut);
+                fut = new IgniteCacheFutureImpl(internalFut);
 
-            Collection<KeyCacheObjectWrapper> keys = null;
+                internalFut.listen(rmvActiveFut);
 
-            if (entries.size() > 1) {
-                keys = new GridConcurrentHashSet<>(entries.size());
+                activeFuts.add(internalFut);
+
+                // Initial capacity should be more than batch by 12.5% in 
order to avoid resizing.
+                futAndEntries = new T2(fut, new ArrayList<>(bufLdrSzPerThread 
+ (bufLdrSzPerThread >> 3)));
+
+                threadBufMap.put(threadId, futAndEntries);
+            }
+            else {
+                fut = futAndEntries.get1();
 
-                for (DataStreamerEntry entry : entries)
-                    keys.add(new KeyCacheObjectWrapper(entry.getKey()));
+                internalFut = (GridFutureAdapter)fut.internalFuture();
             }
 
-            load0(entries, resFut, keys, 0);
+            entriesList = futAndEntries.get2();
 
-            return new IgniteCacheFutureImpl<>(resFut);
+            entriesList.addAll(entries);
+
+            if (entriesList.size() >= bufLdrSzPerThread) {
+                loadData(entriesList, internalFut);
+
+                threadBufMap.remove(threadId);
+            }
+
+            return fut;
         }
         catch (Throwable e) {
-            resFut.onDone(e);
+            if (internalFut != null)
+                internalFut.onDone(e);
 
             if (e instanceof Error || e instanceof 
IgniteDataStreamerTimeoutException)
                 throw e;
 
-            return new IgniteCacheFutureImpl<>(resFut);
+            return fut;
         }
         finally {
-            leaveBusy();
+            unlock(false);
+        }
+    }
+
+    /**
+     * Load thread batch of DataStreamerEntry.
+     */
+    private void loadData(Collection<? extends DataStreamerEntry> entries, 
GridFutureAdapter fut) {
+        Collection<KeyCacheObjectWrapper> keys = null;
+
+        if (entries.size() > 1) {
+            keys = new GridConcurrentHashSet<>(entries.size());
+
+            for (DataStreamerEntry e : entries)
+                keys.add(new KeyCacheObjectWrapper(e.getKey()));
         }
+
+        load0(entries, fut, keys, 0);
     }
 
     /** {@inheritDoc} */
@@ -1043,6 +1089,8 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
 
         int doneCnt = 0;
 
+        flushAllThreadsBufs();
+
         for (IgniteInternalFuture<?> f : activeFuts) {
             if (!f.isDone()) {
                 if (activeFuts0 == null)
@@ -1148,7 +1196,7 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
     /** {@inheritDoc} */
     @SuppressWarnings("ForLoopReplaceableByForEach")
     @Override public void flush() throws CacheException {
-        enterBusy();
+        lock(true);
 
         try {
             doFlush();
@@ -1157,7 +1205,7 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
             throw CU.convertToCacheException(e);
         }
         finally {
-            leaveBusy();
+            unlock(true);
         }
     }
 
@@ -1169,10 +1217,12 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
      * should be called periodically.
      */
     @Override public void tryFlush() throws IgniteInterruptedException {
-        if (!busyLock.enterBusy())
+        if (!busyLock.tryWriteLock())
             return;
 
         try {
+            flushAllThreadsBufs();
+
             for (Buffer buf : bufMappings.values())
                 buf.flush();
 
@@ -1182,11 +1232,23 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
             throw GridCacheUtils.convertToCacheException(e);
         }
         finally {
-            leaveBusy();
+            unlock(true);
         }
     }
 
     /**
+     *
+     */
+    private void flushAllThreadsBufs() {
+        assert busyLock.writeLockedByCurrentThread();
+
+        for (T2<IgniteCacheFutureImpl, List<DataStreamerEntry>> val : 
threadBufMap.values())
+            loadData(val.get2(), 
(GridFutureAdapter)val.get1().internalFuture());
+
+        threadBufMap.clear();
+    }
+
+    /**
      * @param cancel {@code True} to close with cancellation.
      * @throws CacheException If failed.
      */
@@ -1219,39 +1281,44 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
         if (!closed.compareAndSet(false, true))
             return null;
 
-        busyLock.block();
-
-        if (log.isDebugEnabled())
-            log.debug("Closing data streamer [ldr=" + this + ", cancel=" + 
cancel + ']');
+        busyLock.writeLock();
 
         try {
-            // Assuming that no methods are called on this loader after this 
method is called.
-            if (cancel) {
-                cancelled = true;
+            if (log.isDebugEnabled())
+                log.debug("Closing data streamer [ldr=" + this + ", cancel=" + 
cancel + ']');
 
-                for (Buffer buf : bufMappings.values())
-                    buf.cancelAll(err);
-            }
-            else
-                doFlush();
+            try {
+                // Assuming that no methods are called on this loader after 
this method is called.
+                if (cancel) {
+                    cancelled = true;
 
-            ctx.event().removeLocalEventListener(discoLsnr);
+                    for (Buffer buf : bufMappings.values())
+                        buf.cancelAll(err);
+                }
+                else
+                    doFlush();
 
-            ctx.io().removeMessageListener(topic);
-        }
-        catch (IgniteCheckedException | IgniteDataStreamerTimeoutException e) {
-            fut.onDone(e);
-            throw e;
-        }
+                ctx.event().removeLocalEventListener(discoLsnr);
 
-        long failed = failCntr.longValue();
+                ctx.io().removeMessageListener(topic);
+            }
+            catch (IgniteCheckedException | IgniteDataStreamerTimeoutException 
e) {
+                fut.onDone(e);
+                throw e;
+            }
 
-        if (failed > 0 && err == null)
-            err = new IgniteCheckedException("Some of DataStreamer operations 
failed [failedCount=" + failed + "]");
+            long failed = failCntr.longValue();
 
-        fut.onDone(err);
+            if (failed > 0 && err == null)
+                err = new IgniteCheckedException("Some of DataStreamer 
operations failed [failedCount=" + failed + "]");
 
-        return err;
+            fut.onDone(err);
+
+            return err;
+        }
+        finally {
+            busyLock.writeUnlock();
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/45698810/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
index ac89021..91345fe 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
@@ -466,6 +466,7 @@ public class DataStreamProcessorSelfTest extends 
GridCommonAbstractTest {
 
             ldr.receiver(DataStreamerCacheUpdaters.<Integer, 
Integer>individual());
             ldr.perNodeBufferSize(2);
+            ldr.perThreadBufferSize(1);
 
             // Define count of puts.
             final AtomicInteger idxGen = new AtomicInteger();

http://git-wip-us.apache.org/repos/asf/ignite/blob/45698810/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
index d277b2e..d02f466 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
@@ -238,6 +238,8 @@ public class DataStreamerImplSelfTest extends 
GridCommonAbstractTest {
             IgniteFuture fut = null;
 
             try (IgniteDataStreamer<Integer, String> streamer = 
ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
+                streamer.perThreadBufferSize(1);
+
                 fut = streamer.addData(1, "1");
 
                 streamer.flush();
@@ -334,6 +336,8 @@ public class DataStreamerImplSelfTest extends 
GridCommonAbstractTest {
 
         final IgniteDataStreamer ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME);
 
+        ldr.perThreadBufferSize(1);
+
         final IgniteInternalFuture<Long> fut = 
GridTestUtils.runMultiThreadedAsync(new Runnable() {
             @Override public void run() {
                 try {
@@ -467,6 +471,8 @@ public class DataStreamerImplSelfTest extends 
GridCommonAbstractTest {
 
         IgniteDataStreamer<Object, Object> streamer = 
ignite.dataStreamer(DEFAULT_CACHE_NAME);
 
+        streamer.perThreadBufferSize(1);
+
         ((DataStreamerImpl)streamer).maxRemapCount(0);
 
         streamer.addData(1, 1);

http://git-wip-us.apache.org/repos/asf/ignite/blob/45698810/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java
index 314855e..6e88adf 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java
@@ -142,6 +142,7 @@ public class DataStreamerTimeoutTest extends 
GridCommonAbstractTest {
             try (IgniteDataStreamer ldr = ignite.dataStreamer(CACHE_NAME)) {
                 ldr.timeout(TIMEOUT);
                 ldr.receiver(new TestDataReceiver());
+                ldr.perThreadBufferSize(1);
                 ldr.perNodeBufferSize(1);
                 ldr.perNodeParallelOperations(1);
                 ((DataStreamerImpl)ldr).maxRemapCount(0);

http://git-wip-us.apache.org/repos/asf/ignite/blob/45698810/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/StreamerParityTest.cs
----------------------------------------------------------------------
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/StreamerParityTest.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/StreamerParityTest.cs
index 8e795e5..9af0561 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/StreamerParityTest.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/StreamerParityTest.cs
@@ -29,7 +29,8 @@ namespace Apache.Ignite.Core.Tests.ApiParity
         /** Members that are not needed on .NET side. */
         private static readonly string[] UnneededMembers =
         {
-            "deployClass"
+            "deployClass",
+            "perThreadBufferSize"
         };
 
         /** Known name mappings. */

Reply via email to