This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 3e0c2a1 [FLINK-24291][table-planner] Decimal precision is lost when
deserializing records from testcsv
3e0c2a1 is described below
commit 3e0c2a12d2d270162c3d8a4a77451020926293a0
Author: xuyang <[email protected]>
AuthorDate: Fri Sep 17 11:56:54 2021 +0800
[FLINK-24291][table-planner] Decimal precision is lost when deserializing
records from testcsv
This closes #17308
(cherry picked from commit 4d69f7f844725534b66e8cc9f5e64d6d10226055)
---
.../planner/runtime/FileSystemITCaseBase.scala | 54 ++++++++++++++++++++++
.../filesystem/TestRowDataCsvInputFormat.java | 40 +++++++++-------
2 files changed, 78 insertions(+), 16 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
index beb5dda..d80e5b4 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
@@ -95,6 +95,60 @@ trait FileSystemITCaseBase {
|)
""".stripMargin
)
+
+ tableEnv.executeSql(
+ s"""
+ |create table hasDecimalFieldWithPrecisionTenAndZeroTable (
+ | x decimal(10, 0), y int
+ |) with (
+ | 'connector' = 'filesystem',
+ | 'path' = '$resultPath',
+ | ${formatProperties().mkString(",\n")}
+ |)
+ """.stripMargin
+ )
+
+ tableEnv.executeSql(
+ s"""
+ |create table hasDecimalFieldWithPrecisionThreeAndTwoTable (
+ | x decimal(3, 2), y int
+ |) with (
+ | 'connector' = 'filesystem',
+ | 'path' = '$resultPath',
+ | ${formatProperties().mkString(",\n")}
+ |)
+ """.stripMargin
+ )
+ }
+
+ @Test
+ def testSelectDecimalWithPrecisionTenAndZeroFromFileSystem(): Unit={
+ tableEnv.executeSql(
+ "insert into hasDecimalFieldWithPrecisionTenAndZeroTable(x, y) " +
+ "values(cast(2113554011 as decimal(10, 0)), 1), " +
+ "(cast(2113554022 as decimal(10,0)), 2)").await()
+
+ check(
+ "select x, y from hasDecimalFieldWithPrecisionTenAndZeroTable",
+ Seq(
+ row(2113554011, 1),
+ row(2113554022, 2)
+ ))
+ }
+
+ @Test
+ def testSelectDecimalWithPrecisionThreeAndTwoFromFileSystem(): Unit={
+ tableEnv.executeSql(
+ "insert into hasDecimalFieldWithPrecisionThreeAndTwoTable(x,y) " +
+ "values(cast(1.32 as decimal(3, 2)), 1), " +
+ "(cast(2.64 as decimal(3, 2)), 2)").await()
+
+ check(
+ "select x, y from hasDecimalFieldWithPrecisionThreeAndTwoTable",
+ Seq(
+ row(1.32, 1),
+ row(2.64, 2)
+ ))
}
@Test
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestRowDataCsvInputFormat.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestRowDataCsvInputFormat.java
index 2b52188..49699ec 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestRowDataCsvInputFormat.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestRowDataCsvInputFormat.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.filesystem;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.io.RowCsvInputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
@@ -32,7 +31,9 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.util.DataFormatConverters;
-import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.flink.types.Row;
@@ -42,6 +43,8 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.stream.Collectors;
+import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+
/** The {@link InputFormat} that output {@link RowData}. */
public class TestRowDataCsvInputFormat extends FileInputFormat<RowData> {
@@ -50,7 +53,7 @@ public class TestRowDataCsvInputFormat extends
FileInputFormat<RowData> {
private final int[] selectFields;
private final long limit;
private final RowCsvInputFormat inputFormat;
- private final List<TypeInformation> fieldTypes;
+ private final List<DataType> fieldTypes;
private final List<String> fieldNames;
private final List<DataFormatConverters.DataFormatConverter>
csvSelectConverters;
private final int[] csvFieldMapping;
@@ -70,9 +73,8 @@ public class TestRowDataCsvInputFormat extends
FileInputFormat<RowData> {
this.defaultPartValue = defaultPartValue;
this.selectFields = selectFields;
this.limit = limit;
- RowTypeInfo rowType = (RowTypeInfo) schema.toRowType();
- this.fieldTypes = Arrays.asList(rowType.getFieldTypes());
- this.fieldNames = Arrays.asList(rowType.getFieldNames());
+ this.fieldTypes = Arrays.asList(schema.getFieldDataTypes());
+ this.fieldNames = Arrays.asList(schema.getFieldNames());
List<String> csvFieldNames =
fieldNames.stream()
@@ -85,20 +87,23 @@ public class TestRowDataCsvInputFormat extends
FileInputFormat<RowData> {
selectFieldNames.stream()
.filter(name -> !partitionKeys.contains(name))
.collect(Collectors.toList());
- List<TypeInformation> csvSelectTypes =
+ List<DataType> csvSelectTypes =
csvSelectFieldNames.stream()
.map(name -> fieldTypes.get(fieldNames.indexOf(name)))
.collect(Collectors.toList());
+ RowTypeInfo rowType = (RowTypeInfo) schema.toRowType();
+ TypeInformation<?>[] fieldTypeInfos = rowType.getFieldTypes();
+ TypeInformation<?>[] csvSelectTypeInfos =
+ csvSelectFieldNames.stream()
+ .map(name -> fieldTypeInfos[fieldNames.indexOf(name)])
+ .toArray(TypeInformation<?>[]::new);
this.csvSelectConverters =
csvSelectTypes.stream()
- .map(TypeConversions::fromLegacyInfoToDataType)
.map(DataFormatConverters::getConverterForDataType)
.collect(Collectors.toList());
int[] csvSelectFields =
csvSelectFieldNames.stream().mapToInt(csvFieldNames::indexOf).toArray();
- this.inputFormat =
- new RowCsvInputFormat(
- null, csvSelectTypes.toArray(new TypeInformation[0]),
csvSelectFields);
+ this.inputFormat = new RowCsvInputFormat(null, csvSelectTypeInfos,
csvSelectFields);
this.inputFormat.setFilePaths(paths);
this.csvFieldMapping =
@@ -135,15 +140,18 @@ public class TestRowDataCsvInputFormat extends
FileInputFormat<RowData> {
this.csvRow = new Row(csvSelectConverters.size());
}
- private Object convertStringToInternal(String value, TypeInformation type)
{
- if (type.equals(Types.INT)) {
+ private Object convertStringToInternal(String value, DataType dataType) {
+ final LogicalType logicalType = dataType.getLogicalType();
+ if (hasRoot(logicalType, LogicalTypeRoot.INTEGER)) {
return Integer.parseInt(value);
- } else if (type.equals(Types.LONG)) {
+ } else if (hasRoot(logicalType, LogicalTypeRoot.BIGINT)) {
return Long.parseLong(value);
- } else if (type.equals(Types.STRING)) {
+ } else if (hasRoot(logicalType, LogicalTypeRoot.CHAR)
+ || hasRoot(logicalType, LogicalTypeRoot.VARCHAR)) {
return StringData.fromString(value);
} else {
- throw new UnsupportedOperationException("Unsupported partition
type: " + type);
+ throw new UnsupportedOperationException(
+ "Unsupported partition type: " +
logicalType.getTypeRoot().name());
}
}