Repository: ignite
Updated Branches:
  refs/heads/ignite-5075-cc-debug [created] 8adfe7d7f


cc


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8adfe7d7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8adfe7d7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8adfe7d7

Branch: refs/heads/ignite-5075-cc-debug
Commit: 8adfe7d7f6c29ccebae2ee7459bf9edb51b5a520
Parents: c2c9277
Author: sboikov <[email protected]>
Authored: Wed May 24 18:07:59 2017 +0300
Committer: sboikov <[email protected]>
Committed: Wed May 24 18:07:59 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |   6 +
 .../continuous/CacheContinuousQueryEntry.java   |  11 +-
 .../continuous/CacheContinuousQueryHandler.java |  46 +++++-
 .../communication/tcp/TcpCommunicationSpi.java  |  36 ++--
 .../spi/communication/tcp/TestDebugLog.java     | 164 +++++++++++++++++++
 ...ContinuousQueryFailoverAbstractSelfTest.java | 133 ++++++++-------
 6 files changed, 313 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8adfe7d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 30c2a33..569d638 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -76,6 +76,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.communication.tcp.TestDebugLog;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_EXPIRED;
@@ -1843,6 +1844,11 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 CacheObject evtVal = cctx.unwrapTemporary(updateVal);
                 CacheObject evtOldVal = cctx.unwrapTemporary(oldVal);
 
+                if (primary)
+                    TestDebugLog.addEntryMessage(partition(), 
evtVal.value(cctx.cacheObjectContext(), false), "primary notify cntr=" + 
c.updateRes.updateCounter() + " k=" + key.value(null, false));
+                else
+                    TestDebugLog.addEntryMessage(key.value(null, false), 
evtVal.value(cctx.cacheObjectContext(), false), "backup notify cntr=" + 
c.updateRes.updateCounter() + " k=" + key.value(null, false));
+
                 cctx.continuousQueries().onEntryUpdated(lsnrs,
                     key,
                     evtVal,

http://git-wip-us.apache.org/repos/asf/ignite/blob/8adfe7d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 9db92b2..25f6e26 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -231,9 +231,18 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
             return this;
 
         CacheContinuousQueryEntry e = new CacheContinuousQueryEntry(
-                cacheId, null, null, null, null, keepBinary, part, updateCntr, 
topVer);
+            cacheId,
+            null,
+            null,
+            null,
+            null,
+            keepBinary,
+            part,
+            updateCntr,
+            topVer);
 
         e.flags = flags;
+        e.filteredCnt = filteredCnt;
 
         return e;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8adfe7d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index c046095..01fe5e7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -77,6 +77,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.spi.communication.tcp.TestDebugLog;
 import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -1050,6 +1051,10 @@ public class CacheContinuousQueryHandler<K, V> 
implements GridContinuousHandler
                 if (curTop == AffinityTopologyVersion.NONE) {
                     lastFiredEvt = entry.updateCounter();
 
+                    TestDebugLog.addEntryMessage(entry.partition(),
+                        entry.updateCounter(),
+                        "collect first cntr=" + entry.updateCounter() + " 
topVer=" + entry.topologyVersion());
+
                     curTop = entry.topologyVersion();
 
                     if (log.isDebugEnabled()) {
@@ -1080,6 +1085,10 @@ public class CacheContinuousQueryHandler<K, V> 
implements GridContinuousHandler
 
                         lastFiredEvt = entry.updateCounter();
 
+                        TestDebugLog.addEntryMessage(entry.partition(),
+                            entry.updateCounter(),
+                            "collect for lost topVer cntr=" + 
entry.updateCounter() + " topVer=" + entry.topologyVersion());
+
                         if (!entry.isFiltered())
                             entries.add(new CacheContinuousQueryEvent<K, 
V>(cache, cctx, entry));
 
@@ -1097,12 +1106,29 @@ public class CacheContinuousQueryHandler<K, V> 
implements GridContinuousHandler
                 }
 
                 // Check duplicate.
-                if (entry.updateCounter() > lastFiredEvt)
+                if (entry.updateCounter() > lastFiredEvt) {
+                    TestDebugLog.addEntryMessage(entry.partition(),
+                        entry.updateCounter(),
+                        "add event last=" + lastFiredEvt +
+                        " cntr=" + entry.updateCounter() +
+                        " key=" + (entry.isFiltered() ? "filtered" : 
entry.key().value(cctx.cacheObjectContext(), false))  +
+                        " val=" + (entry.isFiltered() ? "filtered" : 
entry.value().value(cctx.cacheObjectContext(), false))  +
+                        " topVer=" + entry.topologyVersion());
+
                     pendingEvts.put(entry.updateCounter(), entry);
+                }
                 else {
                     if (log.isDebugEnabled())
                         log.debug("Skip duplicate continuous query message: " 
+ entry);
 
+                    TestDebugLog.addEntryMessage(entry.partition(),
+                        entry.updateCounter(),
+                        "skip duplicate last=" + lastFiredEvt +
+                        " cntr=" + entry.updateCounter() +
+                        " key=" + (entry.isFiltered() ? "filtered" : 
entry.key().value(cctx.cacheObjectContext(), false))  +
+                        " val=" + (entry.isFiltered() ? "filtered" : 
entry.value().value(cctx.cacheObjectContext(), false))  +
+                        " topVer=" + entry.topologyVersion());
+
                     return Collections.emptyList();
                 }
 
@@ -1156,9 +1182,18 @@ public class CacheContinuousQueryHandler<K, V> 
implements GridContinuousHandler
                         boolean fire = e.getKey() == lastFiredEvt + 1;;
 
                         if (!fire && filtered > 0)
-                            fire = e.getKey() - filtered == lastFiredEvt + 1;
+                            fire = e.getKey() - filtered <= lastFiredEvt;
 
                         if (fire) {
+                            TestDebugLog.addEntryMessage(entry.partition(),
+                                entry.updateCounter(),
+                                "process last=" + lastFiredEvt +
+                                " cntr=" + e.getKey() +
+                                " key=" + (pending.isFiltered() ? "filtered" : 
pending.key().value(cctx.cacheObjectContext(), false))  +
+                                " val=" + (pending.isFiltered() ? "filtered" : 
pending.value().value(cctx.cacheObjectContext(), false))  +
+                                " topVer=" + e.getValue().topologyVersion() +
+                                " f=" + pending.filteredCount());
+
                             lastFiredEvt = e.getKey();
 
                             if (e.getValue() != HOLE && 
!e.getValue().isFiltered())
@@ -1166,8 +1201,13 @@ public class CacheContinuousQueryHandler<K, V> 
implements GridContinuousHandler
 
                             iter.remove();
                         }
-                        else
+                        else {
+                            TestDebugLog.addEntryMessage(entry.partition(),
+                                entry.updateCounter(),
+                                "stop process last=" + lastFiredEvt + " cntr=" 
+ e.getKey() + " topVer=" + e.getValue().topologyVersion() + " f=" + 
pending.filteredCount());
+
                             break;
+                        }
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8adfe7d7/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 5ec9a6e..1462aae 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2891,24 +2891,24 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter implements Communicati
                 allInetAddrs.add(addr.getAddress());
         }
 
-        List<InetAddress> reachableInetAddrs = U.filterReachable(allInetAddrs);
-
-        if (reachableInetAddrs.size() < allInetAddrs.size()) {
-            LinkedHashSet<InetSocketAddress> addrs0 = 
U.newLinkedHashSet(addrs.size());
-
-            List<InetSocketAddress> unreachableInetAddr = new 
ArrayList<>(allInetAddrs.size() - reachableInetAddrs.size());
-
-            for (InetSocketAddress addr : addrs) {
-                if (reachableInetAddrs.contains(addr.getAddress()))
-                    addrs0.add(addr);
-                else
-                    unreachableInetAddr.add(addr);
-            }
-
-            addrs0.addAll(unreachableInetAddr);
-
-            addrs = addrs0;
-        }
+//        List<InetAddress> reachableInetAddrs = 
U.filterReachable(allInetAddrs);
+//
+//        if (reachableInetAddrs.size() < allInetAddrs.size()) {
+//            LinkedHashSet<InetSocketAddress> addrs0 = 
U.newLinkedHashSet(addrs.size());
+//
+//            List<InetSocketAddress> unreachableInetAddr = new 
ArrayList<>(allInetAddrs.size() - reachableInetAddrs.size());
+//
+//            for (InetSocketAddress addr : addrs) {
+//                if (reachableInetAddrs.contains(addr.getAddress()))
+//                    addrs0.add(addr);
+//                else
+//                    unreachableInetAddr.add(addr);
+//            }
+//
+//            addrs0.addAll(unreachableInetAddr);
+//
+//            addrs = addrs0;
+//        }
 
         if (log.isDebugEnabled())
             log.debug("Addresses to connect for node [rmtNode=" + node.id() + 
", addrs=" + addrs.toString() + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/8adfe7d7/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TestDebugLog.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TestDebugLog.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TestDebugLog.java
new file mode 100644
index 0000000..038ea0c
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TestDebugLog.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.text.*;
+import java.util.*;
+
+/**
+ * TODO
+ */
+public class TestDebugLog {
+    /** */
+    private static final List<Object> msgs = Collections.synchronizedList(new 
ArrayList<>(100_000));
+
+    /** */
+    private static final SimpleDateFormat DEBUG_DATE_FMT = new 
SimpleDateFormat("HH:mm:ss,SSS");
+
+    static class Message {
+        String thread = Thread.currentThread().getName();
+
+        String msg;
+
+        long ts = U.currentTimeMillis();
+
+        public Message(String msg) {
+            this.msg = msg;
+        }
+
+        public String toString() {
+            return "Msg [msg=" + msg + ", thread=" + thread + ", time=" + 
DEBUG_DATE_FMT.format(new Date(ts)) + ']';
+        }
+    }
+
+    static class EntryMessage extends Message {
+        Object key;
+        Object val;
+
+        public EntryMessage(Object key, Object val, String msg) {
+            super(msg);
+
+            this.key = key;
+            this.val = val;
+        }
+
+        public String toString() {
+            return "EntryMsg [key=" + key + ", val=" + val + ", msg=" + msg + 
", thread" + thread + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
+        }
+    }
+
+    static final boolean out = false;
+
+    public static void addMessage(String msg) {
+        msgs.add(new Message(msg));
+
+        if (out)
+            System.out.println(msg);
+    }
+
+    public static void addEntryMessage(Object key, Object val, String msg) {
+        EntryMessage msg0 = new EntryMessage(key, val, msg);
+
+        msgs.add(msg0);
+
+        if (out)
+            System.out.println(msg0.toString());
+    }
+
+    public static void printMessages(boolean file) {
+        List<Object> msgs0;
+
+        synchronized (msgs) {
+            msgs0 = new ArrayList<>(msgs);
+
+            msgs.clear();
+        }
+
+        if (file) {
+            try {
+                FileOutputStream out = new FileOutputStream("test_debug.log");
+
+                PrintWriter w = new PrintWriter(out);
+
+                for (Object msg : msgs0)
+                    w.println(msg.toString());
+
+                w.close();
+
+                out.close();
+            }
+            catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+        else {
+            for (Object msg : msgs0)
+                System.out.println(msg);
+        }
+    }
+
+    public static void printKeyMessages(boolean file, Object key) {
+        List<Object> msgs0;
+
+        synchronized (msgs) {
+            msgs0 = new ArrayList<>(msgs);
+
+            msgs.clear();
+        }
+
+        if (file) {
+            try {
+                FileOutputStream out = new FileOutputStream("test_debug.log");
+
+                PrintWriter w = new PrintWriter(out);
+
+                for (Object msg : msgs0) {
+                    if (msg instanceof EntryMessage && 
!((EntryMessage)msg).key.equals(key))
+                        continue;
+
+                    w.println(msg.toString());
+                }
+
+                w.close();
+
+                out.close();
+            }
+            catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+        else {
+            for (Object msg : msgs0) {
+                if (msg instanceof EntryMessage && 
!((EntryMessage)msg).key.equals(key))
+                    continue;
+
+                System.out.println(msg);
+            }
+        }
+    }
+
+    public static void clear() {
+        msgs.clear();
+    }
+
+    public static void main(String[] args) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8adfe7d7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index befd1d7..050af5d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -95,6 +95,7 @@ import 
org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.TestDebugLog;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -1222,8 +1223,15 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
             if (!lostAllow && lostEvts.size() > 100) {
                 log.error("Lost event cnt: " + lostEvts.size());
 
-                for (T3<Object, Object, Object> e : lostEvts)
-                    log.error("Lost event: " + e);
+                for (T3<Object, Object, Object> e : lostEvts) {
+                    log.error("Lost event: " + 
ignite(0).affinity(DEFAULT_CACHE_NAME).partition(e.get1()) + " " + e);
+
+                    
TestDebugLog.addEntryMessage(ignite(0).affinity(DEFAULT_CACHE_NAME).partition(e.get1()),
 e.get2(), "lost event " + e.get1() + " " + e.get2());
+
+                    TestDebugLog.printKeyMessages(true, 
ignite(0).affinity(DEFAULT_CACHE_NAME).partition(e.get1()));
+
+                    System.exit(1);
+                }
 
                 fail("Lose events, see log for details.");
             }
@@ -1645,10 +1653,6 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
         QueryCursor<?> cur = qryClnCache.query(qry);
 
-        CacheEventListener2 dinLsnr = null;
-
-        QueryCursor<?> dinQry = null;
-
         final AtomicBoolean stop = new AtomicBoolean();
 
         final AtomicReference<CountDownLatch> checkLatch = new 
AtomicReference<>();
@@ -1659,22 +1663,24 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
                     final int idx = 
ThreadLocalRandom.current().nextInt(SRV_NODES - 1);
 
                     log.info("Stop node: " + idx);
+                    TestDebugLog.addMessage("Stop node: " + idx);
 
                     awaitPartitionMapExchange();
 
-                    Thread.sleep(400);
+                    Thread.sleep(100);
 
                     stopGrid(idx);
 
                     awaitPartitionMapExchange();
 
-                    Thread.sleep(400);
+                    Thread.sleep(100);
 
                     log.info("Start node: " + idx);
+                    TestDebugLog.addMessage("Start node: " + idx);
 
                     startGrid(idx);
 
-                    Thread.sleep(200);
+                    Thread.sleep(100);
 
                     CountDownLatch latch = new CountDownLatch(1);
 
@@ -1695,7 +1701,7 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
         final Map<Integer, List<T2<Integer, Integer>>> expEvts = new 
HashMap<>();
 
-        final List<T3<Object, Object, Object>> expEvtsNewLsnr = new 
ArrayList<>();
+        //final List<T3<Object, Object, Object>> expEvtsNewLsnr = new 
ArrayList<>();
 
         final List<T3<Object, Object, Object>> expEvtsLsnr = new ArrayList<>();
 
@@ -1703,7 +1709,7 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
             long stopTime = System.currentTimeMillis() + 60_000;
 
             // Start new filter each 5 sec.
-            long startFilterTime = System.currentTimeMillis() + 5_000;
+            //long startFilterTime = System.currentTimeMillis() + 5_000;
 
             final int PARTS = 
qryClient.affinity(DEFAULT_CACHE_NAME).partitions();
 
@@ -1719,30 +1725,30 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
                 Integer prevVal = vals.get(key);
                 Integer val = vals.get(key);
 
-                if (System.currentTimeMillis() > startFilterTime) {
-                    // Stop filter and check events.
-                    if (dinQry != null) {
-                        dinQry.close();
-
-                        log.info("Continuous query listener closed. Await 
events: " + expEvtsNewLsnr.size());
-
-                        checkEvents(expEvtsNewLsnr, dinLsnr, backups == 0);
-                    }
-
-                    dinLsnr = new CacheEventListener2();
-
-                    ContinuousQuery<Object, Object> newQry = new 
ContinuousQuery<>();
-
-                    newQry.setLocalListener(dinLsnr);
-
-                    newQry.setRemoteFilter(asyncCallback() ? new 
CacheEventAsyncFilter() : new CacheEventFilter());
-
-                    dinQry = qryClnCache.query(newQry);
-
-                    log.info("Continuous query listener started.");
-
-                    startFilterTime = System.currentTimeMillis() + 5_000;
-                }
+//                if (System.currentTimeMillis() > startFilterTime) {
+//                    // Stop filter and check events.
+//                    if (dinQry != null) {
+//                        dinQry.close();
+//
+//                        log.info("Continuous query listener closed. Await 
events: " + expEvtsNewLsnr.size());
+//
+//                        checkEvents(expEvtsNewLsnr, dinLsnr, backups == 0);
+//                    }
+//
+//                    dinLsnr = new CacheEventListener2();
+//
+//                    ContinuousQuery<Object, Object> newQry = new 
ContinuousQuery<>();
+//
+//                    newQry.setLocalListener(dinLsnr);
+//
+//                    newQry.setRemoteFilter(asyncCallback() ? new 
CacheEventAsyncFilter() : new CacheEventFilter());
+//
+//                    dinQry = qryClnCache.query(newQry);
+//
+//                    log.info("Continuous query listener started.");
+//
+//                    startFilterTime = System.currentTimeMillis() + 5_000;
+//                }
 
                 if (val == null)
                     val = 0;
@@ -1752,20 +1758,23 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
                 if (filtered)
                     val = -val;
 
-                if (processorPut && prevVal != null) {
-                    qryClnCache.invoke(key, new CacheEntryProcessor<Object, 
Object, Void>() {
-                        @Override public Void process(MutableEntry<Object, 
Object> entry,
-                            Object... arguments) throws 
EntryProcessorException {
-                            entry.setValue(arguments[0]);
+//                if (processorPut && prevVal != null) {
+//                    qryClnCache.invoke(key, new CacheEntryProcessor<Object, 
Object, Void>() {
+//                        @Override public Void process(MutableEntry<Object, 
Object> entry,
+//                            Object... arguments) throws 
EntryProcessorException {
+//                            entry.setValue(arguments[0]);
+//
+//                            return null;
+//                        }
+//                    }, val);
+//                }
+//                else
 
-                            return null;
-                        }
-                    }, val);
-                }
-                else
-                    qryClnCache.put(key, val);
+                
TestDebugLog.addEntryMessage(ignite(4).affinity(DEFAULT_CACHE_NAME).partition(key),
 val, "do put " + key + " " + val);
 
-                processorPut = !processorPut;
+                qryClnCache.put(key, val);
+
+                //processorPut = !processorPut;
 
                 vals.put(key, val);
 
@@ -1784,8 +1793,8 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
                     expEvtsLsnr.add(tupVal);
 
-                    if (dinQry != null)
-                        expEvtsNewLsnr.add(tupVal);
+//                    if (dinQry != null)
+//                        expEvtsNewLsnr.add(tupVal);
                 }
 
                 filtered = !filtered;
@@ -1793,7 +1802,7 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
                 CountDownLatch latch = checkLatch.get();
 
                 if (latch != null) {
-                    log.info("Check events.");
+                    log.info("Check events " + expEvtsLsnr.size());
 
                     checkLatch.set(null);
 
@@ -1808,6 +1817,8 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
                         success = true;
 
                         log.info("Events checked.");
+
+                        //TestDebugLog.clear();
                     }
                     finally {
                         if (!success)
@@ -1834,12 +1845,12 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
         lsnr.evts.clear();
         lsnr.vals.clear();
 
-        if (dinQry != null) {
-            checkEvents(expEvtsNewLsnr, dinLsnr, backups == 0);
-
-            dinLsnr.evts.clear();
-            dinLsnr.vals.clear();
-        }
+//        if (dinQry != null) {
+//            checkEvents(expEvtsNewLsnr, dinLsnr, backups == 0);
+//
+//            dinLsnr.evts.clear();
+//            dinLsnr.vals.clear();
+//        }
 
         List<T3<Object, Object, Object>> afterRestEvts = new ArrayList<>();
 
@@ -1855,11 +1866,11 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
         cur.close();
 
-        if (dinQry != null) {
-            checkEvents(new ArrayList<>(afterRestEvts), dinLsnr, false);
-
-            dinQry.close();
-        }
+//        if (dinQry != null) {
+//            checkEvents(new ArrayList<>(afterRestEvts), dinLsnr, false);
+//
+//            dinQry.close();
+//        }
 
         assertFalse("Unexpected error during test, see log for details.", err);
     }

Reply via email to