mukesh154 opened a new issue, #23934:
URL: https://github.com/apache/pulsar/issues/23934

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Motivation
   
   I created a Debezium PostgreSQL source connector, and when an entry in one 
of the tables exceeded 5MB, the source threw the following error with "flush 
failed." However, the error's cause is only displayed as "Sink Error," which 
doesn't provide sufficient context for troubleshooting. Although there is a 
preceding log indicating that the message size exceeds the maximum allowed 
size, the overall log lacks detailed information.
   
   Currently, the log includes a generic message:
   `Caused by: java.lang.Exception: Sink Error`
   This message provides minimal context and does not help identify the root 
cause of the "Sink Error." More detailed logging would significantly improve 
the debugging process. Including information such as the size of the message 
that caused the failure, details about the associated producer or consumer, and 
the specific part of the sink process where the error occurred would be 
invaluable for faster and more effective troubleshooting.
   
   Here is the complete stacktrace:
   ```
   [org.apache.pulsar.client.api.PulsarClientException$InvalidMessageException: 
Message size is bigger than 5242880 bytes]
   2025-02-04T05:05:26,014+0000 [public/default/postgres-dbz-0] ERROR 
org.apache.pulsar.io.kafka.connect.AbstractKafkaConnectSource - execution 
exception while get flushFuture
   java.util.concurrent.ExecutionException: java.lang.Exception: Sink Error
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
~[?:?]
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005) ~[?:?]
        at 
org.apache.pulsar.io.kafka.connect.AbstractKafkaConnectSource.read(AbstractKafkaConnectSource.java:191)
 ~[pulsar-io-kafka-connect-adaptor-2.10.5.5.jar:2.10.5.5]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:491)
 ~[?:?]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:307)
 ~[?:?]
        at java.lang.Thread.run(Thread.java:829) ~[?:?]
   Caused by: java.lang.Exception: Sink Error
        at 
org.apache.pulsar.io.kafka.connect.AbstractKafkaConnectSource$AbstractKafkaSourceRecord.fail(AbstractKafkaConnectSource.java:326)
 ~[pulsar-io-kafka-connect-adaptor-2.10.5.5.jar:2.10.5.5]
        at 
org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.lambda$getPublishErrorHandler$1(PulsarSink.java:191)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) 
~[?:?]
        at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
 ~[?:?]
        at 
org.apache.pulsar.client.impl.ProducerImpl$1.sendComplete(ProducerImpl.java:350)
 ~[pulsar-client-original-2.10.5.5.jar:2.10.5.5]
        at 
org.apache.pulsar.client.impl.BatchMessageContainerImpl.discard(BatchMessageContainerImpl.java:193)
 ~[pulsar-client-original-2.10.5.5.jar:2.10.5.5]
        at 
org.apache.pulsar.client.impl.BatchMessageContainerImpl.createOpSendMsg(BatchMessageContainerImpl.java:214)
 ~[pulsar-client-original-2.10.5.5.jar:2.10.5.5]
        at 
org.apache.pulsar.client.impl.ProducerImpl.batchMessageAndSend(ProducerImpl.java:2034)
 ~[pulsar-client-original-2.10.5.5.jar:2.10.5.5]
        at 
org.apache.pulsar.client.impl.ProducerImpl.lambda$connectionOpened$14(ProducerImpl.java:1654)
 ~[pulsar-client-original-2.10.5.5.jar:2.10.5.5]
        at 
org.apache.pulsar.common.util.Runnables$CatchingAndLoggingRunnable.run(Runnables.java:54)
 ~[pulsar-common-2.10.5.5.jar:2.10.5.5]
        at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) 
~[netty-common-4.1.93.Final.jar:4.1.93.Final]
        at 
io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:159) 
~[netty-common-4.1.93.Final.jar:4.1.93.Final]
        at 
io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
 ~[netty-common-4.1.93.Final.jar:4.1.93.Final]
        at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
 ~[netty-common-4.1.93.Final.jar:4.1.93.Final]
        at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
 ~[netty-common-4.1.93.Final.jar:4.1.93.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403) 
~[netty-transport-classes-epoll-4.1.93.Final.jar:4.1.93.Final]
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
 ~[netty-common-4.1.93.Final.jar:4.1.93.Final]
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 
~[netty-common-4.1.93.Final.jar:4.1.93.Final]
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[netty-common-4.1.93.Final.jar:4.1.93.Final]
        ... 1 more
   ```
   
   ### Solution
   
   Enhance the Sink Error log to include additional information such as if the 
error is related to specific conditions like exceeding message size or 
connection issues, mention that directly in the log message. For example, if 
the failure was caused by a connection timeout or message size violation, log 
it clearly.
   
   ### Alternatives
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to