http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java index c1251ae..3d3f146 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java @@ -125,8 +125,6 @@ public class DataStreamerMultiThreadedSelfTest extends GridCommonAbstractTest { ignite.getOrCreateCache(cacheConfiguration()); try (final DataStreamerImpl dataLdr = (DataStreamerImpl)ignite.dataStreamer(null)) { - dataLdr.maxRemapCount(0); - Random rnd = new Random(); long endTime = U.currentTimeMillis() + 15_000;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java index 4e981b7..766aa84 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java @@ -17,11 +17,13 @@ package org.apache.ignite.internal.processors.datastreamer; import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.CacheException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; -import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteDataStreamerTimeoutException; +import org.apache.ignite.IgniteException; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -36,7 +38,6 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; * Test timeout for Data streamer. */ public class DataStreamerTimeoutTest extends GridCommonAbstractTest { - /** Cache name. */ public static final String CACHE_NAME = "cacheName"; @@ -46,6 +47,9 @@ public class DataStreamerTimeoutTest extends GridCommonAbstractTest { /** Amount of entries. */ public static final int ENTRY_AMOUNT = 100; + /** Fail on. */ + private static volatile int failOn; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -76,6 +80,8 @@ public class DataStreamerTimeoutTest extends GridCommonAbstractTest { * @throws Exception If fail. */ public void testTimeoutOnCloseMethod() throws Exception { + failOn = 1; + Ignite ignite = startGrid(1); boolean thrown = false; @@ -85,12 +91,10 @@ public class DataStreamerTimeoutTest extends GridCommonAbstractTest { ldr.receiver(new TestDataReceiver()); ldr.perNodeBufferSize(ENTRY_AMOUNT); - for (int i=0; i < ENTRY_AMOUNT; i++) + for (int i = 0; i < ENTRY_AMOUNT; i++) ldr.addData(i, i); - } - catch (IgniteDataStreamerTimeoutException e) { - assertEquals(e.getMessage(), "Data streamer exceeded timeout on flush."); + catch (CacheException | IgniteDataStreamerTimeoutException e) { thrown = true; } finally { @@ -102,40 +106,68 @@ public class DataStreamerTimeoutTest extends GridCommonAbstractTest { /** * Test timeout on {@code DataStreamer.close()} method + * * @throws Exception If fail. */ - public void testTimeoutOnAddDataMethod() throws Exception { - Ignite ignite = startGrid(1); + public void testTimeoutOnAddData() throws Exception { + failOn = 1; - boolean thrown = false; + int processed = timeoutOnAddData(); - IgniteDataStreamer ldr = ignite.dataStreamer(CACHE_NAME); + assertTrue(processed == (failOn + 1) || processed == failOn); - try { - ldr.timeout(TIMEOUT); - ldr.receiver(new TestDataReceiver()); - ldr.perNodeBufferSize(ENTRY_AMOUNT/2); - ldr.perNodeParallelOperations(1); + failOn = ENTRY_AMOUNT / 2; - try { - for (int i=0; i < ENTRY_AMOUNT; i++) - ldr.addData(i, i); - } - catch (IgniteDataStreamerTimeoutException e) { - assertEquals(e.getMessage(), "Data streamer exceeded timeout when starts parallel operation."); + processed = timeoutOnAddData(); + + assertTrue(processed == (failOn + 1) || processed == failOn); + failOn = ENTRY_AMOUNT; + + processed = timeoutOnAddData(); + + assertTrue(processed == (failOn + 1) || processed == failOn); + } + + /** + * + */ + private int timeoutOnAddData() throws Exception { + boolean thrown = false; + int processed = 0; + + try { + Ignite ignite = startGrid(1); + + try (IgniteDataStreamer ldr = ignite.dataStreamer(CACHE_NAME)) { + ldr.timeout(TIMEOUT); + ldr.receiver(new TestDataReceiver()); + ldr.perNodeBufferSize(1); + ldr.perNodeParallelOperations(1); + ((DataStreamerImpl)ldr).maxRemapCount(0); + + try { + for (int i = 0; i < ENTRY_AMOUNT; i++) { + ldr.addData(i, i); + + processed++; + } + } + catch (IllegalStateException e) { + // No-op. + } + } + catch (CacheException | IgniteDataStreamerTimeoutException e) { thrown = true; } - } finally { - if (thrown) - ldr.close(true); - stopAllGrids(); } assertTrue(thrown); + + return processed; } /** @@ -143,16 +175,14 @@ public class DataStreamerTimeoutTest extends GridCommonAbstractTest { */ private static class TestDataReceiver implements StreamReceiver { - /** Is first. */ - boolean isFirst = true; + /** Count. */ + private final AtomicInteger cnt = new AtomicInteger(); /** {@inheritDoc} */ - @Override public void receive(IgniteCache cache, Collection collection) throws IgniteException { + @Override public void receive(IgniteCache cache, Collection col) throws IgniteException { try { - if (isFirst) + if (cnt.incrementAndGet() == failOn) U.sleep(2 * TIMEOUT); - - isFirst = false; } catch (IgniteInterruptedCheckedException e) { throw new IgniteException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java index dc412a9..0663903 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java @@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheIncrementTxTest; import org.apache.ignite.internal.processors.cache.IgniteCachePartitionMapUpdateTest; import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheAndNodeStop; import org.apache.ignite.internal.processors.cache.distributed.CacheLoadingConcurrentGridStartSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.CacheLoadingConcurrentGridStartSelfTestAllowOverwrite; import org.apache.ignite.internal.processors.cache.distributed.CacheLockReleaseNodeLeaveTest; import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionNotLoadedEventSelfTest; import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionedNearDisabledTxMultiThreadedSelfTest; @@ -214,6 +215,7 @@ public class IgniteCacheTestSuite2 extends TestSuite { suite.addTest(new TestSuite(FairAffinityFunctionBackupFilterSelfTest.class)); suite.addTest(new TestSuite(GridCachePartitionedPreloadLifecycleSelfTest.class)); suite.addTest(new TestSuite(CacheLoadingConcurrentGridStartSelfTest.class)); + suite.addTest(new TestSuite(CacheLoadingConcurrentGridStartSelfTestAllowOverwrite.class)); suite.addTest(new TestSuite(GridCacheDhtPreloadDelayedSelfTest.class)); suite.addTest(new TestSuite(GridPartitionedBackupLoadSelfTest.class)); suite.addTest(new TestSuite(GridCachePartitionedLoadCacheSelfTest.class));
