[ 
https://issues.apache.org/jira/browse/FLINK-15941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17031782#comment-17031782
 ] 

Seth Wiesman commented on FLINK-15941:
--------------------------------------

[~dwysakowicz] I have been bitten by this in the past. 

One really important point, the CachedClient caches schemas based on object 
identity[1]. This means if you are using a generic record it will always be a 
cache miss since each record contains its own instance of the schema. As 
opposed to specific records whose schemas are generated as singletons. 

This is a long-winded way of saying, the registry coder will make http calls on 
every record if generic records are used. If this is happening for specific 
records then it is definitely a bug. 


[1] 
https://github.com/confluentinc/schema-registry/blob/bb307ec7636b2602849a14b6d16a98cd5e34c2e9/client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java#L144

> ConfluentSchemaRegistryCoder should not perform HTTP requests for all request
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-15941
>                 URL: https://issues.apache.org/jira/browse/FLINK-15941
>             Project: Flink
>          Issue Type: Improvement
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>            Reporter: Dawid Wysakowicz
>            Priority: Major
>
> ConfluentSchemaRegistryCoder should cache ids of schemas that it has already 
> seen.
> I think it should be as simple as changing
> {code}
>       @Override
>       public void writeSchema(Schema schema, OutputStream out) throws 
> IOException {
>               try {
>                       int registeredId = 
> schemaRegistryClient.register(subject, schema);
>                       out.write(CONFLUENT_MAGIC_BYTE);
>                       byte[] schemaIdBytes = 
> ByteBuffer.allocate(4).putInt(registeredId).array();
>                       out.write(schemaIdBytes);
>               } catch (RestClientException e) {
>                       throw new IOException("Could not register schema in 
> registry", e);
>               }
>       }
> {code}
> to
> {code}
>       @Override
>       public void writeSchema(Schema schema, OutputStream out) throws 
> IOException {
>               try {
>                       int registeredId = schemaRegistryClient.getId(subject, 
> schema);
>                       out.write(CONFLUENT_MAGIC_BYTE);
>                       byte[] schemaIdBytes = 
> ByteBuffer.allocate(4).putInt(registeredId).array();
>                       out.write(schemaIdBytes);
>               } catch (RestClientException e) {
>                       throw new IOException("Could not register schema in 
> registry", e);
>               }
>       }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to