[
https://issues.apache.org/jira/browse/BEAM-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384330#comment-16384330
]
Raghu Angadi commented on BEAM-3754:
------------------------------------
Just saw this. You are correct. Thanks for reporting it. They should never have
duplicated code. I am not even sure when `{{rawBytes()}}` was added. Sent a
fix in https://github.com/apache/beam/pull/4792
> KAFKA - Can't set commitOffsetsInFinalizeEnabled to false with
> KafkaIO.readBytes()
> ----------------------------------------------------------------------------------
>
> Key: BEAM-3754
> URL: https://issues.apache.org/jira/browse/BEAM-3754
> Project: Beam
> Issue Type: Bug
> Components: io-java-kafka
> Affects Versions: 2.3.0
> Environment: Dataflow pipeline using Kafka as a Sink
> Reporter: Benjamin BENOIST
> Assignee: Raghu Angadi
> Priority: Minor
> Labels: patch
> Original Estimate: 2h
> Time Spent: 10m
> Remaining Estimate: 1h 50m
>
> Beam v2.3 introduces finalized offsets, in order to reduce the gaps or
> duplicate processing of records while restarting a pipeline.
> _read()_ sets this parameter to false [by
> default|https://github.com/apache/beam/blob/release-2.3.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L307]
> but _readBytes()_
> [doesn't|https://github.com/apache/beam/blob/release-2.3.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L282],
> thus creating an exception:
> {noformat}
> Exception in thread "main" java.lang.IllegalStateException: Missing required
> properties: commitOffsetsInFinalizeEnabled
> at
> org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Read$Builder.build(AutoValue_KafkaIO_Read.java:344)
> at
> org.apache.beam.sdk.io.kafka.KafkaIO.readBytes(KafkaIO.java:291){noformat}
> The parameter can be set to true with _commitOffsetsInFinalize()_ but never
> to false.
> Using _read()_ in the definition of _readBytes()_ could prevent this kind of
> error in the future:
> {code:java}
> public static Read<byte[], byte[]> readBytes() {
> return read()
> .setKeyDeserializer(ByteArrayDeserializer.class)
> .setValueDeserializer(ByteArrayDeserializer.class)
> .build();
> }{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)