imaffe commented on a change in pull request #17643:
URL: https://github.com/apache/flink/pull/17643#discussion_r744654262
##########
File path:
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
##########
@@ -80,8 +80,16 @@ public PulsarUnorderedPartitionSplitReader(
@Override
protected Message<byte[]> pollMessage(Duration timeout)
throws ExecutionException, InterruptedException, TimeoutException {
- Message<byte[]> message =
- pulsarConsumer.receiveAsync().get(timeout.toMillis(),
TimeUnit.MILLISECONDS);
+ CompletableFuture<Message<byte[]>> messageCompletableFuture =
pulsarConsumer.receiveAsync();
Review comment:
> > I don't think this fixed the data loss issue. Pulsar [internally
wrap](https://github.com/apache/pulsar/blob/f0413b1adad395319e5595a863764f8af943dbe8/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L422)
the CompletableFuture in `receiveAsync()` with a
`CompletableFutureCancellationHandler`. The pending receiver would be
automatically removed when meeting `TimeoutException`.
> > That means we don't meet data loss when we don't manually cancel the
future.
> > @AHeise Can you confirm this?
>
> I think that you are wrong. As the source code, The pending receiver
cannot remove the future with `CompletableFutureCancellationHandler`. What you
used is `CompletableFuture.get(timeout)` , not the
`CompletableFuture.orTimeout(timeout)`. If you don't manually cancel the
future, the `future` will lost in connector, but still in the `pending
receiver` queue in internal pulsar client. So the data lost. Please take a look.
Yeah using CompletableFuture#get() won't complete the future with a
TimeoutException.
The CompletableFutureCancellationHandler executes cancellation logic only
when the future completes exceptionally with a `TimeoutException`.

```
private BiConsumer<Object, ? super Throwable> whenCompleteFunction() {
return (v, throwable) -> {
if (throwable instanceof CancellationException || throwable
instanceof TimeoutException) {
completionStatus = CompletionStatus.CANCELLED;
} else {
completionStatus = CompletionStatus.DONE;
}
runCancelActionOnceIfCancelled();
};
}
```
I agree with @syhily it's kinda a bug on Pulsar Client, users are very
likely to use get() in many cases.
Just wondering, has the issue been created on in pulsar repo ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]