brigen opened a new issue, #16181:
URL: https://github.com/apache/pulsar/issues/16181
Got com.google.common.util.concurrent.UncheckedExecutionException:
java.lang.RuntimeException:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
Unauthorized; error code: 401
when trying to use kafka-source connector with schema registry that requires
basic auth
#### Steps to reproduce
Start the connector that connects to a kafka-broker, but with
KafkaAvroDeserializer which goes to schema-registry. Put a basic auth to the
schema-registry
use className: org.apache.pulsar.io.kafka.KafkaBytesSource
config file of pulsar-io-kafka connector:
bootstrapServers: "localhost:9092"
topic: abcV1
groupId: "group.V1.consumer-1"
valueDeserializationClass:
io.confluent.kafka.serializers.KafkaAvroDeserializer
consumerConfigProperties:
client.id: "avs.sit"
security.protocol: "SASL_SSL"
sasl.mechanism: "PLAIN"
acks: "all"
client.dns.lookup: use_all_dns_ips
sasl.jaas.config:
"org.apache.kafka.common.security.plain.PlainLoginModule required
username=\"USER\" password=\"password\";"
value.deserializer:
io.confluent.kafka.serializers.KafkaAvroDeserializer
basic.auth.credentials.source: USER_INFO
specific.avro.reader: true
schema.registry.url: https://your-schmea-registry-url
basic.auth.user.info: USER:PASSOWRD
#### System configuration
**Pulsar version**: 2.9.2
Already digged the source code and saw that the problem might be here:
in the KafkaBytesSource.java line 110
private void initSchemaCache(Properties props) {
KafkaAvroDeserializerConfig config = new
KafkaAvroDeserializerConfig(props);
List<String> urls = config.getSchemaRegistryUrls();
int maxSchemaObject = config.getMaxSchemasPerSubject();
SchemaRegistryClient schemaRegistryClient = new
CachedSchemaRegistryClient(urls, maxSchemaObject);
log.info("initializing SchemaRegistry Client, urls:{},
maxSchemasPerSubject: {}", urls, maxSchemaObject);
schemaCache = new AvroSchemaCache(schemaRegistryClient);
}
The cached schema registry is called without properties basically, and it
creates a RestService without properties too,
and the default RestService never calls configure, so even thou we are
passing basic.auth.user.info they never get passed to RestService so the call
towards schema-registry is made without an Authorization header
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]