AHeise commented on a change in pull request #13539:
URL: https://github.com/apache/flink/pull/13539#discussion_r499424128
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -247,7 +248,18 @@ public void setup() throws IOException {
}
@Override
- public CompletableFuture<?> readRecoveredState(ExecutorService
executor, ChannelStateReader reader) {
+ public CompletableFuture<?> readRecoveredState(ExecutorService
executor, ChannelStateReader reader) throws IOException {
+ synchronized (requestLock) {
+ if (closeFuture.isDone()) {
+ return FutureUtils.completedVoidFuture();
+ }
+ for (InputChannel inputChannel :
inputChannels.values()) {
+ if (inputChannel instanceof
RemoteRecoveredInputChannel) {
+ ((RemoteRecoveredInputChannel)
inputChannel).assignExclusiveSegments();
+ }
+ }
+ }
+
Review comment:
I'll add. In short, the #number of required buffers is now higher than a
few tests (and possibly production setups) assume. Without the lazy
initialization, you cannot simulate backpressure in a few scenarios as easily.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
##########
@@ -217,15 +221,11 @@ public void testConcurrentReadStateAndProcessAndClose()
throws Exception {
}
};
- submitTasksAndWaitForResults(executor, new Callable[]
{closeTask, readRecoveredStateTask, processStateTask});
- } finally {
- executor.shutdown();
+ executor.invokeAll(Arrays.asList(closeTask,
readRecoveredStateTask, processStateTask));
+
// wait until the internal channel state recover task
finishes
- executor.awaitTermination(60, TimeUnit.SECONDS);
assertEquals(totalBuffers,
environment.getNetworkBufferPool().getNumberOfAvailableMemorySegments());
assertTrue(inputGate.getCloseFuture().isDone());
-
- environment.close();
Review comment:
`close` is called by the `Closer`.
`shutdown` + `awaitTermination` is simply the wrong method. `invokeAll` is
doing what was intended. Could be an extra commit. However, it should then
probably be done on all 10 places that use `submitTasksAndWaitForResults`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalRecoveredInputChannel.java
##########
@@ -42,8 +42,17 @@
TaskEventPublisher taskEventPublisher,
int initialBackOff,
int maxBackoff,
+ int networkBuffersPerChannel,
InputChannelMetrics metrics) {
- super(inputGate, channelIndex, partitionId, initialBackOff,
maxBackoff, metrics.getNumBytesInLocalCounter(),
metrics.getNumBuffersInLocalCounter());
+ super(
Review comment:
Hm you are right, it doesn't solve it completely after having read the
ticket. However, without a solution for FLINK-13203, there will also not be a
real solution here.
On the other hand, it's inherently wrong to treat local and remote channels
differently during recovery (they even share the same implementation). So this
commit is still fixing the issue in a best effort manner and certainly helps to
improve build stability, which is an improvement of its own.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
##########
@@ -59,26 +59,27 @@
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
Review comment:
I didn't even know that double-tags are a thing. :p
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]