danny0405 commented on code in PR #8418:
URL: https://github.com/apache/hudi/pull/8418#discussion_r1165109685
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java:
##########
@@ -124,17 +124,25 @@ private FieldWriter createWriter(LogicalType t) {
return new DoubleWriter();
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) t;
- if (timestampType.getPrecision() == 3) {
- return new Timestamp64Writer();
+ final int tsPrecision = timestampType.getPrecision();
+ if (tsPrecision == 3 || tsPrecision == 6) {
+ return new Timestamp64Writer(tsPrecision);
} else {
- return new Timestamp96Writer(timestampType.getPrecision());
+ throw new IllegalArgumentException(
+ "Parquet does not support TIMESTAMP type with precision: "
+ + tsPrecision
+ + ", it only support precisions <= 6.");
}
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType localZonedTimestampType =
(LocalZonedTimestampType) t;
- if (localZonedTimestampType.getPrecision() == 3) {
- return new Timestamp64Writer();
+ final int tsLtzPrecision = localZonedTimestampType.getPrecision();
+ if (tsLtzPrecision == 3 || tsLtzPrecision == 6) {
+ return new Timestamp64Writer(tsLtzPrecision);
} else {
- return new Timestamp96Writer(localZonedTimestampType.getPrecision());
+ throw new IllegalArgumentException(
Review Comment:
Ditto, can we rollback the change.
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java:
##########
@@ -283,34 +291,66 @@ public void write(ArrayData array, int ordinal) {
}
}
- /**
- * Timestamp of INT96 bytes, julianDay(4) + nanosOfDay(8). See
- *
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp
Review Comment:
Can we fix the doc instead of removing it.
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java:
##########
@@ -283,34 +291,66 @@ public void write(ArrayData array, int ordinal) {
}
}
- /**
- * Timestamp of INT96 bytes, julianDay(4) + nanosOfDay(8). See
- *
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp
- * TIMESTAMP_MILLIS and TIMESTAMP_MICROS are the deprecated ConvertedType.
- */
private class Timestamp64Writer implements FieldWriter {
- private Timestamp64Writer() {
+
+ private final int precision;
+ private Timestamp64Writer(int precision) {
+ this.precision = precision;
}
@Override
public void write(RowData row, int ordinal) {
- recordConsumer.addLong(timestampToInt64(row.getTimestamp(ordinal, 3)));
+ TimestampData timestampData = row.getTimestamp(ordinal, precision);
+ recordConsumer.addLong(timestampToInt64(timestampData, precision));
}
@Override
public void write(ArrayData array, int ordinal) {
- recordConsumer.addLong(timestampToInt64(array.getTimestamp(ordinal, 3)));
+ TimestampData timestampData = array.getTimestamp(ordinal, precision);
+ recordConsumer.addLong(timestampToInt64(timestampData, precision));
}
}
- private long timestampToInt64(TimestampData timestampData) {
- return utcTimestamp ? timestampData.getMillisecond() :
timestampData.toTimestamp().getTime();
+ /**
+ * Converts a {@code TimestampData} to its corresponding int64 value. This
function only accepts TimestampData of
+ * precision 3 or 6. Special attention will need to be given to a
TimestampData of precision = 6.
+ * <p>
+ * For example representing `1970-01-01T00:00:03.100001` of precision 6 will
have:
+ * <ul>
+ * <li>millisecond = 3100</li>
+ * <li>nanoOfMillisecond = 1000</li>
+ * </ul>
+ * As such, the int64 value will be:
+ * <p>
+ * millisecond * 1000 + nanoOfMillisecond / 1000
+ *
+ * @param timestampData TimestampData to be converted to int64 format
+ * @param precision the precision of the TimestampData
+ * @return int64 value of the TimestampData
+ */
+ private long timestampToInt64(TimestampData timestampData, int precision) {
+ if (!utcTimestamp) {
+ // toTimestamp is agnostic of precision
+ return timestampData.toTimestamp().getTime();
+ } else if (precision == 3) {
+ return timestampData.getMillisecond();
+ } else if (precision == 6) {
+ // convert timestampDat ato microseconds format
+ return timestampData.getMillisecond() * 1000 +
timestampData.getNanoOfMillisecond() / 1000;
+ }
+
+ throw new IllegalArgumentException(
+ "Unable to convert TimestampData with precision: "
Review Comment:
Usually we check the arguments at the beginning of the method, and can we
use the utilites class `ValidationUtils`
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java:
##########
@@ -600,23 +600,29 @@ private static Type convertToParquetType(
.named(name);
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) type;
- if (timestampType.getPrecision() == 3) {
+ if (timestampType.getPrecision() == 3 || timestampType.getPrecision()
== 6) {
+ TimeUnit timeunit = timestampType.getPrecision() == 3 ?
TimeUnit.MILLIS : TimeUnit.MICROS;
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64,
repetition)
- .as(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS))
+ .as(LogicalTypeAnnotation.timestampType(true, timeunit))
.named(name);
} else {
- return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96,
repetition)
- .named(name);
+ throw new IllegalArgumentException(
+ "Parquet does not support TIMESTAMP type with precision: "
+ + timestampType.getPrecision()
+ + ", it only support precisions <= 6.");
}
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType localZonedTimestampType =
(LocalZonedTimestampType) type;
- if (localZonedTimestampType.getPrecision() == 3) {
+ if (localZonedTimestampType.getPrecision() == 3 ||
localZonedTimestampType.getPrecision() == 6) {
+ TimeUnit timeunit = localZonedTimestampType.getPrecision() == 3 ?
TimeUnit.MILLIS : TimeUnit.MICROS;
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64,
repetition)
- .as(LogicalTypeAnnotation.timestampType(false, TimeUnit.MILLIS))
+ .as(LogicalTypeAnnotation.timestampType(false, timeunit))
.named(name);
} else {
- return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96,
repetition)
- .named(name);
+ throw new IllegalArgumentException(
+ "Parquet does not support TIMESTAMP_LTZ type with precision: "
+ + localZonedTimestampType.getPrecision()
Review Comment:
Revert the change to not throw exception.
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java:
##########
@@ -283,34 +291,66 @@ public void write(ArrayData array, int ordinal) {
}
}
- /**
- * Timestamp of INT96 bytes, julianDay(4) + nanosOfDay(8). See
- *
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp
- * TIMESTAMP_MILLIS and TIMESTAMP_MICROS are the deprecated ConvertedType.
- */
private class Timestamp64Writer implements FieldWriter {
- private Timestamp64Writer() {
+
+ private final int precision;
+ private Timestamp64Writer(int precision) {
+ this.precision = precision;
}
@Override
public void write(RowData row, int ordinal) {
- recordConsumer.addLong(timestampToInt64(row.getTimestamp(ordinal, 3)));
+ TimestampData timestampData = row.getTimestamp(ordinal, precision);
+ recordConsumer.addLong(timestampToInt64(timestampData, precision));
}
@Override
public void write(ArrayData array, int ordinal) {
- recordConsumer.addLong(timestampToInt64(array.getTimestamp(ordinal, 3)));
+ TimestampData timestampData = array.getTimestamp(ordinal, precision);
+ recordConsumer.addLong(timestampToInt64(timestampData, precision));
}
}
- private long timestampToInt64(TimestampData timestampData) {
- return utcTimestamp ? timestampData.getMillisecond() :
timestampData.toTimestamp().getTime();
+ /**
+ * Converts a {@code TimestampData} to its corresponding int64 value. This
function only accepts TimestampData of
+ * precision 3 or 6. Special attention will need to be given to a
TimestampData of precision = 6.
+ * <p>
+ * For example representing `1970-01-01T00:00:03.100001` of precision 6 will
have:
+ * <ul>
+ * <li>millisecond = 3100</li>
+ * <li>nanoOfMillisecond = 1000</li>
+ * </ul>
+ * As such, the int64 value will be:
+ * <p>
+ * millisecond * 1000 + nanoOfMillisecond / 1000
+ *
+ * @param timestampData TimestampData to be converted to int64 format
+ * @param precision the precision of the TimestampData
+ * @return int64 value of the TimestampData
+ */
+ private long timestampToInt64(TimestampData timestampData, int precision) {
+ if (!utcTimestamp) {
+ // toTimestamp is agnostic of precision
+ return timestampData.toTimestamp().getTime();
Review Comment:
it is not agnostic, we should also fix the longs for different time unit.
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java:
##########
@@ -600,23 +600,29 @@ private static Type convertToParquetType(
.named(name);
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) type;
- if (timestampType.getPrecision() == 3) {
+ if (timestampType.getPrecision() == 3 || timestampType.getPrecision()
== 6) {
+ TimeUnit timeunit = timestampType.getPrecision() == 3 ?
TimeUnit.MILLIS : TimeUnit.MICROS;
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64,
repetition)
- .as(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS))
+ .as(LogicalTypeAnnotation.timestampType(true, timeunit))
.named(name);
} else {
- return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96,
repetition)
- .named(name);
+ throw new IllegalArgumentException(
+ "Parquet does not support TIMESTAMP type with precision: "
+ + timestampType.getPrecision()
Review Comment:
Revert the change to not throw exception.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]