Hi Team,

we have beam pipeline that is throwing below mentioned exceptions 
intermittently. We are using beam 2.29.0 with flink runner 1.12.2 .
What we have observed from logs is beam pipeline keeps checking offset in 
particular range (Ex : range [234400206, 9223372036854775807).). It is failing 
if the offset it is out of range. After restart, the offset is getting 
increased automatically and pipeline running fine after couple of restarts when 
the offset is falling within the range.  Can you help us with below questions?


  1.  Why the offset is going out of range?
  2.  Is there any configuration we need to do to avoid this ?


Here is our IO transformer :

                PCollection<KafkaRecord<String, byte[]>> linesFromStreamTopica 
= pKafkaDescriptor.apply("reading from  " +
                "kafka", KafkaIO.readSourceDescriptors().<String, byte[]>read()
                .withBootstrapServers(bootstrapServers)
                
.withKeyDeserializer(org.apache.kafka.common.serialization.StringDeserializer.class)
                .withValueDeserializer(ByteArrayDeserializer.class)
                .withConsumerFactoryFn(enhanceConsumer(KafkaConsumer::new, 
"vanilla", meterRegHolder))
                .withConsumerConfigUpdates(optionalArgsConsumer)
                .withConsumerConfigUpdates(forcedArgsConsumer)
                .withProcessingTime()
                .withReadCommitted()
        );


Exception :
2021-11-19 13:46:13
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught 
exception while processing timer.
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1205)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1181)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1320)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$17(StreamTask.java:1309)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
                at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
                at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:297)
                at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
                at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
                at java.lang.Thread.run(Thread.java:748)
Caused by: TimerException{org.apache.beam.sdk.util.UserCodeException: 
java.lang.IllegalStateException: Last attempted offset should not be null. No 
work was claimed in non-empty range [234400206, 9223372036854775807).}
                ... 11 more
Caused by: org.apache.beam.sdk.util.UserCodeException: 
java.lang.IllegalStateException: Last attempted offset should not be null. No 
work was claimed in non-empty range [234400206, 9223372036854775807).
                at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
                at 
org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
                at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
                at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
                at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
                at 
org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator.fireTimer(SplittableDoFnOperator.java:174)
                at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:1001)
                at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:995)
                at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1318)
                ... 10 more
Caused by: java.lang.IllegalStateException: Last attempted offset should not be 
null. No work was claimed in non-empty range [234400206, 9223372036854775807).
                at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:588)
                at 
org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:113)
                at 
org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.checkDone(RestrictionTrackers.java:80)
                at 
org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:249)
                at 
org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:523)

Reply via email to