[
https://issues.apache.org/jira/browse/BEAM-11851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17299822#comment-17299822
]
Tom Underhill edited comment on BEAM-11851 at 3/11/21, 7:23 PM:
----------------------------------------------------------------
Hi Alexey,
I'm having the same issue trying to connect to a CSR that implements basic
authentication. In this case I'm trying to use Confluent Cloud & Confluent
Cloud Schema Registry. Connecting to CC is no problem using
.withConsumerConfigUpdates(props) but it's unclear where or if you can pass
along separate API keys and secrets to CC SR. Any pointers on how to make this
work? or does this need a PR?
Many Thanks!
{code:java}
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.kafka.common.serialization.LongDeserializer;
import java.util.HashMap;
import java.util.Map;
public class DataflowJob {
public static void main(String[] args) {
Map<String, Object> props = new HashMap<>();
props.put("auto.offset.reset", "earliest");
props.put("ssl.endpoint.identification.algorithm", "https");
props.put("sasl.mechanism", "PLAIN");
props.put("request.timeout.ms", 20000);
props.put("retry.backoff.ms", 500);
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule
required username=\"<CC API KEY>\" password=\"<CC API SECRET>\";");
props.put("security.protocol", "SASL_SSL");
props.put("schema.registry.url", "<CC-SR-URL>");
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "<CC-SR-KEY:CC-SR-SECRET>");
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("<CC-URL>")
.withTopic("orders")
.withConsumerConfigUpdates(props)
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of("<CC-SR-URL>",
"orders-value"))
.withoutMetadata()
);
p.run().waitUntilFinish();
}
}
{code}
fails with
{code:java}
Exception in thread "main" java.lang.RuntimeException: Unable to get latest
schema metadata for subject: orders-valueException in thread "main"
java.lang.RuntimeException: Unable to get latest schema metadata for subject:
orders-value at
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaMetadata(ConfluentSchemaRegistryDeserializerProvider.java:119)
at
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getAvroSchema(ConfluentSchemaRegistryDeserializerProvider.java:110)
at
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getCoder(ConfluentSchemaRegistryDeserializerProvider.java:106)
at org.apache.beam.sdk.io.kafka.KafkaIO$Read.getValueCoder(KafkaIO.java:1192)
at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:1096) at
org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:486) at
org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547) at
org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:481) at
org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) at
org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1256)
at
org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1245)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547) at
org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:481) at
org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) at
org.apache.beam.sdk.Pipeline.apply(Pipeline.java:176) at
DataflowJob.main(DataflowJob.java:30)Caused by:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
Unauthorized; error code: 401 at
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:515)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:507)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:275)
at
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaMetadata(ConfluentSchemaRegistryDeserializerProvider.java:116)
... 15 more
{code}
was (Author: tunderhill):
Hi Alexey,
I'm having the same issue trying to connect to a CSR that implements basic
authentication. In this case I'm trying to use Confluent Cloud & Confluent
Cloud Schema Registry. Connecting to CC is no problem using
.withConsumerConfigUpdates(props) but it's unclear where or if you can pass
along separate API keys and secrets to CC SR. Any pointers on how to make this
work? or does this need a PR?
Many Thanks!
{code:java}
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.kafka.common.serialization.LongDeserializer;
import java.util.HashMap;
import java.util.Map;
public class DataflowJob {
public static void main(String[] args) {
Map<String, Object> props = new HashMap<>();
props.put("auto.offset.reset", "earliest");
props.put("ssl.endpoint.identification.algorithm", "https");
props.put("sasl.mechanism", "PLAIN");
props.put("request.timeout.ms", 20000);
props.put("retry.backoff.ms", 500);
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule
required username=\"<CC API KEY>\" password=\"<CC API SECRET>\";");
props.put("security.protocol", "SASL_SSL");
props.put("schema.registry.url", "<CC-SR-URL>");
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "<CC-SR-KEY:CC-SR-SECRET>");
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("pkc-4r297.europe-west1.gcp.confluent.cloud:9092")
.withTopic("orders")
.withConsumerConfigUpdates(props)
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of("<CC-SR-URL>",
"orders-value"))
.withoutMetadata()
);
p.run().waitUntilFinish();
}
}
{code}
fails with
{code:java}
Exception in thread "main" java.lang.RuntimeException: Unable to get latest
schema metadata for subject: orders-valueException in thread "main"
java.lang.RuntimeException: Unable to get latest schema metadata for subject:
orders-value at
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaMetadata(ConfluentSchemaRegistryDeserializerProvider.java:119)
at
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getAvroSchema(ConfluentSchemaRegistryDeserializerProvider.java:110)
at
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getCoder(ConfluentSchemaRegistryDeserializerProvider.java:106)
at org.apache.beam.sdk.io.kafka.KafkaIO$Read.getValueCoder(KafkaIO.java:1192)
at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:1096) at
org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:486) at
org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547) at
org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:481) at
org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) at
org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1256)
at
org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1245)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547) at
org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:481) at
org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) at
org.apache.beam.sdk.Pipeline.apply(Pipeline.java:176) at
DataflowJob.main(DataflowJob.java:30)Caused by:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
Unauthorized; error code: 401 at
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:515)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:507)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:275)
at
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaMetadata(ConfluentSchemaRegistryDeserializerProvider.java:116)
... 15 more
{code}
> ConfluentSchemaRegistryProvider fails when authentication is required
> ---------------------------------------------------------------------
>
> Key: BEAM-11851
> URL: https://issues.apache.org/jira/browse/BEAM-11851
> Project: Beam
> Issue Type: Bug
> Components: io-java-kafka
> Affects Versions: 2.27.0
> Reporter: Maxim Ivanov
> Priority: P2
>
> When configuring KafkaIO.reader with ConfluentSchemaRegistryProvider as value
> deserializer with `basic.auth.credentials.source=AUTH_INO` and
> `basic.auth.user.info=user:password` properties, it fails to start the
> pipepline
> {code:java}
> [error] java.lang.RuntimeException: Unable to get latest schema metadata for
> subject: identity_users_v2-value
> [error] at
> org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaMetadata(ConfluentSchemaRegistryDeserializerProvider.java:119)
> [error] at
> org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getAvroSchema(ConfluentSchemaRegistryDeserializerProvider.java:110)
> [error] at
> org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getCoder(ConfluentSchemaRegistryDeserializerProvider.java:106)
> [error] at
> org.apache.beam.sdk.io.kafka.KafkaIO$Read.getValueCoder(KafkaIO.java:1147)
> [error] at
> org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:1052)
> [error] at
> org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:481)
> [error] at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547)
> [error] at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:498)
> [error] at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:56)
> [error] at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:191) {code}
>
> I suspect that it because it reconfigured KafkaAvroDeserializer only in
> getDeserializer, but doesn't do so in getCoder.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)