Luke Cwik created BEAM-13310:
--------------------------------
Summary: KafkaIO SDF does not commit offsets but KafkaIO
UnboundedSource does
Key: BEAM-13310
URL: https://issues.apache.org/jira/browse/BEAM-13310
Project: Beam
Issue Type: Bug
Components: io-java-kafka
Reporter: Luke Cwik
When run using SDF the pipeline does not commit offsets but when run using the
SDF UnboundedSourceWrapper it works. This implies that the UnboundedSource
version is able to correctly commit offsets but the pure SDF does not.
Sample code:
{code:java}
final Pipeline p = Pipeline.create(options);
p.apply(
KafkaIO.<Long, String>read()
.withBootstrapServers(options.getKafkaBroker())
.withTopic(options.getTopic())
.withConsumerConfigUpdates(
Map.of(
ConsumerConfig.GROUP_ID_CONFIG,
options.getConsumerGroup(),
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
"SASL_SSL",
SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka",
SaslConfigs.SASL_JAAS_CONFIG,
"com.sun.security.auth.module.GssLoginModule required initiate=true;"))
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.commitOffsetsInFinalize()
.withoutMetadata());
{code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)