Benjamin BENOIST created BEAM-3754:
--------------------------------------
Summary: Can't have commitOffsetsInFinalizeEnabled set to false
with KafkaIO.readBytes()
Key: BEAM-3754
URL: https://issues.apache.org/jira/browse/BEAM-3754
Project: Beam
Issue Type: Bug
Components: io-java-kafka
Affects Versions: 2.3.0
Environment: Dataflow pipeline using Kafka as a Sink
Reporter: Benjamin BENOIST
Assignee: Raghu Angadi
Beam v2.3 introduces finalized offsets, in order to reduce the gaps or
duplicate processing of records while restarting a pipeline.
_read()_ sets this parameter to false [by
default|https://github.com/apache/beam/blob/release-2.3.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L307]
but _readBytes()_
[doesn't|https://github.com/apache/beam/blob/release-2.3.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L282],
thus creating an exception:
{noformat}
Exception in thread "main" java.lang.IllegalStateException: Missing required
properties: commitOffsetsInFinalizeEnabled
at
org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Read$Builder.build(AutoValue_KafkaIO_Read.java:344)
at
org.apache.beam.sdk.io.kafka.KafkaIO.readBytes(KafkaIO.java:291){noformat}
The parameter can be set to true with _commitOffsetsInFinalize()_ but never to
false.
Using _read()_ in the definition of _readBytes()_ could prevent this kind of
error in the future:
{code:java}
public static Read<byte[], byte[]> readBytes() {
return read()
.setKeyDeserializer(ByteArrayDeserializer.class)
.setValueDeserializer(ByteArrayDeserializer.class)
.build();
}{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)