[ 
https://issues.apache.org/jira/browse/FLINK-39193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-39193:
-----------------------------
    Description: 
When OffsetsInitializer.offsets(...) is given offsets for only some partitions 
of a topic, Flink does not just use those explicit offsets and ignore the rest. 
Instead, SpecifiedOffsetsInitializer falls back to committed-offset lookup for 
the missing partitions. In our environment, that fallback path can throw:

{{java.lang.NoClassDefFoundError: 
org/apache/kafka/clients/admin/ListConsumerGroupOffsetsSpec}}

This is a generic Kafka source issue, not specific to any one integration 
style. It can affect both static and dynamic source setups anywhere sparse 
specific startup offsets are used.

Minimal repro shape:

TopicPartition tp0 = new TopicPartition("test-topic", 0);TopicPartition tp1 = 
new TopicPartition("test-topic", 1);OffsetsInitializer initializer = 
OffsetsInitializer.offsets(Map.of(tp0, 111L));

initializer.getPartitionOffsets(
List.of(tp0, tp1),
partitionOffsetsRetriever);

Observed behavior:
 * the explicit offset for tp0 is used
 * the omitted partition tp1 triggers committed-offset lookup
 * that path can fail with NoClassDefFoundError instead of producing a clear 
validation error or deterministic fallback

 

  was:
When OffsetsInitializer.offsets(...) is given offsets for only some partitions 
of a topic, Flink does not just use those explicit offsets and ignore the rest. 
Instead, SpecifiedOffsetsInitializer falls back to committed-offset lookup for 
the missing partitions. In our environment, that fallback path can throw:


{{java.lang.NoClassDefFoundError: 
org/apache/kafka/clients/admin/ListConsumerGroupOffsetsSpec}}
This is a generic Kafka source issue, not specific to any one integration 
style. It can affect both static and dynamic source setups anywhere sparse 
specific startup offsets are used.

Minimal repro shape:
{{TopicPartition tp0 = new TopicPartition("test-topic", 0);TopicPartition tp1 = 
new TopicPartition("test-topic", 1);OffsetsInitializer initializer =    
OffsetsInitializer.offsets(Map.of(tp0, 111L));

initializer.getPartitionOffsets(
    List.of(tp0, tp1),
    partitionOffsetsRetriever);}}
Observed behavior:
 * the explicit offset for tp0 is used
 * the omitted partition tp1 triggers committed-offset lookup
 * that path can fail with NoClassDefFoundError instead of producing a clear 
validation error or deterministic fallback

 


> SpecifiedOffsetsInitializer can fail if the caller passes a non-empty but 
> incomplete specific-offset map.
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39193
>                 URL: https://issues.apache.org/jira/browse/FLINK-39193
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: kafka-4.0.1
>            Reporter: Bowen Li
>            Priority: Major
>
> When OffsetsInitializer.offsets(...) is given offsets for only some 
> partitions of a topic, Flink does not just use those explicit offsets and 
> ignore the rest. Instead, SpecifiedOffsetsInitializer falls back to 
> committed-offset lookup for the missing partitions. In our environment, that 
> fallback path can throw:
> {{java.lang.NoClassDefFoundError: 
> org/apache/kafka/clients/admin/ListConsumerGroupOffsetsSpec}}
> This is a generic Kafka source issue, not specific to any one integration 
> style. It can affect both static and dynamic source setups anywhere sparse 
> specific startup offsets are used.
> Minimal repro shape:
> TopicPartition tp0 = new TopicPartition("test-topic", 0);TopicPartition tp1 = 
> new TopicPartition("test-topic", 1);OffsetsInitializer initializer = 
> OffsetsInitializer.offsets(Map.of(tp0, 111L));
> initializer.getPartitionOffsets(
> List.of(tp0, tp1),
> partitionOffsetsRetriever);
> Observed behavior:
>  * the explicit offset for tp0 is used
>  * the omitted partition tp1 triggers committed-offset lookup
>  * that path can fail with NoClassDefFoundError instead of producing a clear 
> validation error or deterministic fallback
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to