GitHub user htbmw added a comment to the discussion: Should Kafka Source 
Connector itsself after unrecoverable error?

I work together with @relief-melone. 
I was tasked with testing the fix to see if this solves our problem.

Here is what I did:

- I cloned this repo and verified that the code changes for the fix is visible 
in the source code on my local repo
- I then compiled the required nar files and replaced the previous nar files on 
the docker image.
- Then I deployed a pod in our K8s cluster  with the new Docker image that 
contains the newly compiled nar files (kafka source) and let it run for a 
couple of days.
- I checked this morning and the pod's logs was again showing the familiar 
error without having crashed and restarted as we hoped it would.
- I further tested to check if it could consume messages from Kafka in this 
state, but it couldn't, meaning that the source is still in a state where it 
cannot recover and is basically halted without consuming any messages.

Is there any other way I can test this?
Is there something I could have done wrong during compilation / testing?
Any suggestions to what I can try / test differently?

Here is the excerpt from the log for the source with the fix:
What is interesting is that it shows "Stopping Kafka Source", but it does not 
cause the pod to crash.


2023-06-11T22:20:29,237+0000 [Kafka Source Thread] ERROR 
org.apache.pulsar.io.kafka.KafkaAbstractSource - [Kafka Source Thread] Error 
while consuming records
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
completed since the group has already rebalanced and assigned the partitions to 
another member. This means that the time between subsequent calls to poll() was 
longer than the configured max.poll.interval.ms, which typically implies that 
the poll loop is spending too much time message processing. You can address 
this either by increasing max.poll.interval.ms or by reducing the maximum size 
of batches returned in poll() with max.poll.records.
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1441)
 ~[kafka-clients-3.4.0.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1341)
 ~[kafka-clients-3.4.0.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1260)
 ~[kafka-clients-3.4.0.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1235)
 ~[kafka-clients-3.4.0.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
 ~[kafka-clients-3.4.0.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
 ~[kafka-clients-3.4.0.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
 ~[kafka-clients-3.4.0.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:617)
 ~[kafka-clients-3.4.0.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:427)
 ~[kafka-clients-3.4.0.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:312)
 ~[kafka-clients-3.4.0.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:230)
 ~[kafka-clients-3.4.0.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
 ~[kafka-clients-3.4.0.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1174)
 ~[kafka-clients-3.4.0.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1502)
 ~[kafka-clients-3.4.0.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1401)
 ~[kafka-clients-3.4.0.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1358)
 ~[kafka-clients-3.4.0.jar:?]
        at 
org.apache.pulsar.io.kafka.KafkaAbstractSource.lambda$start$0(KafkaAbstractSource.java:181)
 ~[QwaU9xgIUZTeaeOtqJikUw/:?]
        at java.lang.Thread.run(Thread.java:833) ~[?:?]
2023-06-11T22:20:29,238+0000 [Kafka Source Thread] INFO  
org.apache.pulsar.io.kafka.KafkaAbstractSource - Stopping kafka source

GitHub link: 
https://github.com/apache/pulsar/discussions/19880#discussioncomment-6161712

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to