1u0 commented on a change in pull request #9717: [FLINK-14044] [runtime]
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328539920
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
##########
@@ -209,41 +176,34 @@ else if (element.isLatencyMarker()) {
}
@Override
- public void processElement(StreamRecord<IN> element) throws Exception {
- final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new
StreamRecordQueueEntry<>(element);
+ public void processElement(final StreamRecord<IN> element) throws
Exception {
+ // add element first to the queue
+ final ResultFuture<OUT> entry = addToWorkQueue(element);
+
+ final ResultHandler resultHandler = new ResultHandler(element,
entry);
+ // register a timeout for the entry if timeout is configured
if (timeout > 0L) {
- // register a timeout for this
AsyncStreamRecordBufferEntry
- long timeoutTimestamp = timeout +
getProcessingTimeService().getCurrentProcessingTime();
-
- final ScheduledFuture<?> timerFuture =
getProcessingTimeService().registerTimer(
- timeoutTimestamp,
- new ProcessingTimeCallback() {
- @Override
- public void onProcessingTime(long
timestamp) throws Exception {
-
userFunction.timeout(element.getValue(), streamRecordBufferEntry);
- }
- });
-
- // Cancel the timer once we've completed the stream
record buffer entry. This will remove
- // the register trigger task
- streamRecordBufferEntry.onComplete(
- (StreamElementQueueEntry<Collection<OUT>>
value) -> {
- timerFuture.cancel(true);
- },
- executor);
- }
+ final long timeoutTimestamp = timeout +
getProcessingTimeService().getCurrentProcessingTime();
- addAsyncBufferEntry(streamRecordBufferEntry);
+ final ScheduledFuture<?> timeoutTimer =
getProcessingTimeService().registerTimer(
+ timeoutTimestamp,
+ timestamp ->
userFunction.timeout(element.getValue(), resultHandler));
- userFunction.asyncInvoke(element.getValue(),
streamRecordBufferEntry);
+ resultHandler.setTimeoutTimer(timeoutTimer);
Review comment:
Allowing cancellation of timer callbacks that are already in mailbox queue
may help in this situation, but still won't have strict guarantees in general
(as it was before with multithreaded code).
Even with a hotfix, you may still need to somehow track/allow only one
result (either successfull or failed) per `ResultHandler`. With current
implementation, an ill written `AsyncFunction` may call multiple times
`resultHandler.complete()` (on the same ``resultHandler` instance) and every
such call would post a new mailbox letter.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services