Hi Team,
we have beam pipeline that is throwing below mentioned exceptions
intermittently. We are using beam 2.29.0 with flink runner 1.12.2 .
What we have observed from logs is beam pipeline keeps checking offset in
particular range (Ex : range [234400206, 9223372036854775807).). It is failing
if the offset it is out of range. After restart, the offset is getting
increased automatically and pipeline running fine after couple of restarts when
the offset is falling within the range. Can you help us with below questions?
1. Why the offset is going out of range?
2. Is there any configuration we need to do to avoid this ?
Here is our IO transformer :
PCollection<KafkaRecord<String, byte[]>> linesFromStreamTopica
= pKafkaDescriptor.apply("reading from " +
"kafka", KafkaIO.readSourceDescriptors().<String, byte[]>read()
.withBootstrapServers(bootstrapServers)
.withKeyDeserializer(org.apache.kafka.common.serialization.StringDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class)
.withConsumerFactoryFn(enhanceConsumer(KafkaConsumer::new,
"vanilla", meterRegHolder))
.withConsumerConfigUpdates(optionalArgsConsumer)
.withConsumerConfigUpdates(forcedArgsConsumer)
.withProcessingTime()
.withReadCommitted()
);
Exception :
2021-11-19 13:46:13
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
exception while processing timer.
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1205)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1320)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$17(StreamTask.java:1309)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:297)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)
Caused by: TimerException{org.apache.beam.sdk.util.UserCodeException:
java.lang.IllegalStateException: Last attempted offset should not be null. No
work was claimed in non-empty range [234400206, 9223372036854775807).}
... 11 more
Caused by: org.apache.beam.sdk.util.UserCodeException:
java.lang.IllegalStateException: Last attempted offset should not be null. No
work was claimed in non-empty range [234400206, 9223372036854775807).
at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at
org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator.fireTimer(SplittableDoFnOperator.java:174)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:1001)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:995)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1318)
... 10 more
Caused by: java.lang.IllegalStateException: Last attempted offset should not be
null. No work was claimed in non-empty range [234400206, 9223372036854775807).
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:588)
at
org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:113)
at
org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.checkDone(RestrictionTrackers.java:80)
at
org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:249)
at
org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:523)