wuchong commented on a change in pull request #10667: [FLINK-15313][table] Fix 
can't insert decimal data into sink using TypeInformation
URL: https://github.com/apache/flink/pull/10667#discussion_r361116539
 
 

 ##########
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
 ##########
 @@ -18,67 +18,99 @@
 
 package org.apache.flink.table.planner.sinks
 
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.catalog.ObjectIdentifier
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.table.api.{TableException, TableSchema, Types, 
ValidationException}
+import org.apache.flink.table.catalog.{CatalogTable, ObjectIdentifier}
+import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.operations.CatalogSinkModifyOperation
-import 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
-import org.apache.flink.table.runtime.types.PlannerTypeUtils
-import org.apache.flink.table.sinks.{PartitionableTableSink, TableSink}
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.utils.RelOptUtils
+import org.apache.flink.table.sinks._
+import org.apache.flink.table.types.DataType
+import 
org.apache.flink.table.types.inference.TypeTransformations.{legacyDecimalToDefaultDecimal,
 toNullable}
+import org.apache.flink.table.types.logical.utils.{LogicalTypeCasts, 
LogicalTypeChecks}
+import org.apache.flink.table.types.logical.{LegacyTypeInformationType, 
RowType}
+import org.apache.flink.table.types.utils.DataTypeUtils
+import 
org.apache.flink.table.types.utils.TypeConversions.{fromLegacyInfoToDataType, 
fromLogicalToDataType}
+import org.apache.flink.table.utils.{TableSchemaUtils, TypeMappingUtils}
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.RelNode
 
 import scala.collection.JavaConversions._
 
 object TableSinkUtils {
 
   /**
-    * Checks if the given [[CatalogSinkModifyOperation]]'s query can be 
written to
-    * the given [[TableSink]]. It checks if the names & the field types match. 
If the table
-    * sink is a [[PartitionableTableSink]], also check that the partitions are 
valid.
+    * Checks if the given query can be written into the given sink. It checks 
the field types
+    * should be compatible (types should equal including precisions). If types 
are not compatible,
+    * but can be implicitly casted, a cast projection will be applied. 
Otherwise, an exception will
+    * be thrown.
+    *
+    * @param query the query to be checked
+    * @param sinkSchema the schema of sink to be checked
+    * @param typeFactory type factory
+    * @return the query RelNode which may be applied the implicitly cast 
projection.
+    */
+  def validateSchemaAndApplyImplicitCast(
+      query: RelNode,
+      sinkSchema: TableSchema,
+      typeFactory: FlinkTypeFactory,
+      sinkIdentifier: Option[String] = None): RelNode = {
+
+    val queryLogicalType = DataTypeUtils
+      // convert type to nullable, because we ignore nullability when writing 
query into sink
 
 Review comment:
   It is equivalent no matter `toNullable` or not, because if the nullability 
is not the same, we will add a implicit cast. The purpose to ignore nullable at 
earlier stage is in order to avoid cast project as much as possible, otherwise, 
we have to update a lot of XML plan files. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to