This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new b04edf49b PARQUET-2448: Fix model conversions for avro <= 1.8 
generated classes (#1296)
b04edf49b is described below

commit b04edf49b302599cb106b891be1dcc60bc275f49
Author: Michel Davit <[email protected]>
AuthorDate: Wed Mar 20 06:45:40 2024 +0100

    PARQUET-2448: Fix model conversions for avro <= 1.8 generated classes 
(#1296)
---
 parquet-avro/pom.xml                               |  24 ++-
 .../apache/parquet/avro/AvroRecordConverter.java   |  85 ++++++----
 parquet-avro/src/test/{resources => avro}/car.avdl |   0
 parquet-avro/src/test/avro/logicalType.avsc        |  46 ++++--
 .../parquet/avro/TestAvroRecordConverter.java      | 180 +++++++++------------
 .../apache/parquet/avro/TestSpecificReadWrite.java |   8 +-
 6 files changed, 186 insertions(+), 157 deletions(-)

diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml
index c6abc74ca..ca49e166b 100644
--- a/parquet-avro/pom.xml
+++ b/parquet-avro/pom.xml
@@ -152,6 +152,16 @@
         <artifactId>avro-maven-plugin</artifactId>
         <version>${avro.version}</version>
         <executions>
+          <execution>
+            <id>compile-idl</id>
+            <phase>generate-test-sources</phase>
+            <goals>
+              <goal>idl-protocol</goal>
+            </goals>
+            <configuration>
+              <stringType>String</stringType>
+            </configuration>
+          </execution>
           <execution>
             <id>compile-avsc</id>
             <phase>generate-test-sources</phase>
@@ -159,18 +169,6 @@
               <goal>schema</goal>
             </goals>
           </execution>
-            <execution>
-                <id>compile-idl</id>
-                <phase>generate-test-sources</phase>
-                <goals>
-                    <goal>idl-protocol</goal>
-                </goals>
-                <configuration>
-                    
<sourceDirectory>${project.basedir}/src/test/resources</sourceDirectory>
-                    
<outputDirectory>${project.build.directory}/generated-test-sources</outputDirectory>
-                    <stringType>String</stringType>
-                </configuration>
-            </execution>
         </executions>
       </plugin>
       <plugin>
@@ -187,7 +185,7 @@
             </goals>
             <configuration>
               <sources>
-                
<source>${project.build.directory}/generated-test-sources</source>
+                
<source>${project.build.directory}/generated-test-sources/avro</source>
               </sources>
             </configuration>
           </execution>
diff --git 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
index 62d1f89fd..441428bfa 100644
--- 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
+++ 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
@@ -38,9 +38,11 @@ import java.lang.reflect.Modifier;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.avro.AvroTypeException;
 import org.apache.avro.Conversion;
 import org.apache.avro.LogicalType;
@@ -172,6 +174,51 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
     }
   }
 
+  private static void addLogicalTypeConversion(SpecificData model, Schema 
schema, Set<Schema> seenSchemas)
+      throws IllegalAccessException {
+    if (seenSchemas.contains(schema)) {
+      return;
+    }
+    seenSchemas.add(schema);
+
+    switch (schema.getType()) {
+      case RECORD:
+        final Class<?> clazz = model.getClass(schema);
+        if (clazz != null) {
+          try {
+            final Field conversionsField = 
clazz.getDeclaredField("conversions");
+            conversionsField.setAccessible(true);
+            final Conversion<?>[] conversions = (Conversion<?>[]) 
conversionsField.get(null);
+            for (Conversion<?> conversion : conversions) {
+              if (conversion != null) {
+                model.addLogicalTypeConversion(conversion);
+              }
+            }
+
+            for (Schema.Field field : schema.getFields()) {
+              addLogicalTypeConversion(model, field.schema(), seenSchemas);
+            }
+          } catch (NoSuchFieldException e) {
+            // Avro classes without logical types (denoted by the 
"conversions" field)
+          }
+        }
+        break;
+      case MAP:
+        addLogicalTypeConversion(model, schema.getValueType(), seenSchemas);
+        break;
+      case ARRAY:
+        addLogicalTypeConversion(model, schema.getElementType(), seenSchemas);
+        break;
+      case UNION:
+        for (Schema type : schema.getTypes()) {
+          addLogicalTypeConversion(model, type, seenSchemas);
+        }
+        break;
+      default:
+        break;
+    }
+  }
+
   /**
    * Returns the specific data model for a given SpecificRecord schema by 
reflecting the underlying
    * Avro class's `MODEL$` field, or Null if the class is not on the classpath 
or reflection fails.
@@ -197,17 +244,13 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
 
       model = (SpecificData) modelField.get(null);
     } catch (NoSuchFieldException e) {
-      LOG.info(String.format(
-          "Generated Avro class %s did not contain a MODEL$ field. Parquet 
will use default SpecificData model for "
-              + "reading and writing.",
-          clazz));
+      LOG.info(String.format("Generated Avro class %s did not contain a MODEL$ 
field. ", clazz)
+          + "Parquet will use default SpecificData model for reading and 
writing.");
       return null;
     } catch (IllegalAccessException e) {
       LOG.warn(
-          String.format(
-              "Field `MODEL$` in class %s was inaccessible. Parquet will use 
default SpecificData model for "
-                  + "reading and writing.",
-              clazz),
+          String.format("Field `MODEL$` in class %s was inaccessible. ", clazz)
+              + "Parquet will use default SpecificData model for reading and 
writing.",
           e);
       return null;
     }
@@ -215,31 +258,15 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
     final String avroVersion = getRuntimeAvroVersion();
     // Avro 1.7 and 1.8 don't include conversions in the MODEL$ field by 
default
     if (avroVersion != null && (avroVersion.startsWith("1.8.") || 
avroVersion.startsWith("1.7."))) {
-      final Field conversionsField;
       try {
-        conversionsField = clazz.getDeclaredField("conversions");
-      } catch (NoSuchFieldException e) {
-        // Avro classes without logical types (denoted by the "conversions" 
field) can be returned as-is
-        return model;
-      }
-
-      final Conversion<?>[] conversions;
-      try {
-        conversionsField.setAccessible(true);
-        conversions = (Conversion<?>[]) conversionsField.get(null);
+        addLogicalTypeConversion(model, schema, new HashSet<>());
       } catch (IllegalAccessException e) {
-        LOG.warn(String.format(
-            "Field `conversions` in class %s was inaccessible. Parquet will 
use default "
-                + "SpecificData model for reading and writing.",
-            clazz));
+        LOG.warn(
+            String.format("Logical-type conversions were inaccessible for %s", 
clazz)
+                + "Parquet will use default SpecificData model for reading and 
writing.",
+            e);
         return null;
       }
-
-      for (int i = 0; i < conversions.length; i++) {
-        if (conversions[i] != null) {
-          model.addLogicalTypeConversion(conversions[i]);
-        }
-      }
     }
 
     return model;
diff --git a/parquet-avro/src/test/resources/car.avdl 
b/parquet-avro/src/test/avro/car.avdl
similarity index 100%
rename from parquet-avro/src/test/resources/car.avdl
rename to parquet-avro/src/test/avro/car.avdl
diff --git a/parquet-avro/src/test/avro/logicalType.avsc 
b/parquet-avro/src/test/avro/logicalType.avsc
index fbec10a8d..46bac387e 100644
--- a/parquet-avro/src/test/avro/logicalType.avsc
+++ b/parquet-avro/src/test/avro/logicalType.avsc
@@ -1,14 +1,38 @@
 {
-    "type": "record",
-    "name": "LogicalTypesTest",
-    "namespace": "org.apache.parquet.avro",
-    "doc": "Record for testing logical types",
-    "fields": [
-        {
-          "name": "timestamp",
-          "type": {
-            "type": "long", "logicalType": "timestamp-millis"
+  "name": "LogicalTypesTest",
+  "namespace": "org.apache.parquet.avro",
+  "doc": "Record for testing logical types",
+  "type": "record",
+  "fields": [
+    {
+      "name": "timestamp",
+      "type": {
+        "type": "long",
+        "logicalType": "timestamp-millis"
+      }
+    },
+    {
+      "name": "local_date_time",
+      "type": {
+        "name": "LocalDateTimeTest",
+        "type": "record",
+        "fields": [
+          {
+            "name": "date",
+            "type": {
+              "type": "int",
+              "logicalType": "date"
+            }
+          },
+          {
+            "name": "time",
+            "type": {
+              "type": "int",
+              "logicalType": "time-millis"
+            }
           }
-        }
-    ]
+        ]
+      }
+    }
+  ]
 }
diff --git 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroRecordConverter.java
 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroRecordConverter.java
index 1e26eddb1..76e4b99d0 100644
--- 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroRecordConverter.java
+++ 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroRecordConverter.java
@@ -19,17 +19,20 @@
 package org.apache.parquet.avro;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.CALLS_REAL_METHODS;
 
 import com.google.common.collect.Lists;
-import java.math.BigDecimal;
 import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.Collection;
 import org.apache.avro.Conversion;
-import org.apache.avro.Conversions;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.data.TimeConversions;
+import org.apache.avro.SchemaBuilder;
 import org.apache.avro.specific.SpecificData;
 import org.junit.Before;
 import org.junit.Test;
@@ -54,14 +57,16 @@ public class TestAvroRecordConverter {
     SpecificData model = 
AvroRecordConverter.getModelForSchema(LogicalTypesTest.SCHEMA$);
 
     // Test that model is generated correctly
-    Conversion<?> conversion = model.getConversionByClass(Instant.class);
-    assertEquals(TimeConversions.TimestampMillisConversion.class, 
conversion.getClass());
+    Collection<Conversion<?>> conversions = model.getConversions();
+    assertEquals(conversions.size(), 3);
+    assertNotNull(model.getConversionByClass(Instant.class));
+    assertNotNull(model.getConversionByClass(LocalDate.class));
+    assertNotNull(model.getConversionByClass(LocalTime.class));
   }
 
   @Test
   public void testModelForSpecificRecordWithoutLogicalTypes() {
     SpecificData model = AvroRecordConverter.getModelForSchema(Car.SCHEMA$);
-
     assertTrue(model.getConversions().isEmpty());
   }
 
@@ -80,129 +85,98 @@ public class TestAvroRecordConverter {
 
   // Test logical type support for older Avro versions
   @Test
-  public void testGetModelAvro1_7() {
-    
Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn("1.7.7");
-
-    // Test that model is generated correctly
-    final SpecificData model = 
AvroRecordConverter.getModelForSchema(Avro17GeneratedClass.SCHEMA$);
-    Conversion<?> conversion = model.getConversionByClass(BigDecimal.class);
-    assertEquals(Conversions.DecimalConversion.class, conversion.getClass());
-  }
-
-  @Test
-  public void testGetModelAvro1_8() {
+  public void 
testModelForSpecificRecordWithLogicalTypesWithDeprecatedAvro1_8() {
     
Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn("1.8.2");
 
     // Test that model is generated correctly
-    final SpecificData model = 
AvroRecordConverter.getModelForSchema(Avro18GeneratedClass.SCHEMA$);
-    Conversion<?> conversion = model.getConversionByClass(BigDecimal.class);
-    assertEquals(Conversions.DecimalConversion.class, conversion.getClass());
-  }
-
-  @Test
-  public void testGetModelAvro1_9() {
-    
Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn("1.9.2");
-
+    final SpecificData model = 
AvroRecordConverter.getModelForSchema(LogicalTypesTestDeprecated.SCHEMA$);
     // Test that model is generated correctly
-    final SpecificData model = 
AvroRecordConverter.getModelForSchema(Avro19GeneratedClass.SCHEMA$);
-    Conversion<?> conversion = model.getConversionByClass(BigDecimal.class);
-    assertEquals(Conversions.DecimalConversion.class, conversion.getClass());
+    Collection<Conversion<?>> conversions = model.getConversions();
+    assertEquals(conversions.size(), 3);
+    assertNotNull(model.getConversionByClass(Instant.class));
+    assertNotNull(model.getConversionByClass(LocalDate.class));
+    assertNotNull(model.getConversionByClass(LocalTime.class));
   }
 
   @Test
-  public void testGetModelAvro1_10() {
-    
Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn("1.10.2");
+  public void 
testModelForSpecificRecordWithLogicalTypesWithDeprecatedAvro1_7() {
+    
Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn("1.7.7");
 
     // Test that model is generated correctly
-    final SpecificData model = 
AvroRecordConverter.getModelForSchema(Avro110GeneratedClass.SCHEMA$);
-    Conversion<?> conversion = model.getConversionByClass(BigDecimal.class);
-    assertEquals(Conversions.DecimalConversion.class, conversion.getClass());
-  }
-
-  // Test Avro record class stubs, generated using different versions of the 
Avro compiler
-  public abstract static class Avro110GeneratedClass extends 
org.apache.avro.specific.SpecificRecordBase
-      implements org.apache.avro.specific.SpecificRecord {
-    private static final long serialVersionUID = 5558880508010468207L;
-    public static final org.apache.avro.Schema SCHEMA$ = new 
org.apache.avro.Schema.Parser()
-        .parse(
-            
"{\"type\":\"record\",\"name\":\"Avro110GeneratedClass\",\"namespace\":\"org.apache.parquet.avro.TestAvroRecordConverter\",\"doc\":\"\",\"fields\":[{\"name\":\"decimal\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}");
-
-    public static org.apache.avro.Schema getClassSchema() {
-      return SCHEMA$;
-    }
-
-    private static SpecificData MODEL$ = new SpecificData();
-
-    static {
-      MODEL$.addLogicalTypeConversion(new 
org.apache.avro.Conversions.DecimalConversion());
-    }
-  }
-
-  public abstract static class Avro19GeneratedClass extends 
org.apache.avro.specific.SpecificRecordBase
-      implements org.apache.avro.specific.SpecificRecord {
-    private static final long serialVersionUID = 5558880508010468207L;
-    public static final org.apache.avro.Schema SCHEMA$ = new 
org.apache.avro.Schema.Parser()
-        .parse(
-            
"{\"type\":\"record\",\"name\":\"Avro19GeneratedClass\",\"namespace\":\"org.apache.parquet.avro.TestAvroRecordConverter\",\"doc\":\"\",\"fields\":[{\"name\":\"decimal\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}");
-
-    public static org.apache.avro.Schema getClassSchema() {
-      return SCHEMA$;
-    }
-
-    private static SpecificData MODEL$ = new SpecificData();
-
-    static {
-      MODEL$.addLogicalTypeConversion(new 
org.apache.avro.Conversions.DecimalConversion());
-    }
+    final SpecificData model = 
AvroRecordConverter.getModelForSchema(LogicalTypesTestDeprecated.SCHEMA$);
+    // Test that model is generated correctly
+    Collection<Conversion<?>> conversions = model.getConversions();
+    assertEquals(conversions.size(), 3);
+    assertNotNull(model.getConversionByClass(Instant.class));
+    assertNotNull(model.getConversionByClass(LocalDate.class));
+    assertNotNull(model.getConversionByClass(LocalTime.class));
   }
 
-  public abstract static class Avro18GeneratedClass extends 
org.apache.avro.specific.SpecificRecordBase
+  // Pseudo generated code with bug from avro compiler < 1.8
+  @org.apache.avro.specific.AvroGenerated
+  public abstract static class LocalDateTimeTestDeprecated extends 
org.apache.avro.specific.SpecificRecordBase
       implements org.apache.avro.specific.SpecificRecord {
-    private static final long serialVersionUID = 5558880508010468207L;
-    public static final org.apache.avro.Schema SCHEMA$ = new 
org.apache.avro.Schema.Parser()
-        .parse(
-            
"{\"type\":\"record\",\"name\":\"Avro18GeneratedClass\",\"namespace\":\"org.apache.parquet.avro.TestAvroRecordConverter\",\"doc\":\"\",\"fields\":[{\"name\":\"decimal\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}");
+    public static final org.apache.avro.Schema SCHEMA$ = 
SchemaBuilder.builder()
+        .record("LocalDateTimeTestDeprecated")
+        .namespace("org.apache.parquet.avro.TestAvroRecordConverter")
+        .fields()
+        .name("date")
+        
.type(LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType()))
+        .noDefault()
+        .name("time")
+        .type(LogicalTypes.timeMillis()
+            .addToSchema(SchemaBuilder.builder().intType()))
+        .noDefault()
+        .endRecord();
 
     public static org.apache.avro.Schema getClassSchema() {
       return SCHEMA$;
     }
 
     private static SpecificData MODEL$ = new SpecificData();
-
-    protected static final org.apache.avro.Conversions.DecimalConversion 
DECIMAL_CONVERSION =
-        new org.apache.avro.Conversions.DecimalConversion();
-
-    private static final org.apache.avro.Conversion<?>[] conversions =
-        new org.apache.avro.Conversion<?>[] {DECIMAL_CONVERSION, null};
-
-    @Override
-    public org.apache.avro.Conversion<?> getConversion(int field) {
-      return conversions[field];
-    }
+    // this part is missing in the generated code
+    // static {
+    //   MODEL$.addLogicalTypeConversion(new 
org.apache.avro.data.TimeConversions.TimestampMillisConversion());
+    //   MODEL$.addLogicalTypeConversion(new 
org.apache.avro.data.TimeConversions.TimeMillisConversion());
+    // }
+
+    private static final org.apache.avro.Conversion<?>[] conversions = new 
org.apache.avro.Conversion<?>[] {
+      new org.apache.avro.data.TimeConversions.DateConversion(),
+      new org.apache.avro.data.TimeConversions.TimeMillisConversion(),
+      null
+    };
   }
 
-  public abstract static class Avro17GeneratedClass extends 
org.apache.avro.specific.SpecificRecordBase
+  @org.apache.avro.specific.AvroGenerated
+  public abstract static class LogicalTypesTestDeprecated extends 
org.apache.avro.specific.SpecificRecordBase
       implements org.apache.avro.specific.SpecificRecord {
-    private static final long serialVersionUID = 5558880508010468207L;
-    public static final org.apache.avro.Schema SCHEMA$ = new 
org.apache.avro.Schema.Parser()
-        .parse(
-            
"{\"type\":\"record\",\"name\":\"Avro17GeneratedClass\",\"namespace\":\"org.apache.parquet.avro.TestAvroRecordConverter\",\"doc\":\"\",\"fields\":[{\"name\":\"decimal\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}");
+    public static final org.apache.avro.Schema SCHEMA$ = 
SchemaBuilder.builder()
+        .record("LogicalTypesTestDeprecated")
+        .namespace("org.apache.parquet.avro.TestAvroRecordConverter")
+        .fields()
+        .name("timestamp")
+        .type(LogicalTypes.timestampMillis()
+            .addToSchema(SchemaBuilder.builder().longType()))
+        .noDefault()
+        .name("local_date_time")
+        .type(LocalDateTimeTestDeprecated.getClassSchema())
+        .noDefault()
+        .endRecord();
 
     public static org.apache.avro.Schema getClassSchema() {
       return SCHEMA$;
     }
 
     private static SpecificData MODEL$ = new SpecificData();
-
-    protected static final org.apache.avro.Conversions.DecimalConversion 
DECIMAL_CONVERSION =
-        new org.apache.avro.Conversions.DecimalConversion();
-
-    private static final org.apache.avro.Conversion<?>[] conversions =
-        new org.apache.avro.Conversion<?>[] {DECIMAL_CONVERSION, null};
-
-    @Override
-    public org.apache.avro.Conversion<?> getConversion(int field) {
-      return conversions[field];
-    }
+    // this part is missing in the generated code
+    // {
+    // MODEL$.addLogicalTypeConversion(new 
org.apache.avro.data.TimeConversions.DateConversion());
+    // MODEL$.addLogicalTypeConversion(new 
org.apache.avro.data.TimeConversions.TimestampMillisConversion());
+    // MODEL$.addLogicalTypeConversion(new 
org.apache.avro.data.TimeConversions.TimeMillisConversion());
+    // }
+
+    private static final org.apache.avro.Conversion<?>[] conversions = new 
org.apache.avro.Conversion<?>[] {
+      new org.apache.avro.data.TimeConversions.TimestampMillisConversion(), 
null, null
+    };
   }
 }
diff --git 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
index 2355847f0..6699c72ec 100644
--- 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
+++ 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
@@ -31,6 +31,8 @@ import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.IOException;
 import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -295,6 +297,10 @@ public class TestSpecificReadWrite {
     List<LogicalTypesTest> records = IntStream.range(0, 25)
         .mapToObj(i -> LogicalTypesTest.newBuilder()
             .setTimestamp(Instant.now())
+            .setLocalDateTime(LocalDateTimeTest.newBuilder()
+                .setDate(LocalDate.now())
+                .setTime(LocalTime.now())
+                .build())
             .build())
         .collect(Collectors.toList());
 
@@ -305,7 +311,7 @@ public class TestSpecificReadWrite {
     Path path = new Path(tmp.getPath());
 
     try (ParquetWriter<LogicalTypesTest> writer = 
AvroParquetWriter.<LogicalTypesTest>builder(path)
-        .withSchema(LogicalTypesTest.SCHEMA$)
+        .withSchema(LogicalTypesTest.getClassSchema())
         .withConf(new Configuration(false))
         .withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
         .build()) {

Reply via email to