wuchong commented on a change in pull request #10667: [FLINK-15313][table] Fix 
can't insert decimal data into sink using TypeInformation
URL: https://github.com/apache/flink/pull/10667#discussion_r361129746
 
 

 ##########
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
 ##########
 @@ -18,67 +18,99 @@
 
 package org.apache.flink.table.planner.sinks
 
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.catalog.ObjectIdentifier
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.table.api.{TableException, TableSchema, Types, 
ValidationException}
+import org.apache.flink.table.catalog.{CatalogTable, ObjectIdentifier}
+import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.operations.CatalogSinkModifyOperation
-import 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
-import org.apache.flink.table.runtime.types.PlannerTypeUtils
-import org.apache.flink.table.sinks.{PartitionableTableSink, TableSink}
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.utils.RelOptUtils
+import org.apache.flink.table.sinks._
+import org.apache.flink.table.types.DataType
+import 
org.apache.flink.table.types.inference.TypeTransformations.{legacyDecimalToDefaultDecimal,
 toNullable}
+import org.apache.flink.table.types.logical.utils.{LogicalTypeCasts, 
LogicalTypeChecks}
+import org.apache.flink.table.types.logical.{LegacyTypeInformationType, 
RowType}
+import org.apache.flink.table.types.utils.DataTypeUtils
+import 
org.apache.flink.table.types.utils.TypeConversions.{fromLegacyInfoToDataType, 
fromLogicalToDataType}
+import org.apache.flink.table.utils.{TableSchemaUtils, TypeMappingUtils}
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.RelNode
 
 import scala.collection.JavaConversions._
 
 object TableSinkUtils {
 
   /**
-    * Checks if the given [[CatalogSinkModifyOperation]]'s query can be 
written to
-    * the given [[TableSink]]. It checks if the names & the field types match. 
If the table
-    * sink is a [[PartitionableTableSink]], also check that the partitions are 
valid.
+    * Checks if the given query can be written into the given sink. It checks 
the field types
+    * should be compatible (types should equal including precisions). If types 
are not compatible,
+    * but can be implicitly casted, a cast projection will be applied. 
Otherwise, an exception will
+    * be thrown.
+    *
+    * @param query the query to be checked
+    * @param sinkSchema the schema of sink to be checked
+    * @param typeFactory type factory
+    * @return the query RelNode which may be applied the implicitly cast 
projection.
+    */
+  def validateSchemaAndApplyImplicitCast(
+      query: RelNode,
+      sinkSchema: TableSchema,
+      typeFactory: FlinkTypeFactory,
+      sinkIdentifier: Option[String] = None): RelNode = {
+
+    val queryLogicalType = DataTypeUtils
+      // convert type to nullable, because we ignore nullability when writing 
query into sink
+      
.transform(FlinkTypeFactory.toTableSchema(query.getRowType).toRowDataType, 
toNullable)
+      .getLogicalType
+      .asInstanceOf[RowType]
+    val sinkLogicalType = DataTypeUtils
+      // convert type to nullable, because we ignore nullability when writing 
query into sink
+      .transform(sinkSchema.toRowDataType, legacyDecimalToDefaultDecimal, 
toNullable)
+      .getLogicalType
+      .asInstanceOf[RowType]
+    if (LogicalTypeChecks.areTypesCompatible(queryLogicalType, 
sinkLogicalType)) {
+      // types are compatible, do nothing
+      query
+    } else if (LogicalTypeCasts.supportsImplicitCast(queryLogicalType, 
sinkLogicalType)) {
+      // types can be implicit casted, add a cast project
+      val castedDataType = typeFactory.buildRelNodeRowType(
+        sinkLogicalType.getFieldNames,
+        sinkLogicalType.getFields.map(_.getType))
+      RelOptUtils.createCastRel(query, castedDataType)
+    } else {
+      // format query and sink schema strings
+      val srcSchema = queryLogicalType.getFields
+        .map(f => s"${f.getName}: ${f.getType}")
+        .mkString("[", ", ", "]")
+      val sinkSchema = sinkLogicalType.getFields
+        .map(f => s"${f.getName}: ${f.getType}")
+        .mkString("[", ", ", "]")
+
+      val sinkDesc: String = sinkIdentifier.getOrElse("")
+
+      throw new ValidationException(
+        s"Field types of query result and registered TableSink $sinkDesc do 
not match.\n" +
+          s"Query schema: $srcSchema\n" +
+          s"Sink schema: $sinkSchema")
+    }
+  }
+
+  /**
+    * It checks whether the [[TableSink]] is compatible to the INSERT INTO 
clause, e.g.
+    * whether the sink is a [[PartitionableTableSink]] and the partitions are 
valid.
     *
     * @param sinkOperation The sink operation with the query that is supposed 
to be written.
     * @param sinkIdentifier Tha path of the sink. It is needed just for 
logging. It does not
     *                      participate in the validation.
     * @param sink     The sink that we want to write to.
     * @param partitionKeys The partition keys of this table.
     */
-  def validateSink(
+  def validateTableSink(
 
 Review comment:
   I think it would better to separate them, because they do the different 
things and `validateTableSink` is only invoked in "INSERT" statements. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to