pnowojski commented on a change in pull request #13539:
URL: https://github.com/apache/flink/pull/13539#discussion_r499379001
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -247,7 +248,18 @@ public void setup() throws IOException {
}
@Override
- public CompletableFuture<?> readRecoveredState(ExecutorService
executor, ChannelStateReader reader) {
+ public CompletableFuture<?> readRecoveredState(ExecutorService
executor, ChannelStateReader reader) throws IOException {
+ synchronized (requestLock) {
+ if (closeFuture.isDone()) {
+ return FutureUtils.completedVoidFuture();
+ }
+ for (InputChannel inputChannel :
inputChannels.values()) {
+ if (inputChannel instanceof
RemoteRecoveredInputChannel) {
+ ((RemoteRecoveredInputChannel)
inputChannel).assignExclusiveSegments();
+ }
+ }
+ }
+
Review comment:
Why is this change relevant to the fix? Could you add some explanation
to the commit message?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
##########
@@ -217,15 +221,11 @@ public void testConcurrentReadStateAndProcessAndClose()
throws Exception {
}
};
- submitTasksAndWaitForResults(executor, new Callable[]
{closeTask, readRecoveredStateTask, processStateTask});
- } finally {
- executor.shutdown();
+ executor.invokeAll(Arrays.asList(closeTask,
readRecoveredStateTask, processStateTask));
+
// wait until the internal channel state recover task
finishes
- executor.awaitTermination(60, TimeUnit.SECONDS);
assertEquals(totalBuffers,
environment.getNetworkBufferPool().getNumberOfAvailableMemorySegments());
assertTrue(inputGate.getCloseFuture().isDone());
-
- environment.close();
Review comment:
Did you remove `awaitTermination` and `close` calls?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
##########
@@ -59,26 +59,27 @@
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
Review comment:
Change first commit message to:
> [FLINK-19027][test][network] Ensure SingleInputGateTest does not swallow
exceptions during cleanup.
?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalRecoveredInputChannel.java
##########
@@ -42,8 +42,17 @@
TaskEventPublisher taskEventPublisher,
int initialBackOff,
int maxBackoff,
+ int networkBuffersPerChannel,
InputChannelMetrics metrics) {
- super(inputGate, channelIndex, partitionId, initialBackOff,
maxBackoff, metrics.getNumBytesInLocalCounter(),
metrics.getNumBuffersInLocalCounter());
+ super(
Review comment:
I'm not sure if I understand this bug and the fix. Why is allocating
exclusive buffers for `LocalRecoveredInputChannel` fixing the problem? Isn't it
just reducing the window for the live lock to happen? What if downstream tasks
are scheduled with a significant delay (exclusive buffers assignment happens
after upstream tasks already acquired lot's of buffers).
In other words, Isn't this a semi fix for this bug
https://issues.apache.org/jira/browse/FLINK-13203
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]