http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeConfigVariationsFullApiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeConfigVariationsFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeConfigVariationsFullApiTest.java index 485e811..231fc9e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeConfigVariationsFullApiTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeConfigVariationsFullApiTest.java @@ -24,6 +24,7 @@ import java.io.ObjectOutput; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -43,6 +44,7 @@ import org.apache.ignite.compute.ComputeTaskSplitAdapter; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.testframework.junits.IgniteConfigVariationsAbstractTest; @@ -199,6 +201,30 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat /** * @throws Exception If failed. */ + public void testExecuteTaskClassAsync() throws Exception { + runTest(jobFactories, new ComputeTest() { + @Override public void test(Factory factory, Ignite ignite) throws Exception { + // Begin with negative to check 'null' value in the test. + final int[] i = {-1}; + + ComputeTaskFuture<List<Object>> fut = ignite.compute().executeAsync( + TestTask.class, + new T2<>((Factory<ComputeJobAdapter>)factory, + (Factory<Object>)new Factory<Object>() { + @Override public Object create() { + return value(i[0]++); + } + })); + + checkResultsClassCount(MAX_JOB_COUNT - 1, fut.get(), value(0).getClass()); + assertCollectionsEquals("Results value mismatch", createGoldenResults(), fut.get()); + } + }); + } + + /** + * @throws Exception If failed. + */ public void testExecuteTask() throws Exception { runTest(jobFactories, new ComputeTest() { @Override public void test(Factory factory, Ignite ignite) throws Exception { @@ -222,6 +248,29 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat /** * @throws Exception If failed. */ + public void testExecuteTaskAsync() throws Exception { + runTest(jobFactories, new ComputeTest() { + @Override public void test(Factory factory, Ignite ignite) throws Exception { + // Begin with negative to check 'null' value in the test. + final int[] i = {-1}; + + ComputeTaskFuture<List<Object>> fut = ignite.compute().executeAsync(new TestTask(), + new T2<>((Factory<ComputeJobAdapter>)factory, + (Factory<Object>)new Factory<Object>() { + @Override public Object create() { + return value(i[0]++); + } + })); + + checkResultsClassCount(MAX_JOB_COUNT - 1, fut.get(), value(0).getClass()); + assertCollectionsEquals("Results value mismatch", createGoldenResults(), fut.get()); + } + }); + } + + /** + * @throws Exception If failed. + */ public void testBroadcastClosure() throws Exception { runTest(closureFactories, new ComputeTest() { @Override public void test(Factory factory, Ignite ignite) throws Exception { @@ -245,6 +294,29 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat /** * @throws Exception If failed. */ + public void testBroadcastClosureAsync() throws Exception { + runTest(closureFactories, new ComputeTest() { + @Override public void test(Factory factory, Ignite ignite) throws Exception { + final Collection<Object> resultsAllNull = ignite.compute() + .broadcast((IgniteClosure<Object, Object>)factory.create(), null); + + assertEquals("Result's size mismatch: job must be run on all server nodes", + gridCount() - clientsCount(), resultsAllNull.size()); + + for (Object o : resultsAllNull) + assertNull("All results must be null", o); + + IgniteFuture<Collection<Object>> fut = ignite.compute() + .broadcastAsync((IgniteClosure<Object, Object>)factory.create(), value(0)); + + checkResultsClassCount(gridCount() - clientsCount(), fut.get(), value(0).getClass()); + } + }); + } + + /** + * @throws Exception If failed. + */ public void testBroadcastCallable() throws Exception { runTest(callableFactories, new ComputeTest() { @Override public void test(Factory factory, Ignite ignite) throws Exception { @@ -274,6 +346,35 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat /** * @throws Exception If failed. */ + public void testBroadcastCallableAsync() throws Exception { + runTest(callableFactories, new ComputeTest() { + @Override public void test(Factory factory, Ignite ignite) throws Exception { + EchoCallable job = (EchoCallable)factory.create(); + job.setArg(null); + + final IgniteFuture<Collection<Object>> futAllNull = ignite.compute() + .broadcastAsync(job); + + assertEquals("Result's size mismatch: job must be run on all server nodes", + gridCount() - clientsCount(), futAllNull.get().size()); + + for (Object o : futAllNull.get()) + assertNull("All results must be null", o); + + job.setArg(value(0)); + IgniteFuture<Collection<Object>> futNotNull = ignite.compute() + .broadcastAsync(job); + + checkResultsClassCount(gridCount() - clientsCount(), futNotNull.get(), value(0).getClass()); + for (Object o : futNotNull.get()) + assertEquals("Invalid broadcast results", value(0), o); + } + }); + } + + /** + * @throws Exception If failed. + */ public void testBroadcastRunnable() throws Exception { runTest(runnableFactories, new ComputeTest() { @Override public void test(Factory factory, Ignite ignite) throws Exception { @@ -288,6 +389,22 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat /** * @throws Exception If failed. */ + public void testBroadcastRunnableAsync() throws Exception { + runTest(runnableFactories, new ComputeTest() { + @Override public void test(Factory factory, Ignite ignite) throws Exception { + IgniteRunnable job = (IgniteRunnable)factory.create(); + + IgniteFuture<Void> fut = ignite.compute().broadcastAsync(job); + + fut.get(); + // All checks are inside the run() method of the job. + } + }); + } + + /** + * @throws Exception If failed. + */ public void testRun() throws Exception { runTest(runnableFactories, new ComputeTest() { @Override public void test(Factory factory, Ignite ignite) throws Exception { @@ -310,24 +427,48 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat /** * @throws Exception If failed. */ + public void testRunAsync() throws Exception { + runTest(runnableFactories, new ComputeTest() { + @Override public void test(Factory factory, Ignite ignite) throws Exception { + IgniteRunnable job = (IgniteRunnable)factory.create(); + + IgniteFuture<Void> fut0 = ignite.compute().runAsync(job); + + fut0.get(); + // All checks are inside the run() method of the job. + + Collection<IgniteRunnable> jobs = new ArrayList<>(MAX_JOB_COUNT); + + for (int i = 0; i < MAX_JOB_COUNT; ++i) + jobs.add((IgniteRunnable)factory.create()); + + IgniteFuture<Void> fut1 = ignite.compute().runAsync(jobs); + + fut1.get(); + // All checks are inside the run() method of the job. + } + }); + } + + /** + * @throws Exception If failed. + */ public void testApplyAsync() throws Exception { runTest(closureFactories, new ComputeTest() { @Override public void test(Factory factory, Ignite ignite) throws Exception { - final IgniteCompute comp = ignite.compute().withAsync(); + final IgniteCompute comp = ignite.compute(); - Collection<ComputeTaskFuture<Object>> futures = new ArrayList<>(MAX_JOB_COUNT); + Collection<IgniteFuture<Object>> futures = new ArrayList<>(MAX_JOB_COUNT); for (int i = 0; i < MAX_JOB_COUNT; ++i) { // value(i - 1): use negative argument of the value method to generate nullong value. - comp.apply((IgniteClosure<Object, Object>)factory.create(), value(i - 1)); - - futures.add(comp.future()); + futures.add(comp.applyAsync((IgniteClosure<Object, Object>)factory.create(), value(i - 1))); } // Wait for results. Collection<Object> results = new ArrayList<>(MAX_JOB_COUNT); - for (ComputeTaskFuture<Object> future : futures) + for (IgniteFuture<Object> future : futures) results.add(future.get()); checkResultsClassCount(MAX_JOB_COUNT - 1, results, value(0).getClass()); @@ -339,7 +480,7 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat /** * @throws Exception If failed. */ - public void testApplySync() throws Exception { + public void testApply() throws Exception { runTest(closureFactories, new ComputeTest() { @Override public void test(Factory factory, Ignite ignite) throws Exception { Collection<Object> results = new ArrayList<>(MAX_JOB_COUNT); @@ -383,6 +524,32 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat /** * @throws Exception If failed. */ + public void testApplyForCollectionAsync() throws Exception { + runTest(closureFactories, new ComputeTest() { + @Override public void test(Factory factory, Ignite ignite) throws Exception { + Collection params = new ArrayList<>(MAX_JOB_COUNT); + + for (int i = 0; i < MAX_JOB_COUNT; ++i) { + // value(i - 1): use negative argument of the value method to generate nullong value. + params.add(value(i - 1)); + } + + IgniteClosure c = (IgniteClosure)factory.create(); + + // Use type casting to avoid ambiguous for apply(Callable, Object) vs apply(Callable, Collection<Object>). + IgniteFuture<Collection<Object>> fut = ignite.compute().applyAsync( + (IgniteClosure<TestObject, Object>)c, + (Collection<TestObject>)params); + + checkResultsClassCount(MAX_JOB_COUNT - 1, fut.get(), value(0).getClass()); + assertCollectionsEquals("Results value mismatch", createGoldenResults(), fut.get()); + } + }); + } + + /** + * @throws Exception If failed. + */ public void testApplyForCollectionWithReducer() throws Exception { runTest(closureFactories, new ComputeTest() { @Override public void test(Factory factory, Ignite ignite) throws Exception { @@ -409,6 +576,7 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat return true; } }); + assertTrue(res); } }); @@ -417,24 +585,58 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat /** * @throws Exception If failed. */ + public void testApplyForCollectionWithReducerAsync() throws Exception { + runTest(closureFactories, new ComputeTest() { + @Override public void test(Factory factory, Ignite ignite) throws Exception { + Collection<Object> params = new ArrayList<>(MAX_JOB_COUNT); + + for (int i = 0; i < MAX_JOB_COUNT; ++i) { + // value(i - 1): use negative argument of the value method to generate nullong value. + params.add(value(i - 1)); + } + + IgniteFuture<Boolean> fut = ignite.compute() + .applyAsync((IgniteClosure<Object, Object>)factory.create(), params, new IgniteReducer<Object, Boolean>() { + + private Collection<Object> results = new ArrayList<>(MAX_JOB_COUNT); + + @Override public boolean collect(@Nullable Object o) { + results.add(o); + return true; + } + + @Override public Boolean reduce() { + checkResultsClassCount(MAX_JOB_COUNT - 1, results, value(0).getClass()); + assertCollectionsEquals("Results value mismatch", createGoldenResults(), results); + return true; + } + }); + + assertTrue(fut.get()); + } + }); + } + + /** + * @throws Exception If failed. + */ public void testCallAsync() throws Exception { runTest(callableFactories, new ComputeTest() { @Override public void test(Factory factory, Ignite ignite) throws Exception { - final IgniteCompute comp = ignite.compute().withAsync(); + final IgniteCompute comp = ignite.compute(); - Collection<ComputeTaskFuture<Object>> futures = new ArrayList<>(MAX_JOB_COUNT); + Collection<IgniteFuture<Object>> futures = new ArrayList<>(MAX_JOB_COUNT); for (int i = 0; i < MAX_JOB_COUNT; ++i) { EchoCallable job = (EchoCallable)factory.create(); job.setArg(value(i - 1)); - comp.call(job); - futures.add(comp.future()); + futures.add(comp.callAsync(job)); } // Wait for results. Collection<Object> results = new ArrayList<>(MAX_JOB_COUNT); - for (ComputeTaskFuture<Object> future : futures) + for (IgniteFuture<Object> future : futures) results.add(future.get()); checkResultsClassCount(MAX_JOB_COUNT - 1, results, value(0).getClass()); @@ -446,7 +648,7 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat /** * @throws Exception If failed. */ - public void testCallSync() throws Exception { + public void testCall() throws Exception { runTest(callableFactories, new ComputeTest() { @Override public void test(Factory factory, Ignite ignite) throws Exception { Collection<Object> results = new ArrayList<>(MAX_JOB_COUNT); @@ -488,6 +690,28 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat /** * @throws Exception If failed. */ + public void testCallCollectionAsync() throws Exception { + runTest(callableFactories, new ComputeTest() { + @Override public void test(Factory factory, Ignite ignite) throws Exception { + Collection<EchoCallable> jobs = new ArrayList<>(MAX_JOB_COUNT); + + for (int i = 0; i < MAX_JOB_COUNT; ++i) { + EchoCallable job = (EchoCallable)factory.create(); + job.setArg(value(i - 1)); + jobs.add(job); + } + + IgniteFuture<Collection<Object>> fut = ignite.compute().callAsync(jobs); + + checkResultsClassCount(MAX_JOB_COUNT - 1, fut.get(), value(0).getClass()); + assertCollectionsEquals("Results value mismatch", createGoldenResults(), fut.get()); + } + }); + } + + /** + * @throws Exception If failed. + */ public void testCallCollectionWithReducer() throws Exception { runTest(callableFactories, new ComputeTest() { @Override public void test(Factory factory, Ignite ignite) throws Exception { @@ -522,7 +746,41 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat /** * @throws Exception If failed. */ - public void testDummyAffinityCall() throws Exception { + public void testCallCollectionWithReducerAsync() throws Exception { + runTest(callableFactories, new ComputeTest() { + @Override public void test(Factory factory, Ignite ignite) throws Exception { + Collection<EchoCallable> jobs = new ArrayList<>(MAX_JOB_COUNT); + + for (int i = 0; i < MAX_JOB_COUNT; ++i) { + EchoCallable job = (EchoCallable)factory.create(); + job.setArg(value(i - 1)); + jobs.add(job); + } + + IgniteFuture<Boolean> fut = ignite.compute().callAsync(jobs, new IgniteReducer<Object, Boolean>() { + private Collection<Object> results = new ArrayList<>(MAX_JOB_COUNT); + + @Override public boolean collect(@Nullable Object o) { + results.add(o); + return true; + } + + @Override public Boolean reduce() { + checkResultsClassCount(MAX_JOB_COUNT - 1, results, value(0).getClass()); + assertCollectionsEquals("Results value mismatch", createGoldenResults(), results); + return true; + } + }); + + assertTrue(fut.get()); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testAffinityCall() throws Exception { runTest(callableFactories, new ComputeTest() { @Override public void test(Factory factory, Ignite ignite) throws Exception { ignite.getOrCreateCache(CACHE_NAME); @@ -548,7 +806,147 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat /** * @throws Exception If failed. */ - public void testDummyAffinityRun() throws Exception { + public void testAffinityCallAsync() throws Exception { + runTest(callableFactories, new ComputeTest() { + @Override public void test(Factory factory, Ignite ignite) throws Exception { + ignite.getOrCreateCache(CACHE_NAME); + + final IgniteCompute comp = ignite.compute(); + + Collection<Object> results = new ArrayList<>(MAX_JOB_COUNT); + + for (int i = 0; i < MAX_JOB_COUNT; ++i) { + EchoCallable job = (EchoCallable)factory.create(); + + job.setArg(value(i - 1)); + + IgniteFuture<Object> fut = comp.affinityCallAsync("test", key(0), job); + + results.add(fut.get()); + } + + checkResultsClassCount(MAX_JOB_COUNT - 1, results, value(0).getClass()); + assertCollectionsEquals("Results value mismatch", createGoldenResults(), results); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testMultiCacheAffinityCall() throws Exception { + runTest(callableFactories, new ComputeTest() { + @Override public void test(Factory factory, Ignite ignite) throws Exception { + ignite.getOrCreateCache("test0"); + ignite.getOrCreateCache("test1"); + + final IgniteCompute comp = ignite.compute(); + + Collection<Object> results = new ArrayList<>(MAX_JOB_COUNT); + + for (int i = 0; i < MAX_JOB_COUNT; ++i) { + EchoCallable job = (EchoCallable)factory.create(); + + job.setArg(value(i - 1)); + + results.add(comp.affinityCall(Arrays.asList("test0", "test1"), key(0), job)); + } + + checkResultsClassCount(MAX_JOB_COUNT - 1, results, value(0).getClass()); + assertCollectionsEquals("Results value mismatch", createGoldenResults(), results); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testMultiCacheAffinityCallAsync() throws Exception { + runTest(callableFactories, new ComputeTest() { + @Override public void test(Factory factory, Ignite ignite) throws Exception { + ignite.getOrCreateCache("test0"); + ignite.getOrCreateCache("test1"); + + final IgniteCompute comp = ignite.compute(); + + Collection<Object> results = new ArrayList<>(MAX_JOB_COUNT); + + for (int i = 0; i < MAX_JOB_COUNT; ++i) { + EchoCallable job = (EchoCallable)factory.create(); + + job.setArg(value(i - 1)); + + IgniteFuture<Object> fut = comp.affinityCallAsync(Arrays.asList("test0", "test1"), key(0), job); + + results.add(fut.get()); + } + + checkResultsClassCount(MAX_JOB_COUNT - 1, results, value(0).getClass()); + assertCollectionsEquals("Results value mismatch", createGoldenResults(), results); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testMultiCacheByPartIdAffinityCall() throws Exception { + runTest(callableFactories, new ComputeTest() { + @Override public void test(Factory factory, Ignite ignite) throws Exception { + ignite.getOrCreateCache("test0"); + ignite.getOrCreateCache("test1"); + + final IgniteCompute comp = ignite.compute(); + + Collection<Object> results = new ArrayList<>(MAX_JOB_COUNT); + + for (int i = 0; i < MAX_JOB_COUNT; ++i) { + EchoCallable job = (EchoCallable)factory.create(); + + job.setArg(value(i - 1)); + + results.add(comp.affinityCall(Arrays.asList("test0", "test1"), 0, job)); + } + + checkResultsClassCount(MAX_JOB_COUNT - 1, results, value(0).getClass()); + assertCollectionsEquals("Results value mismatch", createGoldenResults(), results); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testMultiCacheByPartIdAffinityCallAsync() throws Exception { + runTest(callableFactories, new ComputeTest() { + @Override public void test(Factory factory, Ignite ignite) throws Exception { + ignite.getOrCreateCache("test0"); + ignite.getOrCreateCache("test1"); + + final IgniteCompute comp = ignite.compute(); + + Collection<Object> results = new ArrayList<>(MAX_JOB_COUNT); + + for (int i = 0; i < MAX_JOB_COUNT; ++i) { + EchoCallable job = (EchoCallable)factory.create(); + + job.setArg(value(i - 1)); + + IgniteFuture fut = comp.affinityCallAsync(Arrays.asList("test0", "test1"), 0, job); + + results.add(fut.get()); + } + + checkResultsClassCount(MAX_JOB_COUNT - 1, results, value(0).getClass()); + assertCollectionsEquals("Results value mismatch", createGoldenResults(), results); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testAffinityRun() throws Exception { runTest(runnableFactories, new ComputeTest() { @Override public void test(Factory factory, Ignite ignite) throws Exception { ignite.getOrCreateCache(CACHE_NAME); @@ -567,6 +965,111 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat /** * @throws Exception If failed. */ + public void testAffinityRunAsync() throws Exception { + runTest(runnableFactories, new ComputeTest() { + @Override public void test(Factory factory, Ignite ignite) throws Exception { + ignite.getOrCreateCache(CACHE_NAME); + + final IgniteCompute comp = ignite.compute(); + + for (int i = 0; i < MAX_JOB_COUNT; ++i) { + IgniteRunnable job = (IgniteRunnable)factory.create(); + + IgniteFuture<Void> fut = comp.affinityRunAsync("test", key(0), job); + + fut.get(); + } + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testMultiCacheAffinityRun() throws Exception { + runTest(runnableFactories, new ComputeTest() { + @Override public void test(Factory factory, Ignite ignite) throws Exception { + ignite.getOrCreateCache("test0"); + ignite.getOrCreateCache("test1"); + + final IgniteCompute comp = ignite.compute(); + + for (int i = 0; i < MAX_JOB_COUNT; ++i) { + IgniteRunnable job = (IgniteRunnable)factory.create(); + + comp.affinityRun(Arrays.asList("test0", "test1"), key(0), job); + } + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testMultiCacheAffinityRunAsync() throws Exception { + runTest(runnableFactories, new ComputeTest() { + @Override public void test(Factory factory, Ignite ignite) throws Exception { + ignite.getOrCreateCache("test0"); + ignite.getOrCreateCache("test1"); + + final IgniteCompute comp = ignite.compute(); + + for (int i = 0; i < MAX_JOB_COUNT; ++i) { + IgniteRunnable job = (IgniteRunnable)factory.create(); + + IgniteFuture<Void> fut = comp.affinityRunAsync(Arrays.asList("test0", "test1"), key(0), job); + + fut.get(); + } + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testMultiCacheByPartIdAffinityRun() throws Exception { + runTest(runnableFactories, new ComputeTest() { + @Override public void test(Factory factory, Ignite ignite) throws Exception { + ignite.getOrCreateCache("test0"); + ignite.getOrCreateCache("test1"); + + final IgniteCompute comp = ignite.compute(); + + for (int i = 0; i < MAX_JOB_COUNT; ++i) { + IgniteRunnable job = (IgniteRunnable)factory.create(); + + comp.affinityRun(Arrays.asList("test0", "test1"), 0, job); + } + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testMultiCacheByPartIdAffinityRunAsync() throws Exception { + runTest(runnableFactories, new ComputeTest() { + @Override public void test(Factory factory, Ignite ignite) throws Exception { + ignite.getOrCreateCache("test0"); + ignite.getOrCreateCache("test1"); + + final IgniteCompute comp = ignite.compute(); + + for (int i = 0; i < MAX_JOB_COUNT; ++i) { + IgniteRunnable job = (IgniteRunnable)factory.create(); + + IgniteFuture<Void> fut = comp.affinityRunAsync(Arrays.asList("test0", "test1"), 0, job); + + fut.get(); + } + } + }); + } + + /** + * @throws Exception If failed. + */ public void testDeployExecuteByName() throws Exception { runTest(jobFactories, new ComputeTest() { @Override public void test(Factory factory, Ignite ignite) throws Exception {
http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java index b50dfb7..1ee4744 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java @@ -29,7 +29,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteEvents; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; @@ -251,6 +250,175 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testApiAsyncOld() throws Exception { + IgniteEvents evtAsync = grid(0).events().withAsync(); + + try { + evtAsync.stopRemoteListen(null); + evtAsync.future().get(); + } + catch (NullPointerException ignored) { + // No-op. + } + + evtAsync.stopRemoteListen(UUID.randomUUID()); + evtAsync.future().get(); + + UUID consumeId = null; + + try { + evtAsync.remoteListen( + new P2<UUID, DiscoveryEvent>() { + @Override public boolean apply(UUID uuid, DiscoveryEvent evt) { + return false; + } + }, + new P1<DiscoveryEvent>() { + @Override public boolean apply(DiscoveryEvent e) { + return false; + } + }, + EVTS_DISCOVERY + ); + + consumeId = (UUID)evtAsync.future().get(); + + assertNotNull(consumeId); + } + finally { + evtAsync.stopRemoteListen(consumeId); + evtAsync.future().get(); + } + + try { + evtAsync.remoteListen( + new P2<UUID, DiscoveryEvent>() { + @Override public boolean apply(UUID uuid, DiscoveryEvent evt) { + return false; + } + }, + new P1<DiscoveryEvent>() { + @Override public boolean apply(DiscoveryEvent e) { + return false; + } + } + ); + + consumeId = (UUID)evtAsync.future().get(); + + assertNotNull(consumeId); + } + finally { + evtAsync.stopRemoteListen(consumeId); + evtAsync.future().get(); + } + + try { + evtAsync.remoteListen( + new P2<UUID, Event>() { + @Override public boolean apply(UUID uuid, Event evt) { + return false; + } + }, + new P1<Event>() { + @Override public boolean apply(Event e) { + return false; + } + } + ); + + consumeId = (UUID)evtAsync.future().get(); + + assertNotNull(consumeId); + } + finally { + evtAsync.stopRemoteListen(consumeId); + evtAsync.future().get(); + } + } + + /** + * @throws Exception If failed. + */ + public void testApiAsync() throws Exception { + IgniteEvents evt = grid(0).events(); + + try { + evt.stopRemoteListenAsync(null).get(); + } + catch (NullPointerException ignored) { + // No-op. + } + + evt.stopRemoteListenAsync(UUID.randomUUID()).get(); + + UUID consumeId = null; + + try { + consumeId = evt.remoteListenAsync( + new P2<UUID, DiscoveryEvent>() { + @Override public boolean apply(UUID uuid, DiscoveryEvent evt) { + return false; + } + }, + new P1<DiscoveryEvent>() { + @Override public boolean apply(DiscoveryEvent e) { + return false; + } + }, + EVTS_DISCOVERY + ).get(); + + assertNotNull(consumeId); + } + finally { + evt.stopRemoteListenAsync(consumeId).get(); + } + + try { + consumeId = evt.remoteListenAsync( + new P2<UUID, DiscoveryEvent>() { + @Override public boolean apply(UUID uuid, DiscoveryEvent evt) { + return false; + } + }, + new P1<DiscoveryEvent>() { + @Override public boolean apply(DiscoveryEvent e) { + return false; + } + } + ).get(); + + assertNotNull(consumeId); + } + finally { + evt.stopRemoteListenAsync(consumeId).get(); + } + + try { + consumeId = evt.remoteListenAsync( + new P2<UUID, Event>() { + @Override public boolean apply(UUID uuid, Event evt) { + return false; + } + }, + new P1<Event>() { + @Override public boolean apply(Event e) { + return false; + } + } + ).get(); + + assertNotNull(consumeId); + } + finally { + evt.stopRemoteListenAsync(consumeId).get(); + } + } + + /** + * @throws Exception If failed. + */ public void testAllEvents() throws Exception { final Collection<UUID> nodeIds = new HashSet<>(); final AtomicInteger cnt = new AtomicInteger(); @@ -990,15 +1158,13 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest { int idx = rnd.nextInt(GRID_CNT); try { - IgniteEvents evts = grid(idx).events().withAsync(); + IgniteEvents evts = grid(idx).events(); - evts.remoteListen(new P2<UUID, Event>() { + UUID consumeId = evts.remoteListenAsync(new P2<UUID, Event>() { @Override public boolean apply(UUID uuid, Event evt) { return true; } - }, null, EVT_JOB_STARTED); - - UUID consumeId = evts.<UUID>future().get(3000); + }, null, EVT_JOB_STARTED).get(3000); started.add(consumeId); @@ -1029,11 +1195,9 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest { UUID consumeId = t.get2(); try { - IgniteEvents evts = grid(idx).events().withAsync(); - - evts.stopRemoteListen(consumeId); + IgniteEvents evts = grid(idx).events(); - evts.future().get(3000); + evts.stopRemoteListenAsync(consumeId).get(3000); stopped.add(consumeId); } @@ -1063,11 +1227,7 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest { int idx = rnd.nextInt(GRID_CNT); try { - IgniteCompute comp = grid(idx).compute().withAsync(); - - comp.run(F.noop()); - - comp.future().get(3000); + grid(idx).compute().runAsync(F.noop()).get(3000); } catch (IgniteException ignored) { // Ignore all job execution related errors. @@ -1089,11 +1249,7 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest { int idx = t.get1(); UUID consumeId = t.get2(); - IgniteEvents evts = grid(idx).events().withAsync(); - - evts.stopRemoteListen(consumeId); - - evts.future().get(3000); + grid(idx).events().stopRemoteListenAsync(consumeId).get(3000); stopped.add(consumeId); } http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java index 04c67dc..1217005 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java @@ -266,6 +266,13 @@ public class IgfsMock implements IgfsEx { } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> formatAsync() throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException { throwUnsupported(); @@ -274,6 +281,14 @@ public class IgfsMock implements IgfsEx { } /** {@inheritDoc} */ + @Override public <T, R> IgniteFuture<R> executeAsync(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, + Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) throws IgniteException { @@ -283,6 +298,15 @@ public class IgfsMock implements IgfsEx { } /** {@inheritDoc} */ + @Override public <T, R> IgniteFuture<R> executeAsync(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, + Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, + @Nullable T arg) throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException { throwUnsupported(); @@ -291,6 +315,14 @@ public class IgfsMock implements IgfsEx { } /** {@inheritDoc} */ + @Override public <T, R> IgniteFuture<R> executeAsync(Class<? extends IgfsTask<T, R>> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) throws IgniteException { @@ -300,6 +332,15 @@ public class IgfsMock implements IgfsEx { } /** {@inheritDoc} */ + @Override public <T, R> IgniteFuture<R> executeAsync(Class<? extends IgfsTask<T, R>> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, + @Nullable T arg) throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ @Override public boolean exists(IgfsPath path) { throwUnsupported(); http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java index 0d468b4..36c99dc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java @@ -168,6 +168,25 @@ public class IgfsTaskSelfTest extends IgfsCommonAbstractTest { } /** + * Test task. + * + * @throws Exception If failed. + */ + @SuppressWarnings("ConstantConditions") + public void testTaskAsync() throws Exception { + String arg = DICTIONARY[new Random(System.currentTimeMillis()).nextInt(DICTIONARY.length)]; + + generateFile(TOTAL_WORDS); + Long genLen = igfs.info(FILE).length(); + + IgniteBiTuple<Long, Integer> taskRes = igfs.executeAsync(new Task(), + new IgfsStringDelimiterRecordResolver(" "), Collections.singleton(FILE), arg).get(); + + assert F.eq(genLen, taskRes.getKey()); + assert F.eq(TOTAL_WORDS, taskRes.getValue()); + } + + /** * Generate file with random data and provided argument. * * @param wordCnt Word count. http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java index 3e90a52..3e547d0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java @@ -186,18 +186,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria public void testOrderedMessage() throws Exception { runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - orderedMessage(false); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testOrderedMessageAsync() throws Exception { - runInAllDataModes(new TestRunnable() { - @Override public void run() throws Exception { - orderedMessage(true); + orderedMessage(); } }); } @@ -211,26 +200,11 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - clientServerOrderedMessage(false); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testClientServerOrderedMessageAsync() throws Exception { - if (!testsCfg.withClients()) - return; - - runInAllDataModes(new TestRunnable() { - @Override public void run() throws Exception { - clientServerOrderedMessage(true); + clientServerOrderedMessage(); } }); } - /** * @throws Exception If failed. */ @@ -240,21 +214,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - clientClientOrderedMessage(false); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testClientClientOrderedMessageAsync() throws Exception { - if (!testsCfg.withClients()) - return; - - runInAllDataModes(new TestRunnable() { - @Override public void run() throws Exception { - clientClientOrderedMessage(true); + clientClientOrderedMessage(); } }); } @@ -268,21 +228,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - serverClientOrderedMessage(false); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testServerClientOrderedMessageAsync() throws Exception { - if (!testsCfg.withClients()) - return; - - runInAllDataModes(new TestRunnable() { - @Override public void run() throws Exception { - serverClientOrderedMessage(true); + serverClientOrderedMessage(); } }); } @@ -451,68 +397,63 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria } /** - * @param async Async message send flag. * @throws Exception If fail. */ - private void orderedMessage(boolean async) throws Exception { + private void orderedMessage() throws Exception { Ignite ignite = grid(SERVER_NODE_IDX); ClusterGroup grp = gridCount() > 1 ? ignite.cluster().forRemotes() : ignite.cluster().forLocal(); assert grp.nodes().size() > 0; - registerListenerAndSendOrderedMessages(ignite, grp, async); + registerListenerAndSendOrderedMessages(ignite, grp); } /** - * @param async Async message send flag. * @throws Exception If fail. */ - private void clientServerOrderedMessage(boolean async) throws Exception { + private void clientServerOrderedMessage() throws Exception { Ignite ignite = grid(CLIENT_NODE_IDX); ClusterGroup grp = ignite.cluster().forServers(); assert grp.nodes().size() > 0; - registerListenerAndSendOrderedMessages(ignite, grp, async); + registerListenerAndSendOrderedMessages(ignite, grp); } /** - * @param async Async message send flag. * @throws Exception If fail. */ - private void clientClientOrderedMessage(boolean async) throws Exception { + private void clientClientOrderedMessage() throws Exception { Ignite ignite = grid(CLIENT_NODE_IDX); ClusterGroup grp = ignite.cluster().forClients(); assert grp.nodes().size() > 0; - registerListenerAndSendOrderedMessages(ignite, grp, async); + registerListenerAndSendOrderedMessages(ignite, grp); } /** - * @param async Async message send flag. * @throws Exception If fail. */ - private void serverClientOrderedMessage(boolean async) throws Exception { + private void serverClientOrderedMessage() throws Exception { Ignite ignite = grid(SERVER_NODE_IDX); ClusterGroup grp = ignite.cluster().forClients(); assert grp.nodes().size() > 0; - registerListenerAndSendOrderedMessages(ignite, grp, async); + registerListenerAndSendOrderedMessages(ignite, grp); } /** * @param ignite Ignite. * @param grp Cluster group. - * @param async Async message send flag. * @throws Exception If fail. */ - private void registerListenerAndSendOrderedMessages(Ignite ignite, ClusterGroup grp, boolean async) throws Exception { + private void registerListenerAndSendOrderedMessages(Ignite ignite, ClusterGroup grp) throws Exception { int messages = MSGS; LATCH = new CountDownLatch(grp.nodes().size() * messages); @@ -520,12 +461,8 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria UUID opId = ignite.message(grp).remoteListen(MESSAGE_TOPIC, new OrderedMessageListener()); try { - for (int i=0; i < messages; i++){ - if (async) - ignite.message(grp).withAsync().sendOrdered(MESSAGE_TOPIC, value(i), 2000); - else - ignite.message(grp).sendOrdered(MESSAGE_TOPIC, value(i), 2000); - } + for (int i=0; i < messages; i++) + ignite.message(grp).sendOrdered(MESSAGE_TOPIC, value(i), 2000); assertTrue(LATCH.await(10, TimeUnit.SECONDS)); http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java index c6505ba..f9d1632 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java @@ -147,8 +147,8 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs /** * @throws Exception If failed. */ - public void testSameConfiguration() throws Exception { - String name = "dupService"; + public void testSameConfigurationOld() throws Exception { + String name = "dupServiceOld"; IgniteServices svcs1 = randomGrid().services().withAsync(); IgniteServices svcs2 = randomGrid().services().withAsync(); @@ -176,8 +176,33 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs /** * @throws Exception If failed. */ - public void testDifferentConfiguration() throws Exception { - String name = "dupService"; + public void testSameConfiguration() throws Exception { + String name = "dupServiceOld"; + + IgniteServices svcs1 = randomGrid().services(); + IgniteServices svcs2 = randomGrid().services(); + + IgniteFuture<?> fut1 = svcs1.deployClusterSingletonAsync(name, new DummyService()); + + IgniteFuture<?> fut2 = svcs2.deployClusterSingletonAsync(name, new DummyService()); + + info("Deployed service: " + name); + + fut1.get(); + + info("Finished waiting for service future1: " + name); + + // This must succeed without exception because configuration is the same. + fut2.get(); + + info("Finished waiting for service future2: " + name); + } + + /** + * @throws Exception If failed. + */ + public void testDifferentConfigurationOld() throws Exception { + String name = "dupServiceOld"; IgniteServices svcs1 = randomGrid().services().withAsync(); IgniteServices svcs2 = randomGrid().services().withAsync(); @@ -209,6 +234,35 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs /** * @throws Exception If failed. */ + public void testDifferentConfiguration() throws Exception { + String name = "dupService"; + + IgniteServices svcs1 = randomGrid().services(); + IgniteServices svcs2 = randomGrid().services(); + + IgniteFuture<?> fut1 = svcs1.deployClusterSingletonAsync(name, new DummyService()); + + IgniteFuture<?> fut2 = svcs2.deployNodeSingletonAsync(name, new DummyService()); + + info("Deployed service: " + name); + + fut1.get(); + + info("Finished waiting for service future: " + name); + + try { + fut2.get(); + + fail("Failed to receive mismatching configuration exception."); + } + catch (IgniteException e) { + info("Received mismatching configuration exception: " + e.getMessage()); + } + } + + /** + * @throws Exception If failed. + */ public void testGetServiceByName() throws Exception { String name = "serviceByName"; @@ -255,10 +309,10 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs /** * @throws Exception If failed. */ - public void testDeployOnEachNode() throws Exception { + public void testDeployOnEachNodeOld() throws Exception { Ignite g = randomGrid(); - String name = "serviceOnEachNode"; + String name = "serviceOnEachNodeOld"; CountDownLatch latch = new CountDownLatch(nodeCount()); @@ -287,10 +341,38 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs /** * @throws Exception If failed. */ - public void testDeploySingleton() throws Exception { + public void testDeployOnEachNode() throws Exception { Ignite g = randomGrid(); - String name = "serviceSingleton"; + String name = "serviceOnEachNode"; + + CountDownLatch latch = new CountDownLatch(nodeCount()); + + DummyService.exeLatch(name, latch); + + IgniteFuture<?> fut = g.services().deployNodeSingletonAsync(name, new DummyService()); + + info("Deployed service: " + name); + + fut.get(); + + info("Finished waiting for service future: " + name); + + latch.await(); + + assertEquals(name, nodeCount(), DummyService.started(name)); + assertEquals(name, 0, DummyService.cancelled(name)); + + checkCount(name, g.services().serviceDescriptors(), nodeCount()); + } + + /** + * @throws Exception If failed. + */ + public void testDeploySingletonOld() throws Exception { + Ignite g = randomGrid(); + + String name = "serviceSingletonOld"; CountDownLatch latch = new CountDownLatch(1); @@ -319,7 +401,35 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs /** * @throws Exception If failed. */ - public void testAffinityDeploy() throws Exception { + public void testDeploySingleton() throws Exception { + Ignite g = randomGrid(); + + String name = "serviceSingleton"; + + CountDownLatch latch = new CountDownLatch(1); + + DummyService.exeLatch(name, latch); + + IgniteFuture<?> fut = g.services().deployClusterSingletonAsync(name, new DummyService()); + + info("Deployed service: " + name); + + fut.get(); + + info("Finished waiting for service future: " + name); + + latch.await(); + + assertEquals(name, 1, DummyService.started(name)); + assertEquals(name, 0, DummyService.cancelled(name)); + + checkCount(name, g.services().serviceDescriptors(), 1); + } + + /** + * @throws Exception If failed. + */ + public void testAffinityDeployOld() throws Exception { Ignite g = randomGrid(); final Integer affKey = 1; @@ -327,7 +437,7 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs // Store a cache key. g.cache(CACHE_NAME).put(affKey, affKey.toString()); - String name = "serviceAffinity"; + String name = "serviceAffinityOld"; IgniteServices svcs = g.services().withAsync(); @@ -348,10 +458,35 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs /** * @throws Exception If failed. */ - public void testDeployMultiple1() throws Exception { + public void testAffinityDeploy() throws Exception { Ignite g = randomGrid(); - String name = "serviceMultiple1"; + final Integer affKey = 1; + + // Store a cache key. + g.cache(CACHE_NAME).put(affKey, affKey.toString()); + + String name = "serviceAffinity"; + + IgniteFuture<?> fut = g.services().deployKeyAffinitySingletonAsync(name, new AffinityService(affKey), + CACHE_NAME, affKey); + + info("Deployed service: " + name); + + fut.get(); + + info("Finished waiting for service future: " + name); + + checkCount(name, g.services().serviceDescriptors(), 1); + } + + /** + * @throws Exception If failed. + */ + public void testDeployMultiple1Old() throws Exception { + Ignite g = randomGrid(); + + String name = "serviceMultiple1Old"; CountDownLatch latch = new CountDownLatch(nodeCount() * 2); @@ -380,10 +515,38 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs /** * @throws Exception If failed. */ - public void testDeployMultiple2() throws Exception { + public void testDeployMultiple1() throws Exception { Ignite g = randomGrid(); - String name = "serviceMultiple2"; + String name = "serviceMultiple1"; + + CountDownLatch latch = new CountDownLatch(nodeCount() * 2); + + DummyService.exeLatch(name, latch); + + IgniteFuture<?> fut = g.services().deployMultipleAsync(name, new DummyService(), nodeCount() * 2, 3); + + info("Deployed service: " + name); + + fut.get(); + + info("Finished waiting for service future: " + name); + + latch.await(); + + assertEquals(name, nodeCount() * 2, DummyService.started(name)); + assertEquals(name, 0, DummyService.cancelled(name)); + + checkCount(name, g.services().serviceDescriptors(), nodeCount() * 2); + } + + /** + * @throws Exception If failed. + */ + public void testDeployMultiple2Old() throws Exception { + Ignite g = randomGrid(); + + String name = "serviceMultiple2Old"; int cnt = nodeCount() * 2 + 1; @@ -414,6 +577,36 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs /** * @throws Exception If failed. */ + public void testDeployMultiple2() throws Exception { + Ignite g = randomGrid(); + + String name = "serviceMultiple2"; + + int cnt = nodeCount() * 2 + 1; + + CountDownLatch latch = new CountDownLatch(cnt); + + DummyService.exeLatch(name, latch); + + IgniteFuture<?> fut = g.services().deployMultipleAsync(name, new DummyService(), cnt, 3); + + info("Deployed service: " + name); + + fut.get(); + + info("Finished waiting for service future: " + name); + + latch.await(); + + assertEquals(name, cnt, DummyService.started(name)); + assertEquals(name, 0, DummyService.cancelled(name)); + + checkCount(name, g.services().serviceDescriptors(), cnt); + } + + /** + * @throws Exception If failed. + */ public void testCancelSingleton() throws Exception { Ignite g = randomGrid(); @@ -449,6 +642,41 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs /** * @throws Exception If failed. */ + public void testCancelSingletonAsync() throws Exception { + Ignite g = randomGrid(); + + String name = "serviceCancelAsync"; + + CountDownLatch latch = new CountDownLatch(1); + + DummyService.exeLatch(name, latch); + + g.services().deployClusterSingleton(name, new DummyService()); + + info("Deployed service: " + name); + + latch.await(); + + assertEquals(name, 1, DummyService.started(name)); + assertEquals(name, 0, DummyService.cancelled(name)); + + latch = new CountDownLatch(1); + + DummyService.cancelLatch(name, latch); + + g.services().cancelAsync(name).get(); + + info("Cancelled service: " + name); + + latch.await(); + + assertEquals(name, 1, DummyService.started(name)); + assertEquals(name, 1, DummyService.cancelled(name)); + } + + /** + * @throws Exception If failed. + */ public void testCancelEachNode() throws Exception { Ignite g = randomGrid(); @@ -482,6 +710,41 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs } /** + * @throws Exception If failed. + */ + public void testCancelAsyncEachNode() throws Exception { + Ignite g = randomGrid(); + + String name = "serviceCancelEachNodeAsync"; + + CountDownLatch latch = new CountDownLatch(nodeCount()); + + DummyService.exeLatch(name, latch); + + g.services().deployNodeSingleton(name, new DummyService()); + + info("Deployed service: " + name); + + latch.await(); + + assertEquals(name, nodeCount(), DummyService.started(name)); + assertEquals(name, 0, DummyService.cancelled(name)); + + latch = new CountDownLatch(nodeCount()); + + DummyService.cancelLatch(name, latch); + + g.services().cancelAsync(name).get(); + + info("Cancelled service: " + name); + + latch.await(); + + assertEquals(name, nodeCount(), DummyService.started(name)); + assertEquals(name, nodeCount(), DummyService.cancelled(name)); + } + + /** * @param svcName Service name. * @param descs Descriptors. * @param cnt Expected count. http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java index 39336ef..9b787a2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java @@ -44,11 +44,9 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA DummyService.exeLatch(name, latch); - IgniteServices svcs = g.services().withAsync(); + IgniteServices svcs = g.services(); - svcs.deployClusterSingleton(name, new DummyService()); - - IgniteFuture<?> fut = svcs.future(); + IgniteFuture<?> fut = svcs.deployClusterSingletonAsync(name, new DummyService()); info("Deployed service: " + name); @@ -91,13 +89,11 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA String name = "serviceAffinityUpdateTopology"; - IgniteServices svcs = g.services().withAsync(); + IgniteServices svcs = g.services(); - svcs.deployKeyAffinitySingleton(name, new AffinityService(affKey), + IgniteFuture<?> fut = svcs.deployKeyAffinitySingletonAsync(name, new AffinityService(affKey), CACHE_NAME, affKey); - IgniteFuture<?> fut = svcs.future(); - info("Deployed service: " + name); fut.get(); @@ -130,11 +126,9 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA DummyService.exeLatch(name, latch); - IgniteServices svcs = g.services().withAsync(); - - svcs.deployNodeSingleton(name, new DummyService()); + IgniteServices svcs = g.services(); - IgniteFuture<?> fut = svcs.future(); + IgniteFuture<?> fut = svcs.deployNodeSingletonAsync(name, new DummyService()); info("Deployed service: " + name); http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java index 03b00f4..8eefa20 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java @@ -32,6 +32,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.services.Service; import org.apache.ignite.services.ServiceContext; import org.apache.ignite.testframework.GridTestUtils; @@ -62,14 +63,12 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest { @Override public Void call() throws Exception { IgniteServices svcs = ignite.services(); - IgniteServices services = svcs.withAsync(); - - services.deployClusterSingleton("myClusterSingletonService", new TestServiceImpl()); + IgniteFuture f = svcs.deployClusterSingletonAsync("myClusterSingletonService", new TestServiceImpl()); depLatch.countDown(); try { - services.future().get(); + f.get(); } catch (IgniteException ignored) { finishLatch.countDown(); http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/loadtest/GridSingleExecutionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtest/GridSingleExecutionTest.java b/modules/core/src/test/java/org/apache/ignite/loadtest/GridSingleExecutionTest.java index 272c7ad..559cfc9 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtest/GridSingleExecutionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtest/GridSingleExecutionTest.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.UUID; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.compute.ComputeJob; @@ -89,9 +88,8 @@ public final class GridSingleExecutionTest { System.exit(1); } else if (args.length >= 2) { - for (IgniteConfiguration cfg: getConfigurations(args[1], args[0])) { + for (IgniteConfiguration cfg: getConfigurations(args[1], args[0])) G.start(cfg); - } } boolean useSes = false; @@ -104,12 +102,8 @@ public final class GridSingleExecutionTest { try { Ignite ignite = G.ignite(); - IgniteCompute comp = ignite.compute().withAsync(); - // Execute Hello World task. - comp.execute(!useSes ? TestTask.class : TestSessionTask.class, null); - - ComputeTaskFuture<Object> fut = comp.future(); + ComputeTaskFuture<Object> fut = ignite.compute().executeAsync(!useSes ? TestTask.class : TestSessionTask.class, null); if (useSes) { fut.getTaskSession().setAttribute("attr1", 1); http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java index 6a43fee..bf34545 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java @@ -26,7 +26,6 @@ import java.util.concurrent.Executors; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.configuration.IgniteConfiguration; @@ -88,13 +87,11 @@ public class GridTestMain { long start = System.currentTimeMillis(); - IgniteCompute comp = g.compute().withAsync(); - // Collocate computations and data. for (long i = 0; i < GridTestConstants.ENTRY_COUNT; i++) { final long key = i; - comp.affinityRun("partitioned", GridTestKey.affinityKey(key), new IgniteRunnable() { + final IgniteFuture<?> f = g.compute().affinityRunAsync("partitioned", GridTestKey.affinityKey(key), new IgniteRunnable() { // This code will execute on remote nodes by collocating keys with cached data. @Override public void run() { Long val = cache.localPeek(new GridTestKey(key), CachePeekMode.ONHEAP); @@ -104,8 +101,6 @@ public class GridTestMain { } }); - final IgniteFuture<?> f = comp.future(); - q.put(f); f.listen(new CI1<IgniteFuture<?>>() { http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridMultiSplitsLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridMultiSplitsLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridMultiSplitsLoadTest.java index ab6b272..c764f67 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridMultiSplitsLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/multisplit/GridMultiSplitsLoadTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.loadtests.direct.multisplit; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteException; import org.apache.ignite.compute.ComputeTaskFuture; import org.apache.ignite.configuration.IgniteConfiguration; @@ -106,8 +105,6 @@ public class GridMultiSplitsLoadTest extends GridCommonAbstractTest { GridTestUtils.runMultiThreaded(new Runnable() { /** {@inheritDoc} */ @Override public void run() { - IgniteCompute comp = ignite.compute().withAsync(); - while (end - System.currentTimeMillis() > 0) { int levels = 3; @@ -116,9 +113,7 @@ public class GridMultiSplitsLoadTest extends GridCommonAbstractTest { long start = System.currentTimeMillis(); try { - comp.execute(GridLoadTestTask.class, levels); - - ComputeTaskFuture<Integer> fut = comp.future(); + ComputeTaskFuture<Integer> fut = ignite.compute().executeAsync(GridLoadTestTask.class, levels); int res = fut.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitsNewNodesAbstractLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitsNewNodesAbstractLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitsNewNodesAbstractLoadTest.java index 5d909c9..6fb7cdf 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitsNewNodesAbstractLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/direct/newnodes/GridSingleSplitsNewNodesAbstractLoadTest.java @@ -19,7 +19,6 @@ package org.apache.ignite.loadtests.direct.newnodes; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCompute; import org.apache.ignite.compute.ComputeTaskFuture; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.util.typedef.G; @@ -141,8 +140,6 @@ public abstract class GridSingleSplitsNewNodesAbstractLoadTest extends GridCommo GridTestUtils.runMultiThreaded(new Runnable() { /** {@inheritDoc} */ @Override public void run() { - IgniteCompute comp = ignite.compute().withAsync(); - while (end - System.currentTimeMillis() > 0 && !Thread.currentThread().isInterrupted()) { long start = System.currentTimeMillis(); @@ -150,9 +147,8 @@ public abstract class GridSingleSplitsNewNodesAbstractLoadTest extends GridCommo try { int levels = 3; - comp.execute(new GridSingleSplitNewNodesTestTask(), levels); - - ComputeTaskFuture<Integer> fut = comp.future(); + ComputeTaskFuture<Integer> fut = ignite.compute().executeAsync( + new GridSingleSplitNewNodesTestTask(), levels); int res = fut.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiClient.java b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiClient.java index a065580..9662882 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiClient.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/dsi/GridDsiClient.java @@ -114,7 +114,7 @@ public class GridDsiClient implements Callable { /** {@inheritDoc} */ @SuppressWarnings({"unchecked", "InfiniteLoopStatement"}) @Nullable @Override public Object call() throws Exception { - IgniteCompute comp = g.compute(g.cluster().forPredicate(serverNode())).withAsync(); + IgniteCompute comp = g.compute(g.cluster().forPredicate(serverNode())); while (!finish.get()) { try { @@ -122,9 +122,8 @@ public class GridDsiClient implements Callable { long submitTime1 = t0; - comp.execute(GridDsiRequestTask.class, new GridDsiMessage(terminalId, nodeId)); - - ComputeTaskFuture<T3<Long, Integer, Integer>> f1 = comp.future(); + ComputeTaskFuture<T3<Long, Integer, Integer>> f1 = comp.executeAsync( + GridDsiRequestTask.class, new GridDsiMessage(terminalId, nodeId)); submitTime.setIfGreater(System.currentTimeMillis() - submitTime1); @@ -132,9 +131,8 @@ public class GridDsiClient implements Callable { submitTime1 = System.currentTimeMillis(); - comp.execute(GridDsiResponseTask.class, new GridDsiMessage(terminalId, nodeId)); - - ComputeTaskFuture<T3<Long, Integer, Integer>> f2 = comp.future(); + ComputeTaskFuture<T3<Long, Integer, Integer>> f2 = comp.executeAsync( + GridDsiResponseTask.class, new GridDsiMessage(terminalId, nodeId)); submitTime.setIfGreater(System.currentTimeMillis() - submitTime1); http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestClientSemaphore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestClientSemaphore.java b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestClientSemaphore.java index 8e55ff9..53c6f50 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestClientSemaphore.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionLoadTestClientSemaphore.java @@ -29,7 +29,6 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterGroup; -import org.apache.ignite.compute.ComputeTaskFuture; import org.apache.ignite.internal.util.lang.GridAbsClosure; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.G; @@ -75,16 +74,10 @@ public class GridJobExecutionLoadTestClientSemaphore implements Callable<Object> ClusterGroup rmts = g.cluster().forRemotes(); - IgniteCompute comp = g.compute(rmts).withAsync(); - while (!finish) { tasksSem.acquire(); - comp.execute(GridJobExecutionLoadTestTask.class, null); - - ComputeTaskFuture<Object> f = comp.future(); - - f.listen(lsnr); + g.compute(rmts).executeAsync(GridJobExecutionLoadTestTask.class, null).listen(lsnr); txCnt.increment(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionSingleNodeSemaphoreLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionSingleNodeSemaphoreLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionSingleNodeSemaphoreLoadTest.java index 2e2ab20..2f94b48 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionSingleNodeSemaphoreLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobExecutionSingleNodeSemaphoreLoadTest.java @@ -24,13 +24,11 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.Semaphore; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCompute; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.compute.ComputeJobResultPolicy; import org.apache.ignite.compute.ComputeTask; -import org.apache.ignite.compute.ComputeTaskFuture; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.util.typedef.CI1; @@ -188,13 +186,7 @@ public class GridJobExecutionSingleNodeSemaphoreLoadTest { @Nullable @Override public Object call() throws Exception { sem.acquire(); - IgniteCompute comp = g.compute().withAsync(); - - comp.execute(GridJobExecutionLoadTestTask.class, null); - - ComputeTaskFuture<Object> f = comp.future(); - - f.listen(lsnr); + g.compute().executeAsync(GridJobExecutionLoadTestTask.class, null).listen(lsnr); iterCntr.increment(); http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestSubmitter.java b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestSubmitter.java index 8dcd828..16a6af8 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestSubmitter.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/job/GridJobLoadTestSubmitter.java @@ -22,7 +22,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Random; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteException; import org.apache.ignite.compute.ComputeTaskFuture; import org.apache.ignite.lang.IgniteFutureCancelledException; @@ -68,8 +67,6 @@ public class GridJobLoadTestSubmitter implements Runnable { /** {@inheritDoc} */ @SuppressWarnings("BusyWait") @Override public void run() { - IgniteCompute comp = ignite.compute().withAsync(); - while (true) { checkCompletion(); @@ -83,9 +80,7 @@ public class GridJobLoadTestSubmitter implements Runnable { } try { - comp.withTimeout(TIMEOUT).execute(GridJobLoadTestTask.class, params); - - futures.add(comp.<Integer>future()); + futures.add(ignite.compute().withTimeout(TIMEOUT).executeAsync(GridJobLoadTestTask.class, params)); } catch (IgniteException e) { // Should not be thrown since uses asynchronous execution. http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/loadtests/mergesort/GridMergeSortLoadTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mergesort/GridMergeSortLoadTask.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mergesort/GridMergeSortLoadTask.java index 8c8f039..cb609ef 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/mergesort/GridMergeSortLoadTask.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/mergesort/GridMergeSortLoadTask.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCompute; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.ComputeJobAdapter; import org.apache.ignite.compute.ComputeJobContext; @@ -77,12 +76,8 @@ public class GridMergeSortLoadTask extends ComputeTaskSplitAdapter<int[], int[]> // Future is null before holdcc() is called and // not null after callcc() is called. if (fut == null) { - IgniteCompute comp = ignite.compute().withAsync(); - // Launch the recursive child task asynchronously. - comp.execute(new GridMergeSortLoadTask(), arr); - - fut = comp.future(); + fut = ignite.compute().executeAsync(new GridMergeSortLoadTask(), arr); // Add a listener to the future, that will resume the // parent task once the child one is completed. http://git-wip-us.apache.org/repos/asf/ignite/blob/282b334f/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java index 3f66c5d..b7ddc3e 100644 --- a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java @@ -1031,7 +1031,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser /** * @throws Exception If failed. */ - public void testAsync() throws Exception { + public void testAsyncOld() throws Exception { final AtomicInteger msgCnt = new AtomicInteger(); TestTcpDiscoverySpi discoSpi = (TestTcpDiscoverySpi)ignite2.configuration().getDiscoverySpi(); @@ -1137,6 +1137,76 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser } /** + * @throws Exception If failed. + */ + public void testAsync() throws Exception { + final AtomicInteger msgCnt = new AtomicInteger(); + + TestTcpDiscoverySpi discoSpi = (TestTcpDiscoverySpi)ignite2.configuration().getDiscoverySpi(); + + discoSpi.blockCustomEvent(); + + final String topic = "topic"; + + IgniteFuture<UUID> starFut = ignite2.message().remoteListenAsync(topic, new P2<UUID, Object>() { + @Override public boolean apply(UUID nodeId, Object msg) { + System.out.println(Thread.currentThread().getName() + + " Listener received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); + + msgCnt.incrementAndGet(); + + return true; + } + }); + + Assert.assertNotNull(starFut); + + U.sleep(500); + + Assert.assertFalse(starFut.isDone()); + + discoSpi.stopBlock(); + + UUID id = starFut.get(); + + Assert.assertNotNull(id); + + Assert.assertTrue(starFut.isDone()); + + discoSpi.blockCustomEvent(); + + message(ignite1.cluster().forRemotes()).send(topic, "msg1"); + + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return msgCnt.get() > 0; + } + }, 5000); + + assertEquals(1, msgCnt.get()); + + IgniteFuture<?> stopFut = ignite2.message().stopRemoteListenAsync(id); + + Assert.assertNotNull(stopFut); + + U.sleep(500); + + Assert.assertFalse(stopFut.isDone()); + + discoSpi.stopBlock(); + + stopFut.get(); + + Assert.assertTrue(stopFut.isDone()); + + message(ignite1.cluster().forRemotes()).send(topic, "msg2"); + + U.sleep(1000); + + assertEquals(1, msgCnt.get()); + } + + /** * */ static class TestTcpDiscoverySpi extends TcpDiscoverySpi { @@ -1231,6 +1301,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser /** * @param expOldestIgnite Expected oldest ignite. + * @throws InterruptedException If failed. */ private void remoteListenForOldest(Ignite expOldestIgnite) throws InterruptedException { ClusterGroup grp = ignite1.cluster().forOldest();
