dawidwys commented on a change in pull request #15055:
URL: https://github.com/apache/flink/pull/15055#discussion_r657790301
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
##########
@@ -190,6 +190,9 @@ public void resumeConsumption() {
isBlocked = false;
}
+ @Override
+ public void acknowledgeAllRecordsProcessed() throws IOException {}
Review comment:
Should we rather call:
```
@Override
public void acknowledgeAllRecordsProcessed() throws IOException {
inputGate.acknowledgeAllRecordsProcessed(this);
}
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
##########
@@ -271,15 +271,22 @@ private ResultSubpartitionView
checkAndWaitForSubpartitionView() {
@Override
public void resumeConsumption() {
- checkState(!isReleased, "Channel released.");
Review comment:
Why do we need to change the precondition here?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
##########
@@ -130,6 +130,12 @@ public ResultPartitionID getPartitionId() {
*/
public abstract void resumeConsumption() throws IOException;
+ /**
+ * When received {@link
org.apache.flink.runtime.io.network.api.EndOfUserRecordsEvent} from one
+ * channel, it need to acknowledge after this event get processed.
+ */
Review comment:
nit:
```suggestion
* The {@link
org.apache.flink.runtime.io.network.api.EndOfUserRecordsEvent} needs to be
acknowledged once it's
* been processed.
*/
```
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
##########
@@ -63,6 +64,9 @@ public void requestPartitions() {}
@Override
public void resumeConsumption(InputChannelInfo channelInfo) {}
+ @Override
+ public void acknowledgeAllRecordsProcessed(InputChannelInfo channelInfo)
throws IOException {}
Review comment:
How about throwing an `UnsupportedOperationException`? I feel it would
be safer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]