GitHub user htbmw added a comment to the discussion: Should Kafka Source
Connector itsself after unrecoverable error?
I work together with @relief-melone.
I was tasked with testing the fix to see if this solves our problem.
Here is what I did:
- I cloned this repo and verified that the code changes for the fix is visible
in the source code on my local repo
- I then compiled the required nar files and replaced the previous nar files on
the docker image.
- Then I deployed a pod in our K8s cluster with the new Docker image that
contains the newly compiled nar files (kafka source) and let it run for a
couple of days.
- I checked this morning and the pod's logs was again showing the familiar
error without having crashed and restarted as we hoped it would.
- I further tested to check if it could consume messages from Kafka in this
state, but it couldn't, meaning that the source is still in a state where it
cannot recover and is basically halted without consuming any messages.
Is there any other way I can test this?
Is there something I could have done wrong during compilation / testing?
Any suggestions to what I can try / test differently?
Here is the excerpt from the log for the source with the fix:
What is interesting is that it shows "Stopping Kafka Source", but it does not
cause the pod to crash.
2023-06-11T22:20:29,237+0000 [Kafka Source Thread] ERROR
org.apache.pulsar.io.kafka.KafkaAbstractSource - [Kafka Source Thread] Error
while consuming records
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalanced and assigned the partitions to
another member. This means that the time between subsequent calls to poll() was
longer than the configured max.poll.interval.ms, which typically implies that
the poll loop is spending too much time message processing. You can address
this either by increasing max.poll.interval.ms or by reducing the maximum size
of batches returned in poll() with max.poll.records.
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1441)
~[kafka-clients-3.4.0.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1341)
~[kafka-clients-3.4.0.jar:?]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1260)
~[kafka-clients-3.4.0.jar:?]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1235)
~[kafka-clients-3.4.0.jar:?]
at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
~[kafka-clients-3.4.0.jar:?]
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
~[kafka-clients-3.4.0.jar:?]
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
~[kafka-clients-3.4.0.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:617)
~[kafka-clients-3.4.0.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:427)
~[kafka-clients-3.4.0.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:312)
~[kafka-clients-3.4.0.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:230)
~[kafka-clients-3.4.0.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
~[kafka-clients-3.4.0.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1174)
~[kafka-clients-3.4.0.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1502)
~[kafka-clients-3.4.0.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1401)
~[kafka-clients-3.4.0.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1358)
~[kafka-clients-3.4.0.jar:?]
at
org.apache.pulsar.io.kafka.KafkaAbstractSource.lambda$start$0(KafkaAbstractSource.java:181)
~[QwaU9xgIUZTeaeOtqJikUw/:?]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
2023-06-11T22:20:29,238+0000 [Kafka Source Thread] INFO
org.apache.pulsar.io.kafka.KafkaAbstractSource - Stopping kafka source
GitHub link:
https://github.com/apache/pulsar/discussions/19880#discussioncomment-6161712
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]