This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new e8c0959ea PARQUET-2315: Expose local timestamp milli and micro to 
schema converter
e8c0959ea is described below

commit e8c0959ea157ca4f06df818f6eab03cb103b4283
Author: Stefan Ziegler <[email protected]>
AuthorDate: Thu Jun 22 17:52:10 2023 +0200

    PARQUET-2315: Expose local timestamp milli and micro to schema converter
    
    This closes #1115
---
 .../apache/parquet/avro/AvroSchemaConverter.java   | 28 +++++++++---
 .../parquet/avro/TestAvroSchemaConverter.java      | 50 ++++++++++++++++++++++
 2 files changed, 72 insertions(+), 6 deletions(-)

diff --git 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
index 7d1f3cab9..0314bcd71 100644
--- 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
+++ 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -456,8 +456,12 @@ public class AvroSchemaConverter {
       return timeType(true, MICROS);
     } else if (logicalType instanceof LogicalTypes.TimestampMillis) {
       return timestampType(true, MILLIS);
+    } else if (logicalType instanceof LogicalTypes.LocalTimestampMillis) {
+      return timestampType(false, MILLIS);
     } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
       return timestampType(true, MICROS);
+    } else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) {
+      return timestampType(false, MICROS);
     } else if (logicalType.getName().equals(LogicalTypes.uuid().getName()) && 
writeParquetUUID) {
       return uuidType();
     }
@@ -494,13 +498,25 @@ public class AvroSchemaConverter {
       @Override
       public Optional<LogicalType> 
visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation 
timestampLogicalType) {
         LogicalTypeAnnotation.TimeUnit unit = timestampLogicalType.getUnit();
-        switch (unit) {
-          case MILLIS:
-            return of(LogicalTypes.timestampMillis());
-          case MICROS:
-            return of(LogicalTypes.timestampMicros());
+        boolean isAdjustedToUTC = timestampLogicalType.isAdjustedToUTC();
+
+        if (isAdjustedToUTC) {
+          switch (unit) {
+            case MILLIS:
+              return of(LogicalTypes.timestampMillis());
+            case MICROS:
+              return of(LogicalTypes.timestampMicros());
+          }
+          return empty(); 
+        } else {
+          switch (unit) {
+            case MILLIS:
+              return of(LogicalTypes.localTimestampMillis());
+            case MICROS:
+              return of(LogicalTypes.localTimestampMicros());
+          }
+          return empty(); 
         }
-        return empty();
       }
 
       @Override
diff --git 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
index 1bafdec1e..642c48ca2 100644
--- 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
+++ 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
@@ -712,6 +712,31 @@ public class TestAvroSchemaConverter {
     }
   }
 
+  @Test
+  public void testLocalTimestampMillisType() throws Exception {
+    Schema date = 
LogicalTypes.localTimestampMillis().addToSchema(Schema.create(LONG));
+    Schema expected = Schema.createRecord("myrecord", null, null, false,
+        Arrays.asList(new Schema.Field("timestamp", date, null, null)));
+
+    testRoundTripConversion(expected,
+        "message myrecord {\n" +
+            "  required int64 timestamp (TIMESTAMP(MILLIS,false));\n" +
+            "}\n");
+
+    for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+        {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+      final PrimitiveType type;
+      if (primitive == FIXED_LEN_BYTE_ARRAY) {
+        type = new PrimitiveType(REQUIRED, primitive, 12, "test", 
TIMESTAMP_MILLIS);
+      } else {
+        type = new PrimitiveType(REQUIRED, primitive, "test", 
TIMESTAMP_MILLIS);
+      }
+
+      assertThrows("Should not allow TIMESTAMP_MILLIS with " + primitive,
+          IllegalArgumentException.class, () -> new 
AvroSchemaConverter().convert(message(type)));
+    }
+  }
+
   @Test
   public void testTimestampMicrosType() throws Exception {
     Schema date = 
LogicalTypes.timestampMicros().addToSchema(Schema.create(LONG));
@@ -737,6 +762,31 @@ public class TestAvroSchemaConverter {
     }
   }
 
+  @Test
+  public void testLocalTimestampMicrosType() throws Exception {
+    Schema date = 
LogicalTypes.localTimestampMicros().addToSchema(Schema.create(LONG));
+    Schema expected = Schema.createRecord("myrecord", null, null, false,
+        Arrays.asList(new Schema.Field("timestamp", date, null, null)));
+
+    testRoundTripConversion(expected,
+        "message myrecord {\n" +
+            "  required int64 timestamp (TIMESTAMP(MICROS,false));\n" +
+            "}\n");
+
+    for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+        {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+      final PrimitiveType type;
+      if (primitive == FIXED_LEN_BYTE_ARRAY) {
+        type = new PrimitiveType(REQUIRED, primitive, 12, "test", 
TIMESTAMP_MICROS);
+      } else {
+        type = new PrimitiveType(REQUIRED, primitive, "test", 
TIMESTAMP_MICROS);
+      }
+
+      assertThrows("Should not allow TIMESTAMP_MICROS with " + primitive,
+          IllegalArgumentException.class, () -> new 
AvroSchemaConverter().convert(message(type)));
+    }
+  }
+
   @Test
   public void testReuseNameInNestedStructure() throws Exception {
     Schema innerA1 = record("a1", "a12",

Reply via email to