This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 23678c9b2b198762134ec62c06add0a834b0c0e9
Author: Nico Kruber <[email protected]>
AuthorDate: Sun Aug 5 00:41:02 2018 +0200

    [hotfix][network] use ConcurrentMap#putIfAbsent and Lambdas for partition 
request handlers
---
 .../netty/CreditBasedPartitionRequestClientHandler.java       | 11 ++---------
 .../io/network/netty/PartitionRequestClientHandler.java       |  4 +---
 .../flink/runtime/io/network/netty/PartitionRequestQueue.java |  7 +------
 3 files changed, 4 insertions(+), 18 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index 90daf75..cc0b222 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -89,9 +89,7 @@ class CreditBasedPartitionRequestClientHandler extends 
ChannelInboundHandlerAdap
        public void addInputChannel(RemoteInputChannel listener) throws 
IOException {
                checkError();
 
-               if (!inputChannels.containsKey(listener.getInputChannelId())) {
-                       inputChannels.put(listener.getInputChannelId(), 
listener);
-               }
+               inputChannels.putIfAbsent(listener.getInputChannelId(), 
listener);
        }
 
        @Override
@@ -112,12 +110,7 @@ class CreditBasedPartitionRequestClientHandler extends 
ChannelInboundHandlerAdap
 
        @Override
        public void notifyCreditAvailable(final RemoteInputChannel 
inputChannel) {
-               ctx.executor().execute(new Runnable() {
-                       @Override
-                       public void run() {
-                               
ctx.pipeline().fireUserEventTriggered(inputChannel);
-                       }
-               });
+               ctx.executor().execute(() -> 
ctx.pipeline().fireUserEventTriggered(inputChannel));
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 796e86f..c5ba7a4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -85,9 +85,7 @@ class PartitionRequestClientHandler extends 
ChannelInboundHandlerAdapter impleme
        public void addInputChannel(RemoteInputChannel listener) throws 
IOException {
                checkError();
 
-               if (!inputChannels.containsKey(listener.getInputChannelId())) {
-                       inputChannels.put(listener.getInputChannelId(), 
listener);
-               }
+               inputChannels.putIfAbsent(listener.getInputChannelId(), 
listener);
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index 8c05b82..c3d3d1b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -89,12 +89,7 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
                // TODO This could potentially have a bad performance impact as 
in the
                // worst case (network consumes faster than the producer) each 
buffer
                // will trigger a separate event loop task being scheduled.
-               ctx.executor().execute(new Runnable() {
-                       @Override
-                       public void run() {
-                               ctx.pipeline().fireUserEventTriggered(reader);
-                       }
-               });
+               ctx.executor().execute(() -> 
ctx.pipeline().fireUserEventTriggered(reader));
        }
 
        /**

Reply via email to