Repository: ignite Updated Branches: refs/heads/master 9d2ec1cc2 -> b375c1ef5
IGNITE-4477: Added IgniteFuture.listenAsync() and IgniteFuture.chainAsync() methods. This closes #1844. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b375c1ef Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b375c1ef Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b375c1ef Branch: refs/heads/master Commit: b375c1ef504c72b9ffd653933da4bacea5aad5b5 Parents: 9d2ec1c Author: dkarachentsev <[email protected]> Authored: Sat Jun 3 16:43:35 2017 +0300 Committer: devozerov <[email protected]> Committed: Sat Jun 3 16:43:35 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheFutureImpl.java | 8 +- .../util/future/AsyncFutureListener.java | 57 +++ .../internal/util/future/IgniteFutureImpl.java | 33 +- .../org/apache/ignite/lang/IgniteFuture.java | 24 +- .../util/future/IgniteCacheFutureImplTest.java | 46 ++ .../util/future/IgniteFutureImplTest.java | 466 +++++++++++++++++-- .../testsuites/IgniteLangSelfTestSuite.java | 3 + .../processors/schedule/ScheduleFutureImpl.java | 51 +- .../schedule/GridScheduleSelfTest.java | 84 +++- 9 files changed, 724 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b375c1ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java index 74cccc1..c861be8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache; +import java.util.concurrent.Executor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; @@ -43,7 +44,12 @@ public class IgniteCacheFutureImpl<V> extends IgniteFutureImpl<V> { /** {@inheritDoc} */ @Override public <T> IgniteFuture<T> chain(IgniteClosure<? super IgniteFuture<V>, T> doneCb) { - return new IgniteCacheFutureImpl<>(chainInternal(doneCb)); + return new IgniteCacheFutureImpl<>(chainInternal(doneCb, null)); + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<T> chainAsync(IgniteClosure<? super IgniteFuture<V>, T> doneCb, Executor exec) { + return new IgniteCacheFutureImpl<>(chainInternal(doneCb, exec)); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b375c1ef/modules/core/src/main/java/org/apache/ignite/internal/util/future/AsyncFutureListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/AsyncFutureListener.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/AsyncFutureListener.java new file mode 100644 index 0000000..460ce8b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/AsyncFutureListener.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.future; + +import java.util.concurrent.Executor; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteInClosure; + +/** + * Wraps listener and executes it in specified executor. + */ +public class AsyncFutureListener<V> implements IgniteInClosure<IgniteFuture<V>> { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** */ + private final IgniteInClosure<? super IgniteFuture<V>> lsnr; + + /** */ + private final Executor exec; + + /** + * @param lsnr Listener to be called asynchronously. + * @param exec Executor to process listener. + */ + public AsyncFutureListener(IgniteInClosure<? super IgniteFuture<V>> lsnr, Executor exec) { + assert lsnr != null; + assert exec != null; + + this.lsnr = lsnr; + this.exec = exec; + } + + /** {@inheritDoc} */ + @Override public void apply(final IgniteFuture<V> fut) { + exec.execute(new Runnable() { + @Override public void run() { + lsnr.apply(fut); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b375c1ef/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java index 08fae96..a018628 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.util.future; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -28,6 +29,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; +import org.jetbrains.annotations.Nullable; /** * Implementation of public API future. @@ -70,16 +72,34 @@ public class IgniteFutureImpl<V> implements IgniteFuture<V> { } /** {@inheritDoc} */ + @Override public void listenAsync(IgniteInClosure<? super IgniteFuture<V>> lsnr, Executor exec) { + A.notNull(lsnr, "lsnr"); + A.notNull(exec, "exec"); + + fut.listen(new InternalFutureListener(new AsyncFutureListener<>(lsnr, exec))); + } + + /** {@inheritDoc} */ @Override public <T> IgniteFuture<T> chain(final IgniteClosure<? super IgniteFuture<V>, T> doneCb) { - return new IgniteFutureImpl<>(chainInternal(doneCb)); + return new IgniteFutureImpl<>(chainInternal(doneCb, null)); + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<T> chainAsync(final IgniteClosure<? super IgniteFuture<V>, T> doneCb, + Executor exec) { + A.notNull(doneCb, "doneCb"); + A.notNull(exec, "exec"); + + return new IgniteFutureImpl<>(chainInternal(doneCb, exec)); } /** * @param doneCb Done callback. * @return Internal future */ - protected <T> IgniteInternalFuture<T> chainInternal(final IgniteClosure<? super IgniteFuture<V>, T> doneCb) { - return fut.chain(new C1<IgniteInternalFuture<V>, T>() { + protected <T> IgniteInternalFuture<T> chainInternal(final IgniteClosure<? super IgniteFuture<V>, T> doneCb, + @Nullable Executor exec) { + C1<IgniteInternalFuture<V>, T> clos = new C1<IgniteInternalFuture<V>, T>() { @Override public T apply(IgniteInternalFuture<V> fut) { assert IgniteFutureImpl.this.fut == fut; @@ -90,7 +110,12 @@ public class IgniteFutureImpl<V> implements IgniteFuture<V> { throw new GridClosureException(e); } } - }); + }; + + if (exec != null) + return fut.chain(clos, exec); + + return fut.chain(clos); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b375c1ef/modules/core/src/main/java/org/apache/ignite/lang/IgniteFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteFuture.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteFuture.java index 6519ec8..ee297cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteFuture.java @@ -17,6 +17,7 @@ package org.apache.ignite.lang; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteInterruptedException; @@ -96,12 +97,23 @@ public interface IgniteFuture<V> { /** * Registers listener closure to be asynchronously notified whenever future completes. + * Closure will be processed in thread that completes this future or (if future already + * completed) immediately in current thread. * - * @param lsnr Listener closure to register. If not provided - this method is no-op. + * @param lsnr Listener closure to register. Cannot be {@code null}. */ public void listen(IgniteInClosure<? super IgniteFuture<V>> lsnr); /** + * Registers listener closure to be asynchronously notified whenever future completes. + * Closure will be processed in specified executor. + * + * @param lsnr Listener closure to register. Cannot be {@code null}. + * @param exec Executor to run listener. Cannot be {@code null}. + */ + public void listenAsync(IgniteInClosure<? super IgniteFuture<V>> lsnr, Executor exec); + + /** * Make a chained future to convert result of this future (when complete) into a new format. * It is guaranteed that done callback will be called only ONCE. * @@ -109,4 +121,14 @@ public interface IgniteFuture<V> { * @return Chained future that finishes after this future completes and done callback is called. */ public <T> IgniteFuture<T> chain(IgniteClosure<? super IgniteFuture<V>, T> doneCb); + + /** + * Make a chained future to convert result of this future (when complete) into a new format. + * It is guaranteed that done callback will be called only ONCE. + * + * @param doneCb Done callback that is applied to this future when it finishes to produce chained future result. + * @param exec Executor to run done callback. Cannot be {@code null}. + * @return Chained future that finishes after this future completes and done callback is called. + */ + public <T> IgniteFuture<T> chainAsync(IgniteClosure<? super IgniteFuture<V>, T> doneCb, Executor exec); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b375c1ef/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteCacheFutureImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteCacheFutureImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteCacheFutureImplTest.java new file mode 100644 index 0000000..46f1706 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteCacheFutureImplTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.future; + +import javax.cache.CacheException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.IgniteCacheFutureImpl; + +/** + * Tests IgniteCacheFutureImpl. + */ +public class IgniteCacheFutureImplTest extends IgniteFutureImplTest { + /** {@inheritDoc} */ + @Override protected <V> IgniteFutureImpl<V> createFuture(IgniteInternalFuture<V> fut) { + return new IgniteCacheFutureImpl<>(fut); + } + + /** {@inheritDoc} */ + @Override protected Class<? extends Exception> expectedException() { + return CacheException.class; + } + + /** {@inheritDoc} */ + @Override protected void assertExpectedException(Exception e, Exception exp) { + if (exp instanceof IgniteException) + assertEquals(exp, e.getCause().getCause()); + else + assertEquals(exp, e.getCause()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b375c1ef/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteFutureImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteFutureImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteFutureImplTest.java index 1e5b9a8..3a06cf4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteFutureImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteFutureImplTest.java @@ -18,10 +18,17 @@ package org.apache.ignite.internal.util.future; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.internal.U; @@ -29,18 +36,47 @@ import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; /** * */ public class IgniteFutureImplTest extends GridCommonAbstractTest { + /** Context thread name. */ + private static final String CTX_THREAD_NAME = "test-async"; + + /** Custom thread name. */ + private static final String CUSTOM_THREAD_NAME = "test-custom-async"; + + /** Test executor. */ + private ExecutorService ctxExec; + + /** Custom executor. */ + private ExecutorService customExec; + + /** {@inheritDoc} */ + @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") + @Override protected void beforeTest() throws Exception { + ctxExec = createExecutor(CTX_THREAD_NAME); + customExec = createExecutor(CUSTOM_THREAD_NAME); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + U.shutdownNow(getClass(), ctxExec, log); + U.shutdownNow(getClass(), customExec, log); + + ctxExec = null; + customExec = null; + } + /** * @throws Exception If failed. */ public void testFutureGet() throws Exception { GridFutureAdapter<String> fut0 = new GridFutureAdapter<>(); - IgniteFutureImpl<String> fut = new IgniteFutureImpl<>(fut0); + IgniteFutureImpl<String> fut = createFuture(fut0); assertFalse(fut.isDone()); @@ -59,7 +95,7 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest { public void testFutureException() throws Exception { GridFutureAdapter<String> fut0 = new GridFutureAdapter<>(); - final IgniteFutureImpl<String> fut = new IgniteFutureImpl<>(fut0); + final IgniteFutureImpl<String> fut = createFuture(fut0); assertFalse(fut.isDone()); @@ -69,27 +105,27 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest { fut0.onDone(err0); - IgniteException err = (IgniteException)GridTestUtils.assertThrows(log, new Callable<Void>() { + Exception err = (Exception)GridTestUtils.assertThrows(log, new Callable<Void>() { @Override public Void call() throws Exception { fut.get(); return null; } - }, IgniteException.class, "test error"); + }, expectedException(), "test error"); - assertEquals(err0, err.getCause()); + assertExpectedException(err, err0); assertTrue(fut.isDone()); - err = (IgniteException)GridTestUtils.assertThrows(log, new Callable<Void>() { + err = (Exception)GridTestUtils.assertThrows(log, new Callable<Void>() { @Override public Void call() throws Exception { fut.get(); return null; } - }, IgniteException.class, null); + }, expectedException(), null); - assertEquals(err0, err.getCause()); + assertExpectedException(err, err0); } /** @@ -98,21 +134,21 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest { public void testFutureIgniteException() throws Exception { GridFutureAdapter<String> fut0 = new GridFutureAdapter<>(); - final IgniteFutureImpl<String> fut = new IgniteFutureImpl<>(fut0); + final IgniteFutureImpl<String> fut = createFuture(fut0); IgniteException err0 = new IgniteException("test error"); fut0.onDone(err0); - IgniteException err = (IgniteException)GridTestUtils.assertThrows(log, new Callable<Void>() { + Exception err = (Exception)GridTestUtils.assertThrows(log, new Callable<Void>() { @Override public Void call() throws Exception { fut.get(); return null; } - }, IgniteException.class, "test error"); + }, expectedException(), "test error"); - assertEquals(err0, err); + assertExpectedException(err, err0); } /** @@ -121,7 +157,7 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest { public void testListeners() throws Exception { GridFutureAdapter<String> fut0 = new GridFutureAdapter<>(); - IgniteFutureImpl<String> fut = new IgniteFutureImpl<>(fut0); + IgniteFutureImpl<String> fut = createFuture(fut0); final AtomicInteger lsnr1Cnt = new AtomicInteger(); @@ -166,7 +202,7 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest { { GridFutureAdapter<String> fut0 = new GridFutureAdapter<>(); - IgniteFutureImpl<String> fut = new IgniteFutureImpl<>(fut0); + IgniteFutureImpl<String> fut = createFuture(fut0); final IgniteException err0 = new IgniteException("test error"); @@ -179,8 +215,8 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest { fail(); } - catch (IgniteException err) { - assertEquals(err0, err); + catch (IgniteException | CacheException err) { + assertExpectedException(err, err0); passed.set(true); } @@ -197,7 +233,7 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest { { GridFutureAdapter<String> fut0 = new GridFutureAdapter<>(); - IgniteFutureImpl<String> fut = new IgniteFutureImpl<>(fut0); + IgniteFutureImpl<String> fut = createFuture(fut0); final IgniteCheckedException err0 = new IgniteCheckedException("test error"); @@ -210,8 +246,8 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest { fail(); } - catch (IgniteException err) { - assertEquals(err0, err.getCause()); + catch (IgniteException | CacheException err) { + assertExpectedException(err, err0); passed.set(true); } @@ -229,10 +265,130 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testAsyncListeners() throws Exception { + GridFutureAdapter<String> fut0 = new GridFutureAdapter<>(); + + IgniteFutureImpl<String> fut = createFuture(fut0); + + final CountDownLatch latch1 = new CountDownLatch(1); + + IgniteInClosure<? super IgniteFuture<String>> lsnr1 = createAsyncListener(latch1, CUSTOM_THREAD_NAME, null); + + assertFalse(fut.isDone()); + + fut.listenAsync(lsnr1, customExec); + + U.sleep(100); + + assertEquals(1, latch1.getCount()); + + fut0.onDone("test"); + + assert latch1.await(1, TimeUnit.SECONDS) : latch1.getCount(); + + checkAsyncListener(fut); + checkAsyncListener(createFuture(new GridFinishedFuture<>("test"))); + } + + /** + * @param fut Future. + */ + private void checkAsyncListener(IgniteFutureImpl<String> fut) throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + + IgniteInClosure<? super IgniteFuture<String>> lsnr = createAsyncListener(latch, CUSTOM_THREAD_NAME, null); + + fut.listenAsync(lsnr, customExec); + + assert latch.await(1, TimeUnit.SECONDS) : latch.getCount(); + } + + /** + * @throws Exception If failed. + */ + public void testAsyncListenersOnError() throws Exception { + checkAsyncListenerOnError(new IgniteException("Test exception")); + checkAsyncListenerOnError(new IgniteCheckedException("Test checked exception")); + } + + /** + * @param err0 Test exception. + */ + private void checkAsyncListenerOnError(Exception err0) throws InterruptedException { + GridFutureAdapter<String> fut0 = new GridFutureAdapter<>(); + + IgniteFutureImpl<String> fut = createFuture(fut0); + + final CountDownLatch latch1 = new CountDownLatch(1); + + IgniteInClosure<? super IgniteFuture<String>> lsnr1 = createAsyncListener(latch1, CUSTOM_THREAD_NAME, err0); + + fut.listenAsync(lsnr1, customExec); + + assertEquals(1, latch1.getCount()); + + fut0.onDone(err0); + + assert latch1.await(1, TimeUnit.SECONDS); + + checkAsyncListenerOnError(err0, fut); + checkAsyncListenerOnError(err0, createFuture(new GridFinishedFuture<String>(err0))); + } + + /** + * @param err0 Err 0. + * @param fut Future. + */ + private void checkAsyncListenerOnError(Exception err0, IgniteFutureImpl<String> fut) throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + + IgniteInClosure<? super IgniteFuture<String>> lsnr = createAsyncListener(latch, CUSTOM_THREAD_NAME, err0); + + fut.listenAsync(lsnr, customExec); + + assert latch.await(1, TimeUnit.SECONDS); + } + + /** + * @param latch Latch. + */ + @NotNull private CI1<IgniteFuture<String>> createAsyncListener( + final CountDownLatch latch, + final String threadName, + final Exception err + ) { + return new CI1<IgniteFuture<String>>() { + @Override public void apply(IgniteFuture<String> fut) { + try { + String tname = Thread.currentThread().getName(); + + assert tname.contains(threadName) : tname; + + assertEquals("test", fut.get()); + + if (err != null) + fail(); + } + catch (IgniteException | CacheException e) { + if (err != null) + assertExpectedException(e, err); + else + throw e; + } + finally { + latch.countDown(); + } + } + }; + } + + /** + * @throws Exception If failed. + */ public void testChain() throws Exception { GridFutureAdapter<String> fut0 = new GridFutureAdapter<>(); - IgniteFutureImpl<String> fut = new IgniteFutureImpl<>(fut0); + IgniteFutureImpl<String> fut = createFuture(fut0); IgniteFuture<Integer> chained = fut.chain(new C1<IgniteFuture<String>, Integer>() { @Override public Integer apply(IgniteFuture<String> fut) { @@ -274,7 +430,7 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest { { GridFutureAdapter<String> fut0 = new GridFutureAdapter<>(); - IgniteFutureImpl<String> fut = new IgniteFutureImpl<>(fut0); + IgniteFutureImpl<String> fut = createFuture(fut0); final IgniteException err0 = new IgniteException("test error"); @@ -289,8 +445,8 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest { return -1; } - catch (IgniteException err) { - assertEquals(err0, err); + catch (IgniteException | CacheException err) { + assertExpectedException(err, err0); chainedPassed.set(true); @@ -308,8 +464,8 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest { fail(); } - catch (IgniteException err) { - assertEquals(err0, err); + catch (IgniteException | CacheException err) { + assertExpectedException(err, err0); lsnrPassed.set(true); } @@ -329,8 +485,8 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest { fail(); } - catch (IgniteException err) { - assertEquals(err0, err); + catch (IgniteException | CacheException err) { + assertExpectedException(err, err0); } try { @@ -338,15 +494,15 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest { fail(); } - catch (IgniteException err) { - assertEquals(err0, err); + catch (IgniteException | CacheException err) { + assertExpectedException(err, err0); } } { GridFutureAdapter<String> fut0 = new GridFutureAdapter<>(); - IgniteFutureImpl<String> fut = new IgniteFutureImpl<>(fut0); + IgniteFutureImpl<String> fut = createFuture(fut0); final IgniteCheckedException err0 = new IgniteCheckedException("test error"); @@ -361,8 +517,8 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest { return -1; } - catch (IgniteException err) { - assertEquals(err0, err.getCause()); + catch (IgniteException | CacheException err) { + assertExpectedException(err, err0); chainedPassed.set(true); @@ -380,8 +536,8 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest { fail(); } - catch (IgniteException err) { - assertEquals(err0, err.getCause()); + catch (IgniteException | CacheException err) { + assertExpectedException(err, err0); lsnrPassed.set(true); } @@ -401,8 +557,8 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest { fail(); } - catch (IgniteException err) { - assertEquals(err0, err.getCause()); + catch (IgniteException | CacheException err) { + assertExpectedException(err, err0); } try { @@ -410,9 +566,243 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest { fail(); } - catch (IgniteException err) { - assertEquals(err0, err.getCause()); + catch (IgniteException | CacheException err) { + assertExpectedException(err, err0); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testChainAsync() throws Exception { + GridFutureAdapter<String> fut0 = new GridFutureAdapter<>(); + + IgniteFuture<String> fut = createFuture(fut0); + + C1<IgniteFuture<String>, Integer> chainClos = new C1<IgniteFuture<String>, Integer>() { + @Override public Integer apply(IgniteFuture<String> fut) { + assertEquals(CUSTOM_THREAD_NAME, Thread.currentThread().getName()); + + return Integer.valueOf(fut.get()); + } + }; + + IgniteFuture<Integer> chained1 = fut.chainAsync(chainClos, customExec); + + assertFalse(chained1.isDone()); + + final CountDownLatch latch = new CountDownLatch(1); + + class TestClosure implements CI1<IgniteFuture<Integer>> { + private final CountDownLatch latch; + + private TestClosure(CountDownLatch latch) { + this.latch = latch; + } + + @Override public void apply(IgniteFuture<Integer> fut) { + assertEquals(CUSTOM_THREAD_NAME, Thread.currentThread().getName()); + assertEquals(10, (int)fut.get()); + + latch.countDown(); } } + + chained1.listen(new TestClosure(latch)); + + fut0.onDone("10"); + + // Chained future will be completed asynchronously. + chained1.get(100, TimeUnit.MILLISECONDS); + + assertTrue(chained1.isDone()); + + assertEquals(10, (int)chained1.get()); + + assert latch.await(100, TimeUnit.MILLISECONDS); + + assertTrue(fut.isDone()); + + assertEquals("10", fut.get()); + + // Test finished future + GridFinishedFuture<String> ffut0 = new GridFinishedFuture<>("10"); + + CountDownLatch latch1 = new CountDownLatch(1); + + IgniteFuture<Integer> chained2 = createFuture(ffut0).chainAsync(chainClos, customExec); + + chained2.listen(new TestClosure(latch1)); + + chained2.get(100, TimeUnit.MILLISECONDS); + + assertTrue(chained2.isDone()); + + assertEquals(10, (int)chained2.get()); + + assert latch1.await(100, TimeUnit.MILLISECONDS); + } + + /** + * @throws Exception If failed. + */ + public void testChainAsyncOnError() throws Exception { + checkChainedOnError(new IgniteException("Test exception")); + checkChainedOnError(new IgniteCheckedException("Test checked exception")); + checkChainedOnErrorFinishedFuture(new IgniteException("Test exception")); + checkChainedOnErrorFinishedFuture(new IgniteCheckedException("Test checked exception")); + } + + /** + * @param err Exception. + * @throws Exception If failed. + */ + private void checkChainedOnError(final Exception err) throws Exception { + GridFutureAdapter<String> fut0 = new GridFutureAdapter<>(); + + IgniteFutureImpl<String> fut = createFuture(fut0); + + // Chain callback will be invoked in specific executor. + IgniteFuture<Integer> chained1 = fut.chainAsync(new C1<IgniteFuture<String>, Integer>() { + @Override public Integer apply(IgniteFuture<String> fut) { + assertEquals(CUSTOM_THREAD_NAME, Thread.currentThread().getName()); + + try { + fut.get(); + + fail(); + } + catch (IgniteException | CacheException e) { + assertExpectedException(e, err); + + throw e; + } + + return -1; + } + }, customExec); + + assertFalse(chained1.isDone()); + assertFalse(fut.isDone()); + + final CountDownLatch latch = new CountDownLatch(1); + + chained1.listen(new CI1<IgniteFuture<Integer>>() { + @Override public void apply(IgniteFuture<Integer> fut) { + try { + assertEquals(CUSTOM_THREAD_NAME, Thread.currentThread().getName()); + + fut.get(); + + fail(); + } + catch (IgniteException | CacheException e) { + assertExpectedException(e, err); + } + finally { + latch.countDown(); + } + } + }); + + fut0.onDone(err); + + assertExceptionThrown(err, chained1); + assertExceptionThrown(err, fut); + + assertTrue(chained1.isDone()); + assertTrue(fut.isDone()); + + assert latch.await(100, TimeUnit.MILLISECONDS); + } + + /** + * @param err Err. + */ + private void checkChainedOnErrorFinishedFuture(final Exception err) throws Exception { + IgniteFutureImpl<String> fut = createFuture(new GridFinishedFuture<String>(err)); + + // Chain callback will be invoked in specific executor. + IgniteFuture<Integer> chained1 = fut.chainAsync(new C1<IgniteFuture<String>, Integer>() { + @Override public Integer apply(IgniteFuture<String> fut) { + assertEquals(CUSTOM_THREAD_NAME, Thread.currentThread().getName()); + + try { + fut.get(); + + fail(); + } + catch (IgniteException e) { + assertExpectedException(e, err); + + throw e; + } + + return -1; + } + }, customExec); + + + assertExceptionThrown(err, chained1); + assertExceptionThrown(err, fut); + + assertTrue(chained1.isDone()); + assertTrue(fut.isDone()); + } + + /** + * @param err Expected exception. + * @param fut Future. + */ + private void assertExceptionThrown(Exception err, IgniteFuture<?> fut) { + try { + fut.get(); + + fail(); + } + catch (IgniteException | CacheException e) { + assertExpectedException(e, err); + } + } + + /** + * @param e Actual exception. + * @param exp Expected exception. + */ + protected void assertExpectedException(Exception e, Exception exp) { + if (exp instanceof IgniteException) + assertEquals(exp, e); + else + assertEquals(exp, e.getCause()); + } + + /** + * @param name Name. + */ + @NotNull private ExecutorService createExecutor(final String name) { + return Executors.newSingleThreadExecutor(new ThreadFactory() { + @Override public Thread newThread(@NotNull Runnable r) { + Thread t = new Thread(r); + + t.setName(name); + + return t; + } + }); + } + + /** + * @param fut Future. + */ + protected <V> IgniteFutureImpl<V> createFuture(IgniteInternalFuture<V> fut) { + return new IgniteFutureImpl<>(fut); + } + + /** + * + */ + protected Class<? extends Exception> expectedException() { + return IgniteException.class; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b375c1ef/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteLangSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteLangSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteLangSelfTestSuite.java index cfec1ec..70aec72 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteLangSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteLangSelfTestSuite.java @@ -21,6 +21,7 @@ import junit.framework.TestSuite; import org.apache.ignite.internal.util.future.GridCompoundFutureSelfTest; import org.apache.ignite.internal.util.future.GridEmbeddedFutureSelfTest; import org.apache.ignite.internal.util.future.GridFutureAdapterSelfTest; +import org.apache.ignite.internal.util.future.IgniteCacheFutureImplTest; import org.apache.ignite.internal.util.future.IgniteFutureImplTest; import org.apache.ignite.internal.util.future.nio.GridNioEmbeddedFutureSelfTest; import org.apache.ignite.internal.util.future.nio.GridNioFutureSelfTest; @@ -80,7 +81,9 @@ public class IgniteLangSelfTestSuite extends TestSuite { suite.addTest(new TestSuite(GridEmbeddedFutureSelfTest.class)); suite.addTest(new TestSuite(GridNioFutureSelfTest.class)); suite.addTest(new TestSuite(GridNioEmbeddedFutureSelfTest.class)); + suite.addTest(new TestSuite(IgniteFutureImplTest.class)); + suite.addTest(new TestSuite(IgniteCacheFutureImplTest.class)); // Consistent hash tests. suite.addTest(new TestSuite(GridConsistentHashSelfTest.class)); http://git-wip-us.apache.org/repos/asf/ignite/blob/b375c1ef/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java ---------------------------------------------------------------------- diff --git a/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java b/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java index 63a6164..e3c743c 100644 --- a/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java +++ b/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; @@ -35,6 +36,7 @@ import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; +import org.apache.ignite.internal.util.future.AsyncFutureListener; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.lang.GridClosureException; @@ -579,15 +581,42 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R> { } /** {@inheritDoc} */ + @Override public void listenAsync(IgniteInClosure<? super IgniteFuture<R>> lsnr, Executor exec) { + A.notNull(lsnr, "lsnr"); + A.notNull(exec, "exec"); + + listen(new AsyncFutureListener<>(lsnr, exec)); + } + + /** {@inheritDoc} */ @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") @Override public <T> IgniteFuture<T> chain(final IgniteClosure<? super IgniteFuture<R>, T> doneCb) { - final GridFutureAdapter<T> fut = new GridFutureAdapter() { + A.notNull(doneCb, "doneCb"); + + return chain(doneCb, null); + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<T> chainAsync(IgniteClosure<? super IgniteFuture<R>, T> doneCb, Executor exec) { + A.notNull(doneCb, ""); + A.notNull(exec, "exec"); + + return chain(doneCb, exec); + } + + /** + * @param doneCb Done callback. + * @param exec Executor. + * @return Chained future. + */ + private <T> IgniteFuture<T> chain(final IgniteClosure<? super IgniteFuture<R>, T> doneCb, @Nullable Executor exec) { + final GridFutureAdapter<T> fut = new GridFutureAdapter<T>() { @Override public String toString() { return "ChainFuture[orig=" + ScheduleFutureImpl.this + ", doneCb=" + doneCb + ']'; } }; - listen(new CI1<IgniteFuture<R>>() { + IgniteInClosure<? super IgniteFuture<R>> lsnr = new CI1<IgniteFuture<R>>() { @Override public void apply(IgniteFuture<R> fut0) { try { fut.onDone(doneCb.apply(fut0)); @@ -607,7 +636,12 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R> { throw e; } } - }); + }; + + if (exec != null) + lsnr = new AsyncFutureListener<>(lsnr, exec); + + listen(lsnr); return new IgniteFutureImpl<>(fut); } @@ -861,9 +895,20 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R> { } /** {@inheritDoc} */ + @Override public void listenAsync(IgniteInClosure<? super IgniteFuture<R>> lsnr, Executor exec) { + ref.listenAsync(lsnr, exec); + } + + /** {@inheritDoc} */ @Override public <T> IgniteFuture<T> chain(IgniteClosure<? super IgniteFuture<R>, T> doneCb) { return ref.chain(doneCb); } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<T> chainAsync(IgniteClosure<? super IgniteFuture<R>, T> doneCb, + Executor exec) { + return ref.chainAsync(doneCb, exec); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b375c1ef/modules/schedule/src/test/java/org/apache/ignite/internal/processors/schedule/GridScheduleSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/schedule/src/test/java/org/apache/ignite/internal/processors/schedule/GridScheduleSelfTest.java b/modules/schedule/src/test/java/org/apache/ignite/internal/processors/schedule/GridScheduleSelfTest.java index f0860f2..122d98d 100644 --- a/modules/schedule/src/test/java/org/apache/ignite/internal/processors/schedule/GridScheduleSelfTest.java +++ b/modules/schedule/src/test/java/org/apache/ignite/internal/processors/schedule/GridScheduleSelfTest.java @@ -19,21 +19,29 @@ package org.apache.ignite.internal.processors.schedule; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.util.lang.GridTuple; import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.scheduler.SchedulerFuture; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.testframework.GridTestUtils.assertThrows; /** * Test for task scheduler. @@ -43,9 +51,15 @@ public class GridScheduleSelfTest extends GridCommonAbstractTest { /** */ private static final int NODES_CNT = 2; + /** Custom thread name. */ + private static final String CUSTOM_THREAD_NAME = "custom-async-test"; + /** */ private static AtomicInteger execCntr = new AtomicInteger(0); + /** Custom executor. */ + private ExecutorService exec; + /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { startGrids(NODES_CNT); @@ -61,6 +75,22 @@ public class GridScheduleSelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { execCntr.set(0); + exec = Executors.newSingleThreadExecutor(new ThreadFactory() { + @Override public Thread newThread(@NotNull Runnable r) { + Thread t = new Thread(r); + + t.setName(CUSTOM_THREAD_NAME); + + return t; + } + }); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + U.shutdownNow(getClass(), exec, log); + + exec = null; } /** @@ -124,6 +154,54 @@ public class GridScheduleSelfTest extends GridCommonAbstractTest { } }); + final SchedulerFuture<?> fut0 = fut; + + //noinspection ThrowableNotThrown + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + fut0.listenAsync(new IgniteInClosure<IgniteFuture<?>>() { + @Override public void apply(IgniteFuture<?> fut) { + // No-op + } + }, null); + + return null; + } + }, NullPointerException.class, null); + + fut.listenAsync(new IgniteInClosure<IgniteFuture<?>>() { + @Override public void apply(IgniteFuture<?> fut) { + assertEquals(Thread.currentThread().getName(), CUSTOM_THREAD_NAME); + + notifyCnt.incrementAndGet(); + } + }, exec); + + //noinspection ThrowableNotThrown + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + fut0.chainAsync(new IgniteClosure<IgniteFuture<?>, String>() { + @Override public String apply(IgniteFuture<?> fut) { + // No-op + + return null; + } + }, null); + + return null; + } + }, NullPointerException.class, null); + + IgniteFuture<String> chained1 = fut.chainAsync(new IgniteClosure<IgniteFuture<?>, String>() { + @Override public String apply(IgniteFuture<?> fut) { + assertEquals(Thread.currentThread().getName(), CUSTOM_THREAD_NAME); + + fut.get(); + + return "done-custom"; + } + }, exec); + long timeTillRun = freq + delay; info("Going to wait for the first run: " + timeTillRun); @@ -135,6 +213,7 @@ public class GridScheduleSelfTest extends GridCommonAbstractTest { assert !fut.isDone(); assert !fut.isCancelled(); assert fut.last() == null; + assertFalse(chained1.isDone()); info("Going to wait for 2nd run: " + timeTillRun); @@ -142,10 +221,13 @@ public class GridScheduleSelfTest extends GridCommonAbstractTest { Thread.sleep(timeTillRun * 1000); assert fut.isDone(); - assert notifyCnt.get() == 2; + assert notifyCnt.get() == 2 * 2; assert !fut.isCancelled(); assert fut.last() == null; + assertEquals("done-custom", chained1.get()); + + assertTrue(chained1.isDone()); } finally { assert fut != null;
