IGNITE-7389 DataStreamer hangs if exception was thrown during addData which 
isn't IgniteException

Signed-off-by: Anton Vinogradov <a...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/283ab0c4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/283ab0c4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/283ab0c4

Branch: refs/heads/ignite-7485-2
Commit: 283ab0c423c67cc85576cb05c37b2d7be39b088a
Parents: d88af9b
Author: ilantukh <ilant...@gridgain.com>
Authored: Thu Feb 1 18:42:42 2018 +0300
Committer: Anton Vinogradov <a...@apache.org>
Committed: Thu Feb 1 18:42:42 2018 +0300

----------------------------------------------------------------------
 .../cache/DynamicCacheDescriptor.java           |  2 +-
 .../datastreamer/DataStreamerImpl.java          | 43 +++++++++++++-------
 .../datastreamer/DataStreamerImplSelfTest.java  | 22 ++++++++--
 3 files changed, 49 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/283ab0c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 18abcd8..4771c3e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -286,7 +286,7 @@ public class DynamicCacheDescriptor {
 
 
     /**
-     * @return Start topology version.
+     * @return Start topology version or {@code null} if cache configured 
statically.
      */
     @Nullable public AffinityTopologyVersion startTopologyVersion() {
         return startTopVer;

http://git-wip-us.apache.org/repos/asf/ignite/blob/283ab0c4/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index c71d129..7d6a8d3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -73,6 +73,7 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -81,7 +82,6 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheGateway;
 import org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import org.apache.ignite.internal.processors.cache.IgniteCacheFutureImpl;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
-import 
org.apache.ignite.internal.processors.cache.IgniteFinishedCacheFutureImpl;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
@@ -365,6 +365,8 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
 
             ctx.grid().getOrCreateCache(ccfg);
         }
+
+        ensureCacheStarted();
     }
 
     /**
@@ -589,9 +591,6 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
         catch (IgniteDataStreamerTimeoutException e) {
             throw e;
         }
-        catch (IgniteException e) {
-            return new IgniteFinishedCacheFutureImpl<>(e);
-        }
         finally {
             leaveBusy();
         }
@@ -670,16 +669,8 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
         else
             checkSecurityPermission(SecurityPermission.CACHE_PUT);
 
-        KeyCacheObject key0;
-        CacheObject val0;
-
-        try {
-            key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, null, key, true);
-            val0 = cacheObjProc.toCacheObject(cacheObjCtx, val, true);
-        }
-        catch (Exception e) {
-            return new IgniteFinishedCacheFutureImpl<>(e);
-        }
+        KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, null, 
key, true);
+        CacheObject val0 = cacheObjProc.toCacheObject(cacheObjCtx, val, true);
 
         return addDataInternal(Collections.singleton(new 
DataStreamerEntry(key0, val0)));
     }
@@ -1337,6 +1328,30 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
     }
 
     /**
+     * Ensures that cache has been started and is ready to store streamed data.
+     */
+    private void ensureCacheStarted() {
+        DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheName);
+
+        assert desc != null;
+
+        if (desc.startTopologyVersion() == null)
+            return;
+
+        IgniteInternalFuture<?> affReadyFut = ctx.cache().context().exchange()
+            .affinityReadyFuture(desc.startTopologyVersion());
+
+        if (affReadyFut != null) {
+            try {
+                affReadyFut.get();
+            }
+            catch (IgniteCheckedException ex) {
+                throw new IgniteException(ex);
+            }
+        }
+    }
+
+    /**
      *
      */
     private class Buffer {

http://git-wip-us.apache.org/repos/asf/ignite/blob/283ab0c4/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
index 940f8ce..d277b2e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
@@ -20,14 +20,12 @@ package org.apache.ignite.internal.processors.datastreamer;
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.CacheException;
@@ -56,7 +54,6 @@ import 
org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.apache.ignite.transactions.TransactionException;
 import org.apache.log4j.Appender;
 import org.apache.log4j.Logger;
 import org.apache.log4j.SimpleLayout;
@@ -491,6 +488,25 @@ public class DataStreamerImplSelfTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testDataStreamerWaitsUntilDynamicCacheStartIsFinished() throws 
Exception {
+        final Ignite ignite0 = startGrids(2);
+        final Ignite ignite1 = grid(1);
+
+        final String cacheName = "testCache";
+
+        IgniteCache<Integer, Integer> cache = ignite0.getOrCreateCache(
+            new CacheConfiguration<Integer, Integer>().setName(cacheName));
+
+        try (IgniteDataStreamer<Integer, Integer> ldr = 
ignite1.dataStreamer(cacheName)) {
+            ldr.addData(0, 0);
+        }
+
+        assertEquals(Integer.valueOf(0), cache.get(0));
+    }
+
+    /**
      * Gets cache configuration.
      *
      * @return Cache configuration.

Reply via email to