Repository: flink Updated Branches: refs/heads/release-1.5 bbe5930dc -> 931700911
[FLINK-9784] Fix inconsistent use of 'static' in AsyncIOExample.java This closes #6298. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93170091 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93170091 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93170091 Branch: refs/heads/release-1.5 Commit: 93170091155f9f766f1bfa8e54085bd6c70be525 Parents: bbe5930 Author: an4828 <nekras...@att.com> Authored: Tue Jul 10 10:59:35 2018 -0400 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Thu Jul 12 13:43:41 2018 +0200 ---------------------------------------------------------------------- .../examples/async/AsyncIOExample.java | 66 ++++++-------------- 1 file changed, 19 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/93170091/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java index 95379e3..b9f92fe 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java @@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector; +import org.apache.flink.util.ExecutorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,9 +41,9 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; /** @@ -116,10 +117,7 @@ public class AsyncIOExample { private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> { private static final long serialVersionUID = 2098635244857937717L; - private static ExecutorService executorService; - private static Random random; - - private int counter; + private transient ExecutorService executorService; /** * The result of multiplying sleepFactor with a random float is used to pause @@ -145,57 +143,31 @@ public class AsyncIOExample { public void open(Configuration parameters) throws Exception { super.open(parameters); - synchronized (SampleAsyncFunction.class) { - if (counter == 0) { - executorService = Executors.newFixedThreadPool(30); - - random = new Random(); - } - - ++counter; - } + executorService = Executors.newFixedThreadPool(30); } @Override public void close() throws Exception { super.close(); - - synchronized (SampleAsyncFunction.class) { - --counter; - - if (counter == 0) { - executorService.shutdown(); - - try { - if (!executorService.awaitTermination(shutdownWaitTS, TimeUnit.MILLISECONDS)) { - executorService.shutdownNow(); - } - } catch (InterruptedException e) { - executorService.shutdownNow(); - } - } - } + ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS, executorService); } @Override - public void asyncInvoke(final Integer input, final ResultFuture<String> resultFuture) throws Exception { - this.executorService.submit(new Runnable() { - @Override - public void run() { - // wait for while to simulate async operation here - long sleep = (long) (random.nextFloat() * sleepFactor); - try { - Thread.sleep(sleep); - - if (random.nextFloat() < failRatio) { - resultFuture.completeExceptionally(new Exception("wahahahaha...")); - } else { - resultFuture.complete( - Collections.singletonList("key-" + (input % 10))); - } - } catch (InterruptedException e) { - resultFuture.complete(new ArrayList<String>(0)); + public void asyncInvoke(final Integer input, final ResultFuture<String> resultFuture) { + executorService.submit(() -> { + // wait for while to simulate async operation here + long sleep = (long) (ThreadLocalRandom.current().nextFloat() * sleepFactor); + try { + Thread.sleep(sleep); + + if (ThreadLocalRandom.current().nextFloat() < failRatio) { + resultFuture.completeExceptionally(new Exception("wahahahaha...")); + } else { + resultFuture.complete( + Collections.singletonList("key-" + (input % 10))); } + } catch (InterruptedException e) { + resultFuture.complete(new ArrayList<>(0)); } }); }