[
https://issues.apache.org/jira/browse/FLINK-10332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620483#comment-16620483
]
ASF GitHub Bot commented on FLINK-10332:
----------------------------------------
NicoK closed pull request #6693: [FLINK-10332][network] move data notification
out of the synchronized block
URL: https://github.com/apache/flink/pull/6693
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index 90daf75fcc7..cc0b2220fd2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -89,9 +89,7 @@
public void addInputChannel(RemoteInputChannel listener) throws
IOException {
checkError();
- if (!inputChannels.containsKey(listener.getInputChannelId())) {
- inputChannels.put(listener.getInputChannelId(),
listener);
- }
+ inputChannels.putIfAbsent(listener.getInputChannelId(),
listener);
}
@Override
@@ -112,12 +110,7 @@ public void cancelRequestFor(InputChannelID
inputChannelId) {
@Override
public void notifyCreditAvailable(final RemoteInputChannel
inputChannel) {
- ctx.executor().execute(new Runnable() {
- @Override
- public void run() {
-
ctx.pipeline().fireUserEventTriggered(inputChannel);
- }
- });
+ ctx.executor().execute(() ->
ctx.pipeline().fireUserEventTriggered(inputChannel));
}
//
------------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 796e86f51b3..c5ba7a4b7f1 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -85,9 +85,7 @@
public void addInputChannel(RemoteInputChannel listener) throws
IOException {
checkError();
- if (!inputChannels.containsKey(listener.getInputChannelId())) {
- inputChannels.put(listener.getInputChannelId(),
listener);
- }
+ inputChannels.putIfAbsent(listener.getInputChannelId(),
listener);
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index 8c05b8208f9..c3d3d1bcc10 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -89,12 +89,7 @@ void notifyReaderNonEmpty(final NetworkSequenceViewReader
reader) {
// TODO This could potentially have a bad performance impact as
in the
// worst case (network consumes faster than the producer) each
buffer
// will trigger a separate event loop task being scheduled.
- ctx.executor().execute(new Runnable() {
- @Override
- public void run() {
- ctx.pipeline().fireUserEventTriggered(reader);
- }
- });
+ ctx.executor().execute(() ->
ctx.pipeline().fireUserEventTriggered(reader));
}
/**
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index d2d7fdb324b..fe27d97adaa 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -88,6 +88,7 @@ public void finish() throws IOException {
private boolean add(BufferConsumer bufferConsumer, boolean finish) {
checkNotNull(bufferConsumer);
+ final boolean notifyDataAvailable;
synchronized (buffers) {
if (isFinished || isReleased) {
bufferConsumer.close();
@@ -98,14 +99,13 @@ private boolean add(BufferConsumer bufferConsumer, boolean
finish) {
buffers.add(bufferConsumer);
updateStatistics(bufferConsumer);
increaseBuffersInBacklog(bufferConsumer);
+ notifyDataAvailable = shouldNotifyDataAvailable() ||
finish;
- if (finish) {
- isFinished = true;
- notifyDataAvailable();
- }
- else {
- maybeNotifyDataAvailable();
- }
+ isFinished |= finish;
+ }
+
+ if (notifyDataAvailable) {
+ notifyDataAvailable();
}
return true;
@@ -220,6 +220,7 @@ public boolean isReleased() {
@Override
public PipelinedSubpartitionView
createReadView(BufferAvailabilityListener availabilityListener) throws
IOException {
+ final boolean notifyDataAvailable;
synchronized (buffers) {
checkState(!isReleased);
checkState(readView == null,
@@ -230,9 +231,10 @@ public PipelinedSubpartitionView
createReadView(BufferAvailabilityListener avail
parent.getOwningTaskName(), index,
parent.getPartitionId());
readView = new PipelinedSubpartitionView(this,
availabilityListener);
- if (!buffers.isEmpty()) {
- notifyDataAvailable();
- }
+ notifyDataAvailable = !buffers.isEmpty();
+ }
+ if (notifyDataAvailable) {
+ notifyDataAvailable();
}
return readView;
@@ -283,26 +285,24 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
@Override
public void flush() {
+ final boolean notifyDataAvailable;
synchronized (buffers) {
if (buffers.isEmpty()) {
return;
}
- if (!flushRequested) {
- flushRequested = true; // set this before the
notification!
- // if there is more then 1 buffer, we already
notified the reader
- // (at the latest when adding the second buffer)
- if (buffers.size() == 1) {
- notifyDataAvailable();
- }
- }
+ // if there is more then 1 buffer, we already notified
the reader
+ // (at the latest when adding the second buffer)
+ notifyDataAvailable = !flushRequested && buffers.size()
== 1;
+ flushRequested = true;
+ }
+ if (notifyDataAvailable) {
+ notifyDataAvailable();
}
}
- private void maybeNotifyDataAvailable() {
+ private boolean shouldNotifyDataAvailable() {
// Notify only when we added first finished buffer.
- if (getNumberOfFinishedBuffers() == 1) {
- notifyDataAvailable();
- }
+ return readView != null && !flushRequested &&
getNumberOfFinishedBuffers() == 1;
}
private void notifyDataAvailable() {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Move data available notification in PipelinedSubpartition out of the
> synchronized block
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-10332
> URL: https://issues.apache.org/jira/browse/FLINK-10332
> Project: Flink
> Issue Type: Sub-task
> Components: Network
> Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
> Reporter: Nico Kruber
> Assignee: Nico Kruber
> Priority: Major
> Labels: pull-request-available
>
> Currently, calls to {{PipelinedSubpartition#notifyDataAvailable();}} are
> unnecessarily executed inside a {{synchronized (buffers)}} block which may
> lead to lock contention.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)