This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new e8c0959ea PARQUET-2315: Expose local timestamp milli and micro to
schema converter
e8c0959ea is described below
commit e8c0959ea157ca4f06df818f6eab03cb103b4283
Author: Stefan Ziegler <[email protected]>
AuthorDate: Thu Jun 22 17:52:10 2023 +0200
PARQUET-2315: Expose local timestamp milli and micro to schema converter
This closes #1115
---
.../apache/parquet/avro/AvroSchemaConverter.java | 28 +++++++++---
.../parquet/avro/TestAvroSchemaConverter.java | 50 ++++++++++++++++++++++
2 files changed, 72 insertions(+), 6 deletions(-)
diff --git
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
index 7d1f3cab9..0314bcd71 100644
---
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
+++
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -456,8 +456,12 @@ public class AvroSchemaConverter {
return timeType(true, MICROS);
} else if (logicalType instanceof LogicalTypes.TimestampMillis) {
return timestampType(true, MILLIS);
+ } else if (logicalType instanceof LogicalTypes.LocalTimestampMillis) {
+ return timestampType(false, MILLIS);
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
return timestampType(true, MICROS);
+ } else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) {
+ return timestampType(false, MICROS);
} else if (logicalType.getName().equals(LogicalTypes.uuid().getName()) &&
writeParquetUUID) {
return uuidType();
}
@@ -494,13 +498,25 @@ public class AvroSchemaConverter {
@Override
public Optional<LogicalType>
visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation
timestampLogicalType) {
LogicalTypeAnnotation.TimeUnit unit = timestampLogicalType.getUnit();
- switch (unit) {
- case MILLIS:
- return of(LogicalTypes.timestampMillis());
- case MICROS:
- return of(LogicalTypes.timestampMicros());
+ boolean isAdjustedToUTC = timestampLogicalType.isAdjustedToUTC();
+
+ if (isAdjustedToUTC) {
+ switch (unit) {
+ case MILLIS:
+ return of(LogicalTypes.timestampMillis());
+ case MICROS:
+ return of(LogicalTypes.timestampMicros());
+ }
+ return empty();
+ } else {
+ switch (unit) {
+ case MILLIS:
+ return of(LogicalTypes.localTimestampMillis());
+ case MICROS:
+ return of(LogicalTypes.localTimestampMicros());
+ }
+ return empty();
}
- return empty();
}
@Override
diff --git
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
index 1bafdec1e..642c48ca2 100644
---
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
+++
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
@@ -712,6 +712,31 @@ public class TestAvroSchemaConverter {
}
}
+ @Test
+ public void testLocalTimestampMillisType() throws Exception {
+ Schema date =
LogicalTypes.localTimestampMillis().addToSchema(Schema.create(LONG));
+ Schema expected = Schema.createRecord("myrecord", null, null, false,
+ Arrays.asList(new Schema.Field("timestamp", date, null, null)));
+
+ testRoundTripConversion(expected,
+ "message myrecord {\n" +
+ " required int64 timestamp (TIMESTAMP(MILLIS,false));\n" +
+ "}\n");
+
+ for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+ {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+ final PrimitiveType type;
+ if (primitive == FIXED_LEN_BYTE_ARRAY) {
+ type = new PrimitiveType(REQUIRED, primitive, 12, "test",
TIMESTAMP_MILLIS);
+ } else {
+ type = new PrimitiveType(REQUIRED, primitive, "test",
TIMESTAMP_MILLIS);
+ }
+
+ assertThrows("Should not allow TIMESTAMP_MILLIS with " + primitive,
+ IllegalArgumentException.class, () -> new
AvroSchemaConverter().convert(message(type)));
+ }
+ }
+
@Test
public void testTimestampMicrosType() throws Exception {
Schema date =
LogicalTypes.timestampMicros().addToSchema(Schema.create(LONG));
@@ -737,6 +762,31 @@ public class TestAvroSchemaConverter {
}
}
+ @Test
+ public void testLocalTimestampMicrosType() throws Exception {
+ Schema date =
LogicalTypes.localTimestampMicros().addToSchema(Schema.create(LONG));
+ Schema expected = Schema.createRecord("myrecord", null, null, false,
+ Arrays.asList(new Schema.Field("timestamp", date, null, null)));
+
+ testRoundTripConversion(expected,
+ "message myrecord {\n" +
+ " required int64 timestamp (TIMESTAMP(MICROS,false));\n" +
+ "}\n");
+
+ for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+ {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+ final PrimitiveType type;
+ if (primitive == FIXED_LEN_BYTE_ARRAY) {
+ type = new PrimitiveType(REQUIRED, primitive, 12, "test",
TIMESTAMP_MICROS);
+ } else {
+ type = new PrimitiveType(REQUIRED, primitive, "test",
TIMESTAMP_MICROS);
+ }
+
+ assertThrows("Should not allow TIMESTAMP_MICROS with " + primitive,
+ IllegalArgumentException.class, () -> new
AvroSchemaConverter().convert(message(type)));
+ }
+ }
+
@Test
public void testReuseNameInNestedStructure() throws Exception {
Schema innerA1 = record("a1", "a12",