IGNITE-4901 Decrease logging level for DataStremer retry
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c3401cbd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c3401cbd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c3401cbd Branch: refs/heads/master Commit: c3401cbd009d7b9cfc8aed0cc9c3f34fb5f433db Parents: 740b0b2 Author: Alexey Kukushkin <[email protected]> Authored: Thu Jul 6 12:44:27 2017 +0300 Committer: Alexey Kukushkin <[email protected]> Committed: Thu Jul 6 12:44:27 2017 +0300 ---------------------------------------------------------------------- .../datastreamer/DataStreamProcessor.java | 3 +- .../datastreamer/DataStreamerImpl.java | 14 ++- .../datastreamer/DataStreamerImplSelfTest.java | 123 ++++++++++++++++++- 3 files changed, 132 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c3401cbd/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index 789f0d2..84d536f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -22,6 +22,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeployment; @@ -340,7 +341,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { AffinityTopologyVersion topVer = fut.topologyVersion(); if (!allowOverwrite && !topVer.equals(req.topologyVersion())) { - Exception err = new IgniteCheckedException( + Exception err = new ClusterTopologyCheckedException( "DataStreamer will retry data transfer at stable topology " + "[reqTop=" + req.topologyVersion() + ", topVer=" + topVer + ", node=remote]"); http://git-wip-us.apache.org/repos/asf/ignite/blob/c3401cbd/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index ae441de..df51fac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -1782,10 +1782,16 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed try { GridPeerDeployAware jobPda0 = jobPda; - err = new IgniteCheckedException("DataStreamer request failed [node=" + nodeId + "]", - (Throwable)U.unmarshal(ctx, - errBytes, - U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config()))); + final Throwable cause = U.unmarshal( + ctx, + errBytes, + U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config())); + + final String msg = "DataStreamer request failed [node=" + nodeId + "]"; + + err = cause instanceof ClusterTopologyCheckedException ? + new ClusterTopologyCheckedException(msg, cause) : + new IgniteCheckedException(msg, cause); } catch (IgniteCheckedException e) { f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e)); http://git-wip-us.apache.org/repos/asf/ignite/blob/c3401cbd/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java index 6d10312..e72a9b4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.datastreamer; +import java.io.StringWriter; import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; @@ -25,17 +26,29 @@ import javax.cache.CacheException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheServerNotFoundException; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.log4j.Appender; +import org.apache.log4j.Logger; +import org.apache.log4j.SimpleLayout; +import org.apache.log4j.WriterAppender; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; @@ -50,12 +63,18 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest { /** Number of keys to load via data streamer. */ private static final int KEYS_COUNT = 1000; + /** Next nodes after MAX_CACHE_COUNT start without cache */ + private static final int MAX_CACHE_COUNT = 4; + /** Started grid counter. */ private static int cnt; /** No nodes filter. */ private static volatile boolean noNodesFilter; + /** Indicates whether we need to make the topology stale */ + private static boolean needStaleTop = false; + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { super.afterTest(); @@ -72,8 +91,9 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest { cfg.setDiscoverySpi(discoSpi); - // Forth node goes without cache. - if (cnt < 4) + cfg.setCommunicationSpi(new StaleTopologyCommunicationSpi()); + + if (cnt < MAX_CACHE_COUNT) cfg.setCacheConfiguration(cacheConfiguration()); cnt++; @@ -232,6 +252,44 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest { } /** + * Cluster topology mismatch shall result in DataStreamer retrying cache update with the latest topology and + * no error logged to the console. + * + * @throws Exception if failed + */ + public void testRetryWhenTopologyMismatch() throws Exception { + final int KEY = 1; + final String VAL = "1"; + + cnt = 0; + + StringWriter logWriter = new StringWriter(); + Appender logAppender = new WriterAppender(new SimpleLayout(), logWriter); + + Logger.getRootLogger().addAppender(logAppender); + + startGrids(MAX_CACHE_COUNT - 1); // cache-enabled nodes + + try (Ignite ignite = startGrid(MAX_CACHE_COUNT); + IgniteDataStreamer<Integer, String> streamer = ignite.dataStreamer(DEFAULT_CACHE_NAME)) { + + needStaleTop = true; // simulate stale topology for the next action + + streamer.addData(KEY, VAL); + } finally { + needStaleTop = false; + + logWriter.flush(); + + Logger.getRootLogger().removeAppender(logAppender); + + logAppender.close(); + } + + assertFalse(logWriter.toString().contains("DataStreamer will retry data transfer at stable topology")); + } + + /** * Gets cache configuration. * * @return Cache configuration. @@ -248,4 +306,63 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest { return cacheCfg; } -} + + /** + * Simulate stale (not up-to-date) topology + */ + private static class StaleTopologyCommunicationSpi extends TcpCommunicationSpi { + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) { + // Send stale topology only in the first request to avoid indefinitely getting failures. + if (needStaleTop) { + if (msg instanceof GridIoMessage) { + GridIoMessage ioMsg = (GridIoMessage)msg; + + Message appMsg = ioMsg.message(); + + if (appMsg != null && appMsg instanceof DataStreamerRequest) { + DataStreamerRequest req = (DataStreamerRequest)appMsg; + + AffinityTopologyVersion validTop = req.topologyVersion(); + + // Simulate situation when a node did not receive the latest "node joined" topology update causing + // topology mismatch + AffinityTopologyVersion staleTop = new AffinityTopologyVersion( + validTop.topologyVersion() - 1, + validTop.minorTopologyVersion()); + + appMsg = new DataStreamerRequest( + req.requestId(), + req.responseTopicBytes(), + req.cacheName(), + req.updaterBytes(), + req.entries(), + req.ignoreDeploymentOwnership(), + req.skipStore(), + req.keepBinary(), + req.deploymentMode(), + req.sampleClassName(), + req.userVersion(), + req.participants(), + req.classLoaderId(), + req.forceLocalDeployment(), + staleTop); + + msg = new GridIoMessage( + GridTestUtils.<Byte>getFieldValue(ioMsg, "plc"), + GridTestUtils.getFieldValue(ioMsg, "topic"), + GridTestUtils.<Integer>getFieldValue(ioMsg, "topicOrd"), + appMsg, + GridTestUtils.<Boolean>getFieldValue(ioMsg, "ordered"), + ioMsg.timeout(), + ioMsg.skipOnTimeout()); + + needStaleTop = false; + } + } + } + + super.sendMessage(node, msg, ackC); + } + } +} \ No newline at end of file
