Repository: ignite Updated Branches: refs/heads/ignite-426-2-reb 99a5f20cd -> 805997714
IGNITE-426 Fixed tests. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/80599771 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/80599771 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/80599771 Branch: refs/heads/ignite-426-2-reb Commit: 805997714759c885b0b539e168ed94f34232a127 Parents: 99a5f20 Author: Tikhonov Nikolay <[email protected]> Authored: Wed Nov 4 15:10:13 2015 +0300 Committer: Tikhonov Nikolay <[email protected]> Committed: Wed Nov 4 15:10:13 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 32 ++++ .../dht/GridDhtPartitionTopologyImpl.java | 2 - ...ContinuousQueryFailoverAbstractSelfTest.java | 186 ------------------- 3 files changed, 32 insertions(+), 188 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/80599771/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 9273f5b..86051ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1944,6 +1944,38 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme "[entry=" + this + ", newVer=" + newVer + ']'); } + if (!cctx.isNear()) { + CacheObject evtVal; + + if (op == GridCacheOperation.TRANSFORM) { + EntryProcessor<Object, Object, ?> entryProcessor = + (EntryProcessor<Object, Object, ?>)writeObj; + + CacheInvokeEntry<Object, Object> entry = + new CacheInvokeEntry<>(cctx, key, prevVal, version()); + + try { + entryProcessor.process(entry, invokeArgs); + + evtVal = entry.modified() ? + cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal; + } + catch (Exception e) { + evtVal = prevVal; + } + } + else + evtVal = (CacheObject)writeObj; + + updateIdx0 = nextPartIndex(topVer); + + if (updateIdx != null) + updateIdx0 = updateIdx; + + cctx.continuousQueries().onEntryUpdated(GridCacheMapEntry.this, key, evtVal, + prevVal, primary, false, updateIdx0, topVer); + } + return new GridCacheUpdateAtomicResult(false, retval ? rawGetOrUnmarshalUnlocked(false) : null, null, http://git-wip-us.apache.org/repos/asf/ignite/blob/80599771/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index d30cc88..5186589 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -1258,8 +1258,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (part.own()) { updateLocal(part.id(), loc.id(), part.state(), updateSeq.incrementAndGet()); - updateRebalanceVersion(); - consistencyCheck(); return true; http://git-wip-us.apache.org/repos/asf/ignite/blob/80599771/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index f866424..c0d22e3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -1470,192 +1470,6 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** * @throws Exception If failed. */ - public void testFailoverFilter() throws Exception { - this.backups = 2; - - final int SRV_NODES = 4; - - startGridsMultiThreaded(SRV_NODES); - - client = true; - - Ignite qryClient = startGrid(SRV_NODES); - - client = false; - - IgniteCache<Object, Object> qryClientCache = qryClient.cache(null); - - final CacheEventListener2 lsnr = new CacheEventListener2(); - - ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); - - qry.setLocalListener(lsnr); - - qry.setRemoteFilter(new CacheEventFilter()); - - QueryCursor<?> cur = qryClientCache.query(qry); - - final AtomicBoolean stop = new AtomicBoolean(); - - final AtomicReference<CountDownLatch> checkLatch = new AtomicReference<>(); - - IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - final int idx = SRV_NODES + 1; - - while (!stop.get() && !err) { - log.info("Start node: " + idx); - - startGrid(idx); - - Thread.sleep(200); - - log.info("Stop node: " + idx); - - stopGrid(idx); - - CountDownLatch latch = new CountDownLatch(1); - - assertTrue(checkLatch.compareAndSet(null, latch)); - - if (!stop.get()) { - log.info("Wait for event check."); - - assertTrue(latch.await(1, MINUTES)); - } - } - - return null; - } - }); - - final Map<Integer, Integer> vals = new HashMap<>(); - - final Map<Integer, List<T2<Integer, Integer>>> expEvts = new HashMap<>(); - - try { - long stopTime = System.currentTimeMillis() + 60_000; - - final int PARTS = qryClient.affinity(null).partitions(); - - ThreadLocalRandom rnd = ThreadLocalRandom.current(); - - boolean filtered = false; - - boolean processorPut = false; - - while (System.currentTimeMillis() < stopTime) { - Integer key = rnd.nextInt(PARTS); - - Integer prevVal = vals.get(key); - Integer val = vals.get(key); - - if (val == null) - val = 0; - else - val = Math.abs(val) + 1; - - if (filtered) - val = -val; - - if (processorPut && prevVal != null) { - qryClientCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() { - @Override public Void process(MutableEntry<Object, Object> entry, - Object... arguments) throws EntryProcessorException { - entry.setValue(arguments[0]); - - return null; - } - }, val); - } - else - qryClientCache.put(key, val); - - processorPut = !processorPut; - - vals.put(key, val); - - if (val >= 0) { - List<T2<Integer, Integer>> keyEvts = expEvts.get(key); - - if (keyEvts == null) { - keyEvts = new ArrayList<>(); - - expEvts.put(key, keyEvts); - } - - keyEvts.add(new T2<>(val, prevVal)); - } - - filtered = !filtered; - - CountDownLatch latch = checkLatch.get(); - - if (latch != null) { - log.info("Check events."); - - checkLatch.set(null); - - boolean success = false; - - try { - if (err) - break; - - boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return checkEvents(false, expEvts, lsnr); - } - }, 10_000); - - if (!check) - assertTrue(checkEvents(true, expEvts, lsnr)); - - success = true; - - log.info("Events checked."); - } - finally { - if (!success) - err = true; - - latch.countDown(); - } - } - } - } - finally { - stop.set(true); - } - - CountDownLatch latch = checkLatch.get(); - - if (latch != null) - latch.countDown(); - - restartFut.get(); - - boolean check = true; - - if (!expEvts.isEmpty()) { - check = GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return checkEvents(false, expEvts, lsnr); - } - }, 10_000); - } - - if (!check) - assertTrue(checkEvents(true, expEvts, lsnr)); - - cur.close(); - - assertFalse("Unexpected error during test, see log for details.", err); - } - - /** - * @throws Exception If failed. - */ public void testFailoverStartStopBackup() throws Exception { failoverStartStopFilter(2); }
