tisonkun commented on code in PR #21074:
URL: https://github.com/apache/flink/pull/21074#discussion_r998038872
##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java:
##########
@@ -151,42 +146,23 @@ public void write(IN element, Context context) throws
IOException, InterruptedEx
// We would just ignore the sending exception. This may cause data
loss.
builder.sendAsync();
} else {
- // Waiting for permits to write message.
- requirePermits();
- mailboxExecutor.execute(
- () -> enqueueMessageSending(topic, builder),
- "Failed to send message to Pulsar");
- }
- }
-
- private void enqueueMessageSending(String topic, TypedMessageBuilder<?>
builder)
- throws ExecutionException, InterruptedException {
- // Block the mailbox executor for yield method.
- builder.sendAsync()
- .whenComplete(
- (id, ex) -> {
- this.releasePermits();
- if (ex != null) {
- throw new FlinkRuntimeException(
- "Failed to send data to Pulsar " +
topic, ex);
- } else {
- LOG.debug(
- "Sent message to Pulsar {} with
message id {}", topic, id);
- }
- })
- .get();
- }
-
- private void requirePermits() throws InterruptedException {
- while (pendingMessages >= sinkConfiguration.getMaxPendingMessages()) {
- LOG.info("Waiting for the available permits.");
- mailboxExecutor.yield();
+ // Increase the pending message count.
+ pendingMessages.incrementAndGet();
+ builder.sendAsync()
+ .whenComplete(
+ (id, ex) -> {
+ pendingMessages.decrementAndGet();
+ if (ex != null) {
+ throw new FlinkRuntimeException(
+ "Failed to send data to Pulsar " +
topic, ex);
Review Comment:
We may handle exceptions in the mailbox executor as `KafkaWriter` does.
Otherwise, this exception isn't handled by any thread.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]