[ 
https://issues.apache.org/jira/browse/FLINK-28185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17557320#comment-17557320
 ] 

Qingsheng Ren commented on FLINK-28185:
---------------------------------------

I prefer to throw an exception with clearer message under this case. If we 
silently set the starting offset of empty partitions to earliest, there could 
be situation that the timestamp of newly incomming data are lower the specified 
starting timestamp, which is against the semantic of 
OffsetsInitializer.timestamp(). I'd like to leave the decision to users because 
they can always implement a custom {{{}OffsetsInitializer{}}}. 

> "Invalid negative offset" when using OffsetsInitializer.timestamp(.)
> --------------------------------------------------------------------
>
>                 Key: FLINK-28185
>                 URL: https://issues.apache.org/jira/browse/FLINK-28185
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.15.0
>         Environment: Flink 1.15.0
> Kafka 2.8.1
>            Reporter: Peter Schrott
>            Priority: Minor
>         Attachments: Bildschirmfoto 2022-06-21 um 15.24.58-1.png
>
>
> When using the {{OffsetsInitializer.timestamp(.)}} on a topic with empty 
> partitions – little traffice + low retention – an {{IllegalArgumentException: 
> Invalid negative offset}} occures. See stracktrace below.
> The problem here is, that the admin client returns -1 as timestamps and 
> offset for empty partitions in {{{}KafkaAdminClient.listOffsets(.){}}}. [1] 
> Please compare the attached screenshot. When creating {{OffsetAndTimestamp}} 
> object from the admin client response the exception is thrown.
> {code:java}
> org.apache.flink.util.FlinkRuntimeException: Failed to initialize partition 
> splits due to 
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:299)
>     at 
> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83)
>     at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
>     at java.util.concurrent.FutureTask.run(FutureTask.java)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:750)
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
>     at 
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndTimestamp.<init>(OffsetAndTimestamp.java:36)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.lambda$offsetsForTimes$8(KafkaSourceEnumerator.java:622)
>     at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>     at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>     at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1723)
>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>     at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>     at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>     at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.offsetsForTimes(KafkaSourceEnumerator.java:615)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.initializer.TimestampOffsetsInitializer.getPartitionOffsets(TimestampOffsetsInitializer.java:57)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.initializePartitionSplits(KafkaSourceEnumerator.java:272)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$checkPartitionChanges$0(KafkaSourceEnumerator.java:242)
>     at 
> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
>     ... 8 common frames omitted
> 15:25:58.025 INFO  [flink-akka.actor.default-dispatcher-11] 
> o.a.f.runtime.jobmaster.JobMaster - Trying to recover from a global failure.
> org.apache.flink.util.FlinkException: Global failure triggered by 
> OperatorCoordinator for 'Source: XXX -> YYY -> Sink: ZZZ' (operator 
> 351e440289835f2ff3e6fee31bf6e13c).
>     at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556)
>     at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:231)
>     at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:316)
>     at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:329)
>     at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
>     at java.util.concurrent.FutureTask.run(FutureTask.java)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to initialize 
> partition splits due to 
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:299)
>     at 
> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83)
>     at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
>     ... 8 common frames omitted
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
>     at 
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndTimestamp.<init>(OffsetAndTimestamp.java:36)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.lambda$offsetsForTimes$8(KafkaSourceEnumerator.java:622)
>     at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>     at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>     at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1723)
>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>     at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>     at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>     at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.offsetsForTimes(KafkaSourceEnumerator.java:615)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.initializer.TimestampOffsetsInitializer.getPartitionOffsets(TimestampOffsetsInitializer.java:57)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.initializePartitionSplits(KafkaSourceEnumerator.java:272)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$checkPartitionChanges$0(KafkaSourceEnumerator.java:242)
>     at 
> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
>     ... 8 common frames omitted {code}
> *Expected Result:*
> Consumer is initialized and records of partitions that contain data (> given 
> timestamp) are consumed. Newly incomming data on "empty" partitions are also 
> consumed.
> *Actual Result:*
> Consumer is not initizalied. No data are consumed.
>  
> [1] 
> [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L604]
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to