Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/5995#discussion_r188386920
--- Diff:
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
---
@@ -275,9 +304,19 @@ else if (configSnapshot instanceof
AvroSerializerConfigSnapshot) {
// Utilities
//
------------------------------------------------------------------------
+ private static boolean isGenericRecord(Class<?> type) {
+ return !SpecificRecord.class.isAssignableFrom(type) &&
+ GenericRecord.class.isAssignableFrom(type);
+ }
+
@Override
public TypeSerializer<T> duplicate() {
- return new AvroSerializer<>(type);
+ if (schemaString != null) {
+ return new AvroSerializer<>(type, new
Schema.Parser().parse(schemaString));
--- End diff --
Didn't think it through well. Thought we need to create a deep copy of the
schema, but as it is stateless I think we can just pass the schema. My mistake.
Correct me if I am wrong.
---