This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch parquet-1.13.x
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/parquet-1.13.x by this push:
new 5b62b4350 PARQUET-2292: Improve default SpecificRecord model selection
for Avro{Write,Read}Support (#1091)
5b62b4350 is described below
commit 5b62b4350eb59cd7b069d9e70342ec314288eac4
Author: Claire McGinty <[email protected]>
AuthorDate: Fri May 5 22:55:06 2023 -0400
PARQUET-2292: Improve default SpecificRecord model selection for
Avro{Write,Read}Support (#1091)
This commit contains following patches:
* PARQUET-2265: Don't set default Model in AvroParquetWriter (#1049)
- Don't set default Model in AvroParquetWriter
- Test that data model is parsed from Configuration
* PARQUET-2292: Default SpecificRecord model reflects from MODEL$ field
(#1078)
---
parquet-avro/pom.xml | 24 +++
.../org/apache/parquet/avro/AvroParquetWriter.java | 2 +-
.../org/apache/parquet/avro/AvroReadSupport.java | 24 ++-
.../apache/parquet/avro/AvroRecordConverter.java | 78 ++++++++
.../org/apache/parquet/avro/AvroWriteSupport.java | 24 ++-
parquet-avro/src/test/avro/logicalType.avsc | 14 ++
.../parquet/avro/TestAvroRecordConverter.java | 202 +++++++++++++++++++++
.../org/apache/parquet/avro/TestReadWrite.java | 85 +++++++++
.../apache/parquet/avro/TestSpecificReadWrite.java | 42 +++++
pom.xml | 1 +
10 files changed, 491 insertions(+), 5 deletions(-)
diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml
index 52a6f0706..14a24e823 100644
--- a/parquet-avro/pom.xml
+++ b/parquet-avro/pom.xml
@@ -104,6 +104,30 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>2.23.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-core</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito2</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
index 9d514673e..94d8167b0 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
@@ -160,7 +160,7 @@ public class AvroParquetWriter<T> extends ParquetWriter<T> {
public static class Builder<T> extends ParquetWriter.Builder<T, Builder<T>> {
private Schema schema = null;
- private GenericData model = SpecificData.get();
+ private GenericData model = null;
private Builder(Path file) {
super(file);
diff --git
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
index eca14413a..8f268a145 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Avro implementation of {@link ReadSupport} for avro generic, specific, and
@@ -37,6 +39,8 @@ import org.apache.parquet.schema.MessageType;
*/
public class AvroReadSupport<T> extends ReadSupport<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(AvroReadSupport.class);
+
public static String AVRO_REQUESTED_PROJECTION = "parquet.avro.projection";
private static final String AVRO_READ_SCHEMA = "parquet.avro.read.schema";
@@ -134,7 +138,7 @@ public class AvroReadSupport<T> extends ReadSupport<T> {
avroSchema = new
AvroSchemaConverter(configuration).convert(parquetSchema);
}
- GenericData model = getDataModel(configuration);
+ GenericData model = getDataModel(configuration, avroSchema);
String compatEnabled = metadata.get(AvroReadSupport.AVRO_COMPATIBILITY);
if (compatEnabled != null && Boolean.valueOf(compatEnabled)) {
return newCompatMaterializer(parquetSchema, avroSchema, model);
@@ -149,10 +153,26 @@ public class AvroReadSupport<T> extends ReadSupport<T> {
parquetSchema, avroSchema, model);
}
- private GenericData getDataModel(Configuration conf) {
+ private GenericData getDataModel(Configuration conf, Schema schema) {
if (model != null) {
return model;
}
+
+ if (conf.get(AVRO_DATA_SUPPLIER) == null && schema != null) {
+ GenericData modelForSchema;
+ try {
+ modelForSchema = AvroRecordConverter.getModelForSchema(schema);
+ } catch (Exception e) {
+ LOG.warn(String.format("Failed to derive data model for Avro schema
%s. Parquet will use default " +
+ "SpecificData model for reading from source.", schema), e);
+ modelForSchema = null;
+ }
+
+ if (modelForSchema != null) {
+ return modelForSchema;
+ }
+ }
+
Class<? extends AvroDataSupplier> suppClass = conf.getClass(
AVRO_DATA_SUPPLIER, SpecificDataSupplier.class,
AvroDataSupplier.class);
return ReflectionUtils.newInstance(suppClass, conf).get();
diff --git
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
index fee7df727..cc17df582 100644
---
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
+++
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
@@ -30,12 +30,15 @@ import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
+import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.LinkedHashMap;
+import java.util.Objects;
+
import org.apache.avro.AvroTypeException;
import org.apache.avro.Conversion;
import org.apache.avro.LogicalType;
@@ -57,6 +60,8 @@ import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static
org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
import static
org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility;
@@ -73,6 +78,8 @@ import static
org.apache.parquet.schema.Type.Repetition.REQUIRED;
*/
class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
+ private static final Logger LOG =
LoggerFactory.getLogger(AvroRecordConverter.class);
+
private static final String STRINGABLE_PROP = "avro.java.string";
private static final String JAVA_CLASS_PROP = "java-class";
private static final String JAVA_KEY_CLASS_PROP = "java-key-class";
@@ -169,6 +176,77 @@ class AvroRecordConverter<T> extends
AvroConverters.AvroGroupConverter {
}
}
+ /**
+ * Returns the specific data model for a given SpecificRecord schema by
reflecting the underlying
+ * Avro class's `MODEL$` field, or Null if the class is not on the classpath
or reflection fails.
+ */
+ static SpecificData getModelForSchema(Schema schema) {
+ final Class<?> clazz;
+
+ if (schema != null && (schema.getType() == Schema.Type.RECORD ||
schema.getType() == Schema.Type.UNION)) {
+ clazz = SpecificData.get().getClass(schema);
+ } else {
+ return null;
+ }
+
+ // If clazz == null, the underlying Avro class for the schema is not on
the classpath
+ if (clazz == null) {
+ return null;
+ }
+
+ final SpecificData model;
+ try {
+ final Field modelField = clazz.getDeclaredField("MODEL$");
+ modelField.setAccessible(true);
+
+ model = (SpecificData) modelField.get(null);
+ } catch (NoSuchFieldException e) {
+ LOG.info(String.format(
+ "Generated Avro class %s did not contain a MODEL$ field. Parquet will
use default SpecificData model for " +
+ "reading and writing.", clazz));
+ return null;
+ } catch (IllegalAccessException e) {
+ LOG.warn(String.format(
+ "Field `MODEL$` in class %s was inaccessible. Parquet will use default
SpecificData model for " +
+ "reading and writing.", clazz), e);
+ return null;
+ }
+
+ final String avroVersion = getRuntimeAvroVersion();
+ // Avro 1.7 and 1.8 don't include conversions in the MODEL$ field by
default
+ if (avroVersion != null && (avroVersion.startsWith("1.8.") ||
avroVersion.startsWith("1.7."))) {
+ final Field conversionsField;
+ try {
+ conversionsField = clazz.getDeclaredField("conversions");
+ } catch (NoSuchFieldException e) {
+ // Avro classes without logical types (denoted by the "conversions"
field) can be returned as-is
+ return model;
+ }
+
+ final Conversion<?>[] conversions;
+ try {
+ conversionsField.setAccessible(true);
+ conversions = (Conversion<?>[]) conversionsField.get(null);
+ } catch (IllegalAccessException e) {
+ LOG.warn(String.format("Field `conversions` in class %s was
inaccessible. Parquet will use default " +
+ "SpecificData model for reading and writing.", clazz));
+ return null;
+ }
+
+ for (int i = 0; i < conversions.length; i++) {
+ if (conversions[i] != null) {
+ model.addLogicalTypeConversion(conversions[i]);
+ }
+ }
+ }
+
+ return model;
+ }
+
+ static String getRuntimeAvroVersion() {
+ return Schema.Parser.class.getPackage().getImplementationVersion();
+ }
+
// this was taken from Avro's ReflectData
private static Map<String, Class<?>> getFieldsByName(Class<?> recordClass,
boolean excludeJava) {
diff --git
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
index 9a7ef6c90..564e74539 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
@@ -43,6 +43,8 @@ import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.parquet.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Avro implementation of {@link WriteSupport} for generic, specific, and
@@ -51,6 +53,8 @@ import org.apache.parquet.Preconditions;
*/
public class AvroWriteSupport<T> extends WriteSupport<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(AvroWriteSupport.class);
+
public static final String AVRO_DATA_SUPPLIER =
"parquet.avro.write.data.supplier";
public static void setAvroDataSupplier(
@@ -131,7 +135,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
}
if (model == null) {
- this.model = getDataModel(configuration);
+ this.model = getDataModel(configuration, rootAvroSchema);
}
boolean writeOldListStructure = configuration.getBoolean(
@@ -400,7 +404,23 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
return Binary.fromCharSequence(value.toString());
}
- private static GenericData getDataModel(Configuration conf) {
+ private static GenericData getDataModel(Configuration conf, Schema schema) {
+ if (conf.get(AVRO_DATA_SUPPLIER) == null && schema != null) {
+ GenericData modelForSchema;
+ try {
+ modelForSchema = AvroRecordConverter.getModelForSchema(schema);
+ } catch (Exception e) {
+ LOG.warn(String.format("Failed to derive data model for Avro schema
%s. Parquet will use default " +
+ "SpecificData model for writing to sink.", schema), e);
+ modelForSchema = null;
+ }
+
+
+ if (modelForSchema != null) {
+ return modelForSchema;
+ }
+ }
+
Class<? extends AvroDataSupplier> suppClass = conf.getClass(
AVRO_DATA_SUPPLIER, SpecificDataSupplier.class,
AvroDataSupplier.class);
return ReflectionUtils.newInstance(suppClass, conf).get();
diff --git a/parquet-avro/src/test/avro/logicalType.avsc
b/parquet-avro/src/test/avro/logicalType.avsc
new file mode 100644
index 000000000..fbec10a8d
--- /dev/null
+++ b/parquet-avro/src/test/avro/logicalType.avsc
@@ -0,0 +1,14 @@
+{
+ "type": "record",
+ "name": "LogicalTypesTest",
+ "namespace": "org.apache.parquet.avro",
+ "doc": "Record for testing logical types",
+ "fields": [
+ {
+ "name": "timestamp",
+ "type": {
+ "type": "long", "logicalType": "timestamp-millis"
+ }
+ }
+ ]
+}
diff --git
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroRecordConverter.java
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroRecordConverter.java
new file mode 100644
index 000000000..8339285ba
--- /dev/null
+++
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroRecordConverter.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import com.google.common.collect.Lists;
+import org.apache.avro.Conversion;
+import org.apache.avro.Conversions;
+import org.apache.avro.Schema;
+import org.apache.avro.data.TimeConversions;
+import org.apache.avro.specific.SpecificData;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(AvroRecordConverter.class)
+public class TestAvroRecordConverter {
+
+ @Before
+ public void setup() {
+ // Default to calling real methods unless overridden in specific test
+ PowerMockito.mockStatic(AvroRecordConverter.class, CALLS_REAL_METHODS);
+ }
+
+ @Test
+ public void testModelForSpecificRecordWithLogicalTypes() {
+ SpecificData model =
AvroRecordConverter.getModelForSchema(LogicalTypesTest.SCHEMA$);
+
+ // Test that model is generated correctly
+ Conversion<?> conversion = model.getConversionByClass(Instant.class);
+ assertEquals(TimeConversions.TimestampMillisConversion.class,
conversion.getClass());
+ }
+
+ @Test
+ public void testModelForSpecificRecordWithoutLogicalTypes() {
+ SpecificData model = AvroRecordConverter.getModelForSchema(Car.SCHEMA$);
+
+ assertTrue(model.getConversions().isEmpty());
+ }
+
+ @Test
+ public void testModelForGenericRecord() {
+ SpecificData model = AvroRecordConverter.getModelForSchema(
+ Schema.createRecord(
+ "someSchema",
+ "doc",
+ "some.namespace",
+ false,
+ Lists.newArrayList(new Schema.Field("strField",
Schema.create(Schema.Type.STRING)))));
+
+ // There is no class "someSchema" on the classpath, so should return null
+ assertNull(model);
+ }
+
+ // Test logical type support for older Avro versions
+ @Test
+ public void testGetModelAvro1_7() {
+
Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn("1.7.7");
+
+ // Test that model is generated correctly
+ final SpecificData model =
AvroRecordConverter.getModelForSchema(Avro17GeneratedClass.SCHEMA$);
+ Conversion<?> conversion = model.getConversionByClass(BigDecimal.class);
+ assertEquals(Conversions.DecimalConversion.class, conversion.getClass());
+ }
+
+ @Test
+ public void testGetModelAvro1_8() {
+
Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn("1.8.2");
+
+ // Test that model is generated correctly
+ final SpecificData model =
AvroRecordConverter.getModelForSchema(Avro18GeneratedClass.SCHEMA$);
+ Conversion<?> conversion = model.getConversionByClass(BigDecimal.class);
+ assertEquals(Conversions.DecimalConversion.class, conversion.getClass());
+ }
+
+ @Test
+ public void testGetModelAvro1_9() {
+
Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn("1.9.2");
+
+ // Test that model is generated correctly
+ final SpecificData model =
AvroRecordConverter.getModelForSchema(Avro19GeneratedClass.SCHEMA$);
+ Conversion<?> conversion = model.getConversionByClass(BigDecimal.class);
+ assertEquals(Conversions.DecimalConversion.class, conversion.getClass());
+ }
+
+ @Test
+ public void testGetModelAvro1_10() {
+
Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn("1.10.2");
+
+ // Test that model is generated correctly
+ final SpecificData model =
AvroRecordConverter.getModelForSchema(Avro110GeneratedClass.SCHEMA$);
+ Conversion<?> conversion = model.getConversionByClass(BigDecimal.class);
+ assertEquals(Conversions.DecimalConversion.class, conversion.getClass());
+ }
+
+ // Test Avro record class stubs, generated using different versions of the
Avro compiler
+ public abstract static class Avro110GeneratedClass extends
org.apache.avro.specific.SpecificRecordBase implements
org.apache.avro.specific.SpecificRecord {
+ private static final long serialVersionUID = 5558880508010468207L;
+ public static final org.apache.avro.Schema SCHEMA$ = new
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Avro110GeneratedClass\",\"namespace\":\"org.apache.parquet.avro.TestAvroRecordConverter\",\"doc\":\"\",\"fields\":[{\"name\":\"decimal\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}");
+
+ public static org.apache.avro.Schema getClassSchema() {
+ return SCHEMA$;
+ }
+
+ private static SpecificData MODEL$ = new SpecificData();
+
+ static {
+ MODEL$.addLogicalTypeConversion(new
org.apache.avro.Conversions.DecimalConversion());
+ }
+ }
+
+ public abstract static class Avro19GeneratedClass extends
org.apache.avro.specific.SpecificRecordBase implements
org.apache.avro.specific.SpecificRecord {
+ private static final long serialVersionUID = 5558880508010468207L;
+ public static final org.apache.avro.Schema SCHEMA$ = new
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Avro19GeneratedClass\",\"namespace\":\"org.apache.parquet.avro.TestAvroRecordConverter\",\"doc\":\"\",\"fields\":[{\"name\":\"decimal\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}");
+
+ public static org.apache.avro.Schema getClassSchema() {
+ return SCHEMA$;
+ }
+
+ private static SpecificData MODEL$ = new SpecificData();
+
+ static {
+ MODEL$.addLogicalTypeConversion(new
org.apache.avro.Conversions.DecimalConversion());
+ }
+ }
+
+ public abstract static class Avro18GeneratedClass extends
org.apache.avro.specific.SpecificRecordBase implements
org.apache.avro.specific.SpecificRecord {
+ private static final long serialVersionUID = 5558880508010468207L;
+ public static final org.apache.avro.Schema SCHEMA$ = new
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Avro18GeneratedClass\",\"namespace\":\"org.apache.parquet.avro.TestAvroRecordConverter\",\"doc\":\"\",\"fields\":[{\"name\":\"decimal\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}");
+
+ public static org.apache.avro.Schema getClassSchema() {
+ return SCHEMA$;
+ }
+
+ private static SpecificData MODEL$ = new SpecificData();
+
+ protected static final org.apache.avro.Conversions.DecimalConversion
DECIMAL_CONVERSION = new org.apache.avro.Conversions.DecimalConversion();
+
+ private static final org.apache.avro.Conversion<?>[] conversions =
+ new org.apache.avro.Conversion<?>[] {
+ DECIMAL_CONVERSION,
+ null
+ };
+
+ @Override
+ public org.apache.avro.Conversion<?> getConversion(int field) {
+ return conversions[field];
+ }
+ }
+
+ public abstract static class Avro17GeneratedClass extends
org.apache.avro.specific.SpecificRecordBase implements
org.apache.avro.specific.SpecificRecord {
+ private static final long serialVersionUID = 5558880508010468207L;
+ public static final org.apache.avro.Schema SCHEMA$ = new
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Avro17GeneratedClass\",\"namespace\":\"org.apache.parquet.avro.TestAvroRecordConverter\",\"doc\":\"\",\"fields\":[{\"name\":\"decimal\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}");
+
+ public static org.apache.avro.Schema getClassSchema() {
+ return SCHEMA$;
+ }
+
+ private static SpecificData MODEL$ = new SpecificData();
+
+ protected static final org.apache.avro.Conversions.DecimalConversion
DECIMAL_CONVERSION = new org.apache.avro.Conversions.DecimalConversion();
+
+ private static final org.apache.avro.Conversion<?>[] conversions =
+ new org.apache.avro.Conversion<?>[] {
+ DECIMAL_CONVERSION,
+ null
+ };
+
+ @Override
+ public org.apache.avro.Conversion<?> getConversion(int field) {
+ return conversions[field];
+ }
+ }
+}
diff --git
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
index 66f166cfd..6484ab4a6 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
@@ -27,6 +27,8 @@ import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -35,7 +37,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+
+import org.apache.avro.Conversion;
import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
@@ -775,6 +780,86 @@ public class TestReadWrite {
}
}
+ public static class CustomDataModel implements AvroDataSupplier {
+ @Override
+ public GenericData get() {
+ GenericData genericData = new GenericData();
+ genericData.addLogicalTypeConversion(new Conversion<LocalDate>() {
+ private final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyyMMdd");
+
+ @Override
+ public Class<LocalDate> getConvertedType() {
+ return LocalDate.class;
+ }
+
+ @Override
+ public String getLogicalTypeName() {
+ return "date";
+ }
+
+ public LocalDate fromInt(Integer localDate, Schema schema, LogicalType
type) {
+ return LocalDate.parse(String.valueOf(localDate), dateTimeFormatter);
+ }
+
+ public Integer toInt(LocalDate date, Schema schema, LogicalType type) {
+ return Integer.parseInt(dateTimeFormatter.format(date));
+ }
+ });
+ return genericData;
+ }
+ }
+ @Test
+ public void testParsesDataModelFromConf() throws Exception {
+ Schema datetimeSchema = Schema.createRecord("myrecord", null, null, false);
+ Schema date = LogicalTypes.date().addToSchema(
+ Schema.create(Schema.Type.INT));
+ datetimeSchema.setFields(Collections.singletonList(
+ new Schema.Field("date", date, null, null)));
+
+ File file = temp.newFile("datetime.parquet");
+ file.delete();
+ Path path = new Path(file.toString());
+ List<GenericRecord> expected = Lists.newArrayList();
+
+ Configuration conf = new Configuration();
+ AvroWriteSupport.setAvroDataSupplier(conf, CustomDataModel.class);
+
+ // .withDataModel is not set; AvroWriteSupport should parse it from the
Configuration
+ try(ParquetWriter<GenericRecord> writer = AvroParquetWriter
+ .<GenericRecord>builder(path)
+ .withConf(conf)
+ .withSchema(datetimeSchema)
+ .build()) {
+
+ GenericRecordBuilder builder = new GenericRecordBuilder(datetimeSchema);
+ for (int i = 0; i < 100; i += 1) {
+ builder.set("date", LocalDate.now().minusDays(i));
+
+ GenericRecord rec = builder.build();
+ expected.add(rec);
+ writer.write(builder.build());
+ }
+ }
+ List<GenericRecord> records = Lists.newArrayList();
+
+ AvroReadSupport.setAvroDataSupplier(conf, CustomDataModel.class);
+
+ try(ParquetReader<GenericRecord> reader = AvroParquetReader
+ .<GenericRecord>builder(path)
+ .disableCompatibility()
+ .withConf(conf)
+ .build()) {
+ GenericRecord rec;
+ while ((rec = reader.read()) != null) {
+ records.add(rec);
+ }
+ }
+
+ Assert.assertTrue("date field should be a LocalDate instance",
+ records.get(0).get("date") instanceof LocalDate);
+ Assert.assertEquals("Content should match", expected, records);
+ }
+
private File createTempFile() throws IOException {
File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
tmp.deleteOnExit();
diff --git
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
index 46e2f2c23..49ed27b1b 100644
---
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
+++
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
@@ -30,14 +30,19 @@ import static
org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
+import org.apache.parquet.avro.LogicalTypesTest;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -237,6 +242,43 @@ public class TestSpecificReadWrite {
}
}
+ @Test
+ public void testParsesSpecificDataModel() throws IOException {
+ // SpecificRecord contains a logical type and will fail to decode unless
its SpecificData model is parsed
+ List<LogicalTypesTest> records = IntStream
+ .range(0, 25)
+ .mapToObj(i ->
LogicalTypesTest.newBuilder().setTimestamp(Instant.now()).build())
+ .collect(Collectors.toList());
+
+ // Test that SpecificData model is parsed in AvroParquetWriter
+ File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+ tmp.deleteOnExit();
+ tmp.delete();
+ Path path = new Path(tmp.getPath());
+
+ try(
+ ParquetWriter<LogicalTypesTest> writer =
AvroParquetWriter.<LogicalTypesTest>builder(path)
+ .withSchema(LogicalTypesTest.SCHEMA$)
+ .withConf(new Configuration(false))
+ .withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
+ .build()
+ ) {
+ for (LogicalTypesTest record : records) {
+ writer.write(record);
+ }
+ }
+
+ // Test that SpecificData model is parsed in AvroParquetReader
+ final List<LogicalTypesTest> output = new ArrayList<>();
+ try (ParquetReader<org.apache.parquet.avro.LogicalTypesTest> reader = new
AvroParquetReader<>(testConf, path)) {
+ for (LogicalTypesTest record = reader.read(); record != null; record =
reader.read()) {
+ output.add(record);
+ }
+ }
+
+ assertEquals(records, output);
+ }
+
private Path writeCarsToParquetFile( int num, CompressionCodecName
compression, boolean enableDictionary) throws IOException {
return writeCarsToParquetFile(num, compression, enableDictionary,
DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
}
diff --git a/pom.xml b/pom.xml
index 3f2911b4c..eb568e097 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,6 +91,7 @@
<guava.version>27.0.1-jre</guava.version>
<brotli-codec.version>0.1.1</brotli-codec.version>
<mockito.version>1.10.19</mockito.version>
+ <powermock.version>2.0.2</powermock.version>
<net.openhft.version>0.9</net.openhft.version>
<exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
<yetus.audience-annotations.version>0.13.0</yetus.audience-annotations.version>