liuml07 commented on code in PR #21594:
URL: https://github.com/apache/flink/pull/21594#discussion_r1067612224
##########
docs/content/docs/connectors/table/formats/avro.md:
##########
@@ -171,6 +171,11 @@ So the following table lists the type mapping from Flink
type to Avro type.
<td><code>long</code></td>
<td><code>timestamp-millis</code></td>
</tr>
+ <tr>
+ <td><code>TIMESTAMP_LTZ(3)</code></td>
Review Comment:
nit: not sure if we want to put `TIMESTAMP_LTZ` instead of
`TIMESTAMP_LTZ(3)`? `TIMESTAMP_LTZ(1)` and `TIMESTAMP_LTZ(2)` can also use Avro
format. Also the `TIMESTAMP` field does not show precision either though it
supports 1, 2, 3 for now.
##########
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java:
##########
@@ -378,6 +383,23 @@ private Timestamp convertToTimestamp(Object object,
boolean isMicros) {
return new Timestamp(millis - LOCAL_TZ.getOffset(millis));
}
+ private LocalDateTime convertToLocalDateTime(Object object) {
+ final LocalDateTime localDateTime;
+ ZoneId zoneId = ZoneId.systemDefault();
+ final Instant localDateTimeInstant;
+ if (object instanceof Long) {
+ long localDateTimeEpochMilli = (Long) object;
+ localDateTimeInstant =
Instant.ofEpochMilli(localDateTimeEpochMilli);
+ } else if (object instanceof LocalDateTime) {
+ localDateTime = (LocalDateTime) object;
+ localDateTimeInstant = localDateTime.toInstant(ZoneOffset.UTC);
+ } else {
Review Comment:
I think the object can also be Instant, as in `convertToTimestamp`.
##########
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java:
##########
@@ -193,7 +195,8 @@ void testSpecificType() throws Exception {
ROW(
FIELD("type_timestamp_millis",
TIMESTAMP(3).notNull()),
FIELD("type_date", DATE().notNull()),
- FIELD("type_time_millis", TIME(3).notNull()))
+ FIELD("type_time_millis", TIME(3).notNull()),
+ FIELD("type_local_timestamp_millis",
TIMESTAMP(3).notNull()))
Review Comment:
nit: move this after `FIELD("type_timestamp_millis",
TIMESTAMP(3).notNull()),`? Same in other tests, as this is more similar to
`timestamp` and putting them closely make reading easier.
##########
docs/content/docs/connectors/table/formats/avro.md:
##########
@@ -171,6 +171,11 @@ So the following table lists the type mapping from Flink
type to Avro type.
<td><code>long</code></td>
<td><code>timestamp-millis</code></td>
</tr>
+ <tr>
+ <td><code>TIMESTAMP_LTZ(3)</code></td>
Review Comment:
There is also a Chinese version doc. I think we can put the same content
there as well: `docs/content.zh/docs/connectors/table/formats/avro.md`
##########
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java:
##########
@@ -363,6 +368,23 @@ public static Schema convertToSchema(LogicalType
logicalType, String rowName) {
}
Schema timestamp =
avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
return nullable ? nullableSchema(timestamp) : timestamp;
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ final LocalZonedTimestampType localZonedTimestampType =
+ (LocalZonedTimestampType) logicalType;
+ precision = localZonedTimestampType.getPrecision();
+ org.apache.avro.LogicalType avrolocalLogicalType;
+ if (precision <= 3) {
+ avrolocalLogicalType = LogicalTypes.localTimestampMillis();
+ } else {
+ throw new IllegalArgumentException(
+ "Avro does not support LocalDateTime type "
Review Comment:
nit: not sure if it's better replace `LocalDateTime` with
`TIMESTAMP_WITH_LOCAL_TIME_ZONE` in the error message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]