IGNITE-3972: Fixed a bug causing continuous queries to be lost for ATOMIC cache when key's primary node leaves topology. This closes #1133.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0659bebe Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0659bebe Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0659bebe Branch: refs/heads/ignite-ssl-hotfix Commit: 0659bebe04dc9c0b0a163bc95061519c553e678c Parents: 01ca6db Author: Andrey V. Mashenkov <andrey.mashen...@gmail.com> Authored: Wed Oct 12 14:49:36 2016 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Wed Oct 12 14:49:36 2016 +0300 ---------------------------------------------------------------------- .../continuous/CacheContinuousQueryHandler.java | 8 +- .../continuous/GridContinuousProcessor.java | 6 + ...eContinuousQueryAsyncFailoverTxSelfTest.java | 5 + ...ContinuousQueryFailoverAbstractSelfTest.java | 225 ++++++++++++++++++- .../CacheContinuousQueryFailoverTxSelfTest.java | 5 + 5 files changed, 237 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0659bebe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 4b5074c..304d031 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -595,12 +595,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler assert objs != null; assert ctx != null; - final List<CacheContinuousQueryEntry> entries = (List<CacheContinuousQueryEntry>)objs; - - if (entries.isEmpty()) + if (objs.isEmpty()) return; if (asyncCallback) { + final List<CacheContinuousQueryEntry> entries = objs instanceof List ? (List)objs : new ArrayList(objs); + IgniteStripedThreadPoolExecutor asyncPool = ctx.asyncCallbackPool(); int threadId = asyncPool.threadId(entries.get(0).partition()); @@ -639,7 +639,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler }, threadId); } else - notifyCallback0(nodeId, ctx, entries); + notifyCallback0(nodeId, ctx, (Collection)objs); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0659bebe/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index f078b1b..3a559e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -820,6 +820,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (!toSnd.isEmpty()) sendNotification(nodeId, routineId, null, toSnd, orderedTopic, true, null); } + else { + LocalRoutineInfo localRoutineInfo = locInfos.get(routineId); + + if (localRoutineInfo != null) + localRoutineInfo.handler().notifyCallback(nodeId, routineId, objs, ctx); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0659bebe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java index 8f0bd0e..900abc8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java @@ -41,4 +41,9 @@ public class CacheContinuousQueryAsyncFailoverTxSelfTest extends CacheContinuous @Override protected boolean asyncCallback() { return true; } + + /** {@inheritDoc} */ + public void testNoEventLossOnTopologyChange() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-4015"); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0659bebe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index 1376be1..1b7fe2b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.cache.Cache; import javax.cache.CacheException; @@ -51,6 +52,7 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.Ignition; import org.apache.ignite.cache.CacheAtomicWriteOrderMode; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; @@ -69,6 +71,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -84,9 +87,11 @@ import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.PAX; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.T3; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteOutClosure; +import org.apache.ignite.logger.NullLogger; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.IgniteSpiException; @@ -141,10 +146,10 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC cfg.setCommunicationSpi(commSpi); - MemoryEventStorageSpi eventSpi = new MemoryEventStorageSpi(); - eventSpi.setExpireCount(50); + MemoryEventStorageSpi evtSpi = new MemoryEventStorageSpi(); + evtSpi.setExpireCount(50); - cfg.setEventStorageSpi(eventSpi); + cfg.setEventStorageSpi(evtSpi); CacheConfiguration ccfg = new CacheConfiguration(); @@ -1180,7 +1185,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC CacheEntryEvent<?, ?> e = iter.next(); if ((exp.get2() != null && e.getValue() != null && exp.get2().equals(e.getValue())) - && equalOldValue(e, exp)) { + && equalOldValue(e, exp)) { found = true; iter.remove(); @@ -1254,12 +1259,12 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC * @param e Event * @param expVals expected value * @return {@code True} if entries has the same key, value and oldValue. If cache start without backups - * than oldValue ignoring in comparison. + * than oldValue ignoring in comparison. */ private boolean equalOldValue(CacheEntryEvent<?, ?> e, T3<Object, Object, Object> expVals) { return (e.getOldValue() == null && expVals.get3() == null) // Both null || (e.getOldValue() != null && expVals.get3() != null // Equals - && e.getOldValue().equals(expVals.get3())) + && e.getOldValue().equals(expVals.get3())) || (backups == 0); // If we start without backup than oldValue might be lose. } @@ -2040,7 +2045,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC bar.await(1, MINUTES); } } - catch (Exception e){ + catch (Exception e) { log.error("Failed.", e); err = true; @@ -2251,6 +2256,164 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC } /** + * This is failover test detecting CQ event loss while topology changing. + * + * @throws Exception If failed. + */ + public void testNoEventLossOnTopologyChange() throws Exception { + final int stableNodeCnt = 1; + + final int batchLoadSize = 2000; + + final int restartCycles = 5; + + Ignite qryClient = startGridsMultiThreaded(stableNodeCnt); + + final CacheEventListener4 lsnr = new CacheEventListener4(atomicityMode() == CacheAtomicityMode.ATOMIC); + + ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + IgniteCache<Integer, Integer> cache = qryClient.cache(null); + + QueryCursor<?> cur = cache.query(qry); + + int iteration = 0; + + int putCnt = 0; + + int ignoredDupEvts = 0; + + Thread nodeRestartThread = nodeRestartThread(restartCycles, 2_000, 1_000); + + try { + nodeRestartThread.start(); + + while (!Thread.interrupted() && nodeRestartThread.isAlive()) { + iteration++; + + for (int i = 0; i < batchLoadSize; i++) + cache.put(i, iteration); + + putCnt += batchLoadSize; + + log.info("Batch loaded. Iteration: " + iteration); + + final long cnt = lsnr.count(); + + final long expCnt = putCnt * stableNodeCnt + ignoredDupEvts; + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return cnt == expCnt; + } + }, 6_000); + + if (cnt != expCnt) { + StringBuilder sb = new StringBuilder(); + + for (int i = 0; i < batchLoadSize; i++) { + Integer key = i; + Integer val = cache.get(key); + + if (!F.eq(val, iteration)) + sb.append("\n\t").append(">>> WRONG CACHE VALUE (lost data?) [key=").append(key) + .append(", val=").append(val).append(']'); + } + + for (Map.Entry<Integer, Integer> entry : lsnr.eventMap().entrySet()) { + Integer key = entry.getKey(); + Integer val = entry.getValue(); + + if (!F.eq(val, iteration)) + sb.append("\n\t").append(">>> WRONG LISTENER VALUE (lost event?) [key=").append(key) + .append(", val=").append(val).append(']'); + } + + String msg = sb.toString(); + + // In atomic mode CQ can receive duplicate update events if update retried after fails. + // E.g. topology change + if (atomicityMode() == CacheAtomicityMode.ATOMIC && msg.isEmpty() && cnt > expCnt) + ignoredDupEvts += cnt - expCnt; + else + fail("Unexpected event updates count: EXPECTED=" + expCnt + ", ACTUAL=" + cnt + ", " + + "ITERATION=" + iteration + msg); + } + + sleep(500); + } + } + finally { + nodeRestartThread.interrupt(); + + cur.close(); + + nodeRestartThread.join(3_000); + } + } + + /** + * Starts thread which restarts a node over and over again. + */ + private Thread nodeRestartThread(final int restartCycles, final long initDelay, final long restartDelay) { + Thread t = new Thread(new Runnable() { + public void run() { + sleep(initDelay); + + try { + for (int i = 1; i <= restartCycles && !Thread.interrupted(); i++) { + + IgniteConfiguration cfg = optimize(getConfiguration("restartNode")). + setGridLogger(new NullLogger()); + + log.info("Node restart cycle started: " + i); + + try (Ignite ignored = Ignition.start(cfg)) { + awaitPartitionMapExchange(); + + sleep(restartDelay); + } + + log.info("Node restart cycle finished: " + i); + + awaitPartitionMapExchange(); + + sleep(restartDelay); + } + } + catch (Exception e) { + log.error("Unexpected error.", e); + } + } + }); + + t.setName("flapping-node-thread"); + + t.setDaemon(true); + + return t; + } + + /** + * Sleep quietly + * + * @param sleepTime Sleep time. + */ + private void sleep(long sleepTime) { + try { + if (Thread.currentThread().isInterrupted()) + return; + + U.sleep(sleepTime); + } + catch (IgniteInterruptedCheckedException e) { + Thread.interrupted(); + } + } + + /** * */ @IgniteAsyncCallback @@ -2363,7 +2526,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** {@inheritDoc} */ @Override public synchronized void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) - throws CacheEntryListenerException { + throws CacheEntryListenerException { try { for (CacheEntryEvent<?, ?> evt : evts) { Integer key = (Integer)evt.getKey(); @@ -2439,6 +2602,52 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC } /** + * Listener. + */ + private static class CacheEventListener4 implements CacheEntryUpdatedListener<Integer, Integer> { + /** Listener count. */ + private final AtomicLong cntr = new AtomicLong(); + + /** Listener map. */ + private final Map<Integer, Integer> evtMap = new ConcurrentHashMap<>(); + + /** Atomicity mode flag. */ + private final boolean atomicModeFlag; + + /** Constructor */ + public CacheEventListener4(boolean atomicModeFlag) { + this.atomicModeFlag = atomicModeFlag; + } + + /** {@inheritDoc} */ + @SuppressWarnings("EqualsBetweenInconvertibleTypes") + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) + throws CacheEntryListenerException { + for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) { + Integer prev = evtMap.put(evt.getKey(), evt.getValue()); + + //Atomic cache allows duplicate events if cache update operation fails, e.g. due to topology change. + if (!atomicModeFlag || prev == null || !prev.equals(evt)) + cntr.incrementAndGet(); + } + } + + /** + * @return Events count. + */ + public long count() { + return cntr.get(); + } + + /** + * @return Event map. + */ + Map<Integer, Integer> eventMap() { + return evtMap; + } + } + + /** * */ @IgniteAsyncCallback http://git-wip-us.apache.org/repos/asf/ignite/blob/0659bebe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxSelfTest.java index 789a105..c5240da 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxSelfTest.java @@ -36,4 +36,9 @@ public class CacheContinuousQueryFailoverTxSelfTest extends CacheContinuousQuery @Override protected CacheAtomicityMode atomicityMode() { return TRANSACTIONAL; } + + /** {@inheritDoc} */ + public void testNoEventLossOnTopologyChange() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-4015"); + } }