vahmed-hamdy commented on code in PR #24839: URL: https://github.com/apache/flink/pull/24839#discussion_r1614905336
########## flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java: ########## @@ -533,4 +607,77 @@ private int getNextBatchSizeLimit() { protected Consumer<Exception> getFatalExceptionCons() { return fatalExceptionCons; } + + /** An implementation of {@link ResultHandler} that supports timeout. */ + private class AsyncSinkWriterResultHandler implements ResultHandler<RequestEntryT> { + private final ScheduledFuture<?> scheduledFuture; + private final long requestTimestamp; + private final int batchSize; + private final AtomicBoolean isCompleted = new AtomicBoolean(false); + private final List<RequestEntryT> batchEntries; + + public AsyncSinkWriterResultHandler( + long requestTimestamp, List<RequestEntryT> batchEntries, RequestInfo requestInfo) { + this.scheduledFuture = + timeService.registerTimer( + timeService.getCurrentProcessingTime() + requestTimeoutMS, + instant -> this.timeout()); + this.requestTimestamp = requestTimestamp; + this.batchEntries = batchEntries; + this.batchSize = requestInfo.getBatchSize(); + } + + @Override + public void complete() { + if (isCompleted.compareAndSet(false, true)) { + scheduledFuture.cancel(false); + mailboxExecutor.execute( + () -> completeRequest(Collections.emptyList(), batchSize, requestTimestamp), + "Mark in-flight request as completed successfully", + batchSize); + } + } + + @Override + public void completeExceptionally(Exception e) { + if (isCompleted.compareAndSet(false, true)) { + scheduledFuture.cancel(false); + mailboxExecutor.execute( + () -> getFatalExceptionCons().accept(e), + "Mark in-flight request as failed with fatal exception", + e.getMessage()); + } + } + + @Override + public void retryForEntries(List<RequestEntryT> requestEntriesToRetry) { + if (isCompleted.compareAndSet(false, true)) { + scheduledFuture.cancel(false); + mailboxExecutor.execute( + () -> completeRequest(requestEntriesToRetry, batchSize, requestTimestamp), + "Mark in-flight request as completed with %d failed request entries", Review Comment: I think it is already supported by [mailbox executor](https://github.com/apache/flink/blob/5b535e1410782a2d42127e00d77815d6ab7068ed/flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java#L120) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org