[ 
https://issues.apache.org/jira/browse/BEAM-12880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17416446#comment-17416446
 ] 

Logeshwaran commented on BEAM-12880:
------------------------------------

Hi Alexey Romanenko,

Please refer the below apache beam dataflow code.

 PCollection<KafkaRecord<String, GenericRecord>> kafkaMessages = pipeline
.apply("ReadFromKafka", KafkaIO.<String, GenericRecord>read()
                .withConsumerConfigUpdates(ImmutableMap.of(
                                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
options.getConsumerAutoResetConfig(), 
                                //ConsumerConfig.GROUP_ID_CONFIG, 
options.getConsumerGroupID(),
                                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
false))
                        
                .withConsumerFactoryFn(new 
ConsumerFactoryFn(sslConfig,options.getProjectID()))
                .withBootstrapServers(bootStrapURLs)
                
.withTopics(Arrays.asList(options.getInputTopics().split(commaSeperator)))
                .withKeyDeserializer(StringDeserializer.class)                  
        
                
.withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of("https://xxxx:8081";,
 "<Topic Name>-value", 1, null))
                );


Error log : Error message from worker: 
org.apache.kafka.common.errors.GroupAuthorizationException:
Not authorized to access group: 
initialOffset_offset_consumer_1179967555_kafka-connectivity-test

> KafkaIO Connector-updating/overwriting consumer groupid with random prefix
> --------------------------------------------------------------------------
>
>                 Key: BEAM-12880
>                 URL: https://issues.apache.org/jira/browse/BEAM-12880
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>    Affects Versions: 2.27.0
>            Reporter: Logeshwaran
>            Priority: P2
>
> Apache beam version: 2.27
> Connector: KafkaIO
> cloud service : GCP Dataflow
> language : JAVA11
> we are trying to read the avro messages from confluent kafka topic using 
> dataflow service as a consumer. (Using KafkaIO Connector)
> While trying to access the schema registry using provided (schema url, 
> subject, version, ssl configuration(keystore,truststore..etc))  details , we 
> are getting the below error.
>  
> {color:#ef6950}Error message from worker: 
> org.apache.kafka.common.errors.GroupAuthorizationException:{color}
>  {color:#ef6950}Not authorized to access group: 
> initialOffset_offset_consumer_1179967555_kafka-connectivity-test{color}
>  
> Expected Result : Consumer groupid should not change also should able to 
> connect kafka consumer.
>  
> Actual Result: 
> Though the provided groupid was : kafka-connectivity-test, some how it is 
> changing the value to  
> initialOffset_offset_consumer_1179967555_kafka-connectivity-test.
>  
> PFA related code snippets. 
>  
> !image-2021-09-14-17-34-21-672.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to