Repository: ignite Updated Branches: refs/heads/ignite-5075-cc-debug [created] 8adfe7d7f
cc Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8adfe7d7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8adfe7d7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8adfe7d7 Branch: refs/heads/ignite-5075-cc-debug Commit: 8adfe7d7f6c29ccebae2ee7459bf9edb51b5a520 Parents: c2c9277 Author: sboikov <[email protected]> Authored: Wed May 24 18:07:59 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed May 24 18:07:59 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 6 + .../continuous/CacheContinuousQueryEntry.java | 11 +- .../continuous/CacheContinuousQueryHandler.java | 46 +++++- .../communication/tcp/TcpCommunicationSpi.java | 36 ++-- .../spi/communication/tcp/TestDebugLog.java | 164 +++++++++++++++++++ ...ContinuousQueryFailoverAbstractSelfTest.java | 133 ++++++++------- 6 files changed, 313 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8adfe7d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 30c2a33..569d638 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -76,6 +76,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.communication.tcp.TestDebugLog; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_EXPIRED; @@ -1843,6 +1844,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CacheObject evtVal = cctx.unwrapTemporary(updateVal); CacheObject evtOldVal = cctx.unwrapTemporary(oldVal); + if (primary) + TestDebugLog.addEntryMessage(partition(), evtVal.value(cctx.cacheObjectContext(), false), "primary notify cntr=" + c.updateRes.updateCounter() + " k=" + key.value(null, false)); + else + TestDebugLog.addEntryMessage(key.value(null, false), evtVal.value(cctx.cacheObjectContext(), false), "backup notify cntr=" + c.updateRes.updateCounter() + " k=" + key.value(null, false)); + cctx.continuousQueries().onEntryUpdated(lsnrs, key, evtVal, http://git-wip-us.apache.org/repos/asf/ignite/blob/8adfe7d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java index 9db92b2..25f6e26 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -231,9 +231,18 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { return this; CacheContinuousQueryEntry e = new CacheContinuousQueryEntry( - cacheId, null, null, null, null, keepBinary, part, updateCntr, topVer); + cacheId, + null, + null, + null, + null, + keepBinary, + part, + updateCntr, + topVer); e.flags = flags; + e.filteredCnt = filteredCnt; return e; } http://git-wip-us.apache.org/repos/asf/ignite/blob/8adfe7d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index c046095..01fe5e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -77,6 +77,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.spi.communication.tcp.TestDebugLog; import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -1050,6 +1051,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler if (curTop == AffinityTopologyVersion.NONE) { lastFiredEvt = entry.updateCounter(); + TestDebugLog.addEntryMessage(entry.partition(), + entry.updateCounter(), + "collect first cntr=" + entry.updateCounter() + " topVer=" + entry.topologyVersion()); + curTop = entry.topologyVersion(); if (log.isDebugEnabled()) { @@ -1080,6 +1085,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler lastFiredEvt = entry.updateCounter(); + TestDebugLog.addEntryMessage(entry.partition(), + entry.updateCounter(), + "collect for lost topVer cntr=" + entry.updateCounter() + " topVer=" + entry.topologyVersion()); + if (!entry.isFiltered()) entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)); @@ -1097,12 +1106,29 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } // Check duplicate. - if (entry.updateCounter() > lastFiredEvt) + if (entry.updateCounter() > lastFiredEvt) { + TestDebugLog.addEntryMessage(entry.partition(), + entry.updateCounter(), + "add event last=" + lastFiredEvt + + " cntr=" + entry.updateCounter() + + " key=" + (entry.isFiltered() ? "filtered" : entry.key().value(cctx.cacheObjectContext(), false)) + + " val=" + (entry.isFiltered() ? "filtered" : entry.value().value(cctx.cacheObjectContext(), false)) + + " topVer=" + entry.topologyVersion()); + pendingEvts.put(entry.updateCounter(), entry); + } else { if (log.isDebugEnabled()) log.debug("Skip duplicate continuous query message: " + entry); + TestDebugLog.addEntryMessage(entry.partition(), + entry.updateCounter(), + "skip duplicate last=" + lastFiredEvt + + " cntr=" + entry.updateCounter() + + " key=" + (entry.isFiltered() ? "filtered" : entry.key().value(cctx.cacheObjectContext(), false)) + + " val=" + (entry.isFiltered() ? "filtered" : entry.value().value(cctx.cacheObjectContext(), false)) + + " topVer=" + entry.topologyVersion()); + return Collections.emptyList(); } @@ -1156,9 +1182,18 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler boolean fire = e.getKey() == lastFiredEvt + 1;; if (!fire && filtered > 0) - fire = e.getKey() - filtered == lastFiredEvt + 1; + fire = e.getKey() - filtered <= lastFiredEvt; if (fire) { + TestDebugLog.addEntryMessage(entry.partition(), + entry.updateCounter(), + "process last=" + lastFiredEvt + + " cntr=" + e.getKey() + + " key=" + (pending.isFiltered() ? "filtered" : pending.key().value(cctx.cacheObjectContext(), false)) + + " val=" + (pending.isFiltered() ? "filtered" : pending.value().value(cctx.cacheObjectContext(), false)) + + " topVer=" + e.getValue().topologyVersion() + + " f=" + pending.filteredCount()); + lastFiredEvt = e.getKey(); if (e.getValue() != HOLE && !e.getValue().isFiltered()) @@ -1166,8 +1201,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler iter.remove(); } - else + else { + TestDebugLog.addEntryMessage(entry.partition(), + entry.updateCounter(), + "stop process last=" + lastFiredEvt + " cntr=" + e.getKey() + " topVer=" + e.getValue().topologyVersion() + " f=" + pending.filteredCount()); + break; + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8adfe7d7/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 5ec9a6e..1462aae 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -2891,24 +2891,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati allInetAddrs.add(addr.getAddress()); } - List<InetAddress> reachableInetAddrs = U.filterReachable(allInetAddrs); - - if (reachableInetAddrs.size() < allInetAddrs.size()) { - LinkedHashSet<InetSocketAddress> addrs0 = U.newLinkedHashSet(addrs.size()); - - List<InetSocketAddress> unreachableInetAddr = new ArrayList<>(allInetAddrs.size() - reachableInetAddrs.size()); - - for (InetSocketAddress addr : addrs) { - if (reachableInetAddrs.contains(addr.getAddress())) - addrs0.add(addr); - else - unreachableInetAddr.add(addr); - } - - addrs0.addAll(unreachableInetAddr); - - addrs = addrs0; - } +// List<InetAddress> reachableInetAddrs = U.filterReachable(allInetAddrs); +// +// if (reachableInetAddrs.size() < allInetAddrs.size()) { +// LinkedHashSet<InetSocketAddress> addrs0 = U.newLinkedHashSet(addrs.size()); +// +// List<InetSocketAddress> unreachableInetAddr = new ArrayList<>(allInetAddrs.size() - reachableInetAddrs.size()); +// +// for (InetSocketAddress addr : addrs) { +// if (reachableInetAddrs.contains(addr.getAddress())) +// addrs0.add(addr); +// else +// unreachableInetAddr.add(addr); +// } +// +// addrs0.addAll(unreachableInetAddr); +// +// addrs = addrs0; +// } if (log.isDebugEnabled()) log.debug("Addresses to connect for node [rmtNode=" + node.id() + ", addrs=" + addrs.toString() + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/8adfe7d7/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TestDebugLog.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TestDebugLog.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TestDebugLog.java new file mode 100644 index 0000000..038ea0c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TestDebugLog.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.text.*; +import java.util.*; + +/** + * TODO + */ +public class TestDebugLog { + /** */ + private static final List<Object> msgs = Collections.synchronizedList(new ArrayList<>(100_000)); + + /** */ + private static final SimpleDateFormat DEBUG_DATE_FMT = new SimpleDateFormat("HH:mm:ss,SSS"); + + static class Message { + String thread = Thread.currentThread().getName(); + + String msg; + + long ts = U.currentTimeMillis(); + + public Message(String msg) { + this.msg = msg; + } + + public String toString() { + return "Msg [msg=" + msg + ", thread=" + thread + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']'; + } + } + + static class EntryMessage extends Message { + Object key; + Object val; + + public EntryMessage(Object key, Object val, String msg) { + super(msg); + + this.key = key; + this.val = val; + } + + public String toString() { + return "EntryMsg [key=" + key + ", val=" + val + ", msg=" + msg + ", thread" + thread + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']'; + } + } + + static final boolean out = false; + + public static void addMessage(String msg) { + msgs.add(new Message(msg)); + + if (out) + System.out.println(msg); + } + + public static void addEntryMessage(Object key, Object val, String msg) { + EntryMessage msg0 = new EntryMessage(key, val, msg); + + msgs.add(msg0); + + if (out) + System.out.println(msg0.toString()); + } + + public static void printMessages(boolean file) { + List<Object> msgs0; + + synchronized (msgs) { + msgs0 = new ArrayList<>(msgs); + + msgs.clear(); + } + + if (file) { + try { + FileOutputStream out = new FileOutputStream("test_debug.log"); + + PrintWriter w = new PrintWriter(out); + + for (Object msg : msgs0) + w.println(msg.toString()); + + w.close(); + + out.close(); + } + catch (IOException e) { + e.printStackTrace(); + } + } + else { + for (Object msg : msgs0) + System.out.println(msg); + } + } + + public static void printKeyMessages(boolean file, Object key) { + List<Object> msgs0; + + synchronized (msgs) { + msgs0 = new ArrayList<>(msgs); + + msgs.clear(); + } + + if (file) { + try { + FileOutputStream out = new FileOutputStream("test_debug.log"); + + PrintWriter w = new PrintWriter(out); + + for (Object msg : msgs0) { + if (msg instanceof EntryMessage && !((EntryMessage)msg).key.equals(key)) + continue; + + w.println(msg.toString()); + } + + w.close(); + + out.close(); + } + catch (IOException e) { + e.printStackTrace(); + } + } + else { + for (Object msg : msgs0) { + if (msg instanceof EntryMessage && !((EntryMessage)msg).key.equals(key)) + continue; + + System.out.println(msg); + } + } + } + + public static void clear() { + msgs.clear(); + } + + public static void main(String[] args) { + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8adfe7d7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index befd1d7..050af5d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -95,6 +95,7 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.communication.tcp.TestDebugLog; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -1222,8 +1223,15 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC if (!lostAllow && lostEvts.size() > 100) { log.error("Lost event cnt: " + lostEvts.size()); - for (T3<Object, Object, Object> e : lostEvts) - log.error("Lost event: " + e); + for (T3<Object, Object, Object> e : lostEvts) { + log.error("Lost event: " + ignite(0).affinity(DEFAULT_CACHE_NAME).partition(e.get1()) + " " + e); + + TestDebugLog.addEntryMessage(ignite(0).affinity(DEFAULT_CACHE_NAME).partition(e.get1()), e.get2(), "lost event " + e.get1() + " " + e.get2()); + + TestDebugLog.printKeyMessages(true, ignite(0).affinity(DEFAULT_CACHE_NAME).partition(e.get1())); + + System.exit(1); + } fail("Lose events, see log for details."); } @@ -1645,10 +1653,6 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC QueryCursor<?> cur = qryClnCache.query(qry); - CacheEventListener2 dinLsnr = null; - - QueryCursor<?> dinQry = null; - final AtomicBoolean stop = new AtomicBoolean(); final AtomicReference<CountDownLatch> checkLatch = new AtomicReference<>(); @@ -1659,22 +1663,24 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC final int idx = ThreadLocalRandom.current().nextInt(SRV_NODES - 1); log.info("Stop node: " + idx); + TestDebugLog.addMessage("Stop node: " + idx); awaitPartitionMapExchange(); - Thread.sleep(400); + Thread.sleep(100); stopGrid(idx); awaitPartitionMapExchange(); - Thread.sleep(400); + Thread.sleep(100); log.info("Start node: " + idx); + TestDebugLog.addMessage("Start node: " + idx); startGrid(idx); - Thread.sleep(200); + Thread.sleep(100); CountDownLatch latch = new CountDownLatch(1); @@ -1695,7 +1701,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC final Map<Integer, List<T2<Integer, Integer>>> expEvts = new HashMap<>(); - final List<T3<Object, Object, Object>> expEvtsNewLsnr = new ArrayList<>(); + //final List<T3<Object, Object, Object>> expEvtsNewLsnr = new ArrayList<>(); final List<T3<Object, Object, Object>> expEvtsLsnr = new ArrayList<>(); @@ -1703,7 +1709,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC long stopTime = System.currentTimeMillis() + 60_000; // Start new filter each 5 sec. - long startFilterTime = System.currentTimeMillis() + 5_000; + //long startFilterTime = System.currentTimeMillis() + 5_000; final int PARTS = qryClient.affinity(DEFAULT_CACHE_NAME).partitions(); @@ -1719,30 +1725,30 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC Integer prevVal = vals.get(key); Integer val = vals.get(key); - if (System.currentTimeMillis() > startFilterTime) { - // Stop filter and check events. - if (dinQry != null) { - dinQry.close(); - - log.info("Continuous query listener closed. Await events: " + expEvtsNewLsnr.size()); - - checkEvents(expEvtsNewLsnr, dinLsnr, backups == 0); - } - - dinLsnr = new CacheEventListener2(); - - ContinuousQuery<Object, Object> newQry = new ContinuousQuery<>(); - - newQry.setLocalListener(dinLsnr); - - newQry.setRemoteFilter(asyncCallback() ? new CacheEventAsyncFilter() : new CacheEventFilter()); - - dinQry = qryClnCache.query(newQry); - - log.info("Continuous query listener started."); - - startFilterTime = System.currentTimeMillis() + 5_000; - } +// if (System.currentTimeMillis() > startFilterTime) { +// // Stop filter and check events. +// if (dinQry != null) { +// dinQry.close(); +// +// log.info("Continuous query listener closed. Await events: " + expEvtsNewLsnr.size()); +// +// checkEvents(expEvtsNewLsnr, dinLsnr, backups == 0); +// } +// +// dinLsnr = new CacheEventListener2(); +// +// ContinuousQuery<Object, Object> newQry = new ContinuousQuery<>(); +// +// newQry.setLocalListener(dinLsnr); +// +// newQry.setRemoteFilter(asyncCallback() ? new CacheEventAsyncFilter() : new CacheEventFilter()); +// +// dinQry = qryClnCache.query(newQry); +// +// log.info("Continuous query listener started."); +// +// startFilterTime = System.currentTimeMillis() + 5_000; +// } if (val == null) val = 0; @@ -1752,20 +1758,23 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC if (filtered) val = -val; - if (processorPut && prevVal != null) { - qryClnCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() { - @Override public Void process(MutableEntry<Object, Object> entry, - Object... arguments) throws EntryProcessorException { - entry.setValue(arguments[0]); +// if (processorPut && prevVal != null) { +// qryClnCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() { +// @Override public Void process(MutableEntry<Object, Object> entry, +// Object... arguments) throws EntryProcessorException { +// entry.setValue(arguments[0]); +// +// return null; +// } +// }, val); +// } +// else - return null; - } - }, val); - } - else - qryClnCache.put(key, val); + TestDebugLog.addEntryMessage(ignite(4).affinity(DEFAULT_CACHE_NAME).partition(key), val, "do put " + key + " " + val); - processorPut = !processorPut; + qryClnCache.put(key, val); + + //processorPut = !processorPut; vals.put(key, val); @@ -1784,8 +1793,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC expEvtsLsnr.add(tupVal); - if (dinQry != null) - expEvtsNewLsnr.add(tupVal); +// if (dinQry != null) +// expEvtsNewLsnr.add(tupVal); } filtered = !filtered; @@ -1793,7 +1802,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC CountDownLatch latch = checkLatch.get(); if (latch != null) { - log.info("Check events."); + log.info("Check events " + expEvtsLsnr.size()); checkLatch.set(null); @@ -1808,6 +1817,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC success = true; log.info("Events checked."); + + //TestDebugLog.clear(); } finally { if (!success) @@ -1834,12 +1845,12 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC lsnr.evts.clear(); lsnr.vals.clear(); - if (dinQry != null) { - checkEvents(expEvtsNewLsnr, dinLsnr, backups == 0); - - dinLsnr.evts.clear(); - dinLsnr.vals.clear(); - } +// if (dinQry != null) { +// checkEvents(expEvtsNewLsnr, dinLsnr, backups == 0); +// +// dinLsnr.evts.clear(); +// dinLsnr.vals.clear(); +// } List<T3<Object, Object, Object>> afterRestEvts = new ArrayList<>(); @@ -1855,11 +1866,11 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC cur.close(); - if (dinQry != null) { - checkEvents(new ArrayList<>(afterRestEvts), dinLsnr, false); - - dinQry.close(); - } +// if (dinQry != null) { +// checkEvents(new ArrayList<>(afterRestEvts), dinLsnr, false); +// +// dinQry.close(); +// } assertFalse("Unexpected error during test, see log for details.", err); }
