thanks, see https://github.com/apache/beam/issues/22631 +
https://github.com/apache/beam/pull/22633

John Casey via dev <[email protected]> ezt írta (időpont: 2022. aug. 8.,
H, 21:30):

> Which looking at your message again, would imply that the
> configuredKafkaCommit() method shouldn't inspect isolation.level
>
> On Mon, Aug 8, 2022 at 3:27 PM John Casey <[email protected]> wrote:
>
>> .withReadCommitted() doesn't commit messages when read, it instead
>> specifies that the kafka consumer should only read messages that have
>> themselves been committed to kafka.
>>
>> Its use is for exactly once applications.
>>
>>
>>
>> On Mon, Aug 8, 2022 at 3:16 PM Balázs Németh <[email protected]>
>> wrote:
>>
>>> I have been reading from Kafka and trying to figure out which offset
>>> management would be the best for my use-case. During that I noticed
>>> something odd.
>>>
>>>
>>> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2359-L2362
>>>
>>>     private boolean configuredKafkaCommit() {
>>>       return getConsumerConfig().get("isolation.level") ==
>>> "read_committed"
>>>           ||
>>> Boolean.TRUE.equals(getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
>>>     }
>>>
>>> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2292-L2298
>>>
>>> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2321-L2334
>>>
>>> The name of the method, and how it's being used in the code certainly
>>> suggest that using read_committed isolation level handles and commits
>>> kafka offsets.Seemed strange, but I'm not a Kafka pro, so let's test it.
>>> Well it does not.
>>>
>>> - using ONLY ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG does commit it
>>> - using ONLY commitOffsetsInFinalize() does commit it
>>>
>>> - using ONLY withReadCommitted() does NOT commit it
>>>
>>> Dataflow, 2.40.0 Java SDK, without explicitly enabling SDF-read
>>>
>>> So is it a bug, or what am I missing here?
>>>
>>> If it is indeed a bug, then is it with the read_committed (so it should
>>> commit it although found no explicit documentation about that anywhere), or
>>> having that isolation level shouldn't prefer the commit in the finalize and
>>> that method is wrong?
>>>
>>>
>>>
>>>

Reply via email to