1996fanrui commented on code in PR #26900:
URL: https://github.com/apache/flink/pull/26900#discussion_r2404163889


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java:
##########
@@ -229,9 +229,9 @@ void testInputStatusAfterEndOfRecovery() throws Exception {
 
         inputGate.sendElement(new StreamRecord<>(42L), 0);
         
assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
-        inputGate.sendEvent(EndOfChannelStateEvent.INSTANCE, 0);
+        inputGate.sendEvent(EndOfOutputChannelStateEvent.INSTANCE, 0);

Review Comment:
   Yes, it is expected. `EndOfOutputChannelStateEvent` will be sent from 
upstream task side to downstream task side. `EndOfOutputChannelStateEvent` 
means all input and output recovered buffers are consumed, it indicates 
`EndOfRecovery`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java:
##########
@@ -408,7 +408,7 @@ public boolean isPartialRecord() {
         public static DataType getDataType(AbstractEvent event, boolean 
hasPriority) {
             if (hasPriority) {
                 return PRIORITIZED_EVENT_BUFFER;
-            } else if (event instanceof EndOfChannelStateEvent) {
+            } else if (event instanceof EndOfOutputChannelStateEvent) {

Review Comment:
   Yes, `getDataType` method could receive `EndOfInputChannelStateEvent`, and 
`return EVENT_BUFFER` is the correct way for some reasons:
   
   - As the comment of `RECOVERY_COMPLETION` [1] mentioned: `Indicates that 
this subpartition state is fully recovered (emitted). Further data can be 
consumed after unblocking.`
     - subpartition state is recovered output buffers, so `RECOVERY_COMPLETION` 
is exactly what "EndOfOutputChannelStateEvent" means. It indicates all input 
and output recovered buffers are consumed, and the recovery phase is finished.
     - `EndOfInputChannelStateEvent` means all recovered input buffers are 
fully consumed.
     - This code branch means , then return `RECOVERY_COMPLETION`. 
   - The difference between `EVENT_BUFFER`[2] and `RECOVERY_COMPLETION`[1] is 
only  `Buffer.DataType#isBlockingUpstream`[3]
     - `Buffer.DataType#isBlockingUpstream`[4] is only called in 
`PipelinedSubpartition` and `RemoteInputChannel`
     - And, `EndOfInputChannelStateEvent` is only handled in 
RecoveredInputChannel, so `EVENT_BUFFER`[2] and `RECOVERY_COMPLETION` is same 
for `EndOfInputChannelStateEvent`
   
   Note: In other words, that is why refactoring is needed. The existing code 
is hard to distinguish `EndOfChannelStateEvent` is from upstream side or 
downstream side.
   
   
   [1] 
https://github.com/apache/flink/blob/45c47ba1b5f34929e383f1b84ac83861351f60e0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java#L311
   [2] 
https://github.com/apache/flink/blob/45c47ba1b5f34929e383f1b84ac83861351f60e0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java#L289
   [3] 
https://github.com/apache/flink/blob/45c47ba1b5f34929e383f1b84ac83861351f60e0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java#L366
   [4] 
https://github.com/apache/flink/blob/45c47ba1b5f34929e383f1b84ac83861351f60e0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java#L396



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java:
##########
@@ -161,6 +166,9 @@ void testToBufferConsumer() throws IOException {
                     assertThat(bufferConsumer.build().getDataType())
                             
.isEqualTo(Buffer.DataType.UNALIGNED_WATERMARK_EVENT);
                 }
+            } else if (evt instanceof EndOfOutputChannelStateEvent) {
+                assertThat(bufferConsumer.build().getDataType())
+                        .isEqualTo(Buffer.DataType.RECOVERY_COMPLETION);

Review Comment:
   Same with the last comment, `Buffer.DataType.EVENT_BUFFER` has been covered 
by the last code branch. And  it seems we only test other data types explicitly.
   
   > It would better document the intention
   
   As I understand, the `EndOfInputChannelStateEvent`, 
`EndOfOutputChannelStateEvent` and `Buffer.DataType#RECOVERY_COMPLETION` 
already have comprehensive javadoc. The reason why you may not understand the 
current PR may be due to the lack of context and background of unaligned 
checkpoint. 
   
   I have added detailed and relevant context and background in the PR 
description, I hope it is useful. Please let me know if you think it still 
needs more inputs, thanks.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java:
##########
@@ -191,6 +199,8 @@ void testToBuffer() throws IOException {
                     assertThat(buffer.getDataType())
                             
.isEqualTo(Buffer.DataType.UNALIGNED_WATERMARK_EVENT);
                 }
+            } else if (evt instanceof EndOfOutputChannelStateEvent) {
+                
assertThat(buffer.getDataType()).isEqualTo(Buffer.DataType.RECOVERY_COMPLETION);

Review Comment:
   As I mentioned in your first comment 
https://github.com/apache/flink/pull/26900#discussion_r2398289421 , the 
expected data type of `EndOfInputChannelStateEvent` is 
`Buffer.DataType.EVENT_BUFFER `, so it has been covered by the next code branch.



##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java:
##########
@@ -270,7 +270,7 @@ protected DataInputStatus processEvent(BufferOrEvent 
bufferOrEvent, DataOutput<T
             if (checkpointedInputGate.isFinished()) {
                 return DataInputStatus.END_OF_INPUT;
             }
-        } else if (event.getClass() == EndOfChannelStateEvent.class) {
+        } else if (event.getClass() == EndOfOutputChannelStateEvent.class) {

Review Comment:
   The reason is `EndOfOutputChannelStateEvent` will be sent from upstream task 
side to downstream task side. `EndOfOutputChannelStateEvent` means all input 
and output recovered buffers are consumed, it indicates EndOfRecovery.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to