This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch parquet-1.13.x
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/parquet-1.13.x by this push:
     new 5b62b4350 PARQUET-2292: Improve default SpecificRecord model selection 
for Avro{Write,Read}Support (#1091)
5b62b4350 is described below

commit 5b62b4350eb59cd7b069d9e70342ec314288eac4
Author: Claire McGinty <[email protected]>
AuthorDate: Fri May 5 22:55:06 2023 -0400

    PARQUET-2292: Improve default SpecificRecord model selection for 
Avro{Write,Read}Support (#1091)
    
    This commit contains following patches:
    
    * PARQUET-2265: Don't set default Model in AvroParquetWriter (#1049)
    - Don't set default Model in AvroParquetWriter
    - Test that data model is parsed from Configuration
    
    * PARQUET-2292: Default SpecificRecord model reflects from MODEL$ field 
(#1078)
---
 parquet-avro/pom.xml                               |  24 +++
 .../org/apache/parquet/avro/AvroParquetWriter.java |   2 +-
 .../org/apache/parquet/avro/AvroReadSupport.java   |  24 ++-
 .../apache/parquet/avro/AvroRecordConverter.java   |  78 ++++++++
 .../org/apache/parquet/avro/AvroWriteSupport.java  |  24 ++-
 parquet-avro/src/test/avro/logicalType.avsc        |  14 ++
 .../parquet/avro/TestAvroRecordConverter.java      | 202 +++++++++++++++++++++
 .../org/apache/parquet/avro/TestReadWrite.java     |  85 +++++++++
 .../apache/parquet/avro/TestSpecificReadWrite.java |  42 +++++
 pom.xml                                            |   1 +
 10 files changed, 491 insertions(+), 5 deletions(-)

diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml
index 52a6f0706..14a24e823 100644
--- a/parquet-avro/pom.xml
+++ b/parquet-avro/pom.xml
@@ -104,6 +104,30 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <version>2.23.0</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-module-junit4</artifactId>
+      <version>${powermock.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-core</artifactId>
+      <version>${powermock.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-api-mockito2</artifactId>
+      <version>${powermock.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
index 9d514673e..94d8167b0 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
@@ -160,7 +160,7 @@ public class AvroParquetWriter<T> extends ParquetWriter<T> {
 
   public static class Builder<T> extends ParquetWriter.Builder<T, Builder<T>> {
     private Schema schema = null;
-    private GenericData model = SpecificData.get();
+    private GenericData model = null;
 
     private Builder(Path file) {
       super(file);
diff --git 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
index eca14413a..8f268a145 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.parquet.hadoop.api.ReadSupport;
 import org.apache.parquet.io.api.RecordMaterializer;
 import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Avro implementation of {@link ReadSupport} for avro generic, specific, and
@@ -37,6 +39,8 @@ import org.apache.parquet.schema.MessageType;
  */
 public class AvroReadSupport<T> extends ReadSupport<T> {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(AvroReadSupport.class);
+
   public static String AVRO_REQUESTED_PROJECTION = "parquet.avro.projection";
   private static final String AVRO_READ_SCHEMA = "parquet.avro.read.schema";
 
@@ -134,7 +138,7 @@ public class AvroReadSupport<T> extends ReadSupport<T> {
       avroSchema = new 
AvroSchemaConverter(configuration).convert(parquetSchema);
     }
 
-    GenericData model = getDataModel(configuration);
+    GenericData model = getDataModel(configuration, avroSchema);
     String compatEnabled = metadata.get(AvroReadSupport.AVRO_COMPATIBILITY);
     if (compatEnabled != null && Boolean.valueOf(compatEnabled)) {
       return newCompatMaterializer(parquetSchema, avroSchema, model);
@@ -149,10 +153,26 @@ public class AvroReadSupport<T> extends ReadSupport<T> {
         parquetSchema, avroSchema, model);
   }
 
-  private GenericData getDataModel(Configuration conf) {
+  private GenericData getDataModel(Configuration conf, Schema schema) {
     if (model != null) {
       return model;
     }
+
+    if (conf.get(AVRO_DATA_SUPPLIER) == null && schema != null) {
+      GenericData modelForSchema;
+      try {
+        modelForSchema = AvroRecordConverter.getModelForSchema(schema);
+      } catch (Exception e) {
+        LOG.warn(String.format("Failed to derive data model for Avro schema 
%s. Parquet will use default " +
+          "SpecificData model for reading from source.", schema), e);
+        modelForSchema = null;
+      }
+
+      if (modelForSchema != null) {
+        return modelForSchema;
+      }
+    }
+
     Class<? extends AvroDataSupplier> suppClass = conf.getClass(
         AVRO_DATA_SUPPLIER, SpecificDataSupplier.class, 
AvroDataSupplier.class);
     return ReflectionUtils.newInstance(suppClass, conf).get();
diff --git 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
index fee7df727..cc17df582 100644
--- 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
+++ 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
@@ -30,12 +30,15 @@ import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
+import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.LinkedHashMap;
+import java.util.Objects;
+
 import org.apache.avro.AvroTypeException;
 import org.apache.avro.Conversion;
 import org.apache.avro.LogicalType;
@@ -57,6 +60,8 @@ import org.apache.parquet.io.api.GroupConverter;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static 
org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
 import static 
org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility;
@@ -73,6 +78,8 @@ import static 
org.apache.parquet.schema.Type.Repetition.REQUIRED;
  */
 class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(AvroRecordConverter.class);
+
   private static final String STRINGABLE_PROP = "avro.java.string";
   private static final String JAVA_CLASS_PROP = "java-class";
   private static final String JAVA_KEY_CLASS_PROP = "java-key-class";
@@ -169,6 +176,77 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
     }
   }
 
+  /**
+   * Returns the specific data model for a given SpecificRecord schema by 
reflecting the underlying
+   * Avro class's `MODEL$` field, or Null if the class is not on the classpath 
or reflection fails.
+   */
+  static SpecificData getModelForSchema(Schema schema) {
+    final Class<?> clazz;
+
+    if (schema != null && (schema.getType() == Schema.Type.RECORD || 
schema.getType() == Schema.Type.UNION)) {
+      clazz = SpecificData.get().getClass(schema);
+    } else {
+      return null;
+    }
+
+    // If clazz == null, the underlying Avro class for the schema is not on 
the classpath
+    if (clazz == null) {
+      return null;
+    }
+
+    final SpecificData model;
+    try {
+      final Field modelField = clazz.getDeclaredField("MODEL$");
+      modelField.setAccessible(true);
+
+      model = (SpecificData) modelField.get(null);
+    } catch (NoSuchFieldException e) {
+      LOG.info(String.format(
+        "Generated Avro class %s did not contain a MODEL$ field. Parquet will 
use default SpecificData model for " +
+          "reading and writing.", clazz));
+      return null;
+    } catch (IllegalAccessException e) {
+      LOG.warn(String.format(
+        "Field `MODEL$` in class %s was inaccessible. Parquet will use default 
SpecificData model for " +
+          "reading and writing.", clazz), e);
+      return null;
+    }
+
+    final String avroVersion = getRuntimeAvroVersion();
+    // Avro 1.7 and 1.8 don't include conversions in the MODEL$ field by 
default
+    if (avroVersion != null && (avroVersion.startsWith("1.8.") || 
avroVersion.startsWith("1.7."))) {
+      final Field conversionsField;
+      try {
+        conversionsField = clazz.getDeclaredField("conversions");
+      } catch (NoSuchFieldException e) {
+        // Avro classes without logical types (denoted by the "conversions" 
field) can be returned as-is
+        return model;
+      }
+
+      final Conversion<?>[] conversions;
+      try {
+        conversionsField.setAccessible(true);
+        conversions = (Conversion<?>[]) conversionsField.get(null);
+      } catch (IllegalAccessException e) {
+        LOG.warn(String.format("Field `conversions` in class %s was 
inaccessible. Parquet will use default " +
+          "SpecificData model for reading and writing.", clazz));
+        return null;
+      }
+
+      for (int i = 0; i < conversions.length; i++) {
+        if (conversions[i] != null) {
+          model.addLogicalTypeConversion(conversions[i]);
+        }
+      }
+    }
+
+    return model;
+  }
+
+  static String getRuntimeAvroVersion() {
+    return Schema.Parser.class.getPackage().getImplementationVersion();
+  }
+
   // this was taken from Avro's ReflectData
   private static Map<String, Class<?>> getFieldsByName(Class<?> recordClass,
                                                        boolean excludeJava) {
diff --git 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
index 9a7ef6c90..564e74539 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
@@ -43,6 +43,8 @@ import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.parquet.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Avro implementation of {@link WriteSupport} for generic, specific, and
@@ -51,6 +53,8 @@ import org.apache.parquet.Preconditions;
  */
 public class AvroWriteSupport<T> extends WriteSupport<T> {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(AvroWriteSupport.class);
+
   public static final String AVRO_DATA_SUPPLIER = 
"parquet.avro.write.data.supplier";
 
   public static void setAvroDataSupplier(
@@ -131,7 +135,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
     }
 
     if (model == null) {
-      this.model = getDataModel(configuration);
+      this.model = getDataModel(configuration, rootAvroSchema);
     }
 
     boolean writeOldListStructure = configuration.getBoolean(
@@ -400,7 +404,23 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
     return Binary.fromCharSequence(value.toString());
   }
 
-  private static GenericData getDataModel(Configuration conf) {
+  private static GenericData getDataModel(Configuration conf, Schema schema) {
+    if (conf.get(AVRO_DATA_SUPPLIER) == null && schema != null) {
+      GenericData modelForSchema;
+      try {
+        modelForSchema = AvroRecordConverter.getModelForSchema(schema);
+      } catch (Exception e) {
+        LOG.warn(String.format("Failed to derive data model for Avro schema 
%s. Parquet will use default " +
+          "SpecificData model for writing to sink.", schema), e);
+        modelForSchema = null;
+      }
+
+
+      if (modelForSchema != null) {
+        return modelForSchema;
+      }
+    }
+
     Class<? extends AvroDataSupplier> suppClass = conf.getClass(
         AVRO_DATA_SUPPLIER, SpecificDataSupplier.class, 
AvroDataSupplier.class);
     return ReflectionUtils.newInstance(suppClass, conf).get();
diff --git a/parquet-avro/src/test/avro/logicalType.avsc 
b/parquet-avro/src/test/avro/logicalType.avsc
new file mode 100644
index 000000000..fbec10a8d
--- /dev/null
+++ b/parquet-avro/src/test/avro/logicalType.avsc
@@ -0,0 +1,14 @@
+{
+    "type": "record",
+    "name": "LogicalTypesTest",
+    "namespace": "org.apache.parquet.avro",
+    "doc": "Record for testing logical types",
+    "fields": [
+        {
+          "name": "timestamp",
+          "type": {
+            "type": "long", "logicalType": "timestamp-millis"
+          }
+        }
+    ]
+}
diff --git 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroRecordConverter.java
 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroRecordConverter.java
new file mode 100644
index 000000000..8339285ba
--- /dev/null
+++ 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroRecordConverter.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import com.google.common.collect.Lists;
+import org.apache.avro.Conversion;
+import org.apache.avro.Conversions;
+import org.apache.avro.Schema;
+import org.apache.avro.data.TimeConversions;
+import org.apache.avro.specific.SpecificData;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(AvroRecordConverter.class)
+public class TestAvroRecordConverter {
+
+  @Before
+  public void setup() {
+    // Default to calling real methods unless overridden in specific test
+    PowerMockito.mockStatic(AvroRecordConverter.class, CALLS_REAL_METHODS);
+  }
+
+  @Test
+  public void testModelForSpecificRecordWithLogicalTypes() {
+    SpecificData model = 
AvroRecordConverter.getModelForSchema(LogicalTypesTest.SCHEMA$);
+
+    // Test that model is generated correctly
+    Conversion<?> conversion = model.getConversionByClass(Instant.class);
+    assertEquals(TimeConversions.TimestampMillisConversion.class, 
conversion.getClass());
+  }
+
+  @Test
+  public void testModelForSpecificRecordWithoutLogicalTypes() {
+    SpecificData model = AvroRecordConverter.getModelForSchema(Car.SCHEMA$);
+
+    assertTrue(model.getConversions().isEmpty());
+  }
+
+  @Test
+  public void testModelForGenericRecord() {
+    SpecificData model = AvroRecordConverter.getModelForSchema(
+      Schema.createRecord(
+        "someSchema",
+        "doc",
+        "some.namespace",
+        false,
+        Lists.newArrayList(new Schema.Field("strField", 
Schema.create(Schema.Type.STRING)))));
+
+    // There is no class "someSchema" on the classpath, so should return null
+    assertNull(model);
+  }
+
+  // Test logical type support for older Avro versions
+  @Test
+  public void testGetModelAvro1_7() {
+    
Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn("1.7.7");
+
+    // Test that model is generated correctly
+    final SpecificData model = 
AvroRecordConverter.getModelForSchema(Avro17GeneratedClass.SCHEMA$);
+    Conversion<?> conversion = model.getConversionByClass(BigDecimal.class);
+    assertEquals(Conversions.DecimalConversion.class, conversion.getClass());
+  }
+
+  @Test
+  public void testGetModelAvro1_8() {
+    
Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn("1.8.2");
+
+    // Test that model is generated correctly
+    final SpecificData model = 
AvroRecordConverter.getModelForSchema(Avro18GeneratedClass.SCHEMA$);
+    Conversion<?> conversion = model.getConversionByClass(BigDecimal.class);
+    assertEquals(Conversions.DecimalConversion.class, conversion.getClass());
+  }
+
+  @Test
+  public void testGetModelAvro1_9() {
+    
Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn("1.9.2");
+
+    // Test that model is generated correctly
+    final SpecificData model = 
AvroRecordConverter.getModelForSchema(Avro19GeneratedClass.SCHEMA$);
+    Conversion<?> conversion = model.getConversionByClass(BigDecimal.class);
+    assertEquals(Conversions.DecimalConversion.class, conversion.getClass());
+  }
+
+  @Test
+  public void testGetModelAvro1_10() {
+    
Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn("1.10.2");
+
+    // Test that model is generated correctly
+    final SpecificData model = 
AvroRecordConverter.getModelForSchema(Avro110GeneratedClass.SCHEMA$);
+    Conversion<?> conversion = model.getConversionByClass(BigDecimal.class);
+    assertEquals(Conversions.DecimalConversion.class, conversion.getClass());
+  }
+
+  // Test Avro record class stubs, generated using different versions of the 
Avro compiler
+  public abstract static class Avro110GeneratedClass extends 
org.apache.avro.specific.SpecificRecordBase implements 
org.apache.avro.specific.SpecificRecord {
+    private static final long serialVersionUID = 5558880508010468207L;
+    public static final org.apache.avro.Schema SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Avro110GeneratedClass\",\"namespace\":\"org.apache.parquet.avro.TestAvroRecordConverter\",\"doc\":\"\",\"fields\":[{\"name\":\"decimal\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}");
+
+    public static org.apache.avro.Schema getClassSchema() {
+      return SCHEMA$;
+    }
+
+    private static SpecificData MODEL$ = new SpecificData();
+
+    static {
+      MODEL$.addLogicalTypeConversion(new 
org.apache.avro.Conversions.DecimalConversion());
+    }
+  }
+
+  public abstract static class Avro19GeneratedClass extends 
org.apache.avro.specific.SpecificRecordBase implements 
org.apache.avro.specific.SpecificRecord {
+    private static final long serialVersionUID = 5558880508010468207L;
+    public static final org.apache.avro.Schema SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Avro19GeneratedClass\",\"namespace\":\"org.apache.parquet.avro.TestAvroRecordConverter\",\"doc\":\"\",\"fields\":[{\"name\":\"decimal\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}");
+
+    public static org.apache.avro.Schema getClassSchema() {
+      return SCHEMA$;
+    }
+
+    private static SpecificData MODEL$ = new SpecificData();
+
+    static {
+      MODEL$.addLogicalTypeConversion(new 
org.apache.avro.Conversions.DecimalConversion());
+    }
+  }
+
+  public abstract static class Avro18GeneratedClass extends 
org.apache.avro.specific.SpecificRecordBase implements 
org.apache.avro.specific.SpecificRecord {
+    private static final long serialVersionUID = 5558880508010468207L;
+    public static final org.apache.avro.Schema SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Avro18GeneratedClass\",\"namespace\":\"org.apache.parquet.avro.TestAvroRecordConverter\",\"doc\":\"\",\"fields\":[{\"name\":\"decimal\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}");
+
+    public static org.apache.avro.Schema getClassSchema() {
+      return SCHEMA$;
+    }
+
+    private static SpecificData MODEL$ = new SpecificData();
+
+    protected static final org.apache.avro.Conversions.DecimalConversion 
DECIMAL_CONVERSION = new org.apache.avro.Conversions.DecimalConversion();
+
+    private static final org.apache.avro.Conversion<?>[] conversions =
+      new org.apache.avro.Conversion<?>[] {
+        DECIMAL_CONVERSION,
+        null
+      };
+
+    @Override
+    public org.apache.avro.Conversion<?> getConversion(int field) {
+      return conversions[field];
+    }
+  }
+
+  public abstract static class Avro17GeneratedClass extends 
org.apache.avro.specific.SpecificRecordBase implements 
org.apache.avro.specific.SpecificRecord {
+    private static final long serialVersionUID = 5558880508010468207L;
+    public static final org.apache.avro.Schema SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Avro17GeneratedClass\",\"namespace\":\"org.apache.parquet.avro.TestAvroRecordConverter\",\"doc\":\"\",\"fields\":[{\"name\":\"decimal\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}");
+
+    public static org.apache.avro.Schema getClassSchema() {
+      return SCHEMA$;
+    }
+
+    private static SpecificData MODEL$ = new SpecificData();
+
+    protected static final org.apache.avro.Conversions.DecimalConversion 
DECIMAL_CONVERSION = new org.apache.avro.Conversions.DecimalConversion();
+
+    private static final org.apache.avro.Conversion<?>[] conversions =
+      new org.apache.avro.Conversion<?>[] {
+        DECIMAL_CONVERSION,
+        null
+      };
+
+    @Override
+    public org.apache.avro.Conversion<?> getConversion(int field) {
+      return conversions[field];
+    }
+  }
+}
diff --git 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
index 66f166cfd..6484ab4a6 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
@@ -27,6 +27,8 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -35,7 +37,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+
+import org.apache.avro.Conversion;
 import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
@@ -775,6 +780,86 @@ public class TestReadWrite {
     }
   }
 
+  public static class CustomDataModel implements AvroDataSupplier {
+    @Override
+    public GenericData get() {
+      GenericData genericData = new GenericData();
+      genericData.addLogicalTypeConversion(new Conversion<LocalDate>() {
+        private final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyyMMdd");
+
+        @Override
+        public Class<LocalDate> getConvertedType() {
+          return LocalDate.class;
+        }
+
+        @Override
+        public String getLogicalTypeName() {
+          return "date";
+        }
+
+        public LocalDate fromInt(Integer localDate, Schema schema, LogicalType 
type) {
+          return LocalDate.parse(String.valueOf(localDate), dateTimeFormatter);
+        }
+
+        public Integer toInt(LocalDate date, Schema schema, LogicalType type) {
+          return Integer.parseInt(dateTimeFormatter.format(date));
+        }
+      });
+      return genericData;
+    }
+  }
+  @Test
+  public void testParsesDataModelFromConf() throws Exception {
+    Schema datetimeSchema = Schema.createRecord("myrecord", null, null, false);
+    Schema date = LogicalTypes.date().addToSchema(
+      Schema.create(Schema.Type.INT));
+    datetimeSchema.setFields(Collections.singletonList(
+      new Schema.Field("date", date, null, null)));
+
+    File file = temp.newFile("datetime.parquet");
+    file.delete();
+    Path path = new Path(file.toString());
+    List<GenericRecord> expected = Lists.newArrayList();
+
+    Configuration conf = new Configuration();
+    AvroWriteSupport.setAvroDataSupplier(conf, CustomDataModel.class);
+
+    // .withDataModel is not set; AvroWriteSupport should parse it from the 
Configuration
+    try(ParquetWriter<GenericRecord> writer = AvroParquetWriter
+      .<GenericRecord>builder(path)
+      .withConf(conf)
+      .withSchema(datetimeSchema)
+      .build()) {
+
+      GenericRecordBuilder builder = new GenericRecordBuilder(datetimeSchema);
+      for (int i = 0; i < 100; i += 1) {
+        builder.set("date", LocalDate.now().minusDays(i));
+
+        GenericRecord rec = builder.build();
+        expected.add(rec);
+        writer.write(builder.build());
+      }
+    }
+    List<GenericRecord> records = Lists.newArrayList();
+
+    AvroReadSupport.setAvroDataSupplier(conf, CustomDataModel.class);
+
+    try(ParquetReader<GenericRecord> reader = AvroParquetReader
+      .<GenericRecord>builder(path)
+      .disableCompatibility()
+      .withConf(conf)
+      .build()) {
+      GenericRecord rec;
+      while ((rec = reader.read()) != null) {
+        records.add(rec);
+      }
+    }
+
+    Assert.assertTrue("date field should be a LocalDate instance",
+      records.get(0).get("date") instanceof LocalDate);
+    Assert.assertEquals("Content should match", expected, records);
+  }
+
   private File createTempFile() throws IOException {
     File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
     tmp.deleteOnExit();
diff --git 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
index 46e2f2c23..49ed27b1b 100644
--- 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
+++ 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
@@ -30,14 +30,19 @@ import static 
org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
 import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.IOException;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
+import org.apache.parquet.avro.LogicalTypesTest;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -237,6 +242,43 @@ public class TestSpecificReadWrite {
     }
   }
 
+  @Test
+  public void testParsesSpecificDataModel() throws IOException {
+    // SpecificRecord contains a logical type and will fail to decode unless 
its SpecificData model is parsed
+    List<LogicalTypesTest> records = IntStream
+      .range(0, 25)
+      .mapToObj(i -> 
LogicalTypesTest.newBuilder().setTimestamp(Instant.now()).build())
+      .collect(Collectors.toList());
+
+    // Test that SpecificData model is parsed in AvroParquetWriter
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path path = new Path(tmp.getPath());
+
+    try(
+      ParquetWriter<LogicalTypesTest> writer = 
AvroParquetWriter.<LogicalTypesTest>builder(path)
+        .withSchema(LogicalTypesTest.SCHEMA$)
+        .withConf(new Configuration(false))
+        .withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
+        .build()
+    ) {
+      for (LogicalTypesTest record : records) {
+        writer.write(record);
+      }
+    }
+
+    // Test that SpecificData model is parsed in AvroParquetReader
+    final List<LogicalTypesTest> output = new ArrayList<>();
+    try (ParquetReader<org.apache.parquet.avro.LogicalTypesTest> reader = new 
AvroParquetReader<>(testConf, path)) {
+      for (LogicalTypesTest record = reader.read(); record != null; record = 
reader.read()) {
+        output.add(record);
+      }
+    }
+
+    assertEquals(records, output);
+  }
+
   private Path writeCarsToParquetFile( int num, CompressionCodecName 
compression, boolean enableDictionary) throws IOException {
     return writeCarsToParquetFile(num, compression, enableDictionary, 
DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
   }
diff --git a/pom.xml b/pom.xml
index 3f2911b4c..eb568e097 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,6 +91,7 @@
     <guava.version>27.0.1-jre</guava.version>
     <brotli-codec.version>0.1.1</brotli-codec.version>
     <mockito.version>1.10.19</mockito.version>
+    <powermock.version>2.0.2</powermock.version>
     <net.openhft.version>0.9</net.openhft.version>
     <exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
     
<yetus.audience-annotations.version>0.13.0</yetus.audience-annotations.version>

Reply via email to