mukesh154 opened a new issue, #23934: URL: https://github.com/apache/pulsar/issues/23934
### Search before asking - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Motivation I created a Debezium PostgreSQL source connector, and when an entry in one of the tables exceeded 5MB, the source threw the following error with "flush failed." However, the error's cause is only displayed as "Sink Error," which doesn't provide sufficient context for troubleshooting. Although there is a preceding log indicating that the message size exceeds the maximum allowed size, the overall log lacks detailed information. Currently, the log includes a generic message: `Caused by: java.lang.Exception: Sink Error` This message provides minimal context and does not help identify the root cause of the "Sink Error." More detailed logging would significantly improve the debugging process. Including information such as the size of the message that caused the failure, details about the associated producer or consumer, and the specific part of the sink process where the error occurred would be invaluable for faster and more effective troubleshooting. Here is the complete stacktrace: ``` [org.apache.pulsar.client.api.PulsarClientException$InvalidMessageException: Message size is bigger than 5242880 bytes] 2025-02-04T05:05:26,014+0000 [public/default/postgres-dbz-0] ERROR org.apache.pulsar.io.kafka.connect.AbstractKafkaConnectSource - execution exception while get flushFuture java.util.concurrent.ExecutionException: java.lang.Exception: Sink Error at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005) ~[?:?] at org.apache.pulsar.io.kafka.connect.AbstractKafkaConnectSource.read(AbstractKafkaConnectSource.java:191) ~[pulsar-io-kafka-connect-adaptor-2.10.5.5.jar:2.10.5.5] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:491) ~[?:?] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:307) ~[?:?] at java.lang.Thread.run(Thread.java:829) ~[?:?] Caused by: java.lang.Exception: Sink Error at org.apache.pulsar.io.kafka.connect.AbstractKafkaConnectSource$AbstractKafkaSourceRecord.fail(AbstractKafkaConnectSource.java:326) ~[pulsar-io-kafka-connect-adaptor-2.10.5.5.jar:2.10.5.5] at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.lambda$getPublishErrorHandler$1(PulsarSink.java:191) ~[?:?] at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) ~[?:?] at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094) ~[?:?] at org.apache.pulsar.client.impl.ProducerImpl$1.sendComplete(ProducerImpl.java:350) ~[pulsar-client-original-2.10.5.5.jar:2.10.5.5] at org.apache.pulsar.client.impl.BatchMessageContainerImpl.discard(BatchMessageContainerImpl.java:193) ~[pulsar-client-original-2.10.5.5.jar:2.10.5.5] at org.apache.pulsar.client.impl.BatchMessageContainerImpl.createOpSendMsg(BatchMessageContainerImpl.java:214) ~[pulsar-client-original-2.10.5.5.jar:2.10.5.5] at org.apache.pulsar.client.impl.ProducerImpl.batchMessageAndSend(ProducerImpl.java:2034) ~[pulsar-client-original-2.10.5.5.jar:2.10.5.5] at org.apache.pulsar.client.impl.ProducerImpl.lambda$connectionOpened$14(ProducerImpl.java:1654) ~[pulsar-client-original-2.10.5.5.jar:2.10.5.5] at org.apache.pulsar.common.util.Runnables$CatchingAndLoggingRunnable.run(Runnables.java:54) ~[pulsar-common-2.10.5.5.jar:2.10.5.5] at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) ~[netty-common-4.1.93.Final.jar:4.1.93.Final] at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:159) ~[netty-common-4.1.93.Final.jar:4.1.93.Final] at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174) ~[netty-common-4.1.93.Final.jar:4.1.93.Final] at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167) ~[netty-common-4.1.93.Final.jar:4.1.93.Final] at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) ~[netty-common-4.1.93.Final.jar:4.1.93.Final] at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403) ~[netty-transport-classes-epoll-4.1.93.Final.jar:4.1.93.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.93.Final.jar:4.1.93.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.93.Final.jar:4.1.93.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.93.Final.jar:4.1.93.Final] ... 1 more ``` ### Solution Enhance the Sink Error log to include additional information such as if the error is related to specific conditions like exceeding message size or connection issues, mention that directly in the log message. For example, if the failure was caused by a connection timeout or message size violation, log it clearly. ### Alternatives _No response_ ### Anything else? _No response_ ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
