IGNITE-3055: IgniteDataStreamer can't be timed out (cherry picked from commit 10224df)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cd1d618e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cd1d618e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cd1d618e Branch: refs/heads/ignite-1232-1 Commit: cd1d618ea5f80f93c378a7dc4c1bec0dcd56321e Parents: c38b3ba Author: Vladislav Pyatkov <vldpyat...@gmail.com> Authored: Wed Jul 13 15:24:53 2016 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Wed Jul 13 15:28:20 2016 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/IgniteDataStreamer.java | 52 +++++- .../IgniteDataStreamerTimeoutException.java | 45 +++++ .../datastreamer/DataStreamerImpl.java | 71 ++++++-- .../ignite/internal/util/IgniteUtils.java | 21 +++ .../datastreamer/DataStreamerTimeoutTest.java | 163 +++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite.java | 2 + 6 files changed, 335 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/cd1d618e/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java index 2a79143..aa2e1fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java @@ -96,6 +96,9 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { /** Default per node buffer size. */ public static final int DFLT_PER_NODE_BUFFER_SIZE = 1024; + /** Default timeout for streamer's operations. */ + public static final long DFLT_UNLIMIT_TIMEOUT = -1; + /** * Name of cache to stream data to. * @@ -197,6 +200,29 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { public void perNodeParallelOperations(int parallelOps); /** + * Sets the timeout that is used in the following cases: + * <ul> + * <li>any data addition method can be blocked when all per node parallel operations are exhausted. + * The timeout defines the max time you will be blocked waiting for a permit to add a chunk of data + * into the streamer;</li> + * <li>Total timeout time for {@link #flush()} operation;</li> + * <li>Total timeout time for {@link #close()} operation.</li> + * </ul> + * By default the timeout is disabled. + * + * @param timeout Timeout in milliseconds. + * @throws IllegalArgumentException If {@param timeout} is zero or less than {@code -1}. + */ + public void timeout(long timeout); + + /** + * Gets timeout set by {@link #timeout(long)}. + * + * @return Timeout in milliseconds. + */ + public long timeout(); + + /** * Gets automatic flush frequency. Essentially, this is the time after which the * streamer will make an attempt to submit all data added so far to remote nodes. * Note that there is no guarantee that data will be delivered after this concrete @@ -286,10 +312,11 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { * @throws IgniteInterruptedException If thread has been interrupted. * @throws IllegalStateException If grid has been concurrently stopped or * {@link #close(boolean)} has already been called on streamer. + * @throws IgniteDataStreamerTimeoutException If {@code timeout} is exceeded. * @see #allowOverwrite() */ public IgniteFuture<?> addData(K key, @Nullable V val) throws CacheException, IgniteInterruptedException, - IllegalStateException; + IllegalStateException, IgniteDataStreamerTimeoutException; /** * Adds data for streaming on remote node. This method can be called from multiple @@ -309,10 +336,11 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { * @throws IgniteInterruptedException If thread has been interrupted. * @throws IllegalStateException If grid has been concurrently stopped or * {@link #close(boolean)} has already been called on streamer. + * @throws IgniteDataStreamerTimeoutException If {@code timeout} is exceeded. * @see #allowOverwrite() */ public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws CacheException, IgniteInterruptedException, - IllegalStateException; + IllegalStateException, IgniteDataStreamerTimeoutException; /** * Adds data for streaming on remote node. This method can be called from multiple @@ -329,10 +357,12 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { * @param entries Collection of entries to be streamed. * @throws IllegalStateException If grid has been concurrently stopped or * {@link #close(boolean)} has already been called on streamer. + * @throws IgniteDataStreamerTimeoutException If {@code timeout} is exceeded. * @return Future for this stream operation. * @see #allowOverwrite() */ - public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) throws IllegalStateException; + public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) throws IllegalStateException, + IgniteDataStreamerTimeoutException; /** * Adds data for streaming on remote node. This method can be called from multiple @@ -349,10 +379,12 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { * @param entries Map to be streamed. * @throws IllegalStateException If grid has been concurrently stopped or * {@link #close(boolean)} has already been called on streamer. + * @throws IgniteDataStreamerTimeoutException If {@code timeout} is exceeded. * @return Future for this stream operation. * @see #allowOverwrite() */ - public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException; + public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException, + IgniteDataStreamerTimeoutException; /** * Streams any remaining data, but doesn't close the streamer. Data can be still added after @@ -367,9 +399,11 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { * @throws IgniteInterruptedException If thread has been interrupted. * @throws IllegalStateException If grid has been concurrently stopped or * {@link #close(boolean)} has already been called on streamer. + * @throws IgniteDataStreamerTimeoutException If {@code timeout} is exceeded. * @see #tryFlush() */ - public void flush() throws CacheException, IgniteInterruptedException, IllegalStateException; + public void flush() throws CacheException, IgniteInterruptedException, IllegalStateException, + IgniteDataStreamerTimeoutException; /** * Makes an attempt to stream remaining data. This method is mostly similar to {@link #flush}, @@ -389,8 +423,10 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { * @param cancel {@code True} to cancel ongoing streaming operations. * @throws CacheException If failed to map key to node. * @throws IgniteInterruptedException If thread has been interrupted. + * @throws IgniteDataStreamerTimeoutException If {@code timeout} is exceeded, only if {@param cancel} is {@code false}. */ - public void close(boolean cancel) throws CacheException, IgniteInterruptedException; + public void close(boolean cancel) throws CacheException, IgniteInterruptedException, + IgniteDataStreamerTimeoutException; /** * Closes data streamer. This method is identical to calling {@link #close(boolean) close(false)} method. @@ -400,7 +436,9 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { * * @throws CacheException If failed to close data streamer. * @throws IgniteInterruptedException If thread has been interrupted. + * @throws IgniteDataStreamerTimeoutException If {@code timeout} is exceeded. */ - @Override public void close() throws CacheException, IgniteInterruptedException; + @Override public void close() throws CacheException, IgniteInterruptedException, + IgniteDataStreamerTimeoutException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cd1d618e/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamerTimeoutException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamerTimeoutException.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamerTimeoutException.java new file mode 100644 index 0000000..c6c7367 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamerTimeoutException.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite; + +/** + * Exception is thrown when timeout of some {@link IgniteDataStreamer} operations occurs. + */ +public class IgniteDataStreamerTimeoutException extends IgniteException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Creates new timeout exception with given error message. + * + * @param msg Error message. + */ + public IgniteDataStreamerTimeoutException(String msg) { + super(msg); + } + + /** + * Creates new timeout exception with given error message and optional nested exception. + * + * @param msg Error message. + * @param cause Optional nested exception (can be {@code null}). + */ + public IgniteDataStreamerTimeoutException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cd1d618e/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 9dc6a7f..21df559 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -46,6 +46,7 @@ import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteDataStreamerTimeoutException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.configuration.CacheConfiguration; @@ -53,6 +54,7 @@ import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; @@ -147,6 +149,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed private int parallelOps = DFLT_MAX_PARALLEL_OPS; /** */ + private long timeout = DFLT_UNLIMIT_TIMEOUT; + + /** */ private long autoFlushFreq; /** Mapping. */ @@ -453,6 +458,19 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } /** {@inheritDoc} */ + @Override public void timeout(long timeout) { + if (timeout < -1 || timeout == 0) + throw new IllegalArgumentException(); + + this.timeout = timeout; + } + + /** {@inheritDoc} */ + @Override public long timeout() { + return this.timeout; + } + + /** {@inheritDoc} */ @Override public long autoFlushFrequency() { return autoFlushFreq; } @@ -517,6 +535,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed return new IgniteCacheFutureImpl<>(resFut); } + catch (IgniteDataStreamerTimeoutException e) { + throw e; + } catch (IgniteException e) { return new IgniteFinishedFutureImpl<>(e); } @@ -572,7 +593,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed catch (Throwable e) { resFut.onDone(e); - if (e instanceof Error) + if (e instanceof Error || e instanceof IgniteDataStreamerTimeoutException) throw e; return new IgniteFinishedFutureImpl<>(e); @@ -854,9 +875,20 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed boolean err = false; + long startTimeMillis = U.currentTimeMillis(); + for (IgniteInternalFuture fut = q.poll(); fut != null; fut = q.poll()) { try { - fut.get(); + if (timeout == DFLT_UNLIMIT_TIMEOUT) + fut.get(); + else { + long timeRemain = timeout - U.currentTimeMillis() + startTimeMillis; + + if (timeRemain <= 0) + throw new IgniteDataStreamerTimeoutException("Data streamer exceeded timeout on flush."); + + fut.get(timeRemain); + } } catch (IgniteClientDisconnectedCheckedException e) { if (log.isDebugEnabled()) @@ -864,6 +896,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed throw CU.convertToCacheException(e); } + catch (IgniteFutureTimeoutCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to flush buffer: " + e); + + throw new IgniteDataStreamerTimeoutException("Data streamer exceeded timeout on flush.", e); + } catch (IgniteCheckedException e) { if (log.isDebugEnabled()) log.debug("Failed to flush buffer: " + e); @@ -976,8 +1014,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed if (log.isDebugEnabled()) log.debug("Closing data streamer [ldr=" + this + ", cancel=" + cancel + ']'); - IgniteCheckedException e = null; - try { // Assuming that no methods are called on this loader after this method is called. if (cancel) { @@ -993,14 +1029,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed ctx.io().removeMessageListener(topic); } - catch (IgniteCheckedException e0) { - e = e0; + catch (IgniteCheckedException | IgniteDataStreamerTimeoutException e) { + fut.onDone(e); + throw e; } - fut.onDone(null, e != null ? e : err); - - if (e != null) - throw e; + fut.onDone(err); } /** @@ -1242,7 +1276,15 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed * @throws IgniteInterruptedCheckedException If thread has been interrupted. */ private void incrementActiveTasks() throws IgniteInterruptedCheckedException { - U.acquire(sem); + if (timeout == DFLT_UNLIMIT_TIMEOUT) + U.acquire(sem); + else + if (!U.tryAcquire(sem, timeout, TimeUnit.MILLISECONDS)) { + if (log.isDebugEnabled()) + log.debug("Failed to add parallel operation."); + + throw new IgniteDataStreamerTimeoutException("Data streamer exceeded timeout when starts parallel operation."); + } } /** @@ -1268,7 +1310,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed assert !entries.isEmpty(); assert curFut != null; - incrementActiveTasks(); + try { + incrementActiveTasks(); + } catch (IgniteDataStreamerTimeoutException e) { + curFut.onDone(e); + throw e; + } IgniteInternalFuture<Object> fut; http://git-wip-us.apache.org/repos/asf/ignite/blob/cd1d618e/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index fee4f378..269795b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -7462,6 +7462,27 @@ public abstract class IgniteUtils { } /** + * Tries to acquire a permit from provided semaphore during {@code timeout}. + * + * @param sem Semaphore. + * @param timeout The maximum time to wait. + * @param unit The unit of the {@code time} argument. + * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException Wrapped {@link InterruptedException}. + * @return {@code True} if acquires a permit, {@code false} another. + */ + public static boolean tryAcquire(Semaphore sem, long timeout, TimeUnit unit) + throws IgniteInterruptedCheckedException { + try { + return sem.tryAcquire(timeout, unit); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedCheckedException(e); + } + } + + /** * Gets cache attributes for the node. * * @param n Node to get cache attributes for. http://git-wip-us.apache.org/repos/asf/ignite/blob/cd1d618e/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java new file mode 100644 index 0000000..4e981b7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.datastreamer; + +import java.util.Collection; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteDataStreamerTimeoutException; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.stream.StreamReceiver; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Test timeout for Data streamer. + */ +public class DataStreamerTimeoutTest extends GridCommonAbstractTest { + + /** Cache name. */ + public static final String CACHE_NAME = "cacheName"; + + /** Timeout. */ + public static final int TIMEOUT = 1_000; + + /** Amount of entries. */ + public static final int ENTRY_AMOUNT = 100; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCacheConfiguration(cacheConfiguration()); + + return cfg; + } + + /** + * Gets cache configuration. + * + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration() { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setBackups(1); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + cacheCfg.setName(CACHE_NAME); + + return cacheCfg; + } + + /** + * Test timeout on {@code DataStreamer.addData()} method + * @throws Exception If fail. + */ + public void testTimeoutOnCloseMethod() throws Exception { + Ignite ignite = startGrid(1); + + boolean thrown = false; + + try (IgniteDataStreamer ldr = ignite.dataStreamer(CACHE_NAME)) { + ldr.timeout(TIMEOUT); + ldr.receiver(new TestDataReceiver()); + ldr.perNodeBufferSize(ENTRY_AMOUNT); + + for (int i=0; i < ENTRY_AMOUNT; i++) + ldr.addData(i, i); + + } + catch (IgniteDataStreamerTimeoutException e) { + assertEquals(e.getMessage(), "Data streamer exceeded timeout on flush."); + thrown = true; + } + finally { + stopAllGrids(); + } + + assertTrue(thrown); + } + + /** + * Test timeout on {@code DataStreamer.close()} method + * @throws Exception If fail. + */ + public void testTimeoutOnAddDataMethod() throws Exception { + Ignite ignite = startGrid(1); + + boolean thrown = false; + + IgniteDataStreamer ldr = ignite.dataStreamer(CACHE_NAME); + + try { + ldr.timeout(TIMEOUT); + ldr.receiver(new TestDataReceiver()); + ldr.perNodeBufferSize(ENTRY_AMOUNT/2); + ldr.perNodeParallelOperations(1); + + try { + for (int i=0; i < ENTRY_AMOUNT; i++) + ldr.addData(i, i); + } + catch (IgniteDataStreamerTimeoutException e) { + assertEquals(e.getMessage(), "Data streamer exceeded timeout when starts parallel operation."); + + thrown = true; + } + + } + finally { + if (thrown) + ldr.close(true); + + stopAllGrids(); + } + + assertTrue(thrown); + } + + /** + * Test receiver for timeout expiration emulation. + */ + private static class TestDataReceiver implements StreamReceiver { + + /** Is first. */ + boolean isFirst = true; + + /** {@inheritDoc} */ + @Override public void receive(IgniteCache cache, Collection collection) throws IgniteException { + try { + if (isFirst) + U.sleep(2 * TIMEOUT); + + isFirst = false; + } + catch (IgniteInterruptedCheckedException e) { + throw new IgniteException(e); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cd1d618e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 2e45faa..8c3f4de 100755 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -141,6 +141,7 @@ import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSel import org.apache.ignite.internal.processors.datastreamer.DataStreamerImplSelfTest; import org.apache.ignite.internal.processors.datastreamer.DataStreamerMultiThreadedSelfTest; import org.apache.ignite.internal.processors.datastreamer.DataStreamerMultinodeCreateCacheTest; +import org.apache.ignite.internal.processors.datastreamer.DataStreamerTimeoutTest; import org.apache.ignite.internal.processors.datastreamer.DataStreamerUpdateAfterLoadTest; import org.apache.ignite.testframework.GridTestUtils; @@ -249,6 +250,7 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(DataStreamerMultiThreadedSelfTest.class); suite.addTestSuite(DataStreamerMultinodeCreateCacheTest.class); suite.addTestSuite(DataStreamerImplSelfTest.class); + suite.addTestSuite(DataStreamerTimeoutTest.class); GridTestUtils.addTestIfNeeded(suite, GridCacheEntryMemorySizeSelfTest.class, ignoredTests); suite.addTestSuite(GridCacheClearAllSelfTest.class); suite.addTestSuite(GridCacheObjectToStringSelfTest.class);