[
https://issues.apache.org/jira/browse/CAMEL-18627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
rupam updated CAMEL-18627:
--------------------------
Description:
In the updateTaskState() method, if the consumer is resumed and it had no
previously stored offset details, then it throws a null pointer exception when
it tries to retrieve ((OffsetAndMetadata) v).offset() information
{code:java}
private void updateTaskState() {
switch (state) {
case PAUSE_REQUESTED:
LOG.info("Pausing the consumer as a response to a pause request");
consumer.pause(consumer.assignment());
state = State.PAUSED;
break;
case RESUME_REQUESTED:
LOG.info("Resuming the consumer as a response to a resume request");
if (consumer.committed(consumer.assignment()) != null) {
consumer.committed(consumer.assignment()).forEach((k, v) -> {
final TopicPartition tp = ((TopicPartition) k);
LOG.info("Resuming from offset {} for the topic {} with
partition {}",
((OffsetAndMetadata) v).offset(), tp.topic(),
tp.partition());
consumer.seek(tp, ((OffsetAndMetadata) v).offset());
}
);
}
consumer.resume(consumer.assignment());
state = State.RUNNING;
break;
default:
break;
} }
{{code}
was:
In the startPolling() method, if the consumer is suspending, then the
unsubscribe method is called which permanently closes the consumer
{code:java}
Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
while (isKafkaConsumerRunnable() && isConnected() &&
pollExceptionStrategy.canContinue()) {
ConsumerRecords<Object, Object> allRecords =
consumer.poll(pollDuration);
if (consumerListener != null) {
if (!consumerListener.afterConsume(consumer)) {
continue;
}
}
ProcessingResult result =
recordProcessorFacade.processPolledRecords(allRecords);
if (result.isBreakOnErrorHit()) {
LOG.debug("We hit an error ... setting flags to force reconnect");
// force re-connect
setReconnect(true);
setConnected(false);
}
updateTaskState();
}
if (!isConnected()) {
LOG.debug("Not reconnecting, check whether to auto-commit or not ...");
commitManager.commit();
}
safeUnsubscribe();
} catch (InterruptException e) {{code}
> camel-kafka: Kafka resume throws null pointer exception if no partition
> offset exists
> -------------------------------------------------------------------------------------
>
> Key: CAMEL-18627
> URL: https://issues.apache.org/jira/browse/CAMEL-18627
> Project: Camel
> Issue Type: Task
> Components: camel-kafka
> Affects Versions: 3.18.3
> Reporter: rupam
> Assignee: Otavio Rodolfo Piske
> Priority: Minor
> Fix For: 3.20.0
>
>
>
> In the updateTaskState() method, if the consumer is resumed and it had no
> previously stored offset details, then it throws a null pointer exception
> when it tries to retrieve ((OffsetAndMetadata) v).offset() information
>
> {code:java}
> private void updateTaskState() {
> switch (state) {
> case PAUSE_REQUESTED:
> LOG.info("Pausing the consumer as a response to a pause request");
> consumer.pause(consumer.assignment());
> state = State.PAUSED;
> break;
> case RESUME_REQUESTED:
> LOG.info("Resuming the consumer as a response to a resume request");
> if (consumer.committed(consumer.assignment()) != null) {
> consumer.committed(consumer.assignment()).forEach((k, v) -> {
> final TopicPartition tp = ((TopicPartition) k);
> LOG.info("Resuming from offset {} for the topic {} with
> partition {}",
> ((OffsetAndMetadata) v).offset(), tp.topic(),
> tp.partition());
> consumer.seek(tp, ((OffsetAndMetadata) v).offset());
> }
> );
> }
> consumer.resume(consumer.assignment());
> state = State.RUNNING;
> break;
> default:
> break;
> } }
> {{code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)