wuchong commented on a change in pull request #10667: [FLINK-15313][table] Fix
can't insert decimal data into sink using TypeInformation
URL: https://github.com/apache/flink/pull/10667#discussion_r361129746
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
##########
@@ -18,67 +18,99 @@
package org.apache.flink.table.planner.sinks
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.catalog.ObjectIdentifier
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.table.api.{TableException, TableSchema, Types,
ValidationException}
+import org.apache.flink.table.catalog.{CatalogTable, ObjectIdentifier}
+import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.operations.CatalogSinkModifyOperation
-import
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
-import org.apache.flink.table.runtime.types.PlannerTypeUtils
-import org.apache.flink.table.sinks.{PartitionableTableSink, TableSink}
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.utils.RelOptUtils
+import org.apache.flink.table.sinks._
+import org.apache.flink.table.types.DataType
+import
org.apache.flink.table.types.inference.TypeTransformations.{legacyDecimalToDefaultDecimal,
toNullable}
+import org.apache.flink.table.types.logical.utils.{LogicalTypeCasts,
LogicalTypeChecks}
+import org.apache.flink.table.types.logical.{LegacyTypeInformationType,
RowType}
+import org.apache.flink.table.types.utils.DataTypeUtils
+import
org.apache.flink.table.types.utils.TypeConversions.{fromLegacyInfoToDataType,
fromLogicalToDataType}
+import org.apache.flink.table.utils.{TableSchemaUtils, TypeMappingUtils}
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.RelNode
import scala.collection.JavaConversions._
object TableSinkUtils {
/**
- * Checks if the given [[CatalogSinkModifyOperation]]'s query can be
written to
- * the given [[TableSink]]. It checks if the names & the field types match.
If the table
- * sink is a [[PartitionableTableSink]], also check that the partitions are
valid.
+ * Checks if the given query can be written into the given sink. It checks
the field types
+ * should be compatible (types should equal including precisions). If types
are not compatible,
+ * but can be implicitly casted, a cast projection will be applied.
Otherwise, an exception will
+ * be thrown.
+ *
+ * @param query the query to be checked
+ * @param sinkSchema the schema of sink to be checked
+ * @param typeFactory type factory
+ * @return the query RelNode which may be applied the implicitly cast
projection.
+ */
+ def validateSchemaAndApplyImplicitCast(
+ query: RelNode,
+ sinkSchema: TableSchema,
+ typeFactory: FlinkTypeFactory,
+ sinkIdentifier: Option[String] = None): RelNode = {
+
+ val queryLogicalType = DataTypeUtils
+ // convert type to nullable, because we ignore nullability when writing
query into sink
+
.transform(FlinkTypeFactory.toTableSchema(query.getRowType).toRowDataType,
toNullable)
+ .getLogicalType
+ .asInstanceOf[RowType]
+ val sinkLogicalType = DataTypeUtils
+ // convert type to nullable, because we ignore nullability when writing
query into sink
+ .transform(sinkSchema.toRowDataType, legacyDecimalToDefaultDecimal,
toNullable)
+ .getLogicalType
+ .asInstanceOf[RowType]
+ if (LogicalTypeChecks.areTypesCompatible(queryLogicalType,
sinkLogicalType)) {
+ // types are compatible, do nothing
+ query
+ } else if (LogicalTypeCasts.supportsImplicitCast(queryLogicalType,
sinkLogicalType)) {
+ // types can be implicit casted, add a cast project
+ val castedDataType = typeFactory.buildRelNodeRowType(
+ sinkLogicalType.getFieldNames,
+ sinkLogicalType.getFields.map(_.getType))
+ RelOptUtils.createCastRel(query, castedDataType)
+ } else {
+ // format query and sink schema strings
+ val srcSchema = queryLogicalType.getFields
+ .map(f => s"${f.getName}: ${f.getType}")
+ .mkString("[", ", ", "]")
+ val sinkSchema = sinkLogicalType.getFields
+ .map(f => s"${f.getName}: ${f.getType}")
+ .mkString("[", ", ", "]")
+
+ val sinkDesc: String = sinkIdentifier.getOrElse("")
+
+ throw new ValidationException(
+ s"Field types of query result and registered TableSink $sinkDesc do
not match.\n" +
+ s"Query schema: $srcSchema\n" +
+ s"Sink schema: $sinkSchema")
+ }
+ }
+
+ /**
+ * It checks whether the [[TableSink]] is compatible to the INSERT INTO
clause, e.g.
+ * whether the sink is a [[PartitionableTableSink]] and the partitions are
valid.
*
* @param sinkOperation The sink operation with the query that is supposed
to be written.
* @param sinkIdentifier Tha path of the sink. It is needed just for
logging. It does not
* participate in the validation.
* @param sink The sink that we want to write to.
* @param partitionKeys The partition keys of this table.
*/
- def validateSink(
+ def validateTableSink(
Review comment:
I think it would better to separate them, because they do the different
things and `validateTableSink` is only invoked in "INSERT" statements.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services