This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git
commit dea8df0167a7140782e1991f7f878f86bf7e41b5 Author: Nico Kruber <[email protected]> AuthorDate: Sun Aug 5 00:41:02 2018 +0200 [hotfix][network] use ConcurrentMap#putIfAbsent and Lambdas for partition request handlers --- .../netty/CreditBasedPartitionRequestClientHandler.java | 11 ++--------- .../io/network/netty/PartitionRequestClientHandler.java | 4 +--- .../flink/runtime/io/network/netty/PartitionRequestQueue.java | 7 +------ 3 files changed, 4 insertions(+), 18 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java index 90daf75..cc0b222 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java @@ -89,9 +89,7 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap public void addInputChannel(RemoteInputChannel listener) throws IOException { checkError(); - if (!inputChannels.containsKey(listener.getInputChannelId())) { - inputChannels.put(listener.getInputChannelId(), listener); - } + inputChannels.putIfAbsent(listener.getInputChannelId(), listener); } @Override @@ -112,12 +110,7 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap @Override public void notifyCreditAvailable(final RemoteInputChannel inputChannel) { - ctx.executor().execute(new Runnable() { - @Override - public void run() { - ctx.pipeline().fireUserEventTriggered(inputChannel); - } - }); + ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(inputChannel)); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java index 796e86f..c5ba7a4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java @@ -85,9 +85,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter impleme public void addInputChannel(RemoteInputChannel listener) throws IOException { checkError(); - if (!inputChannels.containsKey(listener.getInputChannelId())) { - inputChannels.put(listener.getInputChannelId(), listener); - } + inputChannels.putIfAbsent(listener.getInputChannelId(), listener); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java index 8c05b82..c3d3d1b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java @@ -89,12 +89,7 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter { // TODO This could potentially have a bad performance impact as in the // worst case (network consumes faster than the producer) each buffer // will trigger a separate event loop task being scheduled. - ctx.executor().execute(new Runnable() { - @Override - public void run() { - ctx.pipeline().fireUserEventTriggered(reader); - } - }); + ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader)); } /**
