Luke Cwik created BEAM-13310:
--------------------------------

             Summary: KafkaIO SDF does not commit offsets but KafkaIO 
UnboundedSource does
                 Key: BEAM-13310
                 URL: https://issues.apache.org/jira/browse/BEAM-13310
             Project: Beam
          Issue Type: Bug
          Components: io-java-kafka
            Reporter: Luke Cwik


When run using SDF the pipeline does not commit offsets but when run using the 
SDF UnboundedSourceWrapper it works. This implies that the UnboundedSource 
version is able to correctly commit offsets but the pure SDF does not.

Sample code:
{code:java}
        final Pipeline p = Pipeline.create(options);

        p.apply(

            KafkaIO.<Long, String>read()

                .withBootstrapServers(options.getKafkaBroker())

                .withTopic(options.getTopic())

                .withConsumerConfigUpdates(

                    Map.of(

                        ConsumerConfig.GROUP_ID_CONFIG, 
options.getConsumerGroup(),

                        CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
"SASL_SSL",

                        SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka",

                        SaslConfigs.SASL_JAAS_CONFIG, 
"com.sun.security.auth.module.GssLoginModule required initiate=true;"))

                .withKeyDeserializer(LongDeserializer.class)

                .withValueDeserializer(StringDeserializer.class)

                .commitOffsetsInFinalize()

                .withoutMetadata());
{code}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to