This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 1196887  [FLINK-12501] Use SpecificRecord.getSchema in AvroFactory
1196887 is described below

commit 1196887407c9ef3fad7901f1c592c022b890b227
Author: Aljoscha Krettek <aljos...@apache.org>
AuthorDate: Fri Aug 30 10:42:58 2019 +0200

    [FLINK-12501] Use SpecificRecord.getSchema in AvroFactory
    
    Before, we were using SpecificData.getSchema(type) which was not working
    for types that were generated using Avrohugger (for Scala) because
    the SCHEMA was generated in the companion object. Now we use a method
    that must be available on all SpecificRecord(s).
    
    We still use the old method as a fallback if we cannot instantiate or
    call getSchema() on the instance.
---
 .../flink/formats/avro/typeutils/AvroFactory.java  | 36 +++++++++++++++++++++-
 .../avro/typeutils/AvroSerializerSnapshot.java     |  2 +-
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
index 9a8bdcb..4916a90 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
@@ -37,6 +37,8 @@ import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.specific.SpecificRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Optional;
 
@@ -50,6 +52,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 final class AvroFactory<T> {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(AvroFactory.class);
+
        private final DataOutputEncoder encoder = new DataOutputEncoder();
        private final DataInputDecoder decoder = new DataInputDecoder();
 
@@ -94,7 +98,7 @@ final class AvroFactory<T> {
        @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
        private static <T> AvroFactory<T> fromSpecific(Class<T> type, 
ClassLoader cl, Optional<Schema> previousSchema) {
                SpecificData specificData = new SpecificData(cl);
-               Schema newSchema = specificData.getSchema(type);
+               Schema newSchema = extractAvroSpecificSchema(type, 
specificData);
 
                return new AvroFactory<>(
                        specificData,
@@ -130,6 +134,36 @@ final class AvroFactory<T> {
                );
        }
 
+       /**
+        * Extracts an Avro {@link Schema} from a {@link SpecificRecord}. We do 
this either via {@link
+        * SpecificData} or by instantiating a record and extracting the schema 
from the instance.
+        */
+       static <T> Schema extractAvroSpecificSchema(
+                       Class<T> type,
+                       SpecificData specificData) {
+               Optional<Schema> newSchemaOptional = 
tryExtractAvroSchemaViaInstance(type);
+               return newSchemaOptional.orElseGet(() -> 
specificData.getSchema(type));
+       }
+
+       /**
+        * Extracts an Avro {@link Schema} from a {@link SpecificRecord}. We do 
this by creating an
+        * instance of the class using the zero-argument constructor and 
calling {@link
+        * SpecificRecord#getSchema()} on it.
+        */
+       @SuppressWarnings("unchecked")
+       private static Optional<Schema> 
tryExtractAvroSchemaViaInstance(Class<?> type) {
+               try {
+                       SpecificRecord instance = (SpecificRecord) 
type.newInstance();
+                       return Optional.ofNullable(instance.getSchema());
+               } catch (InstantiationException | IllegalAccessException e) {
+                       LOG.warn(
+                                       "Could not extract schema from 
Avro-generated SpecificRecord class {}: {}.",
+                                       type,
+                                       e);
+                       return Optional.empty();
+               }
+       }
+
        private AvroFactory(
                GenericData avroData,
                Schema schema,
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java
index a5f47d1..316a162 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java
@@ -186,7 +186,7 @@ public class AvroSerializerSnapshot<T> implements 
TypeSerializerSnapshot<T> {
                }
                if (isSpecificRecord(runtimeType)) {
                        SpecificData d = new SpecificData(cl);
-                       return d.getSchema(runtimeType);
+                       return 
AvroFactory.extractAvroSpecificSchema(runtimeType, d);
                }
                ReflectData d = new ReflectData(cl);
                return d.getSchema(runtimeType);

Reply via email to