[
https://issues.apache.org/jira/browse/BEAM-12880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17416446#comment-17416446
]
Logeshwaran commented on BEAM-12880:
------------------------------------
Hi Alexey Romanenko,
Please refer the below apache beam dataflow code.
PCollection<KafkaRecord<String, GenericRecord>> kafkaMessages = pipeline
.apply("ReadFromKafka", KafkaIO.<String, GenericRecord>read()
.withConsumerConfigUpdates(ImmutableMap.of(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
options.getConsumerAutoResetConfig(),
//ConsumerConfig.GROUP_ID_CONFIG,
options.getConsumerGroupID(),
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
false))
.withConsumerFactoryFn(new
ConsumerFactoryFn(sslConfig,options.getProjectID()))
.withBootstrapServers(bootStrapURLs)
.withTopics(Arrays.asList(options.getInputTopics().split(commaSeperator)))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of("https://xxxx:8081",
"<Topic Name>-value", 1, null))
);
Error log : Error message from worker:
org.apache.kafka.common.errors.GroupAuthorizationException:
Not authorized to access group:
initialOffset_offset_consumer_1179967555_kafka-connectivity-test
> KafkaIO Connector-updating/overwriting consumer groupid with random prefix
> --------------------------------------------------------------------------
>
> Key: BEAM-12880
> URL: https://issues.apache.org/jira/browse/BEAM-12880
> Project: Beam
> Issue Type: Bug
> Components: io-java-kafka
> Affects Versions: 2.27.0
> Reporter: Logeshwaran
> Priority: P2
>
> Apache beam version: 2.27
> Connector: KafkaIO
> cloud service : GCP Dataflow
> language : JAVA11
> we are trying to read the avro messages from confluent kafka topic using
> dataflow service as a consumer. (Using KafkaIO Connector)
> While trying to access the schema registry using provided (schema url,
> subject, version, ssl configuration(keystore,truststore..etc)) details , we
> are getting the below error.
>
> {color:#ef6950}Error message from worker:
> org.apache.kafka.common.errors.GroupAuthorizationException:{color}
> {color:#ef6950}Not authorized to access group:
> initialOffset_offset_consumer_1179967555_kafka-connectivity-test{color}
>
> Expected Result : Consumer groupid should not change also should able to
> connect kafka consumer.
>
> Actual Result:
> Though the provided groupid was : kafka-connectivity-test, some how it is
> changing the value to
> initialOffset_offset_consumer_1179967555_kafka-connectivity-test.
>
> PFA related code snippets.
>
> !image-2021-09-14-17-34-21-672.png!
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)