wuchong commented on a change in pull request #13834:
URL: https://github.com/apache/flink/pull/13834#discussion_r514842981
##########
File path:
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java
##########
@@ -221,12 +221,22 @@ private int convertToDate(JsonNode jsonNode) {
return (int)
Date.valueOf(jsonNode.asText()).toLocalDate().toEpochDay();
}
- private int convertToTime(JsonNode jsonNode) {
+ private CsvToRowDataConverter convertToTime(TimeType timeType) {
+ final int precision = timeType.getPrecision();
// csv currently is using Time.valueOf() to parse time string
- LocalTime localTime =
Time.valueOf(jsonNode.asText()).toLocalTime();
// TODO: FLINK-17525 support millisecond and nanosecond
// get number of milliseconds of the day
- return localTime.toSecondOfDay() * 1000;
+ return jsonNode -> {
+ LocalTime localTime =
LocalTime.parse(jsonNode.asText());
+ if (precision == 3) {
+ return localTime.toNanoOfDay() / 1000_000L;
+ } else if (precision == 0) {
+ return localTime.toSecondOfDay() * 1000L;
+ } else {
+ throw new IllegalArgumentException("Csv does
not support TIME type " +
+ "with precision: " + precision + ", it
only supports precision 0 or 3.");
+ }
+ };
Review comment:
```java
final int precision = timeType.getPrecision();
if (precision > 3) {
throw new IllegalArgumentException("CSV format does not
support TIME type " +
"with precision: " + precision + ", it only
supports precision 0 ~ 3.");
}
// get number of milliseconds of the day
return jsonNode -> {
LocalTime localTime =
LocalTime.parse(jsonNode.asText());
if (precision == 0) {
return localTime.toSecondOfDay() * 1000L;
} else {
return (int) (localTime.toNanoOfDay() /
1000_000L);
}
};
```
Throws exception in compile phase instead during runtime.
Besides, we can also support precision 0~3.
##########
File path:
flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
##########
@@ -109,6 +109,54 @@ public void testSerializeDeserialize() throws Exception {
new byte[] {107, 3, 11});
}
+ @Test
+ public void testSerializeDeserializeForTime() throws Exception {
+ testFieldForTime(
+ TIME(3),
+ "12:12:12.232",
+ "12:12:12.232",
+ (deserSchema) -> deserSchema.setNullLiteral("null"),
+ (serSchema) -> serSchema.setNullLiteral("null"),
Review comment:
Why not reuse the `testNullableField` method?
##########
File path:
flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
##########
@@ -109,6 +109,54 @@ public void testSerializeDeserialize() throws Exception {
new byte[] {107, 3, 11});
}
+ @Test
+ public void testSerializeDeserializeForTime() throws Exception {
+ testFieldForTime(
+ TIME(3),
+ "12:12:12.232",
+ "12:12:12.232",
+ (deserSchema) -> deserSchema.setNullLiteral("null"),
+ (serSchema) -> serSchema.setNullLiteral("null"),
+ ",");
+ testFieldForTime(
+ TIME(3),
+ "12:12:12.232421",
+ "12:12:12.232",
+ (deserSchema) -> deserSchema.setNullLiteral("null"),
+ (serSchema) -> serSchema.setNullLiteral("null"),
+ ",");
+ testFieldForTime(
+ TIME(3),
+ "12:12:12.23",
+ "12:12:12.23",
+ (deserSchema) -> deserSchema.setNullLiteral("null"),
+ (serSchema) -> serSchema.setNullLiteral("null"),
+ ",");
+ testFieldForTime(
+ TIME(0),
+ "12:12:12.23",
+ "12:12:12",
+ (deserSchema) -> deserSchema.setNullLiteral("null"),
+ (serSchema) -> serSchema.setNullLiteral("null"),
+ ",");
+
+ int precision = 2;
+ String expectedMessage = String.format("Csv does not support
TIME type with precision: %d, it only supports precision 0 or 3.", precision);
+ String actualMessage = null;
+ try {
+ testFieldForTime(
+ TIME(precision),
+ "12:12:12.23",
+ "12:12:12",
+ (deserSchema) ->
deserSchema.setNullLiteral("null"),
+ (serSchema) -> serSchema.setNullLiteral("null"),
+ ",");
+ } catch (Exception e) {
+ actualMessage = e.getCause().getMessage();
+ }
+ assertEquals(expectedMessage, actualMessage);
Review comment:
```suggestion
",");
fail("Exception should be thrown.");
} catch (Exception e) {
assertEquals(expectedMessage,
e.getCause().getMessage());
}
```
##########
File path:
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowDataToCsvConverters.java
##########
@@ -128,7 +128,7 @@ private static RowFieldConverter
createRowFieldConverter(LogicalType fieldType)
case DATE:
return (csvMapper, container, row, pos) ->
convertDate(row.getInt(pos), container);
case TIME_WITHOUT_TIME_ZONE:
- return (csvMapper, container, row, pos) ->
convertTime(row.getInt(pos), container);
+ return (csvMapper, container, row, pos) ->
convertTime(row.getLong(pos), container);
Review comment:
We should still use `getInt`, because we always use `int` to represent
number of milliseconds of the day in Flink SQL internally. See Javadoc of
`RowData`.
##########
File path:
flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
##########
@@ -109,6 +109,54 @@ public void testSerializeDeserialize() throws Exception {
new byte[] {107, 3, 11});
}
+ @Test
+ public void testSerializeDeserializeForTime() throws Exception {
+ testFieldForTime(
+ TIME(3),
+ "12:12:12.232",
+ "12:12:12.232",
+ (deserSchema) -> deserSchema.setNullLiteral("null"),
+ (serSchema) -> serSchema.setNullLiteral("null"),
+ ",");
+ testFieldForTime(
+ TIME(3),
+ "12:12:12.232421",
+ "12:12:12.232",
+ (deserSchema) -> deserSchema.setNullLiteral("null"),
+ (serSchema) -> serSchema.setNullLiteral("null"),
+ ",");
+ testFieldForTime(
+ TIME(3),
+ "12:12:12.23",
+ "12:12:12.23",
+ (deserSchema) -> deserSchema.setNullLiteral("null"),
+ (serSchema) -> serSchema.setNullLiteral("null"),
+ ",");
+ testFieldForTime(
+ TIME(0),
Review comment:
Add more tests for `TIME(1)`, `TIME(2)`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]