Repository: flink
Updated Branches:
  refs/heads/release-1.5 bbe5930dc -> 931700911


[FLINK-9784] Fix inconsistent use of 'static' in AsyncIOExample.java

This closes #6298.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93170091
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93170091
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93170091

Branch: refs/heads/release-1.5
Commit: 93170091155f9f766f1bfa8e54085bd6c70be525
Parents: bbe5930
Author: an4828 <nekras...@att.com>
Authored: Tue Jul 10 10:59:35 2018 -0400
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Jul 12 13:43:41 2018 +0200

----------------------------------------------------------------------
 .../examples/async/AsyncIOExample.java          | 66 ++++++--------------
 1 file changed, 19 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/93170091/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
index 95379e3..b9f92fe 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.streaming.api.functions.async.ResultFuture;
 import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExecutorUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,9 +41,9 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -116,10 +117,7 @@ public class AsyncIOExample {
        private static class SampleAsyncFunction extends 
RichAsyncFunction<Integer, String> {
                private static final long serialVersionUID = 
2098635244857937717L;
 
-               private static ExecutorService executorService;
-               private static Random random;
-
-               private int counter;
+               private transient ExecutorService executorService;
 
                /**
                 * The result of multiplying sleepFactor with a random float is 
used to pause
@@ -145,57 +143,31 @@ public class AsyncIOExample {
                public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
 
-                       synchronized (SampleAsyncFunction.class) {
-                               if (counter == 0) {
-                                       executorService = 
Executors.newFixedThreadPool(30);
-
-                                       random = new Random();
-                               }
-
-                               ++counter;
-                       }
+                       executorService = Executors.newFixedThreadPool(30);
                }
 
                @Override
                public void close() throws Exception {
                        super.close();
-
-                       synchronized (SampleAsyncFunction.class) {
-                               --counter;
-
-                               if (counter == 0) {
-                                       executorService.shutdown();
-
-                                       try {
-                                               if 
(!executorService.awaitTermination(shutdownWaitTS, TimeUnit.MILLISECONDS)) {
-                                                       
executorService.shutdownNow();
-                                               }
-                                       } catch (InterruptedException e) {
-                                               executorService.shutdownNow();
-                                       }
-                               }
-                       }
+                       ExecutorUtils.gracefulShutdown(shutdownWaitTS, 
TimeUnit.MILLISECONDS, executorService);
                }
 
                @Override
-               public void asyncInvoke(final Integer input, final 
ResultFuture<String> resultFuture) throws Exception {
-                       this.executorService.submit(new Runnable() {
-                               @Override
-                               public void run() {
-                                       // wait for while to simulate async 
operation here
-                                       long sleep = (long) (random.nextFloat() 
* sleepFactor);
-                                       try {
-                                               Thread.sleep(sleep);
-
-                                               if (random.nextFloat() < 
failRatio) {
-                                                       
resultFuture.completeExceptionally(new Exception("wahahahaha..."));
-                                               } else {
-                                                       resultFuture.complete(
-                                                               
Collections.singletonList("key-" + (input % 10)));
-                                               }
-                                       } catch (InterruptedException e) {
-                                               resultFuture.complete(new 
ArrayList<String>(0));
+               public void asyncInvoke(final Integer input, final 
ResultFuture<String> resultFuture) {
+                       executorService.submit(() -> {
+                               // wait for while to simulate async operation 
here
+                               long sleep = (long) 
(ThreadLocalRandom.current().nextFloat() * sleepFactor);
+                               try {
+                                       Thread.sleep(sleep);
+
+                                       if 
(ThreadLocalRandom.current().nextFloat() < failRatio) {
+                                               
resultFuture.completeExceptionally(new Exception("wahahahaha..."));
+                                       } else {
+                                               resultFuture.complete(
+                                                       
Collections.singletonList("key-" + (input % 10)));
                                        }
+                               } catch (InterruptedException e) {
+                                       resultFuture.complete(new 
ArrayList<>(0));
                                }
                        });
                }

Reply via email to