[ 
https://issues.apache.org/jira/browse/BEAM-11851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17299822#comment-17299822
 ] 

Tom Underhill edited comment on BEAM-11851 at 3/11/21, 7:23 PM:
----------------------------------------------------------------

 

Hi Alexey,

I'm having the same issue trying to connect to a CSR that implements basic 
authentication. In this case I'm trying to use Confluent Cloud & Confluent 
Cloud Schema Registry. Connecting to CC is no problem using 
.withConsumerConfigUpdates(props) but it's unclear where or if you can pass 
along separate API keys and secrets to CC SR.  Any pointers on how to make this 
work? or does this need a PR?

Many Thanks!
{code:java}
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.kafka.common.serialization.LongDeserializer;

import java.util.HashMap;
import java.util.Map;

public class DataflowJob {
    public static void main(String[] args) {

        Map<String, Object> props = new HashMap<>();
        props.put("auto.offset.reset", "earliest");
        props.put("ssl.endpoint.identification.algorithm", "https");
        props.put("sasl.mechanism", "PLAIN");
        props.put("request.timeout.ms", 20000);
        props.put("retry.backoff.ms", 500);
        
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule
 required username=\"<CC API KEY>\" password=\"<CC API SECRET>\";");
        props.put("security.protocol", "SASL_SSL");

        props.put("schema.registry.url", "<CC-SR-URL>");
        props.put("basic.auth.credentials.source", "USER_INFO");
        props.put("basic.auth.user.info", "<CC-SR-KEY:CC-SR-SECRET>");

        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline p = Pipeline.create(options);

        p.apply(KafkaIO.<Long, String>read()
                .withBootstrapServers("<CC-URL>")
                .withTopic("orders")
                .withConsumerConfigUpdates(props)
                .withKeyDeserializer(LongDeserializer.class)
                
.withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of("<CC-SR-URL>",
 "orders-value"))
                .withoutMetadata()
        );

        p.run().waitUntilFinish();
    }
}
{code}
fails with
{code:java}
Exception in thread "main" java.lang.RuntimeException: Unable to get latest 
schema metadata for subject: orders-valueException in thread "main" 
java.lang.RuntimeException: Unable to get latest schema metadata for subject: 
orders-value at 
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaMetadata(ConfluentSchemaRegistryDeserializerProvider.java:119)
 at 
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getAvroSchema(ConfluentSchemaRegistryDeserializerProvider.java:110)
 at 
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getCoder(ConfluentSchemaRegistryDeserializerProvider.java:106)
 at org.apache.beam.sdk.io.kafka.KafkaIO$Read.getValueCoder(KafkaIO.java:1192) 
at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:1096) at 
org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:486) at 
org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547) at 
org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:481) at 
org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) at 
org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1256)
 at 
org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1245)
 at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547) at 
org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:481) at 
org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) at 
org.apache.beam.sdk.Pipeline.apply(Pipeline.java:176) at 
DataflowJob.main(DataflowJob.java:30)Caused by: 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
Unauthorized; error code: 401 at 
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230)
 at 
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)
 at 
io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:515)
 at 
io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:507)
 at 
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:275)
 at 
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaMetadata(ConfluentSchemaRegistryDeserializerProvider.java:116)
 ... 15 more
{code}
 


was (Author: tunderhill):
 

Hi Alexey,

I'm having the same issue trying to connect to a CSR that implements basic 
authentication. In this case I'm trying to use Confluent Cloud & Confluent 
Cloud Schema Registry. Connecting to CC is no problem using 
.withConsumerConfigUpdates(props) but it's unclear where or if you can pass 
along separate API keys and secrets to CC SR.  Any pointers on how to make this 
work? or does this need a PR?

Many Thanks!
{code:java}
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.kafka.common.serialization.LongDeserializer;

import java.util.HashMap;
import java.util.Map;

public class DataflowJob {
    public static void main(String[] args) {

        Map<String, Object> props = new HashMap<>();
        props.put("auto.offset.reset", "earliest");
        props.put("ssl.endpoint.identification.algorithm", "https");
        props.put("sasl.mechanism", "PLAIN");
        props.put("request.timeout.ms", 20000);
        props.put("retry.backoff.ms", 500);
        
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule
 required username=\"<CC API KEY>\" password=\"<CC API SECRET>\";");
        props.put("security.protocol", "SASL_SSL");

        props.put("schema.registry.url", "<CC-SR-URL>");
        props.put("basic.auth.credentials.source", "USER_INFO");
        props.put("basic.auth.user.info", "<CC-SR-KEY:CC-SR-SECRET>");

        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline p = Pipeline.create(options);

        p.apply(KafkaIO.<Long, String>read()
                
.withBootstrapServers("pkc-4r297.europe-west1.gcp.confluent.cloud:9092")
                .withTopic("orders")
                .withConsumerConfigUpdates(props)
                .withKeyDeserializer(LongDeserializer.class)
                
.withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of("<CC-SR-URL>",
 "orders-value"))
                .withoutMetadata()
        );

        p.run().waitUntilFinish();
    }
}
{code}
fails with
{code:java}
Exception in thread "main" java.lang.RuntimeException: Unable to get latest 
schema metadata for subject: orders-valueException in thread "main" 
java.lang.RuntimeException: Unable to get latest schema metadata for subject: 
orders-value at 
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaMetadata(ConfluentSchemaRegistryDeserializerProvider.java:119)
 at 
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getAvroSchema(ConfluentSchemaRegistryDeserializerProvider.java:110)
 at 
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getCoder(ConfluentSchemaRegistryDeserializerProvider.java:106)
 at org.apache.beam.sdk.io.kafka.KafkaIO$Read.getValueCoder(KafkaIO.java:1192) 
at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:1096) at 
org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:486) at 
org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547) at 
org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:481) at 
org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) at 
org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1256)
 at 
org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1245)
 at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547) at 
org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:481) at 
org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) at 
org.apache.beam.sdk.Pipeline.apply(Pipeline.java:176) at 
DataflowJob.main(DataflowJob.java:30)Caused by: 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
Unauthorized; error code: 401 at 
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230)
 at 
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)
 at 
io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:515)
 at 
io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:507)
 at 
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:275)
 at 
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaMetadata(ConfluentSchemaRegistryDeserializerProvider.java:116)
 ... 15 more
{code}
 

> ConfluentSchemaRegistryProvider fails when authentication is required
> ---------------------------------------------------------------------
>
>                 Key: BEAM-11851
>                 URL: https://issues.apache.org/jira/browse/BEAM-11851
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>    Affects Versions: 2.27.0
>            Reporter: Maxim Ivanov
>            Priority: P2
>
> When configuring KafkaIO.reader with ConfluentSchemaRegistryProvider as value 
> deserializer with `basic.auth.credentials.source=AUTH_INO` and 
> `basic.auth.user.info=user:password` properties, it fails to start the 
> pipepline
> {code:java}
> [error] java.lang.RuntimeException: Unable to get latest schema metadata for 
> subject: identity_users_v2-value
> [error]   at 
> org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaMetadata(ConfluentSchemaRegistryDeserializerProvider.java:119)
> [error]   at 
> org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getAvroSchema(ConfluentSchemaRegistryDeserializerProvider.java:110)
> [error]   at 
> org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getCoder(ConfluentSchemaRegistryDeserializerProvider.java:106)
> [error]   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$Read.getValueCoder(KafkaIO.java:1147)
> [error]   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:1052)
> [error]   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:481)
> [error]   at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547)
> [error]   at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:498)
> [error]   at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:56)
> [error]   at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:191) {code}
>  
> I suspect that it because it reconfigured KafkaAvroDeserializer only in 
> getDeserializer, but doesn't do so in getCoder.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to