[
https://issues.apache.org/jira/browse/BEAM-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Luke Cwik updated BEAM-13310:
-----------------------------
Description:
When run using SDF the pipeline does not commit offsets but when run using the
SDF UnboundedSourceWrapper via `use_deprecated_read` the pipeline does. This
implies that the UnboundedSource version is able to correctly commit offsets
but the pure SDF does not.
Sample code:
{code:java}
final Pipeline p = Pipeline.create(options);
p.apply(
KafkaIO.<Long, String>read()
.withBootstrapServers(options.getKafkaBroker())
.withTopic(options.getTopic())
.withConsumerConfigUpdates(
Map.of(
ConsumerConfig.GROUP_ID_CONFIG,
options.getConsumerGroup(),
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
"SASL_SSL",
SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka",
SaslConfigs.SASL_JAAS_CONFIG,
"com.sun.security.auth.module.GssLoginModule required initiate=true;"))
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.commitOffsetsInFinalize()
.withoutMetadata());
{code}
was:
When run using SDF the pipeline does not commit offsets but when run using the
SDF UnboundedSourceWrapper it works. This implies that the UnboundedSource
version is able to correctly commit offsets but the pure SDF does not.
Sample code:
{code:java}
final Pipeline p = Pipeline.create(options);
p.apply(
KafkaIO.<Long, String>read()
.withBootstrapServers(options.getKafkaBroker())
.withTopic(options.getTopic())
.withConsumerConfigUpdates(
Map.of(
ConsumerConfig.GROUP_ID_CONFIG,
options.getConsumerGroup(),
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
"SASL_SSL",
SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka",
SaslConfigs.SASL_JAAS_CONFIG,
"com.sun.security.auth.module.GssLoginModule required initiate=true;"))
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.commitOffsetsInFinalize()
.withoutMetadata());
{code}
> KafkaIO SDF does not commit offsets but KafkaIO UnboundedSource does
> --------------------------------------------------------------------
>
> Key: BEAM-13310
> URL: https://issues.apache.org/jira/browse/BEAM-13310
> Project: Beam
> Issue Type: Bug
> Components: io-java-kafka
> Reporter: Luke Cwik
> Priority: P2
>
> When run using SDF the pipeline does not commit offsets but when run using
> the SDF UnboundedSourceWrapper via `use_deprecated_read` the pipeline does.
> This implies that the UnboundedSource version is able to correctly commit
> offsets but the pure SDF does not.
> Sample code:
> {code:java}
> final Pipeline p = Pipeline.create(options);
> p.apply(
> KafkaIO.<Long, String>read()
> .withBootstrapServers(options.getKafkaBroker())
> .withTopic(options.getTopic())
> .withConsumerConfigUpdates(
> Map.of(
> ConsumerConfig.GROUP_ID_CONFIG,
> options.getConsumerGroup(),
> CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
> "SASL_SSL",
> SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka",
> SaslConfigs.SASL_JAAS_CONFIG,
> "com.sun.security.auth.module.GssLoginModule required initiate=true;"))
> .withKeyDeserializer(LongDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .commitOffsetsInFinalize()
> .withoutMetadata());
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)