This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e1b1fc417d4a7aa4aba50875084f5cf84cc58a22
Author: Jark Wu <j...@apache.org>
AuthorDate: Tue Dec 10 23:43:03 2019 +0800

    [FLINK-15124][table] Fix types with precision defined in DDL can't be 
executed
---
 .../flink/table/sinks/CsvTableSinkFactoryBase.java |  27 ++-
 .../apache/flink/table/sources/CsvTableSource.java |  93 +++++++++--
 .../table/sources/CsvTableSourceFactoryBase.java   |   4 +-
 .../table/planner/codegen/SinkCodeGenerator.scala  |   6 +-
 .../physical/batch/BatchExecTableSourceScan.scala  |  15 +-
 .../stream/StreamExecTableSourceScan.scala         |  15 +-
 .../flink/table/planner/sinks/TableSinkUtils.scala |   5 +-
 .../table/planner/sources/TableSourceUtil.scala    | 133 ++++++++++-----
 .../table/planner/catalog/CatalogTableITCase.scala |  92 ++++++++++-
 .../table/validation/TableSinkValidationTest.scala |   8 +-
 .../runtime/types/DataTypePrecisionFixer.java      | 170 +++++++++++++++++++
 .../table/runtime/types/PlannerTypeUtils.java      |   9 +-
 .../runtime/types/DataTypePrecisionFixerTest.java  | 183 +++++++++++++++++++++
 .../runtime/types/LogicalTypeAssignableTest.java   |   2 +-
 14 files changed, 682 insertions(+), 80 deletions(-)

diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.java
index ce4958c..f530fec 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.sinks;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.descriptors.DescriptorProperties;
@@ -27,8 +28,13 @@ import 
org.apache.flink.table.descriptors.FormatDescriptorValidator;
 import org.apache.flink.table.descriptors.OldCsvValidator;
 import org.apache.flink.table.descriptors.SchemaValidator;
 import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.table.types.utils.TypeConversions;
 
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -75,6 +81,8 @@ public abstract class CsvTableSinkFactoryBase implements 
TableFactory {
                properties.add(SCHEMA + ".#." + 
DescriptorProperties.TABLE_SCHEMA_TYPE);
                properties.add(SCHEMA + ".#." + 
DescriptorProperties.TABLE_SCHEMA_DATA_TYPE);
                properties.add(SCHEMA + ".#." + 
DescriptorProperties.TABLE_SCHEMA_NAME);
+               // schema watermark
+               properties.add(SCHEMA + "." + DescriptorProperties.WATERMARK + 
".*");
                return properties;
        }
 
@@ -108,7 +116,24 @@ public abstract class CsvTableSinkFactoryBase implements 
TableFactory {
 
                CsvTableSink csvTableSink = new CsvTableSink(path, 
fieldDelimiter);
 
-               return (CsvTableSink) 
csvTableSink.configure(tableSchema.getFieldNames(), 
tableSchema.getFieldTypes());
+               // bridge to java.sql.Timestamp/Time/Date
+               TypeInformation<?>[] typeInfos = 
Arrays.stream(tableSchema.getFieldDataTypes())
+                       .map(dt -> {
+                               switch (dt.getLogicalType().getTypeRoot()) {
+                                       case TIMESTAMP_WITHOUT_TIME_ZONE:
+                                               return 
dt.bridgedTo(Timestamp.class);
+                                       case TIME_WITHOUT_TIME_ZONE:
+                                               return dt.bridgedTo(Time.class);
+                                       case DATE:
+                                               return dt.bridgedTo(Date.class);
+                                       default:
+                                               return dt;
+                               }
+                       })
+                       .map(TypeConversions::fromDataTypeToLegacyInfo)
+                       .toArray(TypeInformation[]::new);
+
+               return (CsvTableSink) 
csvTableSink.configure(tableSchema.getFieldNames(), typeInfos);
        }
 
 }
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
index becf4de..ec12b20 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
@@ -28,13 +28,19 @@ import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.functions.AsyncTableFunction;
 import org.apache.flink.table.functions.FunctionContext;
 import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.Row;
 
 import java.io.Serializable;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -44,6 +50,8 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.stream.IntStream;
 
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
+
 /**
  * A {@link StreamTableSource} and {@link BatchTableSource} for simple CSV 
files with a
  * (logically) unlimited number of fields.
@@ -129,7 +137,7 @@ public class CsvTableSource
                        String ignoreComments,
                        boolean lenient) {
                this(new CsvInputFormatConfig(
-                               path, fieldNames, fieldTypes, selectedFields,
+                               path, fieldNames, 
TypeConversions.fromLegacyInfoToDataType(fieldTypes), selectedFields,
                                fieldDelim, lineDelim, quoteCharacter, 
ignoreFirstLine,
                                ignoreComments, lenient, false));
        }
@@ -155,13 +163,23 @@ public class CsvTableSource
        }
 
        @Override
-       public TypeInformation<Row> getReturnType() {
-               return new RowTypeInfo(config.getSelectedFieldTypes(), 
config.getSelectedFieldNames());
+       public DataType getProducedDataType() {
+               return TableSchema.builder()
+                       .fields(config.getSelectedFieldNames(), 
config.getSelectedFieldDataTypes())
+                       .build()
+                       .toRowDataType();
+       }
+
+       @SuppressWarnings("unchecked")
+       private TypeInformation<Row> getProducedTypeInformation() {
+               return (TypeInformation<Row>) 
fromDataTypeToLegacyInfo(getProducedDataType());
        }
 
        @Override
        public TableSchema getTableSchema() {
-               return new TableSchema(config.fieldNames, config.fieldTypes);
+               return TableSchema.builder()
+                       .fields(config.fieldNames, config.fieldTypes)
+                       .build();
        }
 
        @Override
@@ -179,12 +197,16 @@ public class CsvTableSource
 
        @Override
        public DataStream<Row> getDataStream(StreamExecutionEnvironment 
execEnv) {
-               return execEnv.createInput(config.createInputFormat(), 
getReturnType()).name(explainSource());
+               return execEnv
+                       .createInput(config.createInputFormat(), 
getProducedTypeInformation())
+                       .name(explainSource());
        }
 
        @Override
        public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
-               return execEnv.createInput(config.createInputFormat(), 
getReturnType()).name(explainSource());
+               return execEnv
+                       .createInput(config.createInputFormat(), 
getProducedTypeInformation())
+                       .name(explainSource());
        }
 
        @Override
@@ -229,7 +251,7 @@ public class CsvTableSource
         * A builder for creating CsvTableSource instances.
         */
        public static class Builder {
-               private LinkedHashMap<String, TypeInformation<?>> schema = new 
LinkedHashMap<>();
+               private LinkedHashMap<String, DataType> schema = new 
LinkedHashMap<>();
                private Character quoteCharacter;
                private String path;
                private String fieldDelim = 
CsvInputFormat.DEFAULT_FIELD_DELIMITER;
@@ -270,22 +292,55 @@ public class CsvTableSource
                }
 
                /**
-                * Adds a field with the field name and the type information. 
Required. This method can be
+                * Adds a field with the field name and the data type. 
Required. This method can be
                 * called multiple times. The call order of this method defines 
also the order of the fields
                 * in a row.
                 *
                 * @param fieldName the field name
-                * @param fieldType the type information of the field
+                * @param fieldType the data type of the field
                 */
-               public Builder field(String fieldName, TypeInformation<?> 
fieldType) {
+               public Builder field(String fieldName, DataType fieldType) {
                        if (schema.containsKey(fieldName)) {
                                throw new IllegalArgumentException("Duplicate 
field name " + fieldName);
                        }
-                       schema.put(fieldName, fieldType);
+                       // CSV only support java.sql.Timestamp/Date/Time
+                       DataType type;
+                       switch (fieldType.getLogicalType().getTypeRoot()) {
+                               case TIMESTAMP_WITHOUT_TIME_ZONE:
+                                       type = 
fieldType.bridgedTo(Timestamp.class);
+                                       break;
+                               case TIME_WITHOUT_TIME_ZONE:
+                                       type = fieldType.bridgedTo(Time.class);
+                                       break;
+                               case DATE:
+                                       type = fieldType.bridgedTo(Date.class);
+                                       break;
+                               default:
+                                       type = fieldType;
+                       }
+                       schema.put(fieldName, type);
                        return this;
                }
 
                /**
+                * Adds a field with the field name and the type information. 
Required. This method can be
+                * called multiple times. The call order of this method defines 
also the order of the fields
+                * in a row.
+                *
+                * @param fieldName the field name
+                * @param fieldType the type information of the field
+                * @deprecated This method will be removed in future versions 
as it uses the old type system. It
+                *                 is recommended to use {@link #field(String, 
DataType)} instead which uses the new type
+                *                 system based on {@link DataTypes}. Please 
make sure to use either the old or the new
+                *                 type system consistently to avoid unintended 
behavior. See the website documentation
+                *                 for more information.
+                */
+               @Deprecated
+               public Builder field(String fieldName, TypeInformation<?> 
fieldType) {
+                       return field(fieldName, 
TypeConversions.fromLegacyInfoToDataType(fieldType));
+               }
+
+               /**
                 * Sets a quote character for String values, null by default.
                 *
                 * @param quote the quote character
@@ -345,7 +400,7 @@ public class CsvTableSource
                        return new CsvTableSource(new CsvInputFormatConfig(
                                        path,
                                        schema.keySet().toArray(new String[0]),
-                                       schema.values().toArray(new 
TypeInformation<?>[0]),
+                                       schema.values().toArray(new 
DataType[0]),
                                        IntStream.range(0, 
schema.values().size()).toArray(),
                                        fieldDelim,
                                        lineDelim,
@@ -459,7 +514,7 @@ public class CsvTableSource
 
                private final String path;
                private final String[] fieldNames;
-               private final TypeInformation<?>[] fieldTypes;
+               private final DataType[] fieldTypes;
                private final int[] selectedFields;
 
                private final String fieldDelim;
@@ -473,7 +528,7 @@ public class CsvTableSource
                CsvInputFormatConfig(
                        String path,
                        String[] fieldNames,
-                       TypeInformation<?>[] fieldTypes,
+                       DataType[] fieldTypes,
                        int[] selectedFields,
                        String fieldDelim,
                        String lineDelim,
@@ -504,14 +559,20 @@ public class CsvTableSource
                        return selectedFieldNames;
                }
 
-               TypeInformation<?>[] getSelectedFieldTypes() {
-                       TypeInformation<?>[] selectedFieldTypes = new 
TypeInformation<?>[selectedFields.length];
+               DataType[] getSelectedFieldDataTypes() {
+                       DataType[] selectedFieldTypes = new 
DataType[selectedFields.length];
                        for (int i = 0; i < selectedFields.length; i++) {
                                selectedFieldTypes[i] = 
fieldTypes[selectedFields[i]];
                        }
                        return selectedFieldTypes;
                }
 
+               TypeInformation<?>[] getSelectedFieldTypes() {
+                       return Arrays.stream(getSelectedFieldDataTypes())
+                               .map(TypeConversions::fromDataTypeToLegacyInfo)
+                               .toArray(TypeInformation[]::new);
+               }
+
                RowCsvInputFormat createInputFormat() {
                        RowCsvInputFormat inputFormat = new RowCsvInputFormat(
                                new Path(path),
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java
index 386c489..818f9d2 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java
@@ -85,6 +85,8 @@ public abstract class CsvTableSourceFactoryBase implements 
TableFactory {
                properties.add(SCHEMA + ".#." + 
DescriptorProperties.TABLE_SCHEMA_TYPE);
                properties.add(SCHEMA + ".#." + 
DescriptorProperties.TABLE_SCHEMA_DATA_TYPE);
                properties.add(SCHEMA + ".#." + 
DescriptorProperties.TABLE_SCHEMA_NAME);
+               // schema watermark
+               properties.add(SCHEMA + "." + DescriptorProperties.WATERMARK + 
".*");
                return properties;
        }
 
@@ -122,7 +124,7 @@ public abstract class CsvTableSourceFactoryBase implements 
TableFactory {
                
params.getOptionalString(FORMAT_LINE_DELIMITER).ifPresent(csvTableSourceBuilder::lineDelimiter);
 
                for (int i = 0; i < tableSchema.getFieldCount(); ++i) {
-                       
csvTableSourceBuilder.field(tableSchema.getFieldNames()[i], 
tableSchema.getFieldTypes()[i]);
+                       
csvTableSourceBuilder.field(tableSchema.getFieldNames()[i], 
tableSchema.getFieldDataTypes()[i]);
                }
                
params.getOptionalCharacter(FORMAT_QUOTE_CHARACTER).ifPresent(csvTableSourceBuilder::quoteCharacter);
                
params.getOptionalString(FORMAT_COMMENT_PREFIX).ifPresent(csvTableSourceBuilder::commentPrefix);
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
index 21ea4f6..9f1a810 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
@@ -31,6 +31,7 @@ import org.apache.flink.table.dataformat.{BaseRow, GenericRow}
 import org.apache.flink.table.planner.codegen.CodeGenUtils.genToExternal
 import 
org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateCollect
 import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
+import org.apache.flink.table.runtime.types.PlannerTypeUtils
 import 
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
 import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
 import org.apache.flink.table.sinks.TableSink
@@ -223,7 +224,10 @@ object SinkCodeGenerator {
           case (fieldTypeInfo, i) =>
             val requestedTypeInfo = tt.getTypeAt(i)
             validateFieldType(requestedTypeInfo)
-            if (!areTypesCompatible(
+            // it's safe to be only assignable, because the conversion from 
internal type (Decimal)
+            // to external type (BigDecimal) doesn't loose precision, the 
internal type already
+            // matches to the expected type defined in DDL.
+            if (!PlannerTypeUtils.isAssignable(
               fromTypeInfoToLogicalType(fieldTypeInfo),
               fromTypeInfoToLogicalType(requestedTypeInfo)) &&
               !requestedTypeInfo.isInstanceOf[GenericTypeInfo[Object]]) {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
index 1793616..61080dc 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.physical.batch
 
+
 import org.apache.flink.api.common.io.InputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.dag.Transformation
@@ -27,6 +28,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.codegen.CodeGeneratorContext
 import org.apache.flink.table.planner.delegation.BatchPlanner
 import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
@@ -90,9 +92,10 @@ class BatchExecTableSourceScan(
     val config = planner.getTableConfig
     val inputTransform = getSourceTransformation(planner.getExecEnv)
 
+    val rowType = 
FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType)
     val fieldIndexes = TableSourceUtil.computeIndexMapping(
       tableSource,
-      tableSourceTable.getRowType,
+      rowType,
       tableSourceTable.isStreamingMode)
 
     val inputDataType = inputTransform.getOutputType
@@ -114,11 +117,17 @@ class BatchExecTableSourceScan(
       planner.getRelBuilder
     )
     if (needInternalConversion) {
+      // the produced type may not carry the correct precision user defined in 
DDL, because
+      // it may be converted from legacy type. Fix precision using logical 
schema from DDL.
+      // code generation requires the correct precision of input fields.
+      val fixedProducedDataType = 
TableSourceUtil.fixPrecisionForProducedDataType(
+        tableSource,
+        rowType)
       ScanUtil.convertToInternalRow(
         CodeGeneratorContext(config),
         inputTransform.asInstanceOf[Transformation[Any]],
         fieldIndexes,
-        producedDataType,
+        fixedProducedDataType,
         getRowType,
         getTable.getQualifiedName,
         config,
@@ -132,7 +141,7 @@ class BatchExecTableSourceScan(
   def needInternalConversion: Boolean = {
     val fieldIndexes = TableSourceUtil.computeIndexMapping(
       tableSource,
-      tableSourceTable.getRowType,
+      FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType),
       tableSourceTable.isStreamingMode)
     ScanUtil.hasTimeAttributeField(fieldIndexes) ||
       ScanUtil.needsConversion(tableSource.getProducedDataType)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
index 67c46f0..dbf5bfd 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.table.api.{DataTypes, TableException}
 import 
org.apache.flink.table.dataformat.DataFormatConverters.DataFormatConverter
 import org.apache.flink.table.dataformat.{BaseRow, DataFormatConverters}
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.codegen.CodeGeneratorContext
 import org.apache.flink.table.planner.codegen.OperatorCodeGenerator._
 import org.apache.flink.table.planner.delegation.StreamPlanner
@@ -43,6 +44,7 @@ import 
org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, P
 import org.apache.flink.table.sources.{RowtimeAttributeDescriptor, 
StreamTableSource}
 import org.apache.flink.table.types.{DataType, FieldsDataType}
 import org.apache.flink.types.Row
+
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.metadata.RelMetadataQuery
@@ -100,9 +102,10 @@ class StreamExecTableSourceScan(
     val config = planner.getTableConfig
     val inputTransform = getSourceTransformation(planner.getExecEnv)
 
+    val rowType = 
FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType)
     val fieldIndexes = TableSourceUtil.computeIndexMapping(
       tableSource,
-      tableSourceTable.getRowType,
+      rowType,
       tableSourceTable.isStreamingMode)
 
     val inputDataType = inputTransform.getOutputType
@@ -134,11 +137,17 @@ class StreamExecTableSourceScan(
         }
       val ctx = CodeGeneratorContext(config).setOperatorBaseClass(
         classOf[AbstractProcessStreamOperator[BaseRow]])
+      // the produced type may not carry the correct precision user defined in 
DDL, because
+      // it may be converted from legacy type. Fix precision using logical 
schema from DDL.
+      // Code generation requires the correct precision of input fields.
+      val fixedProducedDataType = 
TableSourceUtil.fixPrecisionForProducedDataType(
+        tableSource,
+        rowType)
       val conversionTransform = ScanUtil.convertToInternalRow(
         ctx,
         inputTransform.asInstanceOf[Transformation[Any]],
         fieldIndexes,
-        producedDataType,
+        fixedProducedDataType,
         getRowType,
         getTable.getQualifiedName,
         config,
@@ -182,7 +191,7 @@ class StreamExecTableSourceScan(
   private def needInternalConversion: Boolean = {
     val fieldIndexes = TableSourceUtil.computeIndexMapping(
       tableSource,
-      tableSourceTable.getRowType,
+      FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType),
       tableSourceTable.isStreamingMode)
     ScanUtil.hasTimeAttributeField(fieldIndexes) ||
       ScanUtil.needsConversion(tableSource.getProducedDataType)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
index 8d8a578..7ea93a7 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
@@ -56,7 +56,10 @@ object TableSinkUtils {
     if (srcLogicalTypes.length != sinkLogicalTypes.length ||
       srcLogicalTypes.zip(sinkLogicalTypes).exists {
         case (srcType, sinkType) =>
-          !PlannerTypeUtils.isInteroperable(srcType, sinkType)
+          // it's safe to be only assignable, because the conversion from 
internal type (Decimal)
+          // to external type (BigDecimal) doesn't loose precision, the 
internal type already
+          // matches to the expected type defined in DDL.
+          !PlannerTypeUtils.isAssignable(srcType, sinkType)
       }) {
 
       val srcFieldNames = query.getTableSchema.getFieldNames
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
index 49e96ea..89ef980 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
@@ -18,23 +18,20 @@
 
 package org.apache.flink.table.planner.sources
 
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.table.api.{DataTypes, TableSchema, 
ValidationException, WatermarkSpec}
 import 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, 
valueLiteral}
 import org.apache.flink.table.expressions.{CallExpression, ResolvedExpression, 
ResolvedFieldReference}
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.expressions.converter.ExpressionConverter
-import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter
 import org.apache.flink.table.runtime.types.PlannerTypeUtils.isAssignable
-import 
org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
-import 
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
+import 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
+import 
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo
+import org.apache.flink.table.runtime.types.DataTypePrecisionFixer
 import org.apache.flink.table.sources.{DefinedFieldMapping, 
DefinedProctimeAttribute, DefinedRowtimeAttributes, RowtimeAttributeDescriptor, 
TableSource}
 import org.apache.flink.table.types.DataType
-import org.apache.flink.table.types.logical.{LogicalType, TimestampKind, 
TimestampType, TinyIntType}
+import org.apache.flink.table.types.logical.{LogicalType, LogicalTypeRoot, 
RowType, TimestampKind, TimestampType, TinyIntType}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
-
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.RelOptCluster
 import org.apache.calcite.rel.RelNode
@@ -42,10 +39,12 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.logical.LogicalValues
 import org.apache.calcite.rex.{RexLiteral, RexNode}
 import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.table.types.logical.RowType.RowField
 
 import java.sql.Timestamp
 
 import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 /** Util class for [[TableSource]]. */
 object TableSourceUtil {
@@ -63,7 +62,7 @@ object TableSourceUtil {
     */
   def computeIndexMapping(
       tableSource: TableSource[_],
-      rowType: RelDataType,
+      rowType: RowType,
       isStreamTable: Boolean): Array[Int] = {
 
     // get rowtime and proctime attributes
@@ -72,8 +71,8 @@ object TableSourceUtil {
     // compute mapping of selected fields and time attributes
     val names = rowType.getFieldNames.toArray
     val fieldTypes = rowType
-      .getFieldList
-      .map(f => FlinkTypeFactory.toLogicalType(f.getType))
+      .getFields
+      .map(_.getType)
       .toArray
     val mapping: Array[Int] = fieldTypes.zip(names).map {
       case (_: TimestampType, name: String)
@@ -100,18 +99,20 @@ object TableSourceUtil {
           throw new ValidationException(s"Rowtime field '$name' has invalid 
type $t. " +
             s"Rowtime attributes must be of TimestampType.")
         }
-        val (physicalName, idx, tpe) = resolveInputField(name, tableSource)
+        val (physicalName, idx, logicalType) = resolveInputField(name, 
tableSource)
         // validate that mapped fields are are same type
-        if (!isAssignable(fromTypeInfoToLogicalType(tpe), t)) {
+        if (!isAssignable(logicalType, t)) {
           throw new ValidationException(s"Type $t of table field '$name' does 
not " +
-            s"match with type $tpe of the field '$physicalName' of the 
TableSource return type.")
+            s"match with type $logicalType of the field '$physicalName' of the 
" +
+            "TableSource return type.")
         }
         idx
     }
-    val inputType = fromDataTypeToTypeInfo(tableSource.getProducedDataType)
+
+    val inputType = fromDataTypeToLogicalType(tableSource.getProducedDataType)
 
     // ensure that only one field is mapped to an atomic type
-    if (!inputType.isInstanceOf[CompositeType[_]] && mapping.count(_ >= 0) > 
1) {
+    if (!(inputType.getTypeRoot == LogicalTypeRoot.ROW) && mapping.count(_ >= 
0) > 1) {
       throw new ValidationException(
         s"More than one table field matched to atomic input type $inputType.")
     }
@@ -119,6 +120,62 @@ object TableSourceUtil {
     mapping
   }
 
+
+  /**
+    * Fixes the precision of [[TableSource#getProducedDataType()]] with the 
given logical schema
+    * type. The precision of producedDataType may lose, because the data type 
may comes from
+    * legacy type (e.g. Types.BIG_DEC). However, the precision is important to 
convert output of
+    * source to internal row.
+    *
+    * @param tableSource the table source
+    * @param logicalSchema the logical schema from DDL which carries the 
correct precisions
+    * @return the produced data type with correct precisions.
+    */
+  def fixPrecisionForProducedDataType(
+      tableSource: TableSource[_],
+      logicalSchema: RowType): DataType = {
+
+    // remove proctime field from logical schema, because proctime is not in 
produced data type
+    val schemaWithoutProctime = getProctimeAttribute(tableSource) match {
+      case Some(proctime) =>
+        val fields = logicalSchema
+          .getFields
+          .filter(f => !f.getName.equals(proctime))
+          .asJava
+        new RowType(logicalSchema.isNullable, fields)
+
+      case None => logicalSchema
+    }
+
+    // get the corresponding logical type according to the layout of source 
data type
+    val sourceLogicalType = 
fromDataTypeToLogicalType(tableSource.getProducedDataType)
+    def mapping(physicalName: String): String = tableSource match {
+      case ts: DefinedFieldMapping if ts.getFieldMapping != null =>
+        // revert key and value, mapping from physical field to logical field
+        val map = ts.getFieldMapping.toMap.map(_.swap)
+        map(physicalName)
+      case _ =>
+        physicalName
+    }
+    val correspondingLogicalType = sourceLogicalType match {
+      case outType: RowType =>
+        val fieldsDataType = schemaWithoutProctime.getFields.map(f => 
(f.getName, f.getType)).toMap
+        val fields = outType.getFieldNames.map(n =>
+          new RowField(n, fieldsDataType(mapping(n)))).asJava
+        new RowType(schemaWithoutProctime.isNullable, fields)
+
+      case _ =>
+        // atomic output type
+        // get the first type of logical schema list, there must be only one 
type in the list
+        schemaWithoutProctime.getFields.get(0).getType
+    }
+
+    // fixing the precision
+    tableSource
+      .getProducedDataType
+      .accept(new DataTypePrecisionFixer(correspondingLogicalType))
+  }
+
   /**
     * Returns schema of the selected fields of the given [[TableSource]].
     *
@@ -139,7 +196,7 @@ object TableSourceUtil {
 
     val fieldNames = fieldNameArray
     var fieldTypes = fieldDataTypeArray
-      .map(LogicalTypeDataTypeConverter.fromDataTypeToLogicalType)
+      .map(fromDataTypeToLogicalType)
 
     if (streaming) {
       // adjust the type of time attributes for streaming tables
@@ -184,7 +241,7 @@ object TableSourceUtil {
 
     val fieldNames = fieldNameArray
     var fieldTypes = fieldDataTypeArray
-      .map(LogicalTypeDataTypeConverter.fromDataTypeToLogicalType)
+      .map(fromDataTypeToLogicalType)
 
     // patch rowtime field according to WatermarkSpec
     fieldTypes = if (streaming) {
@@ -236,7 +293,7 @@ object TableSourceUtil {
       getSourceRowType(typeFactory, fieldNames, fieldDataTypes, 
tableSource.get,
         streaming)
     } else {
-      val fieldTypes = 
fieldDataTypes.map(LogicalTypeDataTypeConverter.fromDataTypeToLogicalType)
+      val fieldTypes = fieldDataTypes.map(fromDataTypeToLogicalType)
       typeFactory.buildRelNodeRowType(fieldNames, fieldTypes)
     }
   }
@@ -296,11 +353,10 @@ object TableSourceUtil {
       * Creates a RelNode with a schema that corresponds on the given fields
       * Fields for which no information is available, will have default values.
       */
-    def createSchemaRelNode(fields: Array[(String, Int, TypeInformation[_])]): 
RelNode = {
+    def createSchemaRelNode(fields: Array[(String, Int, LogicalType)]): 
RelNode = {
       val maxIdx = fields.map(_._2).max
       val idxMap: Map[Int, (String, LogicalType)] = Map(
-        fields.map(f => f._2 ->(f._1,
-            fromTypeInfoToLogicalType(f._3))): _*)
+        fields.map(f => f._2 ->(f._1, f._3)): _*)
       val (physicalFields, physicalTypes) = (0 to maxIdx)
         .map(i => idxMap.getOrElse(i, ("", new TinyIntType()))).unzip
       val physicalSchema: RelDataType = typeFactory.buildRelNodeRowType(
@@ -322,7 +378,11 @@ object TableSourceUtil {
           // push an empty values node with the physical schema on the 
relbuilder
           relBuilder.push(createSchemaRelNode(resolvedFields))
           // get extraction expression
-          resolvedFields.map(f => new ResolvedFieldReference(f._1, f._3, f._2))
+          resolvedFields.map(
+            f => new ResolvedFieldReference(
+              f._1,
+              fromLogicalTypeToTypeInfo(f._3),
+              f._2))
         } else {
           new Array[ResolvedFieldReference](0)
         }
@@ -371,30 +431,30 @@ object TableSourceUtil {
     *
     * @param fieldName The logical field to look up.
     * @param tableSource The table source in which to look for the field.
-    * @return The name, index, and type information of the physical field.
+    * @return The name, index, and logical type of the physical field.
     */
   private def resolveInputField(
       fieldName: String,
-      tableSource: TableSource[_]): (String, Int, TypeInformation[_]) = {
+      tableSource: TableSource[_]): (String, Int, LogicalType) = {
 
-    val returnType = fromDataTypeToTypeInfo(tableSource.getProducedDataType)
+    val returnType = fromDataTypeToLogicalType(tableSource.getProducedDataType)
 
     /** Look up a field by name in a type information */
-    def lookupField(fieldName: String, failMsg: String): (String, Int, 
TypeInformation[_]) = {
-      returnType match {
+    def lookupField(fieldName: String, failMsg: String): (String, Int, 
LogicalType) = {
 
-        case c: CompositeType[_] =>
+      returnType match {
+        case rt: RowType =>
           // get and check field index
-          val idx = c.getFieldIndex(fieldName)
+          val idx = rt.getFieldIndex(fieldName)
           if (idx < 0) {
             throw new ValidationException(failMsg)
           }
-          // return field name, index, and field type
-          (fieldName, idx, c.getTypeAt(idx))
 
-        case t: TypeInformation[_] =>
+          // return field name, index, and field type
+          (fieldName, idx, rt.getTypeAt(idx))
+        case _ =>
           // no composite type, we return the full atomic type as field
-          (fieldName, 0, t)
+          (fieldName, 0, returnType)
       }
     }
 
@@ -423,17 +483,16 @@ object TableSourceUtil {
   }
 
   /**
-    * Identifies the physical fields in the return type
-    * [[org.apache.flink.api.common.typeinfo.TypeInformation]] of a 
[[TableSource]]
+    * Identifies the physical fields in the return type of a [[TableSource]]
     * for a list of field names of the [[TableSource]]'s 
[[org.apache.flink.table.api.TableSchema]].
     *
     * @param fieldNames The field names to look up.
     * @param tableSource The table source in which to look for the field.
-    * @return The name, index, and type information of the physical field.
+    * @return The name, index, and logical type of the physical field.
     */
   private def resolveInputFields(
       fieldNames: Array[String],
-      tableSource: TableSource[_]): Array[(String, Int, TypeInformation[_])] = 
{
+      tableSource: TableSource[_]): Array[(String, Int, LogicalType)] = {
     fieldNames.map(resolveInputField(_, tableSource))
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
index 5b39bcb..050fc76 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
@@ -26,19 +26,26 @@ import 
org.apache.flink.table.planner.expressions.utils.Func0
 import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc0
 import org.apache.flink.table.planner.utils.DateTimeTestUtil.localDateTime
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
+import org.apache.flink.util.FileUtils
+
 import org.junit.Assert.{assertEquals, fail}
 import org.junit.rules.ExpectedException
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.{Before, Ignore, Rule, Test}
+
+import java.io.File
 import java.util
+import java.math.{BigDecimal => JBigDecimal}
+import java.net.URI
 
 import scala.collection.JavaConversions._
 
 /** Test cases for catalog table. */
 @RunWith(classOf[Parameterized])
-class CatalogTableITCase(isStreamingMode: Boolean) {
+class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase {
   //~ Instance fields --------------------------------------------------------
 
   private val settings = if (isStreamingMode) {
@@ -135,11 +142,11 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
   @Test
   def testInsertInto(): Unit = {
     val sourceData = List(
-      toRow(1, "1000", 2),
-      toRow(2, "1", 3),
-      toRow(3, "2000", 4),
-      toRow(1, "2", 2),
-      toRow(2, "3000", 3)
+      toRow(1, "1000", 2, new JBigDecimal("10.001")),
+      toRow(2, "1", 3, new JBigDecimal("10.001")),
+      toRow(3, "2000", 4, new JBigDecimal("10.001")),
+      toRow(1, "2", 2, new JBigDecimal("10.001")),
+      toRow(2, "3000", 3, new JBigDecimal("10.001"))
     )
     TestCollectionTableFactory.initData(sourceData)
     val sourceDDL =
@@ -147,7 +154,8 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |create table t1(
         |  a int,
         |  b varchar,
-        |  c int
+        |  c int,
+        |  d DECIMAL(10, 3)
         |) with (
         |  'connector' = 'COLLECTION'
         |)
@@ -157,7 +165,8 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |create table t2(
         |  a int,
         |  b varchar,
-        |  c int
+        |  c int,
+        |  d DECIMAL(10, 3)
         |) with (
         |  'connector' = 'COLLECTION'
         |)
@@ -165,7 +174,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
     val query =
       """
         |insert into t2
-        |select t1.a, t1.b, (t1.a + 1) as c from t1
+        |select t1.a, t1.b, (t1.a + 1) as c , d from t1
       """.stripMargin
     tableEnv.sqlUpdate(sourceDDL)
     tableEnv.sqlUpdate(sinkDDL)
@@ -175,6 +184,71 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
   }
 
   @Test
+  def testReadWriteCsvUsingDDL(): Unit = {
+    val csvRecords = Seq(
+      "2.02,Euro,2019-12-12 00:00:01.001001",
+      "1.11,US Dollar,2019-12-12 00:00:02.002001",
+      "50,Yen,2019-12-12 00:00:04.004001",
+      "3.1,Euro,2019-12-12 00:00:05.005001",
+      "5.33,US Dollar,2019-12-12 00:00:06.006001"
+    )
+    val tempFilePath = createTempFile(
+      "csv-order-test",
+      csvRecords.mkString("#"))
+    val sourceDDL =
+      s"""
+       |CREATE TABLE T1 (
+       |  price DECIMAL(10, 2),
+       |  currency STRING,
+       |  ts TIMESTAMP(3),
+       |  WATERMARK FOR ts AS ts
+       |) WITH (
+       |  'connector.type' = 'filesystem',
+       |  'connector.path' = '$tempFilePath',
+       |  'format.type' = 'csv',
+       |  'format.field-delimiter' = ',',
+       |  'format.line-delimiter' = '#'
+       |)
+     """.stripMargin
+    tableEnv.sqlUpdate(sourceDDL)
+
+    val sinkFilePath = getTempFilePath("csv-order-sink")
+    val sinkDDL =
+      s"""
+        |CREATE TABLE T2 (
+        |  window_end TIMESTAMP(3),
+        |  max_ts TIMESTAMP(6),
+        |  counter BIGINT,
+        |  total_price DECIMAL(10, 2)
+        |) with (
+        |  'connector.type' = 'filesystem',
+        |  'connector.path' = '$sinkFilePath',
+        |  'format.type' = 'csv',
+        |  'format.field-delimiter' = ','
+        |)
+      """.stripMargin
+    tableEnv.sqlUpdate(sinkDDL)
+
+    val query =
+      """
+        |INSERT INTO T2
+        |SELECT
+        |  TUMBLE_END(ts, INTERVAL '5' SECOND),
+        |  MAX(ts),
+        |  COUNT(*),
+        |  MAX(price)
+        |FROM T1
+        |GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)
+      """.stripMargin
+    tableEnv.sqlUpdate(query)
+    execJob("testJob")
+
+    val expected =
+      "2019-12-12 00:00:05.0,2019-12-12 00:00:04.004,3,50.00\n" +
+      "2019-12-12 00:00:10.0,2019-12-12 00:00:06.006,2,5.33\n"
+    assertEquals(expected, FileUtils.readFileUtf8(new File(new 
URI(sinkFilePath))))
+  }
+  @Test
   def testInsertSourceTableExpressionFields(): Unit = {
     val sourceData = List(
       toRow(1, "1000"),
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TableSinkValidationTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TableSinkValidationTest.scala
index 27b0269..21c3ea3 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TableSinkValidationTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TableSinkValidationTest.scala
@@ -91,21 +91,21 @@ class TableSinkValidationTest extends TableTestBase {
     expectedException.expectMessage(
       "Field types of query result and registered TableSink 
`default_catalog`." +
       "`default_database`.`testSink` do not match.\n" +
-      "Query result schema: [a: INT, b: BIGINT, c: STRING, d: DECIMAL(10, 
8)]\n" +
-      "TableSink schema:    [a: INT, b: BIGINT, c: STRING, d: DECIMAL(10, 7)]")
+      "Query result schema: [a: INT, b: BIGINT, c: STRING, d: BIGINT]\n" +
+      "TableSink schema:    [a: INT, b: BIGINT, c: STRING, d: INT]")
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
 
     val sourceTable = env.fromCollection(TestData.tupleData3).toTable(tEnv, 
'a, 'b, 'c)
     tEnv.registerTable("source", sourceTable)
-    val resultTable = tEnv.sqlQuery("select a, b, c, cast(b as decimal(10, 8)) 
as d from source")
+    val resultTable = tEnv.sqlQuery("select a, b, c, b as d from source")
 
     val sinkSchema = TableSchema.builder()
       .field("a", DataTypes.INT())
       .field("b", DataTypes.BIGINT())
       .field("c", DataTypes.STRING())
-      .field("d", DataTypes.DECIMAL(10, 7))
+      .field("d", DataTypes.INT())
       .build()
     val sink = new DataTypeOutputFormatTableSink(sinkSchema)
     tEnv.registerTableSink("testSink", sink)
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixer.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixer.java
new file mode 100644
index 0000000..c2674a6
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixer.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.types;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.DataTypeVisitor;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+
+import java.util.Map;
+
+/**
+ * The data type visitor used to fix the precision for data type with the 
given logical type
+ * which carries the correct precisions. The original data type may loses 
precision because
+ * of conversion from {@link 
org.apache.flink.api.common.typeinfo.TypeInformation}.
+ */
+public final class DataTypePrecisionFixer implements DataTypeVisitor<DataType> 
{
+
+       private final LogicalType logicalType;
+
+       /**
+        * Creates a new instance with the given logical type.
+        * @param logicalType the logical type which carries the correct 
precisions.
+        */
+       public DataTypePrecisionFixer(LogicalType logicalType) {
+               this.logicalType = logicalType;
+       }
+
+       @Override
+       public DataType visit(AtomicDataType dataType) {
+               switch (logicalType.getTypeRoot()) {
+                       case DECIMAL:
+                               DecimalType decimalType = (DecimalType) 
logicalType;
+                               return DataTypes
+                                       // fix the precision and scale, because 
precision may lose or not correct.
+                                       // precision from DDL is the only 
source of truth.
+                                       // we don't care about nullability for 
now.
+                                       .DECIMAL(decimalType.getPrecision(), 
decimalType.getScale())
+                                       // only keep the original conversion 
class
+                                       
.bridgedTo(dataType.getConversionClass());
+
+                       case TIMESTAMP_WITHOUT_TIME_ZONE :
+                               TimestampType timestampType = (TimestampType) 
logicalType;
+                               if (timestampType.getKind() == 
TimestampKind.REGULAR) {
+                                       return DataTypes
+                                               
.TIMESTAMP(timestampType.getPrecision())
+                                               
.bridgedTo(dataType.getConversionClass());
+                               } else {
+                                       // keep the original type if it is time 
attribute type
+                                       // because time attribute can only be 
precision 3
+                                       // and the original type may be BIGINT.
+                                       return dataType;
+                               }
+
+                       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                               LocalZonedTimestampType localZonedTimestampType 
= (LocalZonedTimestampType) logicalType;
+                               return DataTypes
+                                       
.TIMESTAMP_WITH_LOCAL_TIME_ZONE(localZonedTimestampType.getPrecision())
+                                       
.bridgedTo(dataType.getConversionClass());
+
+                       case TIMESTAMP_WITH_TIME_ZONE:
+                               ZonedTimestampType zonedTimestampType = 
(ZonedTimestampType) logicalType;
+                               return DataTypes
+                                       
.TIMESTAMP_WITH_TIME_ZONE(zonedTimestampType.getPrecision())
+                                       
.bridgedTo(dataType.getConversionClass());
+
+                       case TIME_WITHOUT_TIME_ZONE:
+                               TimeType timeType = (TimeType) logicalType;
+                               return DataTypes
+                                       .TIME(timeType.getPrecision())
+                                       
.bridgedTo(dataType.getConversionClass());
+
+                       default:
+                               return dataType;
+               }
+       }
+
+       @Override
+       public DataType visit(CollectionDataType collectionDataType) {
+               DataType elementType = collectionDataType.getElementDataType();
+               switch (logicalType.getTypeRoot()) {
+                       case ARRAY:
+                               ArrayType arrayType = (ArrayType) logicalType;
+                               DataType newArrayElementType = elementType
+                                       .accept(new 
DataTypePrecisionFixer(arrayType.getElementType()));
+                               return DataTypes
+                                       .ARRAY(newArrayElementType)
+                                       
.bridgedTo(collectionDataType.getConversionClass());
+
+                       case MULTISET:
+                               MultisetType multisetType = (MultisetType) 
logicalType;
+                               DataType newMultisetElementType = elementType
+                                       .accept(new 
DataTypePrecisionFixer(multisetType.getElementType()));
+                               return DataTypes
+                                       .MULTISET(newMultisetElementType)
+                                       
.bridgedTo(collectionDataType.getConversionClass());
+
+                       default:
+                               throw new 
UnsupportedOperationException("Unsupported logical type : " + logicalType);
+               }
+       }
+
+       @Override
+       public DataType visit(FieldsDataType fieldsDataType) {
+               Map<String, DataType> fieldDataTypes = 
fieldsDataType.getFieldDataTypes();
+               if (logicalType.getTypeRoot() == LogicalTypeRoot.ROW) {
+                       RowType rowType = (RowType) logicalType;
+                       DataTypes.Field[] fields = rowType.getFields().stream()
+                               .map(f -> {
+                                       DataType fieldType = 
fieldDataTypes.get(f.getName());
+                                       DataType newFieldType = null;
+                                       try {
+                                               newFieldType = 
fieldType.accept(new DataTypePrecisionFixer(f.getType()));
+                                       } catch (Exception e) {
+                                               e.printStackTrace();
+                                       }
+                                       return DataTypes.FIELD(f.getName(), 
newFieldType);
+                               })
+                               .toArray(DataTypes.Field[]::new);
+                       return 
DataTypes.ROW(fields).bridgedTo(fieldsDataType.getConversionClass());
+               }
+               throw new UnsupportedOperationException("Unsupported logical 
type : " + logicalType);
+       }
+
+       @Override
+       public DataType visit(KeyValueDataType keyValueDataType) {
+               DataType keyType = keyValueDataType.getKeyDataType();
+               DataType valueType = keyValueDataType.getValueDataType();
+               if (logicalType.getTypeRoot() == LogicalTypeRoot.MAP) {
+                       MapType mapType = (MapType) logicalType;
+                       DataType newKeyType = keyType.accept(new 
DataTypePrecisionFixer(mapType.getKeyType()));
+                       DataType newValueType = valueType.accept(new 
DataTypePrecisionFixer(mapType.getValueType()));
+                       return DataTypes
+                               .MAP(newKeyType, newValueType)
+                               
.bridgedTo(keyValueDataType.getConversionClass());
+               }
+               throw new UnsupportedOperationException("Unsupported logical 
type : " + logicalType);
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/PlannerTypeUtils.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/PlannerTypeUtils.java
index 77d346b..724828e 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/PlannerTypeUtils.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/PlannerTypeUtils.java
@@ -115,9 +115,7 @@ public class PlannerTypeUtils {
         * Now in the conversion to the TypeInformation from DataType, type may 
loose some information
         * about nullable and precision. So we add this method to do a soft 
check.
         *
-        * <p>The difference of {@link #isInteroperable} is ignore decimal 
precision.
-        *
-        * <p>Now not ignore timestamp precision, because we only support one 
precision for timestamp type now.
+        * <p>The difference of {@link #isInteroperable} is ignore precisions.
         */
        public static boolean isAssignable(LogicalType t1, LogicalType t2) {
                // Soft check for CharType, it is converted to String 
TypeInformation and loose char information.
@@ -134,7 +132,12 @@ public class PlannerTypeUtils {
                }
 
                switch (t1.getTypeRoot()) {
+                       // only support precisions for DECIMAL, 
TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE
+                       // still consider precision for others (e.g. TIME).
+                       // TODO: add other precision types here in the future
                        case DECIMAL:
+                       case TIMESTAMP_WITHOUT_TIME_ZONE:
+                       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                                return true;
                        default:
                                if (t1.getChildren().isEmpty()) {
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixerTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixerTest.java
new file mode 100644
index 0000000..0730ed2
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixerTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.types;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link DataTypePrecisionFixer}.
+ */
+@RunWith(Parameterized.class)
+public class DataTypePrecisionFixerTest {
+
+       @Parameterized.Parameters(name = "{index}: [From: {0}, To: {1}]")
+       public static List<TestSpec> testData() {
+               return Arrays.asList(
+
+                       TestSpecs
+                               .fix(Types.BIG_DEC)
+                               .logicalType(new DecimalType(10, 5))
+                               .expect(DataTypes.DECIMAL(10, 5)),
+
+                       TestSpecs
+                               .fix(Types.SQL_TIMESTAMP)
+                               .logicalType(new TimestampType(9))
+                               
.expect(DataTypes.TIMESTAMP(9).bridgedTo(Timestamp.class)),
+
+                       TestSpecs
+                               .fix(Types.SQL_TIME)
+                               .logicalType(new TimeType(9))
+                               
.expect(DataTypes.TIME(9).bridgedTo(Time.class)),
+
+                       TestSpecs
+                               .fix(Types.SQL_DATE)
+                               .logicalType(new DateType())
+                               .expect(DataTypes.DATE().bridgedTo(Date.class)),
+
+                       TestSpecs
+                               .fix(Types.LOCAL_DATE_TIME)
+                               .logicalType(new TimestampType(9))
+                               .expect(DataTypes.TIMESTAMP(9)),
+
+                       TestSpecs
+                               .fix(Types.LOCAL_TIME)
+                               .logicalType(new TimeType(9))
+                               .expect(DataTypes.TIME(9)),
+
+                       TestSpecs
+                               .fix(Types.LOCAL_DATE)
+                               .logicalType(new DateType())
+                               .expect(DataTypes.DATE()),
+
+                       TestSpecs
+                               .fix(Types.INSTANT)
+                               .logicalType(new LocalZonedTimestampType(2))
+                               
.expect(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(2)),
+
+                       TestSpecs
+                               .fix(Types.STRING)
+                               .logicalType(new 
VarCharType(VarCharType.MAX_LENGTH))
+                               .expect(DataTypes.STRING()),
+
+                       // nested
+                       TestSpecs
+                               .fix(Types.ROW_NAMED(
+                                       new String[] {"field1", "field2"},
+                                       Types.MAP(Types.BIG_DEC, 
Types.SQL_TIMESTAMP),
+                                       Types.OBJECT_ARRAY(Types.SQL_TIME)))
+                               .logicalType(new RowType(
+                                       Arrays.asList(
+                                               new RowType.RowField("field1", 
new MapType(
+                                                       new DecimalType(20, 2),
+                                                       new TimestampType(0))),
+                                               new RowType.RowField("field2", 
new ArrayType(new TimeType(8)))
+                                       )
+                               ))
+                               .expect(
+                                       DataTypes.ROW(
+                                               FIELD("field1", DataTypes.MAP(
+                                                       DataTypes.DECIMAL(20, 
2),
+                                                       
DataTypes.TIMESTAMP(0).bridgedTo(Timestamp.class))),
+                                               FIELD("field2", DataTypes.ARRAY(
+                                                       
DataTypes.TIME(8).bridgedTo(Time.class)))))
+
+                       );
+       }
+
+       @Parameterized.Parameter
+       public TestSpec testSpec;
+
+       @Rule
+       public ExpectedException thrown = ExpectedException.none();
+
+       @Test
+       public void testPrecisionFixing() {
+               DataType dataType = fromLegacyInfoToDataType(testSpec.typeInfo);
+               DataType newDataType = dataType.accept(new 
DataTypePrecisionFixer(testSpec.logicalType));
+               assertEquals(testSpec.expectedType, newDataType);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private static class TestSpec {
+               private final TypeInformation<?> typeInfo;
+               private final LogicalType logicalType;
+               private final DataType expectedType;
+
+               private TestSpec(
+                               TypeInformation<?> typeInfo,
+                               LogicalType logicalType,
+                               DataType expectedType) {
+                       this.typeInfo = checkNotNull(typeInfo);
+                       this.logicalType = checkNotNull(logicalType);
+                       this.expectedType = checkNotNull(expectedType);
+               }
+       }
+
+       private static class TestSpecs {
+               private TypeInformation<?> typeInfo;
+               private LogicalType logicalType;
+
+               static TestSpecs fix(TypeInformation<?> typeInfo) {
+                       TestSpecs testSpecs = new TestSpecs();
+                       testSpecs.typeInfo = typeInfo;
+                       return testSpecs;
+               }
+
+               TestSpecs logicalType(LogicalType logicalType) {
+                       this.logicalType = logicalType;
+                       return this;
+               }
+
+               TestSpec expect(DataType expectedType) {
+                       return new TestSpec(typeInfo, logicalType, 
expectedType);
+               }
+       }
+
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/types/LogicalTypeAssignableTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/types/LogicalTypeAssignableTest.java
index 67209f0..cdc6c4c 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/types/LogicalTypeAssignableTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/types/LogicalTypeAssignableTest.java
@@ -104,7 +104,7 @@ public class LogicalTypeAssignableTest {
 
                                {new TimeType(), new TimeType(9), false},
 
-                               {new TimestampType(9), new TimestampType(3), 
false},
+                               {new TimestampType(9), new TimestampType(3), 
true},
 
                                {new ZonedTimestampType(9), new 
ZonedTimestampType(3), false},
 

Reply via email to