IGNITE-3972: Fixed a bug causing continuous queries to be lost for ATOMIC cache 
when key's primary node leaves topology. This closes #1133.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0659bebe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0659bebe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0659bebe

Branch: refs/heads/ignite-ssl-hotfix
Commit: 0659bebe04dc9c0b0a163bc95061519c553e678c
Parents: 01ca6db
Author: Andrey V. Mashenkov <andrey.mashen...@gmail.com>
Authored: Wed Oct 12 14:49:36 2016 +0300
Committer: vozerov-gridgain <voze...@gridgain.com>
Committed: Wed Oct 12 14:49:36 2016 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryHandler.java |   8 +-
 .../continuous/GridContinuousProcessor.java     |   6 +
 ...eContinuousQueryAsyncFailoverTxSelfTest.java |   5 +
 ...ContinuousQueryFailoverAbstractSelfTest.java | 225 ++++++++++++++++++-
 .../CacheContinuousQueryFailoverTxSelfTest.java |   5 +
 5 files changed, 237 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0659bebe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 4b5074c..304d031 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -595,12 +595,12 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
         assert objs != null;
         assert ctx != null;
 
-        final List<CacheContinuousQueryEntry> entries = 
(List<CacheContinuousQueryEntry>)objs;
-
-        if (entries.isEmpty())
+        if (objs.isEmpty())
             return;
 
         if (asyncCallback) {
+            final List<CacheContinuousQueryEntry> entries = objs instanceof 
List ? (List)objs : new ArrayList(objs);
+
             IgniteStripedThreadPoolExecutor asyncPool = 
ctx.asyncCallbackPool();
 
             int threadId = asyncPool.threadId(entries.get(0).partition());
@@ -639,7 +639,7 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
             }, threadId);
         }
         else
-            notifyCallback0(nodeId, ctx, entries);
+            notifyCallback0(nodeId, ctx, (Collection)objs);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/0659bebe/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index f078b1b..3a559e7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -820,6 +820,12 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
             if (!toSnd.isEmpty())
                 sendNotification(nodeId, routineId, null, toSnd, orderedTopic, 
true, null);
         }
+        else {
+            LocalRoutineInfo localRoutineInfo = locInfos.get(routineId);
+
+            if (localRoutineInfo != null)
+                localRoutineInfo.handler().notifyCallback(nodeId, routineId, 
objs, ctx);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/0659bebe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java
index 8f0bd0e..900abc8 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java
@@ -41,4 +41,9 @@ public class CacheContinuousQueryAsyncFailoverTxSelfTest 
extends CacheContinuous
     @Override protected boolean asyncCallback() {
         return true;
     }
+
+    /** {@inheritDoc} */
+    public void testNoEventLossOnTopologyChange() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-4015";);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0659bebe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 1376be1..1b7fe2b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.Cache;
 import javax.cache.CacheException;
@@ -51,6 +52,7 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.Ignition;
 import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
@@ -69,6 +71,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -84,9 +87,11 @@ import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.PAX;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.logger.NullLogger;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.spi.IgniteSpiException;
@@ -141,10 +146,10 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
         cfg.setCommunicationSpi(commSpi);
 
-        MemoryEventStorageSpi eventSpi = new MemoryEventStorageSpi();
-        eventSpi.setExpireCount(50);
+        MemoryEventStorageSpi evtSpi = new MemoryEventStorageSpi();
+        evtSpi.setExpireCount(50);
 
-        cfg.setEventStorageSpi(eventSpi);
+        cfg.setEventStorageSpi(evtSpi);
 
         CacheConfiguration ccfg = new CacheConfiguration();
 
@@ -1180,7 +1185,7 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
                     CacheEntryEvent<?, ?> e = iter.next();
 
                     if ((exp.get2() != null && e.getValue() != null && 
exp.get2().equals(e.getValue()))
-                            && equalOldValue(e, exp)) {
+                        && equalOldValue(e, exp)) {
                         found = true;
 
                         iter.remove();
@@ -1254,12 +1259,12 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
      * @param e Event
      * @param expVals expected value
      * @return {@code True} if entries has the same key, value and oldValue. 
If cache start without backups
-     *          than oldValue ignoring in comparison.
+     * than oldValue ignoring in comparison.
      */
     private boolean equalOldValue(CacheEntryEvent<?, ?> e, T3<Object, Object, 
Object> expVals) {
         return (e.getOldValue() == null && expVals.get3() == null) // Both null
             || (e.getOldValue() != null && expVals.get3() != null  // Equals
-                && e.getOldValue().equals(expVals.get3()))
+            && e.getOldValue().equals(expVals.get3()))
             || (backups == 0); // If we start without backup than oldValue 
might be lose.
     }
 
@@ -2040,7 +2045,7 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
                             bar.await(1, MINUTES);
                     }
                 }
-                catch (Exception e){
+                catch (Exception e) {
                     log.error("Failed.", e);
 
                     err = true;
@@ -2251,6 +2256,164 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
     }
 
     /**
+     * This is failover test detecting CQ event loss while topology changing.
+     *
+     * @throws Exception If failed.
+     */
+    public void testNoEventLossOnTopologyChange() throws Exception {
+        final int stableNodeCnt = 1;
+
+        final int batchLoadSize = 2000;
+
+        final int restartCycles = 5;
+
+        Ignite qryClient = startGridsMultiThreaded(stableNodeCnt);
+
+        final CacheEventListener4 lsnr = new 
CacheEventListener4(atomicityMode() == CacheAtomicityMode.ATOMIC);
+
+        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(lsnr);
+
+        IgniteCache<Integer, Integer> cache = qryClient.cache(null);
+
+        QueryCursor<?> cur = cache.query(qry);
+
+        int iteration = 0;
+
+        int putCnt = 0;
+
+        int ignoredDupEvts = 0;
+
+        Thread nodeRestartThread = nodeRestartThread(restartCycles, 2_000, 
1_000);
+
+        try {
+            nodeRestartThread.start();
+
+            while (!Thread.interrupted() && nodeRestartThread.isAlive()) {
+                iteration++;
+
+                for (int i = 0; i < batchLoadSize; i++)
+                    cache.put(i, iteration);
+
+                putCnt += batchLoadSize;
+
+                log.info("Batch loaded. Iteration: " + iteration);
+
+                final long cnt = lsnr.count();
+
+                final long expCnt = putCnt * stableNodeCnt + ignoredDupEvts;
+
+                GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                    @Override public boolean apply() {
+                        return cnt == expCnt;
+                    }
+                }, 6_000);
+
+                if (cnt != expCnt) {
+                    StringBuilder sb = new StringBuilder();
+
+                    for (int i = 0; i < batchLoadSize; i++) {
+                        Integer key = i;
+                        Integer val = cache.get(key);
+
+                        if (!F.eq(val, iteration))
+                            sb.append("\n\t").append(">>> WRONG CACHE VALUE 
(lost data?) [key=").append(key)
+                                .append(", val=").append(val).append(']');
+                    }
+
+                    for (Map.Entry<Integer, Integer> entry : 
lsnr.eventMap().entrySet()) {
+                        Integer key = entry.getKey();
+                        Integer val = entry.getValue();
+
+                        if (!F.eq(val, iteration))
+                            sb.append("\n\t").append(">>> WRONG LISTENER VALUE 
(lost event?) [key=").append(key)
+                                .append(", val=").append(val).append(']');
+                    }
+
+                    String msg = sb.toString();
+
+                    // In atomic mode CQ can receive duplicate update events 
if update retried after fails.
+                    // E.g. topology change
+                    if (atomicityMode() == CacheAtomicityMode.ATOMIC && 
msg.isEmpty() && cnt > expCnt)
+                        ignoredDupEvts += cnt - expCnt;
+                    else
+                        fail("Unexpected event updates count: EXPECTED=" + 
expCnt + ", ACTUAL=" + cnt + ", " +
+                            "ITERATION=" + iteration + msg);
+                }
+
+                sleep(500);
+            }
+        }
+        finally {
+            nodeRestartThread.interrupt();
+
+            cur.close();
+
+            nodeRestartThread.join(3_000);
+        }
+    }
+
+    /**
+     * Starts thread which restarts a node over and over again.
+     */
+    private Thread nodeRestartThread(final int restartCycles, final long 
initDelay, final long restartDelay) {
+        Thread t = new Thread(new Runnable() {
+            public void run() {
+                sleep(initDelay);
+
+                try {
+                    for (int i = 1; i <= restartCycles && 
!Thread.interrupted(); i++) {
+
+                        IgniteConfiguration cfg = 
optimize(getConfiguration("restartNode")).
+                            setGridLogger(new NullLogger());
+
+                        log.info("Node restart cycle started: " + i);
+
+                        try (Ignite ignored = Ignition.start(cfg)) {
+                            awaitPartitionMapExchange();
+
+                            sleep(restartDelay);
+                        }
+
+                        log.info("Node restart cycle finished: " + i);
+
+                        awaitPartitionMapExchange();
+
+                        sleep(restartDelay);
+                    }
+                }
+                catch (Exception e) {
+                    log.error("Unexpected error.", e);
+                }
+            }
+        });
+
+        t.setName("flapping-node-thread");
+
+        t.setDaemon(true);
+
+        return t;
+    }
+
+    /**
+     * Sleep quietly
+     *
+     * @param sleepTime Sleep time.
+     */
+    private void sleep(long sleepTime) {
+        try {
+            if (Thread.currentThread().isInterrupted())
+                return;
+
+            U.sleep(sleepTime);
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            Thread.interrupted();
+        }
+    }
+
+    /**
      *
      */
     @IgniteAsyncCallback
@@ -2363,7 +2526,7 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
         /** {@inheritDoc} */
         @Override public synchronized void 
onUpdated(Iterable<CacheEntryEvent<?, ?>> evts)
-            throws CacheEntryListenerException  {
+            throws CacheEntryListenerException {
             try {
                 for (CacheEntryEvent<?, ?> evt : evts) {
                     Integer key = (Integer)evt.getKey();
@@ -2439,6 +2602,52 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
     }
 
     /**
+     * Listener.
+     */
+    private static class CacheEventListener4 implements 
CacheEntryUpdatedListener<Integer, Integer> {
+        /** Listener count. */
+        private final AtomicLong cntr = new AtomicLong();
+
+        /** Listener map. */
+        private final Map<Integer, Integer> evtMap = new ConcurrentHashMap<>();
+
+        /** Atomicity mode flag. */
+        private final boolean atomicModeFlag;
+
+        /** Constructor */
+        public CacheEventListener4(boolean atomicModeFlag) {
+            this.atomicModeFlag = atomicModeFlag;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("EqualsBetweenInconvertibleTypes")
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends 
Integer, ? extends Integer>> evts)
+            throws CacheEntryListenerException {
+            for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : 
evts) {
+                Integer prev = evtMap.put(evt.getKey(), evt.getValue());
+
+                //Atomic cache allows duplicate events if cache update 
operation fails, e.g. due to topology change.
+                if (!atomicModeFlag || prev == null || !prev.equals(evt))
+                    cntr.incrementAndGet();
+            }
+        }
+
+        /**
+         * @return Events count.
+         */
+        public long count() {
+            return cntr.get();
+        }
+
+        /**
+         * @return Event map.
+         */
+        Map<Integer, Integer> eventMap() {
+            return evtMap;
+        }
+    }
+
+    /**
      *
      */
     @IgniteAsyncCallback

http://git-wip-us.apache.org/repos/asf/ignite/blob/0659bebe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxSelfTest.java
index 789a105..c5240da 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxSelfTest.java
@@ -36,4 +36,9 @@ public class CacheContinuousQueryFailoverTxSelfTest extends 
CacheContinuousQuery
     @Override protected CacheAtomicityMode atomicityMode() {
         return TRANSACTIONAL;
     }
+
+    /** {@inheritDoc} */
+    public void testNoEventLossOnTopologyChange() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-4015";);
+    }
 }

Reply via email to