IGNITE-2468
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/725d6cb5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/725d6cb5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/725d6cb5 Branch: refs/heads/ignite-2542 Commit: 725d6cb557684ac8f31dfde8f5fcb4ddb95a18dd Parents: 763bf57 Author: Anton Vinogradov <[email protected]> Authored: Fri Feb 12 14:08:25 2016 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Fri Feb 12 14:08:25 2016 +0300 ---------------------------------------------------------------------- .../internal/GridMessageListenHandler.java | 16 ++ .../continuous/GridContinuousProcessor.java | 50 +++-- ...eClientReconnectContinuousProcessorTest.java | 32 +++- ...IgniteCacheContinuousQueryReconnectTest.java | 192 +++++++++++++++++++ .../tcp/TcpClientDiscoverySpiSelfTest.java | 2 +- .../IgniteCacheQuerySelfTestSuite.java | 2 + 6 files changed, 279 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java index 13aeb54..bf81944 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java @@ -83,6 +83,22 @@ public class GridMessageListenHandler implements GridContinuousHandler { this.pred = pred; } + /** + * + * @param orig Handler to be copied. + */ + public GridMessageListenHandler(GridMessageListenHandler orig) { + assert orig != null; + + this.clsName = orig.clsName; + this.depInfo = orig.depInfo; + this.pred = orig.pred; + this.predBytes = orig.predBytes; + this.topic = orig.topic; + this.topicBytes = orig.topicBytes; + this.depEnabled = false; + } + /** {@inheritDoc} */ @Override public boolean isEvents() { return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 0218897..496f820 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -40,6 +40,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.GridMessageListenHandler; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteDeploymentCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; @@ -428,11 +429,17 @@ public class GridContinuousProcessor extends GridProcessorAdapter { ctx.resource().injectGeneric(item.prjPred); // Register handler only if local node passes projection predicate. - if (item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) { + if ((item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) && + !locInfos.containsKey(item.routineId)) { if (registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval, item.autoUnsubscribe, false)) item.hnd.onListenerRegistered(item.routineId, ctx); } + + if (!item.autoUnsubscribe) + // Register routine locally. + locInfos.putIfAbsent(item.routineId, new LocalRoutineInfo( + item.prjPred, item.hnd, item.bufSize, item.interval, item.autoUnsubscribe)); } catch (IgniteCheckedException e) { U.error(log, "Failed to register continuous handler.", e); @@ -854,6 +861,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter { U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e); } + GridContinuousHandler hnd0 = hnd instanceof GridMessageListenHandler ? + new GridMessageListenHandler((GridMessageListenHandler)hnd) : + hnd; + if (node.isClient()) { Map<UUID, LocalRoutineInfo> clientRoutineMap = clientInfos.get(node.id()); @@ -866,7 +877,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } clientRoutineMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(), - hnd, + hnd0, data.bufferSize(), data.interval(), data.autoUnsubscribe())); @@ -881,10 +892,16 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (prjPred != null) ctx.resource().injectGeneric(prjPred); - if (prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) { - registered = registerHandler(node.id(), routineId, hnd, data.bufferSize(), data.interval(), + if ((prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) && + !locInfos.containsKey(routineId)) { + registered = registerHandler(node.id(), routineId, hnd0, data.bufferSize(), data.interval(), data.autoUnsubscribe(), false); } + + if (!data.autoUnsubscribe()) + // Register routine locally. + locInfos.putIfAbsent(routineId, new LocalRoutineInfo( + prjPred, hnd0, data.bufferSize(), data.interval(), data.autoUnsubscribe())); } catch (IgniteCheckedException e) { err = e; @@ -894,11 +911,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } // Load partition counters. - if (hnd.isQuery()) { + if (hnd0.isQuery()) { GridCacheProcessor proc = ctx.cache(); if (proc != null) { - GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName()); + GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName()); if (cache != null && !cache.isLocal()) { Map<Integer, Long> cntrs = cache.context().topology().updateCounters(); @@ -912,7 +929,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { req.addError(ctx.localNodeId(), err); if (registered) - hnd.onListenerRegistered(routineId, ctx); + hnd0.onListenerRegistered(routineId, ctx); } /** @@ -1095,22 +1112,31 @@ public class GridContinuousProcessor extends GridProcessorAdapter { */ @SuppressWarnings("TooBroadScope") private void unregisterRemote(UUID routineId) { - RemoteRoutineInfo info; + RemoteRoutineInfo remote; + LocalRoutineInfo loc; stopLock.lock(); try { - info = rmtInfos.remove(routineId); + remote = rmtInfos.remove(routineId); - if (info == null) + loc = locInfos.remove(routineId); + + if (remote == null) stopped.add(routineId); } finally { stopLock.unlock(); } - if (info != null) - unregisterHandler(routineId, info.hnd, false); + if (remote != null) + unregisterHandler(routineId, remote.hnd, false); + else { + assert loc != null; + + // Removes routine at node started it when stopRoutine called from another node. + unregisterHandler(routineId, loc.hnd, false); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java index dc94c96..4c44adc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java @@ -113,7 +113,21 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe /** * @throws Exception If failed. */ - public void testMessageListenerReconnect() throws Exception { + public void testMessageListenerReconnectAndStopFromServer() throws Exception { + testMessageListenerReconnect(false); + } + + /** + * @throws Exception If failed. + */ + public void testMessageListenerReconnectAndStopFromClient() throws Exception { + testMessageListenerReconnect(true); + } + + /** + * @throws Exception If failed. + */ + private void testMessageListenerReconnect(boolean stopFromClient) throws Exception { Ignite client = grid(serverCount()); assertTrue(client.cluster().localNode().isClient()); @@ -166,7 +180,7 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe log.info("Stop listen, should not get remote messages anymore."); - client.message().stopRemoteListen(opId); + (stopFromClient ? client : srv).message().stopRemoteListen(opId); srv.message().send(topic, "msg3"); @@ -175,6 +189,20 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe assertTrue(locLsnr.latch.await(5000, MILLISECONDS)); assertFalse(latch.await(3000, MILLISECONDS)); + + log.info("New nodes should not register stopped listeners."); + + startGrid(serverCount() + 1); + + srv.message().send(topic, "msg4"); + + locLsnr.latch = new CountDownLatch(1); + latch = new CountDownLatch(1); + + assertTrue(locLsnr.latch.await(5000, MILLISECONDS)); + assertFalse(latch.await(3000, MILLISECONDS)); + + stopGrid(serverCount() + 1); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryReconnectTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryReconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryReconnectTest.java new file mode 100644 index 0000000..b1d8a49 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryReconnectTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; +import javax.cache.CacheException; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteClientDisconnectedException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteCacheContinuousQueryReconnectTest extends GridCommonAbstractTest implements Serializable { + /** */ + final private static AtomicInteger cnt = new AtomicInteger(); + + /** */ + private volatile boolean isClient = false; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setAtomicityMode(atomicMode()); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setBackups(1); + + cfg.setCacheConfiguration(ccfg); + + if (isClient) + cfg.setClientMode(true); + + return cfg; + } + + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @return Atomic mode. + */ + protected CacheAtomicityMode atomicMode() { + return ATOMIC; + } + + /** + * @throws Exception If failed. + */ + public void testReconnectServer() throws Exception { + testReconnect(false); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectClient() throws Exception { + testReconnect(true); + } + + /** + * + */ + private void putAndCheck(IgniteCache<Object, Object> cache, int diff) { + cnt.set(0); + + cache.put(1, "1"); + + assertEquals(diff, cnt.get()); + } + + /** + * @throws Exception If failed. + */ + private void testReconnect(boolean clientQuery) throws Exception { + Ignite srv1 = startGrid(0); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { + // No-op. + } + }); + + qry.setAutoUnsubscribe(false); + + qry.setRemoteFilter(new CacheEntryEventSerializableFilter<Object, Object>() { + @Override public boolean evaluate(CacheEntryEvent<?, ?> event) throws CacheEntryListenerException { + cnt.incrementAndGet(); + + return true; + } + }); + + isClient = true; + + Ignite client = startGrid(1); + + isClient = false; + + IgniteCache<Object, Object> cache1 = srv1.cache(null); + IgniteCache<Object, Object> clCache = client.cache(null); + + putAndCheck(clCache, 0); // 0 remote listeners. + + QueryCursor<Cache.Entry<Object, Object>> cur = (clientQuery ? clCache : cache1).query(qry); + + putAndCheck(clCache, 1); // 1 remote listener. + + final Ignite srv2 = startGrid(2); + + putAndCheck(clCache, 2); // 2 remote listeners. + + stopGrid(0); + + while (true) { + try { + clCache.get(1); + + break; + } + catch (IgniteClientDisconnectedException e) { + e.reconnectFuture().get(); // Wait for reconnect. + + } + catch (CacheException e) { + if (e.getCause() instanceof IgniteClientDisconnectedException) + ((IgniteClientDisconnectedException)e.getCause()).reconnectFuture().get(); // Wait for reconnect. + } + } + + putAndCheck(clCache, 1); // 1 remote listener. + + Ignite srv3 = startGrid(3); + + putAndCheck(clCache, 2); // 2 remote listeners. + + stopGrid(1); // Client node. + + isClient = true; + + client = startGrid(4); + + isClient = false; + + clCache = client.cache(null); + + putAndCheck(clCache, 2); // 2 remote listeners. + + Ignite srv4 = startGrid(5); + + putAndCheck(clCache, 3); // 3 remote listeners. + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 030c653..7debb41 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -2050,7 +2050,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override public boolean apply(UUID uuid, Object msg) { - X.println(">>> Received [locNodeId=" + ignite.configuration().getNodeId() + ", msg=" + msg + ']'); + X.println(">>> Received [node=" + ignite.name() + ", msg=" + msg + ']'); msgLatch.countDown(); http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 3cd4579..cecb8ad 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -100,6 +100,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest; import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest; import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTxReconnectTest; +import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryReconnectTest; import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryAtomicSelfTest; import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryLocalSelfTest; import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryPartitionedSelfTest; @@ -200,6 +201,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(GridCacheContinuousQueryPartitionTxOneNodeTest.class); suite.addTestSuite(GridCacheContinuousQueryPartitionAtomicOneNodeTest.class); suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class); + suite.addTestSuite(IgniteCacheContinuousQueryReconnectTest.class); suite.addTestSuite(IgniteCacheContinuousQueryClientReconnectTest.class); suite.addTestSuite(IgniteCacheContinuousQueryClientTxReconnectTest.class); suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest.class);
