zhijiangW commented on a change in pull request #11687:
[FLINK-16536][network][checkpointing] Implement InputChannel state recovery for
unaligned checkpoint
URL: https://github.com/apache/flink/pull/11687#discussion_r409283520
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -149,6 +149,47 @@ void assignExclusiveSegments() throws IOException {
}
}
+ @Override
+ public void initializeState(ChannelStateReader reader) throws
IOException, InterruptedException {
+ numRequiredBuffers = initialCredit +
inputGate.getBufferPool().getMaxNumberOfMemorySegments();
+ unannouncedCredit.set(initialCredit);
+
+ while (true) {
+ Buffer buffer;
+ synchronized (bufferQueue) {
+ buffer = bufferQueue.takeBuffer();
+ if (buffer == null) {
+ if (isReleased()) {
+ return;
+ }
+
+ if (!isWaitingForFloatingBuffers) {
+ buffer =
inputGate.getBufferPool().requestBuffer();
+ if (buffer == null) {
+
inputGate.getBufferProvider().addBufferListener(this);
+
isWaitingForFloatingBuffers = true;
+ }
+ }
+ }
+ }
+
+ if (buffer == null) {
+ wait(10);
Review comment:
Yes, we have the same issue in another option of
`SpilledInputChannel`/`SpilledInputGate`.
In conclusion, there may be several solutions:
- `wait()` if unavailable buffers: block the unspilling thread always and it
fits our current requirements. We should not exit current channel to switch
another channel temporarily, which might bring random IO. And it requires the
wakeup mechanism when buffer available again.
- `wait(timeout)` if unavailable buffers: more or less the same with above
`wait()`, but wakeup mechanism is not a mandatory, can be regarded as somehow
improvement to wakeup eerily.
- unblocking way: terminate the current channel and unspill another channel
with available buffers. It would bring random IO as mentioned above and not the
current suggestion.
Based on the current situation with custom thread for unspilling, and allow
only one thread to unspill channel one by one to avoid random IO, so I choose
the unblocking option 1 or 2.
Regarding the option 2, wakeup is not necessary, then we do not need to
touch the previous processes `RemoteInputChannel#recycle` and
`RemoteInputChannel#notifyBufferAvailable`. If we want to add the wakeup
mechanism, option 1 also makes sense.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services