IGNITE-7389 DataStreamer hangs if exception was thrown during addData which isn't IgniteException
Signed-off-by: Anton Vinogradov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/283ab0c4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/283ab0c4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/283ab0c4 Branch: refs/heads/ignite-7485-2 Commit: 283ab0c423c67cc85576cb05c37b2d7be39b088a Parents: d88af9b Author: ilantukh <[email protected]> Authored: Thu Feb 1 18:42:42 2018 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Thu Feb 1 18:42:42 2018 +0300 ---------------------------------------------------------------------- .../cache/DynamicCacheDescriptor.java | 2 +- .../datastreamer/DataStreamerImpl.java | 43 +++++++++++++------- .../datastreamer/DataStreamerImplSelfTest.java | 22 ++++++++-- 3 files changed, 49 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/283ab0c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java index 18abcd8..4771c3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java @@ -286,7 +286,7 @@ public class DynamicCacheDescriptor { /** - * @return Start topology version. + * @return Start topology version or {@code null} if cache configured statically. */ @Nullable public AffinityTopologyVersion startTopologyVersion() { return startTopVer; http://git-wip-us.apache.org/repos/asf/ignite/blob/283ab0c4/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index c71d129..7d6a8d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -73,6 +73,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; @@ -81,7 +82,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheGateway; import org.apache.ignite.internal.processors.cache.GridCacheUtils; import org.apache.ignite.internal.processors.cache.IgniteCacheFutureImpl; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; -import org.apache.ignite.internal.processors.cache.IgniteFinishedCacheFutureImpl; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; @@ -365,6 +365,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed ctx.grid().getOrCreateCache(ccfg); } + + ensureCacheStarted(); } /** @@ -589,9 +591,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed catch (IgniteDataStreamerTimeoutException e) { throw e; } - catch (IgniteException e) { - return new IgniteFinishedCacheFutureImpl<>(e); - } finally { leaveBusy(); } @@ -670,16 +669,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed else checkSecurityPermission(SecurityPermission.CACHE_PUT); - KeyCacheObject key0; - CacheObject val0; - - try { - key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, null, key, true); - val0 = cacheObjProc.toCacheObject(cacheObjCtx, val, true); - } - catch (Exception e) { - return new IgniteFinishedCacheFutureImpl<>(e); - } + KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, null, key, true); + CacheObject val0 = cacheObjProc.toCacheObject(cacheObjCtx, val, true); return addDataInternal(Collections.singleton(new DataStreamerEntry(key0, val0))); } @@ -1337,6 +1328,30 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } /** + * Ensures that cache has been started and is ready to store streamed data. + */ + private void ensureCacheStarted() { + DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheName); + + assert desc != null; + + if (desc.startTopologyVersion() == null) + return; + + IgniteInternalFuture<?> affReadyFut = ctx.cache().context().exchange() + .affinityReadyFuture(desc.startTopologyVersion()); + + if (affReadyFut != null) { + try { + affReadyFut.get(); + } + catch (IgniteCheckedException ex) { + throw new IgniteException(ex); + } + } + } + + /** * */ private class Buffer { http://git-wip-us.apache.org/repos/asf/ignite/blob/283ab0c4/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java index 940f8ce..d277b2e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java @@ -20,14 +20,12 @@ package org.apache.ignite.internal.processors.datastreamer; import java.io.StringWriter; import java.util.ArrayList; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import javax.cache.CacheException; @@ -56,7 +54,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.apache.ignite.transactions.TransactionException; import org.apache.log4j.Appender; import org.apache.log4j.Logger; import org.apache.log4j.SimpleLayout; @@ -491,6 +488,25 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testDataStreamerWaitsUntilDynamicCacheStartIsFinished() throws Exception { + final Ignite ignite0 = startGrids(2); + final Ignite ignite1 = grid(1); + + final String cacheName = "testCache"; + + IgniteCache<Integer, Integer> cache = ignite0.getOrCreateCache( + new CacheConfiguration<Integer, Integer>().setName(cacheName)); + + try (IgniteDataStreamer<Integer, Integer> ldr = ignite1.dataStreamer(cacheName)) { + ldr.addData(0, 0); + } + + assertEquals(Integer.valueOf(0), cache.get(0)); + } + + /** * Gets cache configuration. * * @return Cache configuration.
