brigen opened a new issue, #16181:
URL: https://github.com/apache/pulsar/issues/16181

   Got com.google.common.util.concurrent.UncheckedExecutionException: 
java.lang.RuntimeException: 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
Unauthorized; error code: 401
   when trying to use kafka-source connector with schema registry that requires 
basic auth
   
   #### Steps to reproduce
   Start the connector that connects to a kafka-broker, but with 
KafkaAvroDeserializer which goes to schema-registry. Put a basic auth to the 
schema-registry
   use className: org.apache.pulsar.io.kafka.KafkaBytesSource
   config file of pulsar-io-kafka connector:
   bootstrapServers: "localhost:9092"
       topic: abcV1
       groupId: "group.V1.consumer-1"
       valueDeserializationClass: 
io.confluent.kafka.serializers.KafkaAvroDeserializer
       consumerConfigProperties:
         client.id: "avs.sit"
         security.protocol: "SASL_SSL"
         sasl.mechanism: "PLAIN"
         acks: "all"
         client.dns.lookup: use_all_dns_ips
         sasl.jaas.config: 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"USER\" password=\"password\";"
         value.deserializer: 
io.confluent.kafka.serializers.KafkaAvroDeserializer
         basic.auth.credentials.source: USER_INFO
         specific.avro.reader: true
         schema.registry.url: https://your-schmea-registry-url
         basic.auth.user.info: USER:PASSOWRD
   
   
   #### System configuration
   **Pulsar version**: 2.9.2
   
   Already digged the source code and saw that the problem might be here:
   in the KafkaBytesSource.java line 110
   
   private void initSchemaCache(Properties props) {
           KafkaAvroDeserializerConfig config = new 
KafkaAvroDeserializerConfig(props);
           List<String> urls = config.getSchemaRegistryUrls();
           int maxSchemaObject = config.getMaxSchemasPerSubject();
           SchemaRegistryClient schemaRegistryClient = new 
CachedSchemaRegistryClient(urls, maxSchemaObject);
           log.info("initializing SchemaRegistry Client, urls:{}, 
maxSchemasPerSubject: {}", urls, maxSchemaObject);
           schemaCache = new AvroSchemaCache(schemaRegistryClient);
       }
   
   The cached schema registry is called without properties basically, and it 
creates a RestService without properties too,
   and the default RestService never calls configure, so even thou we are 
passing basic.auth.user.info they never get passed to RestService so the call 
towards schema-registry is made without an Authorization header


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to