1996fanrui commented on code in PR #26900:
URL: https://github.com/apache/flink/pull/26900#discussion_r2404163889
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java:
##########
@@ -229,9 +229,9 @@ void testInputStatusAfterEndOfRecovery() throws Exception {
inputGate.sendElement(new StreamRecord<>(42L), 0);
assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
- inputGate.sendEvent(EndOfChannelStateEvent.INSTANCE, 0);
+ inputGate.sendEvent(EndOfOutputChannelStateEvent.INSTANCE, 0);
Review Comment:
Yes, it is expected. `EndOfOutputChannelStateEvent` will be sent from
upstream task side to downstream task side. `EndOfOutputChannelStateEvent`
means all input and output recovered buffers are consumed, it indicates
`EndOfRecovery`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java:
##########
@@ -408,7 +408,7 @@ public boolean isPartialRecord() {
public static DataType getDataType(AbstractEvent event, boolean
hasPriority) {
if (hasPriority) {
return PRIORITIZED_EVENT_BUFFER;
- } else if (event instanceof EndOfChannelStateEvent) {
+ } else if (event instanceof EndOfOutputChannelStateEvent) {
Review Comment:
Yes, `getDataType` method could receive `EndOfInputChannelStateEvent`, and
`return EVENT_BUFFER` is the correct way for some reasons:
- As the comment of `RECOVERY_COMPLETION` [1] mentioned: `Indicates that
this subpartition state is fully recovered (emitted). Further data can be
consumed after unblocking.`
- subpartition state is recovered output buffers, so `RECOVERY_COMPLETION`
is exactly what "EndOfOutputChannelStateEvent" means. It indicates all input
and output recovered buffers are consumed, and the recovery phase is finished.
- `EndOfInputChannelStateEvent` means all recovered input buffers are
fully consumed.
- This code branch means , then return `RECOVERY_COMPLETION`.
- The difference between `EVENT_BUFFER`[2] and `RECOVERY_COMPLETION`[1] is
only `Buffer.DataType#isBlockingUpstream`[3]
- `Buffer.DataType#isBlockingUpstream`[4] is only called in
`PipelinedSubpartition` and `RemoteInputChannel`
- And, `EndOfInputChannelStateEvent` is only handled in
RecoveredInputChannel, so `EVENT_BUFFER`[2] and `RECOVERY_COMPLETION` is same
for `EndOfInputChannelStateEvent`
Note: In other words, that is why refactoring is needed. The existing code
is hard to distinguish `EndOfChannelStateEvent` is from upstream side or
downstream side.
[1]
https://github.com/apache/flink/blob/45c47ba1b5f34929e383f1b84ac83861351f60e0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java#L311
[2]
https://github.com/apache/flink/blob/45c47ba1b5f34929e383f1b84ac83861351f60e0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java#L289
[3]
https://github.com/apache/flink/blob/45c47ba1b5f34929e383f1b84ac83861351f60e0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java#L366
[4]
https://github.com/apache/flink/blob/45c47ba1b5f34929e383f1b84ac83861351f60e0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java#L396
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java:
##########
@@ -161,6 +166,9 @@ void testToBufferConsumer() throws IOException {
assertThat(bufferConsumer.build().getDataType())
.isEqualTo(Buffer.DataType.UNALIGNED_WATERMARK_EVENT);
}
+ } else if (evt instanceof EndOfOutputChannelStateEvent) {
+ assertThat(bufferConsumer.build().getDataType())
+ .isEqualTo(Buffer.DataType.RECOVERY_COMPLETION);
Review Comment:
Same with the last comment, `Buffer.DataType.EVENT_BUFFER` has been covered
by the last code branch. And it seems we only test other data types explicitly.
> It would better document the intention
As I understand, the `EndOfInputChannelStateEvent`,
`EndOfOutputChannelStateEvent` and `Buffer.DataType#RECOVERY_COMPLETION`
already have comprehensive javadoc. The reason why you may not understand the
current PR may be due to the lack of context and background of unaligned
checkpoint.
I have added detailed and relevant context and background in the PR
description, I hope it is useful. Please let me know if you think it still
needs more inputs, thanks.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java:
##########
@@ -191,6 +199,8 @@ void testToBuffer() throws IOException {
assertThat(buffer.getDataType())
.isEqualTo(Buffer.DataType.UNALIGNED_WATERMARK_EVENT);
}
+ } else if (evt instanceof EndOfOutputChannelStateEvent) {
+
assertThat(buffer.getDataType()).isEqualTo(Buffer.DataType.RECOVERY_COMPLETION);
Review Comment:
As I mentioned in your first comment
https://github.com/apache/flink/pull/26900#discussion_r2398289421 , the
expected data type of `EndOfInputChannelStateEvent` is
`Buffer.DataType.EVENT_BUFFER `, so it has been covered by the next code branch.
##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java:
##########
@@ -270,7 +270,7 @@ protected DataInputStatus processEvent(BufferOrEvent
bufferOrEvent, DataOutput<T
if (checkpointedInputGate.isFinished()) {
return DataInputStatus.END_OF_INPUT;
}
- } else if (event.getClass() == EndOfChannelStateEvent.class) {
+ } else if (event.getClass() == EndOfOutputChannelStateEvent.class) {
Review Comment:
The reason is `EndOfOutputChannelStateEvent` will be sent from upstream task
side to downstream task side. `EndOfOutputChannelStateEvent` means all input
and output recovered buffers are consumed, it indicates EndOfRecovery.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]