http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureProcessorSelfTest.java new file mode 100644 index 0000000..82e29db --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureProcessorSelfTest.java @@ -0,0 +1,541 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.closure; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.testframework.*; +import org.gridgain.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Tests for {@link GridClosureProcessor}. + */ +@GridCommonTest(group = "Closure Processor") +public class GridClosureProcessorSelfTest extends GridCommonAbstractTest { + /** Number of grids started for tests. Should not be less than 2. */ + private static final int NODES_CNT = 2; + + /** Job sleep duration in order to initiate timeout exception. */ + private static final long JOB_SLEEP = 200; + + /** Timeout used in timed tests. */ + private static final long JOB_TIMEOUT = 100; + + /** IP finder. */ + private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(discoSpi); + + cfg.setCacheConfiguration(); + + return cfg; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"ConstantConditions"}) + @Override protected void beforeTestsStarted() throws Exception { + assert NODES_CNT >= 2; + + startGrids(NODES_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + execCntr.set(0); + } + + /** Execution counter for runnable and callable jobs. */ + private static AtomicInteger execCntr = new AtomicInteger(0); + + /** + * Test runnable job. + */ + private static class TestRunnable implements IgniteRunnable { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** @{inheritDoc} */ + @Override public void run() { + log.info("Runnable job executed on node: " + ignite.cluster().localNode().id()); + + assert ignite != null; + + execCntr.incrementAndGet(); + } + } + + /** + * Base class for test callables. + */ + private abstract static class AbstractTestCallable implements IgniteCallable<Integer> { + /** */ + @IgniteInstanceResource + protected Ignite ignite; + + /** */ + @IgniteLoggerResource + protected IgniteLogger log; + } + + /** + * Test callable job. + */ + private static class TestCallable extends AbstractTestCallable { + /** {@inheritDoc} */ + @Override public Integer call() { + log.info("Callable job executed on node: " + ignite.cluster().localNode().id()); + + assert ignite != null; + + return execCntr.incrementAndGet(); + } + } + + /** + * Test callable job which throws class not found exception. + */ + private static class TestCallableError extends AbstractTestCallable implements Externalizable { + /** + * + */ + public TestCallableError() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Integer call() { + log.info("Callable job executed on node: " + ignite.cluster().localNode().id()); + + return null; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + throw new ClassNotFoundException(); + } + } + + /** + * Test callable job which sleeps for some time. Is used in timeout tests. + */ + private static class TestCallableTimeout extends AbstractTestCallable { + /** {@inheritDoc} */ + @Override public Integer call() throws Exception { + Thread.sleep(JOB_SLEEP); + + return null; + } + } + + /** + * @param idx Node index. + * @param job Runnable job. + * @param p Optional node predicate. + * @return Future object. + * @throws IgniteCheckedException If failed. + */ + private IgniteFuture<?> runAsync(int idx, Runnable job, @Nullable IgnitePredicate<ClusterNode> p) + throws IgniteCheckedException { + assert idx >= 0 && idx < NODES_CNT; + assert job != null; + + execCntr.set(0); + + IgniteCompute comp = p != null ? compute(grid(idx).forPredicate(p)) : grid(idx).compute(); + + comp = comp.enableAsync(); + + comp.run(job); + + return comp.future(); + } + + /** + * @param idx Node index. + * @param job Runnable job. + * @param p Optional node predicate. + * @return Future object. + * @throws IgniteCheckedException If failed. + */ + private IgniteFuture<?> broadcast(int idx, Runnable job, @Nullable IgnitePredicate<ClusterNode> p) + throws IgniteCheckedException { + assert idx >= 0 && idx < NODES_CNT; + assert job != null; + + execCntr.set(0); + + ClusterGroup prj = grid(idx); + + if (p != null) + prj = prj.forPredicate(p); + + IgniteCompute comp = compute(prj).enableAsync(); + + comp.broadcast(job); + + return comp.future(); + } + + /** + * @param idx Node index. + * @param jobs Runnable jobs. + * @param p Optional node predicate. + * @return Future object. + * @throws IgniteCheckedException If failed. + */ + private IgniteFuture<?> runAsync(int idx, Collection<TestRunnable> jobs, @Nullable IgnitePredicate<ClusterNode> p) + throws IgniteCheckedException { + assert idx >= 0 && idx < NODES_CNT; + assert !F.isEmpty(jobs); + + execCntr.set(0); + + IgniteCompute comp = p != null ? compute(grid(idx).forPredicate(p)) : grid(idx).compute(); + + comp = comp.enableAsync(); + + comp.run(jobs); + + return comp.future(); + } + + /** + * @param idx Node index. + * @param job Callable job. + * @param p Optional node predicate. + * @return Future object. + * @throws IgniteCheckedException If failed. + */ + private IgniteFuture<Integer> callAsync(int idx, Callable<Integer> job, @Nullable IgnitePredicate<ClusterNode> p) + throws IgniteCheckedException { + assert idx >= 0 && idx < NODES_CNT; + assert job != null; + + execCntr.set(0); + + IgniteCompute comp = p != null ? compute(grid(idx).forPredicate(p)) : grid(idx).compute(); + + comp = comp.enableAsync(); + + comp.call(job); + + return comp.future(); + } + + /** + * @param idx Node index. + * @param job Callable job. + * @param p Optional node predicate. + * @return Future object. + * @throws IgniteCheckedException If failed. + */ + private IgniteFuture<Collection<Integer>> broadcast(int idx, Callable<Integer> job, + @Nullable IgnitePredicate<ClusterNode> p) throws IgniteCheckedException { + assert idx >= 0 && idx < NODES_CNT; + assert job != null; + + execCntr.set(0); + + IgniteCompute comp = p != null ? compute(grid(idx).forPredicate(p)) : grid(idx).compute(); + + comp = comp.enableAsync(); + + comp.broadcast(job); + + return comp.future(); + } + + /** + * @param idx Node index. + * @param jobs Callable job. + * @param p Optional node predicate. + * @return Future object. + * @throws IgniteCheckedException If failed. + */ + private IgniteFuture<Collection<Integer>> callAsync(int idx, Collection<TestCallable> jobs, + @Nullable IgnitePredicate<ClusterNode> p) throws IgniteCheckedException { + assert idx >= 0 && idx < NODES_CNT; + assert !F.isEmpty(jobs); + + execCntr.set(0); + + IgniteCompute comp = p != null ? compute(grid(idx).forPredicate(p)) : grid(idx).compute(); + + comp = comp.enableAsync(); + + comp.call(jobs); + + return comp.future(); + } + + /** + * @param idx Node index. + * @return Predicate. + */ + private IgnitePredicate<ClusterNode> singleNodePredicate(final int idx) { + assert idx >= 0 && idx < NODES_CNT; + + return new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode e) { return grid(idx).localNode().id().equals(e.id()); } + }; + } + + /** + * @throws Exception If failed. + */ + public void testRunAsyncSingle() throws Exception { + Runnable job = new TestRunnable(); + + IgniteFuture<?> fut = broadcast(0, job, null); + + assert fut.get() == null; + + assertEquals(NODES_CNT, execCntr.getAndSet(0)); + + fut = broadcast(0, job, singleNodePredicate(0)); + + assert fut.get() == null; + + assertEquals(1, execCntr.get()); + + fut = runAsync(0, job, null); + + assert fut.get() == null : "Execution result must be null."; + + assert execCntr.get() == 1 : + "Execution counter must be equal to 1, actual: " + execCntr.get(); + } + + /** + * @throws Exception If failed. + */ + public void testRunAsyncMultiple() throws Exception { + Collection<TestRunnable> jobs = F.asList(new TestRunnable(), new TestRunnable()); + + IgniteFuture<?> fut = runAsync(0, jobs, null); + + assert fut.get() == null : "Execution result must be null."; + + assert execCntr.get() == jobs.size() : + "Execution counter must be equal to " + jobs.size() + ", actual: " + execCntr.get(); + } + + /** + * @throws Exception If failed. + */ + public void testCallAsyncSingle() throws Exception { + Callable<Integer> job = new TestCallable(); + + IgniteFuture<Collection<Integer>> fut1 = broadcast(0, job, null); + + assert fut1.get() != null; + + assertEquals(NODES_CNT, execCntr.getAndSet(0)); + + fut1 = broadcast(0, job, singleNodePredicate(0)); + + // We left one node so we can get definite result. + assertEquals(Integer.valueOf(1), F.first(fut1.get())); + + assertEquals(1, execCntr.get()); + + IgniteFuture<Integer> fut2 = callAsync(0, job, null); + + assert fut2.get() == 1 : + "Execution result must be equal to 1, actual: " + fut2.get(); + + assert execCntr.get() == 1 : + "Execution counter must be equal to 1, actual: " + execCntr.get(); + } + + /** + * @throws Exception If failed. + */ + public void testCallAsyncErrorNoFailover() throws Exception { + IgniteCompute comp = compute(grid(0).forPredicate(F.notEqualTo(grid(0).localNode()))).enableAsync(); + + comp.withNoFailover().call(new TestCallableError()); + + IgniteFuture<Integer> fut = comp.future(); + + try { + fut.get(); + + assert false : "Exception should have been thrown."; + } + catch (IgniteCheckedException e) { + info("Caught expected exception: " + e); + } + } + + /** + * @throws Exception If failed. + */ + public void testWithName() throws Exception { + grid(0).compute().withName("TestTaskName").call(new TestCallable()); + } + + /** + * @throws Exception If failed. + */ + public void testWithTimeout() throws Exception { + Collection<TestCallableTimeout> jobs = F.asList(new TestCallableTimeout()); + + boolean timedOut = false; + + try { + // Ensure that we will get timeout exception. + grid(0).compute().withTimeout(JOB_TIMEOUT).call(jobs); + } + catch (ComputeTaskTimeoutException ignore) { + timedOut = true; + } + + assert timedOut : "Task has not timed out."; + + timedOut = false; + + try { + // Previous task invocation cleared the timeout. + grid(0).compute().call(jobs); + } + catch (ComputeTaskTimeoutException ignore) { + timedOut = true; + } + + assert !timedOut : "Subsequently called task has timed out."; + } + + /** + * @throws Exception If failed. + */ + public void testCallAsyncMultiple() throws Exception { + Collection<TestCallable> jobs = F.asList(new TestCallable(), new TestCallable()); + + IgniteFuture<Collection<Integer>> fut = callAsync(0, jobs, null); + + Collection<Integer> results = fut.get(); + + assert !results.isEmpty() : "Collection of results is empty."; + + assert results.size() == jobs.size() : + "Collection of results must be of size: " + jobs.size() + "."; + + for (int i = 1; i <= jobs.size(); i++) + assert results.contains(i) : "Collection of results does not contain value: " + i; + } + + /** + * @throws Exception If failed. + */ + public void testReduceAsync() throws Exception { + Collection<TestCallable> jobs = F.asList(new TestCallable(), new TestCallable()); + + IgniteCompute comp = grid(0).compute().enableAsync(); + + comp.call(jobs, F.sumIntReducer()); + + IgniteFuture<Integer> fut = comp.future(); + + // Sum of arithmetic progression. + int exp = (1 + jobs.size()) * jobs.size() / 2; + + assert fut.get() == exp : + "Execution result must be equal to " + exp + ", actual: " + fut.get(); + + assert execCntr.get() == jobs.size() : + "Execution counter must be equal to " + jobs.size() + ", actual: " + execCntr.get(); + + execCntr.set(0); + } + + /** + * @throws Exception If failed. + */ + public void testReducerError() throws Exception { + final Ignite g = grid(0); + + final Collection<Callable<Integer>> jobs = new ArrayList<>(); + + for (int i = 0; i < g.cluster().nodes().size(); i++) { + jobs.add(new IgniteCallable<Integer>() { + @Override public Integer call() throws Exception { + throw new RuntimeException("Test exception."); + } + }); + } + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + g.compute().call(jobs, new IgniteReducer<Integer, Object>() { + @Override public boolean collect(@Nullable Integer e) { + fail("Expects failed jobs never call 'collect' method."); + + return true; + } + + @Override public Object reduce() { + return null; + } + }); + + return null; + } + }, IgniteCheckedException.class, null); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/package.html b/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/package.html new file mode 100644 index 0000000..1f85ff2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/package.html @@ -0,0 +1,23 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + Contains internal tests or test related classes and interfaces. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java new file mode 100644 index 0000000..f016369 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java @@ -0,0 +1,1079 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.*; + +/** + * Event consume test. + */ +public class GridEventConsumeSelfTest extends GridCommonAbstractTest { + /** */ + private static final String PRJ_PRED_CLS_NAME = "org.gridgain.grid.tests.p2p.GridEventConsumeProjectionPredicate"; + + /** */ + private static final String FILTER_CLS_NAME = "org.gridgain.grid.tests.p2p.GridEventConsumeFilter"; + + /** Grids count. */ + private static final int GRID_CNT = 3; + + /** Number of created consumes per thread in multithreaded test. */ + private static final int CONSUME_CNT = 500; + + /** Consume latch. */ + private static volatile CountDownLatch consumeLatch; + + /** Consume counter. */ + private static volatile AtomicInteger consumeCnt; + + /** Include node flag. */ + private boolean include; + + /** No automatic unsubscribe flag. */ + private boolean noAutoUnsubscribe; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (include) + cfg.setUserAttributes(F.asMap("include", true)); + + cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + assertTrue(GRID_CNT > 1); + + include = true; + + startGridsMultiThreaded(GRID_CNT - 1); + + include = false; + + startGrid(GRID_CNT - 1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + assertEquals(GRID_CNT, grid(0).nodes().size()); + + for (int i = 0; i < GRID_CNT; i++) { + GridKernal grid = (GridKernal)grid(i); + + GridContinuousProcessor proc = grid.context().continuous(); + + if (noAutoUnsubscribe) { + localRoutines(proc).clear(); + + U.<Map>field(proc, "rmtInfos").clear(); + } + + assertEquals(0, localRoutines(proc).size()); + assertEquals(0, U.<Map>field(proc, "rmtInfos").size()); + assertEquals(0, U.<Map>field(proc, "startFuts").size()); + assertEquals(0, U.<Map>field(proc, "waitForStartAck").size()); + assertEquals(0, U.<Map>field(proc, "stopFuts").size()); + assertEquals(0, U.<Map>field(proc, "waitForStopAck").size()); + assertEquals(0, U.<Map>field(proc, "pending").size()); + } + } + + /** + * @param proc Continuous processor. + * @return Local event routines. + */ + private Collection<LocalRoutineInfo> localRoutines(GridContinuousProcessor proc) { + return F.view(U.<Map<UUID, LocalRoutineInfo>>field(proc, "locInfos").values(), + new IgnitePredicate<LocalRoutineInfo>() { + @Override public boolean apply(LocalRoutineInfo info) { + return info.handler().isForEvents(); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testApi() throws Exception { + try { + grid(0).events().stopRemoteListen(null); + } + catch (NullPointerException ignored) { + // No-op. + } + + grid(0).events().stopRemoteListen(UUID.randomUUID()); + + UUID consumeId = null; + + try { + consumeId = grid(0).events().remoteListen( + new P2<UUID, IgniteDiscoveryEvent>() { + @Override public boolean apply(UUID uuid, IgniteDiscoveryEvent evt) { + return false; + } + }, + new P1<IgniteDiscoveryEvent>() { + @Override public boolean apply(IgniteDiscoveryEvent e) { + return false; + } + }, + EVTS_DISCOVERY + ); + + assertNotNull(consumeId); + } + finally { + grid(0).events().stopRemoteListen(consumeId); + } + + try { + consumeId = grid(0).events().remoteListen( + new P2<UUID, IgniteDiscoveryEvent>() { + @Override public boolean apply(UUID uuid, IgniteDiscoveryEvent evt) { + return false; + } + }, + new P1<IgniteDiscoveryEvent>() { + @Override public boolean apply(IgniteDiscoveryEvent e) { + return false; + } + } + ); + + assertNotNull(consumeId); + } + finally { + grid(0).events().stopRemoteListen(consumeId); + } + + try { + consumeId = grid(0).events().remoteListen( + new P2<UUID, IgniteEvent>() { + @Override public boolean apply(UUID uuid, IgniteEvent evt) { + return false; + } + }, + new P1<IgniteEvent>() { + @Override public boolean apply(IgniteEvent e) { + return false; + } + } + ); + + assertNotNull(consumeId); + } + finally { + grid(0).events().stopRemoteListen(consumeId); + } + } + + /** + * @throws Exception If failed. + */ + public void testAllEvents() throws Exception { + final Collection<UUID> nodeIds = new HashSet<>(); + final AtomicInteger cnt = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(GRID_CNT); + + UUID consumeId = grid(0).events().remoteListen( + new P2<UUID, IgniteEvent>() { + @Override public boolean apply(UUID nodeId, IgniteEvent evt) { + info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); + + if (evt.type() == EVT_JOB_STARTED) { + nodeIds.add(nodeId); + cnt.incrementAndGet(); + latch.countDown(); + } + + return true; + } + }, + null + ); + + try { + assertNotNull(consumeId); + + grid(0).compute().broadcast(F.noop()); + + assert latch.await(2, SECONDS); + + assertEquals(GRID_CNT, nodeIds.size()); + assertEquals(GRID_CNT, cnt.get()); + } + finally { + grid(0).events().stopRemoteListen(consumeId); + } + } + + /** + * @throws Exception If failed. + */ + public void testEventsByType() throws Exception { + final Collection<UUID> nodeIds = new HashSet<>(); + final AtomicInteger cnt = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(GRID_CNT); + + UUID consumeId = grid(0).events().remoteListen( + new P2<UUID, IgniteEvent>() { + @Override public boolean apply(UUID nodeId, IgniteEvent evt) { + info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); + + assertEquals(EVT_JOB_STARTED, evt.type()); + + nodeIds.add(nodeId); + cnt.incrementAndGet(); + latch.countDown(); + + return true; + } + }, + null, + EVT_JOB_STARTED + ); + + try { + assertNotNull(consumeId); + + grid(0).compute().broadcast(F.noop()); + + assert latch.await(2, SECONDS); + + assertEquals(GRID_CNT, nodeIds.size()); + assertEquals(GRID_CNT, cnt.get()); + } + finally { + grid(0).events().stopRemoteListen(consumeId); + } + } + + /** + * @throws Exception If failed. + */ + public void testEventsByFilter() throws Exception { + final Collection<UUID> nodeIds = new HashSet<>(); + final AtomicInteger cnt = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(GRID_CNT); + + UUID consumeId = grid(0).events().remoteListen( + new P2<UUID, IgniteEvent>() { + @Override public boolean apply(UUID nodeId, IgniteEvent evt) { + info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); + + assertEquals(EVT_JOB_STARTED, evt.type()); + + nodeIds.add(nodeId); + cnt.incrementAndGet(); + latch.countDown(); + + return true; + } + }, + new P1<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + return evt.type() == EVT_JOB_STARTED; + } + } + ); + + try { + assertNotNull(consumeId); + + grid(0).compute().broadcast(F.noop()); + + assert latch.await(2, SECONDS); + + assertEquals(GRID_CNT, nodeIds.size()); + assertEquals(GRID_CNT, cnt.get()); + } + finally { + grid(0).events().stopRemoteListen(consumeId); + } + } + + /** + * @throws Exception If failed. + */ + public void testEventsByTypeAndFilter() throws Exception { + final Collection<UUID> nodeIds = new HashSet<>(); + final AtomicInteger cnt = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(GRID_CNT); + + UUID consumeId = grid(0).events().remoteListen( + new P2<UUID, IgniteJobEvent>() { + @Override public boolean apply(UUID nodeId, IgniteJobEvent evt) { + info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); + + assertEquals(EVT_JOB_STARTED, evt.type()); + + nodeIds.add(nodeId); + cnt.incrementAndGet(); + latch.countDown(); + + return true; + } + }, + new P1<IgniteJobEvent>() { + @Override public boolean apply(IgniteJobEvent evt) { + return !"exclude".equals(evt.taskName()); + } + }, + EVT_JOB_STARTED + ); + + try { + assertNotNull(consumeId); + + grid(0).compute().broadcast(F.noop()); + grid(0).compute().withName("exclude").run(F.noop()); + + assert latch.await(2, SECONDS); + + assertEquals(GRID_CNT, nodeIds.size()); + assertEquals(GRID_CNT, cnt.get()); + } + finally { + grid(0).events().stopRemoteListen(consumeId); + } + } + + /** + * @throws Exception If failed. + */ + public void testRemoteProjection() throws Exception { + final Collection<UUID> nodeIds = new HashSet<>(); + final AtomicInteger cnt = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(GRID_CNT - 1); + + UUID consumeId = events(grid(0).forRemotes()).remoteListen( + new P2<UUID, IgniteEvent>() { + @Override public boolean apply(UUID nodeId, IgniteEvent evt) { + info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); + + assertEquals(EVT_JOB_STARTED, evt.type()); + + nodeIds.add(nodeId); + cnt.incrementAndGet(); + latch.countDown(); + + return true; + } + }, + null, + EVT_JOB_STARTED + ); + + try { + assertNotNull(consumeId); + + grid(0).compute().broadcast(F.noop()); + + assert latch.await(2, SECONDS); + + assertEquals(GRID_CNT - 1, nodeIds.size()); + assertEquals(GRID_CNT - 1, cnt.get()); + } + finally { + grid(0).events().stopRemoteListen(consumeId); + } + } + + /** + * @throws Exception If failed. + */ + public void testProjectionWithLocalNode() throws Exception { + final Collection<UUID> nodeIds = new HashSet<>(); + final AtomicInteger cnt = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(GRID_CNT - 1); + + UUID consumeId = events(grid(0).forAttribute("include", null)).remoteListen( + new P2<UUID, IgniteEvent>() { + @Override public boolean apply(UUID nodeId, IgniteEvent evt) { + info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); + + assertEquals(EVT_JOB_STARTED, evt.type()); + + nodeIds.add(nodeId); + cnt.incrementAndGet(); + latch.countDown(); + + return true; + } + }, + null, + EVT_JOB_STARTED + ); + + try { + assertNotNull(consumeId); + + grid(0).compute().broadcast(F.noop()); + + assert latch.await(2, SECONDS); + + assertEquals(GRID_CNT - 1, nodeIds.size()); + assertEquals(GRID_CNT - 1, cnt.get()); + } + finally { + grid(0).events().stopRemoteListen(consumeId); + } + } + + /** + * @throws Exception If failed. + */ + public void testLocalNodeOnly() throws Exception { + final Collection<UUID> nodeIds = new HashSet<>(); + final AtomicInteger cnt = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(1); + + UUID consumeId = events(grid(0).forLocal()).remoteListen( + new P2<UUID, IgniteEvent>() { + @Override public boolean apply(UUID nodeId, IgniteEvent evt) { + info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); + + assertEquals(EVT_JOB_STARTED, evt.type()); + + nodeIds.add(nodeId); + cnt.incrementAndGet(); + latch.countDown(); + + return true; + } + }, + null, + EVT_JOB_STARTED + ); + + try { + assertNotNull(consumeId); + + grid(0).compute().broadcast(F.noop()); + + assert latch.await(2, SECONDS); + + assertEquals(1, nodeIds.size()); + assertEquals(1, cnt.get()); + + assertEquals(grid(0).localNode().id(), F.first(nodeIds)); + } + finally { + grid(0).events().stopRemoteListen(consumeId); + } + } + + /** + * @throws Exception If failed. + */ + public void testEmptyProjection() throws Exception { + try { + events(grid(0).forPredicate(F.<ClusterNode>alwaysFalse())).remoteListen( + new P2<UUID, IgniteEvent>() { + @Override public boolean apply(UUID nodeId, IgniteEvent evt) { + return true; + } + }, + null + ); + + assert false : "Exception was not thrown."; + } + catch (IgniteCheckedException e) { + assertTrue(e.getMessage().startsWith( + "Failed to register remote continuous listener (projection is empty).")); + } + } + + /** + * @throws Exception If failed. + */ + public void testStopByCallback() throws Exception { + final Collection<UUID> nodeIds = new HashSet<>(); + final AtomicInteger cnt = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(1); + + UUID consumeId = grid(0).events().remoteListen( + new P2<UUID, IgniteEvent>() { + @Override public boolean apply(UUID nodeId, IgniteEvent evt) { + info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); + + assertEquals(EVT_JOB_STARTED, evt.type()); + + nodeIds.add(nodeId); + cnt.incrementAndGet(); + latch.countDown(); + + return false; + } + }, + null, + EVT_JOB_STARTED + ); + + try { + assertNotNull(consumeId); + + grid(0).compute().broadcast(F.noop()); + + assert latch.await(2, SECONDS); + + assertEquals(1, nodeIds.size()); + assertEquals(1, cnt.get()); + } + finally { + grid(0).events().stopRemoteListen(consumeId); + } + } + + /** + * @throws Exception If failed. + */ + public void testStopRemoteListen() throws Exception { + final Collection<UUID> nodeIds = new HashSet<>(); + final AtomicInteger cnt = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(1); + + UUID consumeId = grid(0).events().remoteListen( + new P2<UUID, IgniteEvent>() { + @Override public boolean apply(UUID nodeId, IgniteEvent evt) { + info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); + + assertEquals(EVT_JOB_STARTED, evt.type()); + + nodeIds.add(nodeId); + cnt.incrementAndGet(); + latch.countDown(); + + return true; + } + }, + null, + EVT_JOB_STARTED + ); + + try { + assertNotNull(consumeId); + + grid(0).compute().run(F.noop()); + + assert latch.await(2, SECONDS); + + assertEquals(1, nodeIds.size()); + assertEquals(1, cnt.get()); + + grid(0).events().stopRemoteListen(consumeId); + + grid(0).compute().run(F.noop()); + + U.sleep(500); + + assertEquals(1, nodeIds.size()); + assertEquals(1, cnt.get()); + } + finally { + grid(0).events().stopRemoteListen(consumeId); + } + } + + /** + * @throws Exception If failed. + */ + public void testStopLocalListenByCallback() throws Exception { + final AtomicInteger cnt = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(1); + + grid(0).events().localListen( + new P1<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + info("Local event [" + evt.shortDisplay() + ']'); + + assertEquals(EVT_JOB_STARTED, evt.type()); + + cnt.incrementAndGet(); + latch.countDown(); + + return false; + } + }, + EVT_JOB_STARTED); + + compute(grid(0).forLocal()).run(F.noop()); + + assert latch.await(2, SECONDS); + + assertEquals(1, cnt.get()); + + compute(grid(0).forLocal()).run(F.noop()); + + U.sleep(500); + + assertEquals(1, cnt.get()); + } + + /** + * @throws Exception If failed. + */ + public void testNodeJoin() throws Exception { + final Collection<UUID> nodeIds = new HashSet<>(); + final AtomicInteger cnt = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(GRID_CNT + 1); + + UUID consumeId = grid(0).events().remoteListen( + new P2<UUID, IgniteEvent>() { + @Override public boolean apply(UUID nodeId, IgniteEvent evt) { + info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); + + assertEquals(EVT_JOB_STARTED, evt.type()); + + nodeIds.add(nodeId); + cnt.incrementAndGet(); + latch.countDown(); + + return true; + } + }, + new P1<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + return evt.type() == EVT_JOB_STARTED; + } + }, + EVT_JOB_STARTED, EVT_JOB_FINISHED + ); + + try { + assertNotNull(consumeId); + + include = true; + + startGrid("anotherGrid"); + + grid(0).compute().broadcast(F.noop()); + + assert latch.await(2, SECONDS); + + assertEquals(GRID_CNT + 1, nodeIds.size()); + assertEquals(GRID_CNT + 1, cnt.get()); + } + finally { + stopGrid("anotherGrid"); + + grid(0).events().stopRemoteListen(consumeId); + } + } + + /** + * @throws Exception If failed. + */ + public void testNodeJoinWithProjection() throws Exception { + final Collection<UUID> nodeIds = new HashSet<>(); + final AtomicInteger cnt = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(GRID_CNT); + + UUID consumeId = events(grid(0).forAttribute("include", null)).remoteListen( + new P2<UUID, IgniteEvent>() { + @Override public boolean apply(UUID nodeId, IgniteEvent evt) { + info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); + + assertEquals(EVT_JOB_STARTED, evt.type()); + + nodeIds.add(nodeId); + cnt.incrementAndGet(); + latch.countDown(); + + return true; + } + }, + null, + EVT_JOB_STARTED + ); + + try { + assertNotNull(consumeId); + + include = true; + + startGrid("anotherGrid1"); + + include = false; + + startGrid("anotherGrid2"); + + grid(0).compute().broadcast(F.noop()); + + assert latch.await(2, SECONDS); + + assertEquals(GRID_CNT, nodeIds.size()); + assertEquals(GRID_CNT, cnt.get()); + } + finally { + stopGrid("anotherGrid1"); + stopGrid("anotherGrid2"); + + grid(0).events().stopRemoteListen(consumeId); + } + } + + /** + * @throws Exception If failed. + */ + // TODO: GG-6730 + public void _testNodeJoinWithP2P() throws Exception { + final Collection<UUID> nodeIds = new HashSet<>(); + final AtomicInteger cnt = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(GRID_CNT + 1); + + ClassLoader ldr = getExternalClassLoader(); + + IgnitePredicate<ClusterNode> prjPred = (IgnitePredicate<ClusterNode>)ldr.loadClass(PRJ_PRED_CLS_NAME).newInstance(); + IgnitePredicate<IgniteEvent> filter = (IgnitePredicate<IgniteEvent>)ldr.loadClass(FILTER_CLS_NAME).newInstance(); + + UUID consumeId = events(grid(0).forPredicate(prjPred)).remoteListen(new P2<UUID, IgniteEvent>() { + @Override public boolean apply(UUID nodeId, IgniteEvent evt) { + info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); + + assertEquals(EVT_JOB_STARTED, evt.type()); + + nodeIds.add(nodeId); + cnt.incrementAndGet(); + latch.countDown(); + + return true; + } + }, filter, EVT_JOB_STARTED); + + try { + assertNotNull(consumeId); + + startGrid("anotherGrid"); + + grid(0).compute().broadcast(F.noop()); + + assert latch.await(2, SECONDS); + + assertEquals(GRID_CNT + 1, nodeIds.size()); + assertEquals(GRID_CNT + 1, cnt.get()); + } + finally { + stopGrid("anotherGrid1"); + stopGrid("anotherGrid2"); + + grid(0).events().stopRemoteListen(consumeId); + } + } + + /** + * @throws Exception If failed. + */ + public void testResources() throws Exception { + final Collection<UUID> nodeIds = new HashSet<>(); + final AtomicInteger cnt = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(GRID_CNT); + + UUID consumeId = grid(0).events().remoteListen( + new P2<UUID, IgniteEvent>() { + @IgniteInstanceResource + private Ignite grid; + + @Override public boolean apply(UUID nodeId, IgniteEvent evt) { + info("Event from " + nodeId + " [" + evt.shortDisplay() + ']'); + + assertEquals(EVT_JOB_STARTED, evt.type()); + assertNotNull(grid); + + nodeIds.add(nodeId); + cnt.incrementAndGet(); + latch.countDown(); + + return true; + } + }, + new P1<IgniteEvent>() { + @IgniteInstanceResource + private Ignite grid; + + @Override public boolean apply(IgniteEvent evt) { + assertNotNull(grid); + + return true; + } + }, + EVT_JOB_STARTED + ); + + try { + assertNotNull(consumeId); + + grid(0).compute().broadcast(F.noop()); + + assert latch.await(2, SECONDS); + + assertEquals(GRID_CNT, nodeIds.size()); + assertEquals(GRID_CNT, cnt.get()); + } + finally { + grid(0).events().stopRemoteListen(consumeId); + } + } + + /** + * @throws Exception If failed. + */ + public void testMasterNodeLeave() throws Exception { + Ignite g = startGrid("anotherGrid"); + + final UUID nodeId = g.cluster().localNode().id(); + final CountDownLatch latch = new CountDownLatch(GRID_CNT); + + for (int i = 0; i < GRID_CNT; i++) { + grid(0).events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + if (nodeId.equals(((IgniteDiscoveryEvent) evt).eventNode().id())) + latch.countDown(); + + return true; + } + }, EVT_NODE_LEFT); + } + + g.events().remoteListen( + null, + new P1<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + return true; + } + }, + EVTS_ALL + ); + + stopGrid("anotherGrid"); + + latch.await(); + } + + /** + * @throws Exception If failed. + */ + public void testMasterNodeLeaveNoAutoUnsubscribe() throws Exception { + Ignite g = startGrid("anotherGrid"); + + final UUID nodeId = g.cluster().localNode().id(); + final CountDownLatch discoLatch = new CountDownLatch(GRID_CNT); + + for (int i = 0; i < GRID_CNT; i++) { + grid(0).events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + if (nodeId.equals(((IgniteDiscoveryEvent) evt).eventNode().id())) + discoLatch.countDown(); + + return true; + } + }, EVT_NODE_LEFT); + } + + consumeLatch = new CountDownLatch(GRID_CNT * 2 + 1); + consumeCnt = new AtomicInteger(); + + noAutoUnsubscribe = true; + + g.events().remoteListen( + 1, 0, false, + null, + new P1<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + consumeLatch.countDown(); + consumeCnt.incrementAndGet(); + + return true; + } + }, + EVT_JOB_STARTED + ); + + grid(0).compute().broadcast(F.noop()); + + stopGrid("anotherGrid"); + + discoLatch.await(); + + grid(0).compute().broadcast(F.noop()); + + assert consumeLatch.await(2, SECONDS); + + assertEquals(GRID_CNT * 2 + 1, consumeCnt.get()); + } + + /** + * @throws Exception If failed. + */ + public void testMultithreadedWithNodeRestart() throws Exception { + final AtomicBoolean stop = new AtomicBoolean(); + final BlockingQueue<IgniteBiTuple<Integer, UUID>> queue = new LinkedBlockingQueue<>(); + final Collection<UUID> started = new GridConcurrentHashSet<>(); + final Collection<UUID> stopped = new GridConcurrentHashSet<>(); + + final Random rnd = new Random(); + + IgniteFuture<?> starterFut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < CONSUME_CNT; i++) { + int idx = rnd.nextInt(GRID_CNT); + + try { + IgniteEvents evts = grid(idx).events().enableAsync(); + + evts.remoteListen(new P2<UUID, IgniteEvent>() { + @Override public boolean apply(UUID uuid, IgniteEvent evt) { + return true; + } + }, null, EVT_JOB_STARTED); + + UUID consumeId = evts.<UUID>future().get(3000); + + started.add(consumeId); + + queue.add(F.t(idx, consumeId)); + } + catch (ClusterTopologyException ignored) { + // No-op. + } + + U.sleep(10); + } + + stop.set(true); + + return null; + } + }, 8, "consume-starter"); + + IgniteFuture<?> stopperFut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + while (!stop.get()) { + IgniteBiTuple<Integer, UUID> t = queue.poll(1, SECONDS); + + if (t == null) + continue; + + int idx = t.get1(); + UUID consumeId = t.get2(); + + try { + IgniteEvents evts = grid(idx).events().enableAsync(); + + evts.stopRemoteListen(consumeId); + + evts.future().get(3000); + + stopped.add(consumeId); + } + catch (ClusterTopologyException ignored) { + // No-op. + } + } + + return null; + } + }, 4, "consume-stopper"); + + IgniteFuture<?> nodeRestarterFut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + while (!stop.get()) { + startGrid("anotherGrid"); + stopGrid("anotherGrid"); + } + + return null; + } + }, 1, "node-restarter"); + + IgniteFuture<?> jobRunnerFut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + while (!stop.get()) { + int idx = rnd.nextInt(GRID_CNT); + + try { + IgniteCompute comp = grid(idx).compute().enableAsync(); + + comp.run(F.noop()); + + comp.future().get(3000); + } + catch (IgniteCheckedException ignored) { + // Ignore all job execution related errors. + } + } + + return null; + } + }, 1, "job-runner"); + + starterFut.get(); + stopperFut.get(); + nodeRestarterFut.get(); + jobRunnerFut.get(); + + IgniteBiTuple<Integer, UUID> t; + + while ((t = queue.poll()) != null) { + int idx = t.get1(); + UUID consumeId = t.get2(); + + IgniteEvents evts = grid(idx).events().enableAsync(); + + evts.stopRemoteListen(consumeId); + + evts.future().get(3000); + + stopped.add(consumeId); + } + + Collection<UUID> notStopped = F.lose(started, true, stopped); + + assertEquals("Not stopped IDs: " + notStopped, 0, notStopped.size()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridMessageListenSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridMessageListenSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridMessageListenSelfTest.java new file mode 100644 index 0000000..1940aaa --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridMessageListenSelfTest.java @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.messaging.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.testframework.*; +import org.gridgain.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static java.util.concurrent.TimeUnit.*; + +/** + * Message listen test. + */ +public class GridMessageListenSelfTest extends GridCommonAbstractTest { + /** */ + private static final int GRID_CNT = 3; + + /** */ + private static final String INC_ATTR = "include"; + + /** */ + private static final String MSG = "Message"; + + /** */ + private static final String TOPIC = "Topic"; + + /** */ + private static final int MSG_CNT = 3; + + /** */ + private static final String TOPIC_CLS_NAME = "org.gridgain.grid.tests.p2p.GridTestMessageTopic"; + + /** */ + private static final String LSNR_CLS_NAME = "org.gridgain.grid.tests.p2p.GridTestMessageListener"; + + /** */ + private static boolean include; + + /** */ + private static final List<UUID> allNodes = new ArrayList<>(); + + /** */ + private static final List<UUID> rmtNodes = new ArrayList<>(); + + /** */ + private static final List<UUID> incNodes = new ArrayList<>(); + + /** */ + private static final Collection<UUID> nodes = new GridConcurrentHashSet<>(); + + /** */ + private static final AtomicInteger cnt = new AtomicInteger(); + + /** */ + private static CountDownLatch latch; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (include) + cfg.setUserAttributes(F.asMap(INC_ATTR, true)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + nodes.clear(); + cnt.set(0); + + include = true; + + startGridsMultiThreaded(GRID_CNT - 1); + + include = false; + + Thread.sleep(500); + + startGrid(GRID_CNT - 1); + + allNodes.clear(); + rmtNodes.clear(); + incNodes.clear(); + + for (int i = 0; i < GRID_CNT; i++) { + UUID id = grid(i).localNode().id(); + + allNodes.add(id); + + if (i != 0) + rmtNodes.add(id); + + if (i != GRID_CNT - 1) + incNodes.add(id); + } + + Collections.sort(allNodes); + Collections.sort(rmtNodes); + Collections.sort(incNodes); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testNullTopic() throws Exception { + latch = new CountDownLatch(MSG_CNT * GRID_CNT); + + listen(grid(0), null, true); + + send(); + + assert latch.await(2, SECONDS); + + Thread.sleep(500); + + assertEquals(MSG_CNT * GRID_CNT, cnt.get()); + + checkNodes(allNodes); + } + + /** + * @throws Exception If failed. + */ + public void testNonNullTopic() throws Exception { + latch = new CountDownLatch(MSG_CNT * GRID_CNT); + + listen(grid(0), null, true); + + send(); + + assert latch.await(2, SECONDS); + + Thread.sleep(500); + + assertEquals(MSG_CNT * GRID_CNT, cnt.get()); + + checkNodes(allNodes); + } + + /** + * @throws Exception If failed. + */ + public void testStopListen() throws Exception { + latch = new CountDownLatch(GRID_CNT); + + listen(grid(0), null, false); + + send(); + + assert latch.await(2, SECONDS); + + Thread.sleep(500); + + int expCnt = cnt.get(); + + send(); + + Thread.sleep(1000); + + assertEquals(expCnt, cnt.get()); + + checkNodes(allNodes); + } + + /** + * @throws Exception If failed. + */ + public void testProjection() throws Exception { + latch = new CountDownLatch(MSG_CNT * (GRID_CNT - 1)); + + listen(grid(0).forRemotes(), null, true); + + send(); + + assert latch.await(2, SECONDS); + + Thread.sleep(500); + + assertEquals(MSG_CNT * (GRID_CNT - 1), cnt.get()); + + checkNodes(rmtNodes); + } + + /** + * @throws Exception If failed. + */ + public void testNodeJoin() throws Exception { + latch = new CountDownLatch(MSG_CNT * (GRID_CNT + 1)); + + listen(grid(0), null, true); + + try { + Ignite g = startGrid("anotherGrid"); + + send(); + + assert latch.await(2, SECONDS); + + Thread.sleep(500); + + assertEquals(MSG_CNT * (GRID_CNT + 1), cnt.get()); + + List<UUID> allNodes0 = new ArrayList<>(allNodes); + + allNodes0.add(g.cluster().localNode().id()); + + Collections.sort(allNodes0); + + checkNodes(allNodes0); + } + finally { + stopGrid("anotherGrid"); + } + } + + /** + * @throws Exception If failed. + */ + public void testNodeJoinWithProjection() throws Exception { + latch = new CountDownLatch(MSG_CNT * GRID_CNT); + + listen(grid(0).forAttribute(INC_ATTR, null), null, true); + + try { + include = true; + + Ignite g = startGrid("anotherGrid1"); + + include = false; + + startGrid("anotherGrid2"); + + send(); + + assert latch.await(2, SECONDS); + + Thread.sleep(500); + + assertEquals(MSG_CNT * GRID_CNT, cnt.get()); + + List<UUID> incNodes0 = new ArrayList<>(incNodes); + + incNodes0.add(g.cluster().localNode().id()); + + Collections.sort(incNodes0); + + checkNodes(incNodes0); + } + finally { + stopGrid("anotherGrid1"); + stopGrid("anotherGrid2"); + } + } + + /** + * @throws Exception If failed. + */ + public void testNullTopicWithDeployment() throws Exception { + Class<?> cls = getExternalClassLoader().loadClass(LSNR_CLS_NAME); + + grid(0).message().remoteListen(null, (IgniteBiPredicate<UUID, Object>)cls.newInstance()); + + send(); + + boolean s = GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return checkDeployedListeners(GRID_CNT); + } + }, 2000); + + assertTrue(s); + } + + /** + * @throws Exception If failed. + */ + public void testNonNullTopicWithDeployment() throws Exception { + ClassLoader ldr = getExternalClassLoader(); + + Class<?> topicCls = ldr.loadClass(TOPIC_CLS_NAME); + Class<?> lsnrCls = ldr.loadClass(LSNR_CLS_NAME); + + Object topic = topicCls.newInstance(); + + grid(0).message().remoteListen(topic, (IgniteBiPredicate<UUID, Object>)lsnrCls.newInstance()); + + send(topic); + + boolean s = GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return checkDeployedListeners(GRID_CNT); + } + }, 2000); + + assertTrue(s); + } + + /** + * @throws Exception If failed. + */ + public void testListenActor() throws Exception { + latch = new CountDownLatch(MSG_CNT * (GRID_CNT + 1)); + + grid(0).message().remoteListen(null, new Actor(grid(0))); + + try { + Ignite g = startGrid("anotherGrid"); + + send(); + + assert latch.await(2, SECONDS); + + Thread.sleep(500); + + assertEquals(MSG_CNT * (GRID_CNT + 1), cnt.get()); + + List<UUID> allNodes0 = new ArrayList<>(allNodes); + + allNodes0.add(g.cluster().localNode().id()); + + Collections.sort(allNodes0); + + checkNodes(allNodes0); + } + finally { + stopGrid("anotherGrid"); + } + } + + /** + * @param prj Projection. + * @param topic Topic. + * @param ret Value returned from listener. + * @throws Exception In case of error. + */ + private void listen(final ClusterGroup prj, @Nullable Object topic, final boolean ret) throws Exception { + assert prj != null; + + message(prj).remoteListen(topic, new Listener(prj, ret)); + } + + /** + * @throws Exception In case of error. + */ + private void send() throws Exception { + send(TOPIC); + } + + /** + * @param topic Non-null topic. + * @throws Exception In case of error. + */ + private void send(Object topic) throws Exception { + assert topic != null; + + for (int i = 0; i < MSG_CNT; i++) + grid(0).message().send(null, MSG); + + for (int i = 0; i < MSG_CNT; i++) + grid(0).message().send(topic, MSG); + } + + /** + * @param expCnt Expected messages count. + * @return If check passed. + */ + private boolean checkDeployedListeners(int expCnt) { + for (Ignite g : G.allGrids()) { + AtomicInteger cnt = g.cluster().<String, AtomicInteger>nodeLocalMap().get("msgCnt"); + + if (cnt == null || cnt.get() != expCnt) + return false; + } + + return true; + } + + /** + * @param expNodes Expected nodes. + */ + private void checkNodes(List<UUID> expNodes) { + List<UUID> nodes0 = new ArrayList<>(nodes); + + Collections.sort(nodes0); + + assertEquals(expNodes, nodes0); + } + + /** */ + private static class Listener implements P2<UUID, Object> { + /** */ + private final ClusterGroup prj; + + /** */ + private final boolean ret; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** + * @param prj Projection. + * @param ret Return value. + */ + private Listener(ClusterGroup prj, boolean ret) { + this.prj = prj; + this.ret = ret; + } + + /** {@inheritDoc} */ + @Override public boolean apply(UUID nodeId, Object msg) { + assertNotNull(ignite); + assertNotNull(ignite.configuration().getNodeId()); + + X.println("Received message [nodeId=" + nodeId + ", locNodeId=" + ignite.cluster().localNode().id() + ']'); + + assertEquals(prj.ignite().cluster().localNode().id(), nodeId); + assertEquals(MSG, msg); + + nodes.add(ignite.configuration().getNodeId()); + cnt.incrementAndGet(); + latch.countDown(); + + return ret; + } + } + + /** */ + private static class Actor extends MessagingListenActor<Object> { + /** */ + private final ClusterGroup prj; + + /** + * @param prj Projection. + */ + private Actor(ClusterGroup prj) { + this.prj = prj; + } + + /** {@inheritDoc} */ + @Override protected void receive(UUID nodeId, Object msg) throws Throwable { + assertNotNull(ignite()); + + UUID locNodeId = ignite().cluster().localNode().id(); + + X.println("Received message [nodeId=" + nodeId + ", locNodeId=" + locNodeId + ']'); + + assertEquals(prj.ignite().cluster().localNode().id(), nodeId); + assertEquals(MSG, msg); + + nodes.add(locNodeId); + cnt.incrementAndGet(); + latch.countDown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java new file mode 100644 index 0000000..e3d3397 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.dataload; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Tests for {@code GridDataLoaderImpl}. + */ +public class GridDataLoaderImplSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Number of keys to load via data loader. */ + private static final int KEYS_COUNT = 1000; + + /** Started grid counter. */ + private static int cnt; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + // Forth node goes without cache. + if (cnt < 4) + cfg.setCacheConfiguration(cacheConfiguration()); + + cnt++; + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testNullPointerExceptionUponDataLoaderClosing() throws Exception { + try { + startGrids(5); + + final CyclicBarrier barrier = new CyclicBarrier(2); + + multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + U.awaitQuiet(barrier); + + G.stopAll(true); + + return null; + } + }, 1); + + Ignite g4 = grid(4); + + IgniteDataLoader<Object, Object> dataLdr = g4.dataLoader(null); + + dataLdr.perNodeBufferSize(32); + + for (int i = 0; i < 100000; i += 2) { + dataLdr.addData(i, i); + dataLdr.removeData(i + 1); + } + + U.awaitQuiet(barrier); + + info("Closing data loader."); + + try { + dataLdr.close(true); + } + catch (IllegalStateException ignore) { + // This is ok to ignore this exception as test is racy by it's nature - + // grid is stopping in different thread. + } + } + finally { + G.stopAll(true); + } + } + + /** + * Data loader should correctly load entries from HashMap in case of grids with more than one node + * and with GridOptimizedMarshaller that requires serializable. + * + * @throws Exception If failed. + */ + public void testAddDataFromMap() throws Exception { + try { + cnt = 0; + + startGrids(2); + + Ignite g0 = grid(0); + + IgniteMarshaller marsh = g0.configuration().getMarshaller(); + + if (marsh instanceof IgniteOptimizedMarshaller) + assertTrue(((IgniteOptimizedMarshaller)marsh).isRequireSerializable()); + else + fail("Expected GridOptimizedMarshaller, but found: " + marsh.getClass().getName()); + + IgniteDataLoader<Integer, String> dataLdr = g0.dataLoader(null); + + Map<Integer, String> map = U.newHashMap(KEYS_COUNT); + + for (int i = 0; i < KEYS_COUNT; i ++) + map.put(i, String.valueOf(i)); + + dataLdr.addData(map); + + dataLdr.close(); + + Random rnd = new Random(); + + GridCache<Integer, String> c = g0.cache(null); + + for (int i = 0; i < KEYS_COUNT; i ++) { + Integer k = rnd.nextInt(KEYS_COUNT); + + String v = c.get(k); + + assertEquals(k.toString(), v); + } + } + finally { + G.stopAll(true); + } + } + + /** + * Gets cache configuration. + * + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration() { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setBackups(1); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + + return cacheCfg; + } + + /** + * + */ + private static class TestObject implements Serializable { + /** */ + private int val; + + /** + */ + private TestObject() { + // No-op. + } + + /** + * @param val Value. + */ + private TestObject(int val) { + this.val = val; + } + + public Integer val() { + return val; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return val; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj instanceof TestObject && ((TestObject)obj).val == val; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java new file mode 100644 index 0000000..77d6b06 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.dataload; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.testframework.junits.common.*; +import org.jdk8.backport.*; + +import java.util.concurrent.*; + +import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Data loader performance test. Compares group lock data loader to traditional lock. + * <p> + * Disable assertions and give at least 2 GB heap to run this test. + */ +public class GridDataLoaderPerformanceTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int GRID_CNT = 3; + + /** */ + private static final int ENTRY_CNT = 80000; + + /** */ + private boolean useCache; + + /** */ + private boolean useGrpLock; + + /** */ + private String[] vals = new String[2048]; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(spi); + + cfg.setIncludeProperties(); + + cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); + + cfg.setRestEnabled(false); + + cfg.setPeerClassLoadingEnabled(true); + + if (useCache) { + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(PARTITIONED); + + cc.setDistributionMode(PARTITIONED_ONLY); + cc.setWriteSynchronizationMode(FULL_SYNC); + cc.setStartSize(ENTRY_CNT / GRID_CNT); + cc.setSwapEnabled(false); + + cc.setBackups(1); + + cc.setStoreValueBytes(true); + + cfg.setCacheSanityCheckEnabled(false); + cfg.setCacheConfiguration(cc); + } + else + cfg.setCacheConfiguration(); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + for (int i = 0; i < vals.length; i++) { + int valLen = ThreadLocalRandom8.current().nextInt(128, 512); + + StringBuilder sb = new StringBuilder(); + + for (int j = 0; j < valLen; j++) + sb.append('a' + ThreadLocalRandom8.current().nextInt(20)); + + vals[i] = sb.toString(); + + info("Value: " + vals[i]); + } + } + + /** + * @throws Exception If failed. + */ + public void testPerformance() throws Exception { + useGrpLock = false; + + doTest(); + } + + /** + * @throws Exception If failed. + */ + public void testPerformanceGroupLock() throws Exception { + useGrpLock = true; + + doTest(); + } + + /** + * @throws Exception If failed. + */ + private void doTest() throws Exception { + System.gc(); + System.gc(); + System.gc(); + + try { + useCache = true; + + startGridsMultiThreaded(GRID_CNT); + + useCache = false; + + Ignite ignite = startGrid(); + + final IgniteDataLoader<Integer, String> ldr = ignite.dataLoader(null); + + ldr.perNodeBufferSize(8192); + ldr.updater(useGrpLock ? GridDataLoadCacheUpdaters.<Integer, String>groupLocked() : + GridDataLoadCacheUpdaters.<Integer, String>batchedSorted()); + ldr.autoFlushFrequency(0); + + final LongAdder cnt = new LongAdder(); + + long start = U.currentTimeMillis(); + + Thread t = new Thread(new Runnable() { + @SuppressWarnings("BusyWait") + @Override public void run() { + while (true) { + try { + Thread.sleep(10000); + } + catch (InterruptedException ignored) { + break; + } + + info(">>> Adds/sec: " + cnt.sumThenReset() / 10); + } + } + }); + + t.setDaemon(true); + + t.start(); + + int threadNum = 2;//Runtime.getRuntime().availableProcessors(); + + multithreaded(new Callable<Object>() { + @SuppressWarnings("InfiniteLoopStatement") + @Override public Object call() throws Exception { + ThreadLocalRandom8 rnd = ThreadLocalRandom8.current(); + + while (true) { + int i = rnd.nextInt(ENTRY_CNT); + + ldr.addData(i, vals[rnd.nextInt(vals.length)]); + + cnt.increment(); + } + } + }, threadNum, "loader"); + + info("Closing loader..."); + + ldr.close(false); + + long duration = U.currentTimeMillis() - start; + + info("Finished performance test. Duration: " + duration + "ms."); + } + finally { + stopAllGrids(); + } + } +}
