[
https://issues.apache.org/jira/browse/FLINK-15941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17033121#comment-17033121
]
Stephen Whelan commented on FLINK-15941:
----------------------------------------
This issue appears to be in _*RegistryAvroSerializationSchema*.serialize()._ It
instantiates a new instance of SchemaCoder on every method call rather than
using the instance in the class.
{code:java}
public byte[] serialize(T object) {
checkAvroInitialized();
if (object == null) {
return null;
} else {
try {
Encoder encoder = getEncoder();
// get() creates a new instance of {@link SchemaCoder}
schemaCoderProvider.get().writeSchema(getSchema(), getOutputStream());
getDatumWriter().write(object, encoder);
encoder.flush();
byte[] bytes = getOutputStream().toByteArray();
getOutputStream().reset();
return bytes;
} catch (IOException e) {
throw new WrappingRuntimeException("Failed to serialize schema
registry.", e);
}
}
}
{code}
The following change solves the issue.
{code:java}
public byte[] serialize(T object) {
checkAvroInitialized();
if (object == null) {
return null;
} else {
try {
Encoder encoder = getEncoder();
schemaCoder.writeSchema(getSchema(), getOutputStream());
getDatumWriter().write(object, encoder);
encoder.flush();
byte[] bytes = getOutputStream().toByteArray();
getOutputStream().reset();
return bytes;
} catch (IOException e) {
throw new WrappingRuntimeException("Failed to serialize schema
registry.", e);
}
}
}
{code}
I have tested this solution on both SpecificRecord and GenericRecord. Using
debug logging on our Schema Registry, I see the requests on job start up and
then none subsequent. Hence, its retrieving it from the cache. For
GenericRecord, we use a Row object converted to a GenericRecord.
We have had this patch running in our production environment for a few days now
with no performance issues.
> ConfluentSchemaRegistryCoder should not perform HTTP requests for all request
> -----------------------------------------------------------------------------
>
> Key: FLINK-15941
> URL: https://issues.apache.org/jira/browse/FLINK-15941
> Project: Flink
> Issue Type: Improvement
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Reporter: Dawid Wysakowicz
> Priority: Major
>
> ConfluentSchemaRegistryCoder should cache ids of schemas that it has already
> seen.
> I think it should be as simple as changing
> {code}
> @Override
> public void writeSchema(Schema schema, OutputStream out) throws
> IOException {
> try {
> int registeredId =
> schemaRegistryClient.register(subject, schema);
> out.write(CONFLUENT_MAGIC_BYTE);
> byte[] schemaIdBytes =
> ByteBuffer.allocate(4).putInt(registeredId).array();
> out.write(schemaIdBytes);
> } catch (RestClientException e) {
> throw new IOException("Could not register schema in
> registry", e);
> }
> }
> {code}
> to
> {code}
> @Override
> public void writeSchema(Schema schema, OutputStream out) throws
> IOException {
> try {
> int registeredId = schemaRegistryClient.getId(subject,
> schema);
> out.write(CONFLUENT_MAGIC_BYTE);
> byte[] schemaIdBytes =
> ByteBuffer.allocate(4).putInt(registeredId).array();
> out.write(schemaIdBytes);
> } catch (RestClientException e) {
> throw new IOException("Could not register schema in
> registry", e);
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)