This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 3e0c2a1  [FLINK-24291][table-planner] Decimal precision is lost when 
deserializing records from testcsv
3e0c2a1 is described below

commit 3e0c2a12d2d270162c3d8a4a77451020926293a0
Author: xuyang <[email protected]>
AuthorDate: Fri Sep 17 11:56:54 2021 +0800

    [FLINK-24291][table-planner] Decimal precision is lost when deserializing 
records from testcsv
    
    This closes #17308
    
    (cherry picked from commit 4d69f7f844725534b66e8cc9f5e64d6d10226055)
---
 .../planner/runtime/FileSystemITCaseBase.scala     | 54 ++++++++++++++++++++++
 .../filesystem/TestRowDataCsvInputFormat.java      | 40 +++++++++-------
 2 files changed, 78 insertions(+), 16 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
index beb5dda..d80e5b4 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
@@ -95,6 +95,60 @@ trait FileSystemITCaseBase {
          |)
        """.stripMargin
     )
+
+    tableEnv.executeSql(
+      s"""
+         |create table hasDecimalFieldWithPrecisionTenAndZeroTable (
+         |  x decimal(10, 0), y int
+         |) with (
+         |  'connector' = 'filesystem',
+         |  'path' = '$resultPath',
+         |  ${formatProperties().mkString(",\n")}
+         |)
+       """.stripMargin
+    )
+
+    tableEnv.executeSql(
+      s"""
+         |create table hasDecimalFieldWithPrecisionThreeAndTwoTable (
+         |  x decimal(3, 2), y int
+         |) with (
+         |  'connector' = 'filesystem',
+         |  'path' = '$resultPath',
+         |  ${formatProperties().mkString(",\n")}
+         |)
+       """.stripMargin
+    )
+  }
+
+  @Test
+  def testSelectDecimalWithPrecisionTenAndZeroFromFileSystem(): Unit={
+    tableEnv.executeSql(
+      "insert into hasDecimalFieldWithPrecisionTenAndZeroTable(x, y) " +
+        "values(cast(2113554011 as decimal(10, 0)), 1), " +
+        "(cast(2113554022 as decimal(10,0)), 2)").await()
+
+    check(
+      "select x, y from hasDecimalFieldWithPrecisionTenAndZeroTable",
+      Seq(
+        row(2113554011, 1),
+        row(2113554022, 2)
+      ))
+  }
+
+  @Test
+  def testSelectDecimalWithPrecisionThreeAndTwoFromFileSystem(): Unit={
+    tableEnv.executeSql(
+      "insert into hasDecimalFieldWithPrecisionThreeAndTwoTable(x,y) " +
+        "values(cast(1.32 as decimal(3, 2)), 1), " +
+        "(cast(2.64 as decimal(3, 2)), 2)").await()
+
+    check(
+      "select x, y from hasDecimalFieldWithPrecisionThreeAndTwoTable",
+      Seq(
+        row(1.32, 1),
+        row(2.64, 2)
+      ))
   }
 
   @Test
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestRowDataCsvInputFormat.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestRowDataCsvInputFormat.java
index 2b52188..49699ec 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestRowDataCsvInputFormat.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestRowDataCsvInputFormat.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.filesystem;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.io.RowCsvInputFormat;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.Configuration;
@@ -32,7 +31,9 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.util.DataFormatConverters;
-import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.utils.PartitionPathUtils;
 import org.apache.flink.types.Row;
 
@@ -42,6 +43,8 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+
 /** The {@link InputFormat} that output {@link RowData}. */
 public class TestRowDataCsvInputFormat extends FileInputFormat<RowData> {
 
@@ -50,7 +53,7 @@ public class TestRowDataCsvInputFormat extends 
FileInputFormat<RowData> {
     private final int[] selectFields;
     private final long limit;
     private final RowCsvInputFormat inputFormat;
-    private final List<TypeInformation> fieldTypes;
+    private final List<DataType> fieldTypes;
     private final List<String> fieldNames;
     private final List<DataFormatConverters.DataFormatConverter> 
csvSelectConverters;
     private final int[] csvFieldMapping;
@@ -70,9 +73,8 @@ public class TestRowDataCsvInputFormat extends 
FileInputFormat<RowData> {
         this.defaultPartValue = defaultPartValue;
         this.selectFields = selectFields;
         this.limit = limit;
-        RowTypeInfo rowType = (RowTypeInfo) schema.toRowType();
-        this.fieldTypes = Arrays.asList(rowType.getFieldTypes());
-        this.fieldNames = Arrays.asList(rowType.getFieldNames());
+        this.fieldTypes = Arrays.asList(schema.getFieldDataTypes());
+        this.fieldNames = Arrays.asList(schema.getFieldNames());
 
         List<String> csvFieldNames =
                 fieldNames.stream()
@@ -85,20 +87,23 @@ public class TestRowDataCsvInputFormat extends 
FileInputFormat<RowData> {
                 selectFieldNames.stream()
                         .filter(name -> !partitionKeys.contains(name))
                         .collect(Collectors.toList());
-        List<TypeInformation> csvSelectTypes =
+        List<DataType> csvSelectTypes =
                 csvSelectFieldNames.stream()
                         .map(name -> fieldTypes.get(fieldNames.indexOf(name)))
                         .collect(Collectors.toList());
+        RowTypeInfo rowType = (RowTypeInfo) schema.toRowType();
+        TypeInformation<?>[] fieldTypeInfos = rowType.getFieldTypes();
+        TypeInformation<?>[] csvSelectTypeInfos =
+                csvSelectFieldNames.stream()
+                        .map(name -> fieldTypeInfos[fieldNames.indexOf(name)])
+                        .toArray(TypeInformation<?>[]::new);
         this.csvSelectConverters =
                 csvSelectTypes.stream()
-                        .map(TypeConversions::fromLegacyInfoToDataType)
                         .map(DataFormatConverters::getConverterForDataType)
                         .collect(Collectors.toList());
         int[] csvSelectFields =
                 
csvSelectFieldNames.stream().mapToInt(csvFieldNames::indexOf).toArray();
-        this.inputFormat =
-                new RowCsvInputFormat(
-                        null, csvSelectTypes.toArray(new TypeInformation[0]), 
csvSelectFields);
+        this.inputFormat = new RowCsvInputFormat(null, csvSelectTypeInfos, 
csvSelectFields);
         this.inputFormat.setFilePaths(paths);
 
         this.csvFieldMapping =
@@ -135,15 +140,18 @@ public class TestRowDataCsvInputFormat extends 
FileInputFormat<RowData> {
         this.csvRow = new Row(csvSelectConverters.size());
     }
 
-    private Object convertStringToInternal(String value, TypeInformation type) 
{
-        if (type.equals(Types.INT)) {
+    private Object convertStringToInternal(String value, DataType dataType) {
+        final LogicalType logicalType = dataType.getLogicalType();
+        if (hasRoot(logicalType, LogicalTypeRoot.INTEGER)) {
             return Integer.parseInt(value);
-        } else if (type.equals(Types.LONG)) {
+        } else if (hasRoot(logicalType, LogicalTypeRoot.BIGINT)) {
             return Long.parseLong(value);
-        } else if (type.equals(Types.STRING)) {
+        } else if (hasRoot(logicalType, LogicalTypeRoot.CHAR)
+                || hasRoot(logicalType, LogicalTypeRoot.VARCHAR)) {
             return StringData.fromString(value);
         } else {
-            throw new UnsupportedOperationException("Unsupported partition 
type: " + type);
+            throw new UnsupportedOperationException(
+                    "Unsupported partition type: " + 
logicalType.getTypeRoot().name());
         }
     }
 

Reply via email to