This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new 1196887 [FLINK-12501] Use SpecificRecord.getSchema in AvroFactory 1196887 is described below commit 1196887407c9ef3fad7901f1c592c022b890b227 Author: Aljoscha Krettek <aljos...@apache.org> AuthorDate: Fri Aug 30 10:42:58 2019 +0200 [FLINK-12501] Use SpecificRecord.getSchema in AvroFactory Before, we were using SpecificData.getSchema(type) which was not working for types that were generated using Avrohugger (for Scala) because the SCHEMA was generated in the companion object. Now we use a method that must be available on all SpecificRecord(s). We still use the old method as a fallback if we cannot instantiate or call getSchema() on the instance. --- .../flink/formats/avro/typeutils/AvroFactory.java | 36 +++++++++++++++++++++- .../avro/typeutils/AvroSerializerSnapshot.java | 2 +- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java index 9a8bdcb..4916a90 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java @@ -37,6 +37,8 @@ import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.specific.SpecificRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Optional; @@ -50,6 +52,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal final class AvroFactory<T> { + private static final Logger LOG = LoggerFactory.getLogger(AvroFactory.class); + private final DataOutputEncoder encoder = new DataOutputEncoder(); private final DataInputDecoder decoder = new DataInputDecoder(); @@ -94,7 +98,7 @@ final class AvroFactory<T> { @SuppressWarnings("OptionalUsedAsFieldOrParameterType") private static <T> AvroFactory<T> fromSpecific(Class<T> type, ClassLoader cl, Optional<Schema> previousSchema) { SpecificData specificData = new SpecificData(cl); - Schema newSchema = specificData.getSchema(type); + Schema newSchema = extractAvroSpecificSchema(type, specificData); return new AvroFactory<>( specificData, @@ -130,6 +134,36 @@ final class AvroFactory<T> { ); } + /** + * Extracts an Avro {@link Schema} from a {@link SpecificRecord}. We do this either via {@link + * SpecificData} or by instantiating a record and extracting the schema from the instance. + */ + static <T> Schema extractAvroSpecificSchema( + Class<T> type, + SpecificData specificData) { + Optional<Schema> newSchemaOptional = tryExtractAvroSchemaViaInstance(type); + return newSchemaOptional.orElseGet(() -> specificData.getSchema(type)); + } + + /** + * Extracts an Avro {@link Schema} from a {@link SpecificRecord}. We do this by creating an + * instance of the class using the zero-argument constructor and calling {@link + * SpecificRecord#getSchema()} on it. + */ + @SuppressWarnings("unchecked") + private static Optional<Schema> tryExtractAvroSchemaViaInstance(Class<?> type) { + try { + SpecificRecord instance = (SpecificRecord) type.newInstance(); + return Optional.ofNullable(instance.getSchema()); + } catch (InstantiationException | IllegalAccessException e) { + LOG.warn( + "Could not extract schema from Avro-generated SpecificRecord class {}: {}.", + type, + e); + return Optional.empty(); + } + } + private AvroFactory( GenericData avroData, Schema schema, diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java index a5f47d1..316a162 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java @@ -186,7 +186,7 @@ public class AvroSerializerSnapshot<T> implements TypeSerializerSnapshot<T> { } if (isSpecificRecord(runtimeType)) { SpecificData d = new SpecificData(cl); - return d.getSchema(runtimeType); + return AvroFactory.extractAvroSpecificSchema(runtimeType, d); } ReflectData d = new ReflectData(cl); return d.getSchema(runtimeType);