1996fanrui commented on PR #21690:
URL: https://github.com/apache/flink/pull/21690#issuecomment-1399208093
Hi @pnowojski , after an offline discussion with @lindong28 , I'm not sure
if it makes sense to `add a return value for DataOutput#emitRecord` and `the
return value determines whether the caller can continue to emit the record`?
`DataOutput#emitRecord` is an interface, it should obey these conditions:
- It shouldn't care about the logic of the caller.
- It can be called repeatedly by the caller (Most methods can be called
repeatedly by the caller, except for some special methods, such as: `open()`,
`close()`, etc.).
- It shouldn't care what the caller does between calling itself multiple
times.
Simplified code for our discussion looks like this:
```java
MailboxProcessor#runMailboxLoop() {
while(true) {
// do some logics
NetworkInput/SourceOperator#emitNext();
// do some logics
}
}
// Before change: just emit one record.
NetworkInput/SourceOperator#emitNext(){
DataOutput#emitRecord();
}
// After change: can emit multiple records.
NetworkInput/SourceOperator#emitNext(){
while(true) {
DataOutput#emitRecord();
}
}
```
We want to add the loop inside `emitNext` to reduce the call stack and
improve performance. The `loop inside emitNext` logic should belong to
`NetworkInput/SourceOperator` instead of `DataOutput`.
And I mentioned before: `DataOutput#emitRecord can be called repeatedly by
the caller`. If we return a boolean to determine whether caller can continue to
emit records, it means that `DataOutput#emitRecord` knows the caller's two
loops, however, `it shouldn't care about the logic of the caller`, and it
shouldn't know the logic about `canEmitBatchOfRecords`. It should only be
responsible for emitting records.
So I think the logic related to the `canEmitBatchOfRecords` should be
maintained inside the `NetworkInput and SourceOperator`, and I have a POC
branch to do it[1].
Also, since flink 1.17 is approaching the feature freeze, could I create a
new PR to fix bug and this PR focus on the refactoring? We can continue discuss
the refactoring here, what do you think?
[1]
https://github.com/1996fanrui/flink/commits/30623/canEmitBatchOfRecords-POC
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]