This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new b04edf49b PARQUET-2448: Fix model conversions for avro <= 1.8
generated classes (#1296)
b04edf49b is described below
commit b04edf49b302599cb106b891be1dcc60bc275f49
Author: Michel Davit <[email protected]>
AuthorDate: Wed Mar 20 06:45:40 2024 +0100
PARQUET-2448: Fix model conversions for avro <= 1.8 generated classes
(#1296)
---
parquet-avro/pom.xml | 24 ++-
.../apache/parquet/avro/AvroRecordConverter.java | 85 ++++++----
parquet-avro/src/test/{resources => avro}/car.avdl | 0
parquet-avro/src/test/avro/logicalType.avsc | 46 ++++--
.../parquet/avro/TestAvroRecordConverter.java | 180 +++++++++------------
.../apache/parquet/avro/TestSpecificReadWrite.java | 8 +-
6 files changed, 186 insertions(+), 157 deletions(-)
diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml
index c6abc74ca..ca49e166b 100644
--- a/parquet-avro/pom.xml
+++ b/parquet-avro/pom.xml
@@ -152,6 +152,16 @@
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
+ <execution>
+ <id>compile-idl</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>idl-protocol</goal>
+ </goals>
+ <configuration>
+ <stringType>String</stringType>
+ </configuration>
+ </execution>
<execution>
<id>compile-avsc</id>
<phase>generate-test-sources</phase>
@@ -159,18 +169,6 @@
<goal>schema</goal>
</goals>
</execution>
- <execution>
- <id>compile-idl</id>
- <phase>generate-test-sources</phase>
- <goals>
- <goal>idl-protocol</goal>
- </goals>
- <configuration>
-
<sourceDirectory>${project.basedir}/src/test/resources</sourceDirectory>
-
<outputDirectory>${project.build.directory}/generated-test-sources</outputDirectory>
- <stringType>String</stringType>
- </configuration>
- </execution>
</executions>
</plugin>
<plugin>
@@ -187,7 +185,7 @@
</goals>
<configuration>
<sources>
-
<source>${project.build.directory}/generated-test-sources</source>
+
<source>${project.build.directory}/generated-test-sources/avro</source>
</sources>
</configuration>
</execution>
diff --git
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
index 62d1f89fd..441428bfa 100644
---
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
+++
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
@@ -38,9 +38,11 @@ import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Conversion;
import org.apache.avro.LogicalType;
@@ -172,6 +174,51 @@ class AvroRecordConverter<T> extends
AvroConverters.AvroGroupConverter {
}
}
+ private static void addLogicalTypeConversion(SpecificData model, Schema
schema, Set<Schema> seenSchemas)
+ throws IllegalAccessException {
+ if (seenSchemas.contains(schema)) {
+ return;
+ }
+ seenSchemas.add(schema);
+
+ switch (schema.getType()) {
+ case RECORD:
+ final Class<?> clazz = model.getClass(schema);
+ if (clazz != null) {
+ try {
+ final Field conversionsField =
clazz.getDeclaredField("conversions");
+ conversionsField.setAccessible(true);
+ final Conversion<?>[] conversions = (Conversion<?>[])
conversionsField.get(null);
+ for (Conversion<?> conversion : conversions) {
+ if (conversion != null) {
+ model.addLogicalTypeConversion(conversion);
+ }
+ }
+
+ for (Schema.Field field : schema.getFields()) {
+ addLogicalTypeConversion(model, field.schema(), seenSchemas);
+ }
+ } catch (NoSuchFieldException e) {
+ // Avro classes without logical types (denoted by the
"conversions" field)
+ }
+ }
+ break;
+ case MAP:
+ addLogicalTypeConversion(model, schema.getValueType(), seenSchemas);
+ break;
+ case ARRAY:
+ addLogicalTypeConversion(model, schema.getElementType(), seenSchemas);
+ break;
+ case UNION:
+ for (Schema type : schema.getTypes()) {
+ addLogicalTypeConversion(model, type, seenSchemas);
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
/**
* Returns the specific data model for a given SpecificRecord schema by
reflecting the underlying
* Avro class's `MODEL$` field, or Null if the class is not on the classpath
or reflection fails.
@@ -197,17 +244,13 @@ class AvroRecordConverter<T> extends
AvroConverters.AvroGroupConverter {
model = (SpecificData) modelField.get(null);
} catch (NoSuchFieldException e) {
- LOG.info(String.format(
- "Generated Avro class %s did not contain a MODEL$ field. Parquet
will use default SpecificData model for "
- + "reading and writing.",
- clazz));
+ LOG.info(String.format("Generated Avro class %s did not contain a MODEL$
field. ", clazz)
+ + "Parquet will use default SpecificData model for reading and
writing.");
return null;
} catch (IllegalAccessException e) {
LOG.warn(
- String.format(
- "Field `MODEL$` in class %s was inaccessible. Parquet will use
default SpecificData model for "
- + "reading and writing.",
- clazz),
+ String.format("Field `MODEL$` in class %s was inaccessible. ", clazz)
+ + "Parquet will use default SpecificData model for reading and
writing.",
e);
return null;
}
@@ -215,31 +258,15 @@ class AvroRecordConverter<T> extends
AvroConverters.AvroGroupConverter {
final String avroVersion = getRuntimeAvroVersion();
// Avro 1.7 and 1.8 don't include conversions in the MODEL$ field by
default
if (avroVersion != null && (avroVersion.startsWith("1.8.") ||
avroVersion.startsWith("1.7."))) {
- final Field conversionsField;
try {
- conversionsField = clazz.getDeclaredField("conversions");
- } catch (NoSuchFieldException e) {
- // Avro classes without logical types (denoted by the "conversions"
field) can be returned as-is
- return model;
- }
-
- final Conversion<?>[] conversions;
- try {
- conversionsField.setAccessible(true);
- conversions = (Conversion<?>[]) conversionsField.get(null);
+ addLogicalTypeConversion(model, schema, new HashSet<>());
} catch (IllegalAccessException e) {
- LOG.warn(String.format(
- "Field `conversions` in class %s was inaccessible. Parquet will
use default "
- + "SpecificData model for reading and writing.",
- clazz));
+ LOG.warn(
+ String.format("Logical-type conversions were inaccessible for %s",
clazz)
+ + "Parquet will use default SpecificData model for reading and
writing.",
+ e);
return null;
}
-
- for (int i = 0; i < conversions.length; i++) {
- if (conversions[i] != null) {
- model.addLogicalTypeConversion(conversions[i]);
- }
- }
}
return model;
diff --git a/parquet-avro/src/test/resources/car.avdl
b/parquet-avro/src/test/avro/car.avdl
similarity index 100%
rename from parquet-avro/src/test/resources/car.avdl
rename to parquet-avro/src/test/avro/car.avdl
diff --git a/parquet-avro/src/test/avro/logicalType.avsc
b/parquet-avro/src/test/avro/logicalType.avsc
index fbec10a8d..46bac387e 100644
--- a/parquet-avro/src/test/avro/logicalType.avsc
+++ b/parquet-avro/src/test/avro/logicalType.avsc
@@ -1,14 +1,38 @@
{
- "type": "record",
- "name": "LogicalTypesTest",
- "namespace": "org.apache.parquet.avro",
- "doc": "Record for testing logical types",
- "fields": [
- {
- "name": "timestamp",
- "type": {
- "type": "long", "logicalType": "timestamp-millis"
+ "name": "LogicalTypesTest",
+ "namespace": "org.apache.parquet.avro",
+ "doc": "Record for testing logical types",
+ "type": "record",
+ "fields": [
+ {
+ "name": "timestamp",
+ "type": {
+ "type": "long",
+ "logicalType": "timestamp-millis"
+ }
+ },
+ {
+ "name": "local_date_time",
+ "type": {
+ "name": "LocalDateTimeTest",
+ "type": "record",
+ "fields": [
+ {
+ "name": "date",
+ "type": {
+ "type": "int",
+ "logicalType": "date"
+ }
+ },
+ {
+ "name": "time",
+ "type": {
+ "type": "int",
+ "logicalType": "time-millis"
+ }
}
- }
- ]
+ ]
+ }
+ }
+ ]
}
diff --git
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroRecordConverter.java
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroRecordConverter.java
index 1e26eddb1..76e4b99d0 100644
---
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroRecordConverter.java
+++
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroRecordConverter.java
@@ -19,17 +19,20 @@
package org.apache.parquet.avro;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import com.google.common.collect.Lists;
-import java.math.BigDecimal;
import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.Collection;
import org.apache.avro.Conversion;
-import org.apache.avro.Conversions;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
-import org.apache.avro.data.TimeConversions;
+import org.apache.avro.SchemaBuilder;
import org.apache.avro.specific.SpecificData;
import org.junit.Before;
import org.junit.Test;
@@ -54,14 +57,16 @@ public class TestAvroRecordConverter {
SpecificData model =
AvroRecordConverter.getModelForSchema(LogicalTypesTest.SCHEMA$);
// Test that model is generated correctly
- Conversion<?> conversion = model.getConversionByClass(Instant.class);
- assertEquals(TimeConversions.TimestampMillisConversion.class,
conversion.getClass());
+ Collection<Conversion<?>> conversions = model.getConversions();
+ assertEquals(conversions.size(), 3);
+ assertNotNull(model.getConversionByClass(Instant.class));
+ assertNotNull(model.getConversionByClass(LocalDate.class));
+ assertNotNull(model.getConversionByClass(LocalTime.class));
}
@Test
public void testModelForSpecificRecordWithoutLogicalTypes() {
SpecificData model = AvroRecordConverter.getModelForSchema(Car.SCHEMA$);
-
assertTrue(model.getConversions().isEmpty());
}
@@ -80,129 +85,98 @@ public class TestAvroRecordConverter {
// Test logical type support for older Avro versions
@Test
- public void testGetModelAvro1_7() {
-
Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn("1.7.7");
-
- // Test that model is generated correctly
- final SpecificData model =
AvroRecordConverter.getModelForSchema(Avro17GeneratedClass.SCHEMA$);
- Conversion<?> conversion = model.getConversionByClass(BigDecimal.class);
- assertEquals(Conversions.DecimalConversion.class, conversion.getClass());
- }
-
- @Test
- public void testGetModelAvro1_8() {
+ public void
testModelForSpecificRecordWithLogicalTypesWithDeprecatedAvro1_8() {
Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn("1.8.2");
// Test that model is generated correctly
- final SpecificData model =
AvroRecordConverter.getModelForSchema(Avro18GeneratedClass.SCHEMA$);
- Conversion<?> conversion = model.getConversionByClass(BigDecimal.class);
- assertEquals(Conversions.DecimalConversion.class, conversion.getClass());
- }
-
- @Test
- public void testGetModelAvro1_9() {
-
Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn("1.9.2");
-
+ final SpecificData model =
AvroRecordConverter.getModelForSchema(LogicalTypesTestDeprecated.SCHEMA$);
// Test that model is generated correctly
- final SpecificData model =
AvroRecordConverter.getModelForSchema(Avro19GeneratedClass.SCHEMA$);
- Conversion<?> conversion = model.getConversionByClass(BigDecimal.class);
- assertEquals(Conversions.DecimalConversion.class, conversion.getClass());
+ Collection<Conversion<?>> conversions = model.getConversions();
+ assertEquals(conversions.size(), 3);
+ assertNotNull(model.getConversionByClass(Instant.class));
+ assertNotNull(model.getConversionByClass(LocalDate.class));
+ assertNotNull(model.getConversionByClass(LocalTime.class));
}
@Test
- public void testGetModelAvro1_10() {
-
Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn("1.10.2");
+ public void
testModelForSpecificRecordWithLogicalTypesWithDeprecatedAvro1_7() {
+
Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn("1.7.7");
// Test that model is generated correctly
- final SpecificData model =
AvroRecordConverter.getModelForSchema(Avro110GeneratedClass.SCHEMA$);
- Conversion<?> conversion = model.getConversionByClass(BigDecimal.class);
- assertEquals(Conversions.DecimalConversion.class, conversion.getClass());
- }
-
- // Test Avro record class stubs, generated using different versions of the
Avro compiler
- public abstract static class Avro110GeneratedClass extends
org.apache.avro.specific.SpecificRecordBase
- implements org.apache.avro.specific.SpecificRecord {
- private static final long serialVersionUID = 5558880508010468207L;
- public static final org.apache.avro.Schema SCHEMA$ = new
org.apache.avro.Schema.Parser()
- .parse(
-
"{\"type\":\"record\",\"name\":\"Avro110GeneratedClass\",\"namespace\":\"org.apache.parquet.avro.TestAvroRecordConverter\",\"doc\":\"\",\"fields\":[{\"name\":\"decimal\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}");
-
- public static org.apache.avro.Schema getClassSchema() {
- return SCHEMA$;
- }
-
- private static SpecificData MODEL$ = new SpecificData();
-
- static {
- MODEL$.addLogicalTypeConversion(new
org.apache.avro.Conversions.DecimalConversion());
- }
- }
-
- public abstract static class Avro19GeneratedClass extends
org.apache.avro.specific.SpecificRecordBase
- implements org.apache.avro.specific.SpecificRecord {
- private static final long serialVersionUID = 5558880508010468207L;
- public static final org.apache.avro.Schema SCHEMA$ = new
org.apache.avro.Schema.Parser()
- .parse(
-
"{\"type\":\"record\",\"name\":\"Avro19GeneratedClass\",\"namespace\":\"org.apache.parquet.avro.TestAvroRecordConverter\",\"doc\":\"\",\"fields\":[{\"name\":\"decimal\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}");
-
- public static org.apache.avro.Schema getClassSchema() {
- return SCHEMA$;
- }
-
- private static SpecificData MODEL$ = new SpecificData();
-
- static {
- MODEL$.addLogicalTypeConversion(new
org.apache.avro.Conversions.DecimalConversion());
- }
+ final SpecificData model =
AvroRecordConverter.getModelForSchema(LogicalTypesTestDeprecated.SCHEMA$);
+ // Test that model is generated correctly
+ Collection<Conversion<?>> conversions = model.getConversions();
+ assertEquals(conversions.size(), 3);
+ assertNotNull(model.getConversionByClass(Instant.class));
+ assertNotNull(model.getConversionByClass(LocalDate.class));
+ assertNotNull(model.getConversionByClass(LocalTime.class));
}
- public abstract static class Avro18GeneratedClass extends
org.apache.avro.specific.SpecificRecordBase
+ // Pseudo generated code with bug from avro compiler < 1.8
+ @org.apache.avro.specific.AvroGenerated
+ public abstract static class LocalDateTimeTestDeprecated extends
org.apache.avro.specific.SpecificRecordBase
implements org.apache.avro.specific.SpecificRecord {
- private static final long serialVersionUID = 5558880508010468207L;
- public static final org.apache.avro.Schema SCHEMA$ = new
org.apache.avro.Schema.Parser()
- .parse(
-
"{\"type\":\"record\",\"name\":\"Avro18GeneratedClass\",\"namespace\":\"org.apache.parquet.avro.TestAvroRecordConverter\",\"doc\":\"\",\"fields\":[{\"name\":\"decimal\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}");
+ public static final org.apache.avro.Schema SCHEMA$ =
SchemaBuilder.builder()
+ .record("LocalDateTimeTestDeprecated")
+ .namespace("org.apache.parquet.avro.TestAvroRecordConverter")
+ .fields()
+ .name("date")
+
.type(LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType()))
+ .noDefault()
+ .name("time")
+ .type(LogicalTypes.timeMillis()
+ .addToSchema(SchemaBuilder.builder().intType()))
+ .noDefault()
+ .endRecord();
public static org.apache.avro.Schema getClassSchema() {
return SCHEMA$;
}
private static SpecificData MODEL$ = new SpecificData();
-
- protected static final org.apache.avro.Conversions.DecimalConversion
DECIMAL_CONVERSION =
- new org.apache.avro.Conversions.DecimalConversion();
-
- private static final org.apache.avro.Conversion<?>[] conversions =
- new org.apache.avro.Conversion<?>[] {DECIMAL_CONVERSION, null};
-
- @Override
- public org.apache.avro.Conversion<?> getConversion(int field) {
- return conversions[field];
- }
+ // this part is missing in the generated code
+ // static {
+ // MODEL$.addLogicalTypeConversion(new
org.apache.avro.data.TimeConversions.TimestampMillisConversion());
+ // MODEL$.addLogicalTypeConversion(new
org.apache.avro.data.TimeConversions.TimeMillisConversion());
+ // }
+
+ private static final org.apache.avro.Conversion<?>[] conversions = new
org.apache.avro.Conversion<?>[] {
+ new org.apache.avro.data.TimeConversions.DateConversion(),
+ new org.apache.avro.data.TimeConversions.TimeMillisConversion(),
+ null
+ };
}
- public abstract static class Avro17GeneratedClass extends
org.apache.avro.specific.SpecificRecordBase
+ @org.apache.avro.specific.AvroGenerated
+ public abstract static class LogicalTypesTestDeprecated extends
org.apache.avro.specific.SpecificRecordBase
implements org.apache.avro.specific.SpecificRecord {
- private static final long serialVersionUID = 5558880508010468207L;
- public static final org.apache.avro.Schema SCHEMA$ = new
org.apache.avro.Schema.Parser()
- .parse(
-
"{\"type\":\"record\",\"name\":\"Avro17GeneratedClass\",\"namespace\":\"org.apache.parquet.avro.TestAvroRecordConverter\",\"doc\":\"\",\"fields\":[{\"name\":\"decimal\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}");
+ public static final org.apache.avro.Schema SCHEMA$ =
SchemaBuilder.builder()
+ .record("LogicalTypesTestDeprecated")
+ .namespace("org.apache.parquet.avro.TestAvroRecordConverter")
+ .fields()
+ .name("timestamp")
+ .type(LogicalTypes.timestampMillis()
+ .addToSchema(SchemaBuilder.builder().longType()))
+ .noDefault()
+ .name("local_date_time")
+ .type(LocalDateTimeTestDeprecated.getClassSchema())
+ .noDefault()
+ .endRecord();
public static org.apache.avro.Schema getClassSchema() {
return SCHEMA$;
}
private static SpecificData MODEL$ = new SpecificData();
-
- protected static final org.apache.avro.Conversions.DecimalConversion
DECIMAL_CONVERSION =
- new org.apache.avro.Conversions.DecimalConversion();
-
- private static final org.apache.avro.Conversion<?>[] conversions =
- new org.apache.avro.Conversion<?>[] {DECIMAL_CONVERSION, null};
-
- @Override
- public org.apache.avro.Conversion<?> getConversion(int field) {
- return conversions[field];
- }
+ // this part is missing in the generated code
+ // {
+ // MODEL$.addLogicalTypeConversion(new
org.apache.avro.data.TimeConversions.DateConversion());
+ // MODEL$.addLogicalTypeConversion(new
org.apache.avro.data.TimeConversions.TimestampMillisConversion());
+ // MODEL$.addLogicalTypeConversion(new
org.apache.avro.data.TimeConversions.TimeMillisConversion());
+ // }
+
+ private static final org.apache.avro.Conversion<?>[] conversions = new
org.apache.avro.Conversion<?>[] {
+ new org.apache.avro.data.TimeConversions.TimestampMillisConversion(),
null, null
+ };
}
}
diff --git
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
index 2355847f0..6699c72ec 100644
---
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
+++
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
@@ -31,6 +31,8 @@ import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -295,6 +297,10 @@ public class TestSpecificReadWrite {
List<LogicalTypesTest> records = IntStream.range(0, 25)
.mapToObj(i -> LogicalTypesTest.newBuilder()
.setTimestamp(Instant.now())
+ .setLocalDateTime(LocalDateTimeTest.newBuilder()
+ .setDate(LocalDate.now())
+ .setTime(LocalTime.now())
+ .build())
.build())
.collect(Collectors.toList());
@@ -305,7 +311,7 @@ public class TestSpecificReadWrite {
Path path = new Path(tmp.getPath());
try (ParquetWriter<LogicalTypesTest> writer =
AvroParquetWriter.<LogicalTypesTest>builder(path)
- .withSchema(LogicalTypesTest.SCHEMA$)
+ .withSchema(LogicalTypesTest.getClassSchema())
.withConf(new Configuration(false))
.withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
.build()) {