[ 
https://issues.apache.org/jira/browse/FLINK-22763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Samuel Fiddis updated FLINK-22763:
----------------------------------
    Description: 
In PyFlink, attempting to connect to a avro-confluent kafka stream where the 
Confluent Schema Registry requires authorization does not work.

 Table API definition:
{code:java}
ddl_kafka_avro_confluent_source = f""" 
  CREATE TABLE gtt_records( 
    **table columsn**
  ) WITH ( 
    'connector' = 'kafka', 
    'topic' = 'topic.avro-v1', 
    'properties.bootstrap.servers' = 
'pkc-ldvj1.ap-southeast-2.aws.confluent.cloud:9092', 
    'properties.security.protocol' = 'SASL_SSL', 
    'properties.sasl.mechanism' = 'PLAIN', 
    'properties.sasl.jaas.config' = 
'org.apache.kafka.common.security.plain.PlainLoginModule required 
username="{KAFKA_API_KEY}" password="{KAFKA_API_SECRET}";', 
    'properties.basic.auth.credentials.source' = 'USER_INFO',   
    'properties.basic.auth.user.info' = '{SR_API_KEY}:{SR_API_SECRET}',

    'key.format' = 'avro-confluent', 
    'key.avro-confluent.schema-registry.url' = 
'https://{SR_API_KEY}:{SR_API_SECRET}@psrc-lzvd0.ap-southeast-2.aws.confluent.cloud',
 
    'key.fields' = '**key fields**', 

    'value.format' = 'avro-confluent', 
    'value.avro-confluent.schema-registry.url' = 
'https://{SR_API_KEY}:{SR_API_SECRET}@psrc-lzvd0.ap-southeast-2.aws.confluent.cloud',
 
    'value.fields-include' = 'ALL', 

    'key.avro-confluent.schema-registry.subject' = 
'data.google-travel-time.avro-v1-key', 
    'value.avro-confluent.schema-registry.subject' = 
'data.google-travel-time.avro-v1-value' 
) """{code}
 

Attempting to run a job with this table as a source results in a 401 error for 
the Confluent Schema Registry:

 
{code:java}
2021-05-19 04:50:21,830 WARN  org.apache.flink.runtime.taskmanager.Task         
           [] - Source: TableSourceScan(table=[[default_catalog, 
default_database, gtt_records]], fields=[unique, direction, window_ts, 
road_number, link_number, carriageway, version_no, window_local_date, 
window_local_time, poll_ts, duration, traffic_duration, distance, link_length]) 
-> Sink: Sink(table=[default_catalog.default_database.kafka_messages], 
fields=[unique, direction, window_ts, road_number, link_number, carriageway, 
version_no, window_local_date, window_local_time, poll_ts, duration, 
traffic_duration, distance, link_length]) (1/1)#0 
(7eddc3a42dbcad0fc313bb6bdfa2c922) switched from RUNNING to FAILED with failure 
cause: java.io.IOException: Failed to deserialize Avro record.    at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
    at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
    at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
    at 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:119)
    at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
    at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 
   at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)Caused
 by: java.io.IOException: Could not find schema with id 100001 in registry    
at 
org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:77)
    at 
org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:73)
    at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
    ... 9 moreCaused by: 
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
 Unauthorized; error code: 401    at 
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292)
    at 
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)
    at 
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:660)
    at 
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:642)
    at 
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217)
    at 
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291)
    at 
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276)
    at 
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.getById(SchemaRegistryClient.java:64)
    at 
org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:74)
    ... 11 more
{code}

  was:
In PyFlink, attempting to connect to a avro-confluent kafka stream where the 
Confluent Schema Registry requires authorization does not work.

 Table API definition:
{code:java}
ddl_kafka_avro_confluent_source = f""" 
  CREATE TABLE gtt_records( 
    **table columsn**
  ) WITH ( 
    'connector' = 'kafka', 
    'topic' = 'topic.avro-v1', 
    'properties.bootstrap.servers' = 
'pkc-ldvj1.ap-southeast-2.aws.confluent.cloud:9092', 
    'properties.security.protocol' = 'SASL_SSL', 
    'properties.sasl.mechanism' = 'PLAIN', 
    'properties.sasl.jaas.config' = 
'org.apache.kafka.common.security.plain.PlainLoginModule required 
username="{KAFKA_API_KEY}" password="{KAFKA_API_SECRET";', 
    'properties.basic.auth.credentials.source' = 'USER_INFO',   
    'properties.basic.auth.user.info' = '{SR_API_KEY}:{SR_API_SECRET}',

    'key.format' = 'avro-confluent', 
    'key.avro-confluent.schema-registry.url' = 
'https://{SR_API_KEY}:{SR_API_SECRET}@psrc-lzvd0.ap-southeast-2.aws.confluent.cloud',
 
    'key.fields' = '**key fields**', 

    'value.format' = 'avro-confluent', 
    'value.avro-confluent.schema-registry.url' = 
'https://{SR_API_KEY}:{SR_API_SECRET}@psrc-lzvd0.ap-southeast-2.aws.confluent.cloud',
 
    'value.fields-include' = 'ALL', 

    'key.avro-confluent.schema-registry.subject' = 
'data.google-travel-time.avro-v1-key', 
    'value.avro-confluent.schema-registry.subject' = 
'data.google-travel-time.avro-v1-value' 
) """{code}
 

Attempting to run a job with this table as a source results in a 401 error for 
the Confluent Schema Registery:

 
{code:java}
2021-05-19 04:50:21,830 WARN  org.apache.flink.runtime.taskmanager.Task         
           [] - Source: TableSourceScan(table=[[default_catalog, 
default_database, gtt_records]], fields=[unique, direction, window_ts, 
road_number, link_number, carriageway, version_no, window_local_date, 
window_local_time, poll_ts, duration, traffic_duration, distance, link_length]) 
-> Sink: Sink(table=[default_catalog.default_database.kafka_messages], 
fields=[unique, direction, window_ts, road_number, link_number, carriageway, 
version_no, window_local_date, window_local_time, poll_ts, duration, 
traffic_duration, distance, link_length]) (1/1)#0 
(7eddc3a42dbcad0fc313bb6bdfa2c922) switched from RUNNING to FAILED with failure 
cause: java.io.IOException: Failed to deserialize Avro record.    at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
    at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
    at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
    at 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:119)
    at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
    at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 
   at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)Caused
 by: java.io.IOException: Could not find schema with id 100001 in registry    
at 
org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:77)
    at 
org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:73)
    at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
    ... 9 moreCaused by: 
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
 Unauthorized; error code: 401    at 
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292)
    at 
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)
    at 
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:660)
    at 
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:642)
    at 
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217)
    at 
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291)
    at 
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276)
    at 
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.getById(SchemaRegistryClient.java:64)
    at 
org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:74)
    ... 11 more
{code}


> avro-confluent format does not allow for authorization credentials to be 
> supplied to Confluent Schema Registry
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-22763
>                 URL: https://issues.apache.org/jira/browse/FLINK-22763
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python, Formats (JSON, Avro, Parquet, ORC, 
> SequenceFile), Table SQL / API
>    Affects Versions: 1.13.0
>            Reporter: Samuel Fiddis
>            Priority: Minor
>
> In PyFlink, attempting to connect to a avro-confluent kafka stream where the 
> Confluent Schema Registry requires authorization does not work.
>  Table API definition:
> {code:java}
> ddl_kafka_avro_confluent_source = f""" 
>   CREATE TABLE gtt_records( 
>     **table columsn**
>   ) WITH ( 
>     'connector' = 'kafka', 
>     'topic' = 'topic.avro-v1', 
>     'properties.bootstrap.servers' = 
> 'pkc-ldvj1.ap-southeast-2.aws.confluent.cloud:9092', 
>     'properties.security.protocol' = 'SASL_SSL', 
>     'properties.sasl.mechanism' = 'PLAIN', 
>     'properties.sasl.jaas.config' = 
> 'org.apache.kafka.common.security.plain.PlainLoginModule required 
> username="{KAFKA_API_KEY}" password="{KAFKA_API_SECRET}";', 
>     'properties.basic.auth.credentials.source' = 'USER_INFO',   
>     'properties.basic.auth.user.info' = '{SR_API_KEY}:{SR_API_SECRET}',
>     'key.format' = 'avro-confluent', 
>     'key.avro-confluent.schema-registry.url' = 
> 'https://{SR_API_KEY}:{SR_API_SECRET}@psrc-lzvd0.ap-southeast-2.aws.confluent.cloud',
>  
>     'key.fields' = '**key fields**', 
>     'value.format' = 'avro-confluent', 
>     'value.avro-confluent.schema-registry.url' = 
> 'https://{SR_API_KEY}:{SR_API_SECRET}@psrc-lzvd0.ap-southeast-2.aws.confluent.cloud',
>  
>     'value.fields-include' = 'ALL', 
>     'key.avro-confluent.schema-registry.subject' = 
> 'data.google-travel-time.avro-v1-key', 
>     'value.avro-confluent.schema-registry.subject' = 
> 'data.google-travel-time.avro-v1-value' 
> ) """{code}
>  
> Attempting to run a job with this table as a source results in a 401 error 
> for the Confluent Schema Registry:
>  
> {code:java}
> 2021-05-19 04:50:21,830 WARN  org.apache.flink.runtime.taskmanager.Task       
>              [] - Source: TableSourceScan(table=[[default_catalog, 
> default_database, gtt_records]], fields=[unique, direction, window_ts, 
> road_number, link_number, carriageway, version_no, window_local_date, 
> window_local_time, poll_ts, duration, traffic_duration, distance, 
> link_length]) -> Sink: 
> Sink(table=[default_catalog.default_database.kafka_messages], fields=[unique, 
> direction, window_ts, road_number, link_number, carriageway, version_no, 
> window_local_date, window_local_time, poll_ts, duration, traffic_duration, 
> distance, link_length]) (1/1)#0 (7eddc3a42dbcad0fc313bb6bdfa2c922) switched 
> from RUNNING to FAILED with failure cause: java.io.IOException: Failed to 
> deserialize Avro record.    at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
>     at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
>     at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>     at 
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:119)
>     at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>     at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>     at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>     at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)Caused
>  by: java.io.IOException: Could not find schema with id 100001 in registry    
> at 
> org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:77)
>     at 
> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:73)
>     at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>     ... 9 moreCaused by: 
> org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
>  Unauthorized; error code: 401    at 
> org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292)
>     at 
> org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)
>     at 
> org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:660)
>     at 
> org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:642)
>     at 
> org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217)
>     at 
> org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291)
>     at 
> org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276)
>     at 
> org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.getById(SchemaRegistryClient.java:64)
>     at 
> org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:74)
>     ... 11 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to