[ 
https://issues.apache.org/jira/browse/BEAM-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Cwik updated BEAM-13310:
-----------------------------
    Description: 
When run using SDF the pipeline does not commit offsets but when run using the 
SDF UnboundedSourceWrapper via `use_deprecated_read` the pipeline does. This 
implies that the UnboundedSource version is able to correctly commit offsets 
but the pure SDF does not.

Sample code:
{code:java}
        final Pipeline p = Pipeline.create(options);

        p.apply(

            KafkaIO.<Long, String>read()

                .withBootstrapServers(options.getKafkaBroker())

                .withTopic(options.getTopic())

                .withConsumerConfigUpdates(

                    Map.of(

                        ConsumerConfig.GROUP_ID_CONFIG, 
options.getConsumerGroup(),

                        CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
"SASL_SSL",

                        SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka",

                        SaslConfigs.SASL_JAAS_CONFIG, 
"com.sun.security.auth.module.GssLoginModule required initiate=true;"))

                .withKeyDeserializer(LongDeserializer.class)

                .withValueDeserializer(StringDeserializer.class)

                .commitOffsetsInFinalize()

                .withoutMetadata());
{code}


  was:
When run using SDF the pipeline does not commit offsets but when run using the 
SDF UnboundedSourceWrapper it works. This implies that the UnboundedSource 
version is able to correctly commit offsets but the pure SDF does not.

Sample code:
{code:java}
        final Pipeline p = Pipeline.create(options);

        p.apply(

            KafkaIO.<Long, String>read()

                .withBootstrapServers(options.getKafkaBroker())

                .withTopic(options.getTopic())

                .withConsumerConfigUpdates(

                    Map.of(

                        ConsumerConfig.GROUP_ID_CONFIG, 
options.getConsumerGroup(),

                        CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
"SASL_SSL",

                        SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka",

                        SaslConfigs.SASL_JAAS_CONFIG, 
"com.sun.security.auth.module.GssLoginModule required initiate=true;"))

                .withKeyDeserializer(LongDeserializer.class)

                .withValueDeserializer(StringDeserializer.class)

                .commitOffsetsInFinalize()

                .withoutMetadata());
{code}



> KafkaIO SDF does not commit offsets but KafkaIO UnboundedSource does
> --------------------------------------------------------------------
>
>                 Key: BEAM-13310
>                 URL: https://issues.apache.org/jira/browse/BEAM-13310
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>            Reporter: Luke Cwik
>            Priority: P2
>
> When run using SDF the pipeline does not commit offsets but when run using 
> the SDF UnboundedSourceWrapper via `use_deprecated_read` the pipeline does. 
> This implies that the UnboundedSource version is able to correctly commit 
> offsets but the pure SDF does not.
> Sample code:
> {code:java}
>         final Pipeline p = Pipeline.create(options);
>         p.apply(
>             KafkaIO.<Long, String>read()
>                 .withBootstrapServers(options.getKafkaBroker())
>                 .withTopic(options.getTopic())
>                 .withConsumerConfigUpdates(
>                     Map.of(
>                         ConsumerConfig.GROUP_ID_CONFIG, 
> options.getConsumerGroup(),
>                         CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
> "SASL_SSL",
>                         SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka",
>                         SaslConfigs.SASL_JAAS_CONFIG, 
> "com.sun.security.auth.module.GssLoginModule required initiate=true;"))
>                 .withKeyDeserializer(LongDeserializer.class)
>                 .withValueDeserializer(StringDeserializer.class)
>                 .commitOffsetsInFinalize()
>                 .withoutMetadata());
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to