JingsongLi commented on a change in pull request #10667: [FLINK-15313][table]
Fix can't insert decimal data into sink using TypeInformation
URL: https://github.com/apache/flink/pull/10667#discussion_r361082465
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
##########
@@ -18,67 +18,99 @@
package org.apache.flink.table.planner.sinks
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.catalog.ObjectIdentifier
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.table.api.{TableException, TableSchema, Types,
ValidationException}
+import org.apache.flink.table.catalog.{CatalogTable, ObjectIdentifier}
+import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.operations.CatalogSinkModifyOperation
-import
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
-import org.apache.flink.table.runtime.types.PlannerTypeUtils
-import org.apache.flink.table.sinks.{PartitionableTableSink, TableSink}
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.utils.RelOptUtils
+import org.apache.flink.table.sinks._
+import org.apache.flink.table.types.DataType
+import
org.apache.flink.table.types.inference.TypeTransformations.{legacyDecimalToDefaultDecimal,
toNullable}
+import org.apache.flink.table.types.logical.utils.{LogicalTypeCasts,
LogicalTypeChecks}
+import org.apache.flink.table.types.logical.{LegacyTypeInformationType,
RowType}
+import org.apache.flink.table.types.utils.DataTypeUtils
+import
org.apache.flink.table.types.utils.TypeConversions.{fromLegacyInfoToDataType,
fromLogicalToDataType}
+import org.apache.flink.table.utils.{TableSchemaUtils, TypeMappingUtils}
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.RelNode
import scala.collection.JavaConversions._
object TableSinkUtils {
/**
- * Checks if the given [[CatalogSinkModifyOperation]]'s query can be
written to
- * the given [[TableSink]]. It checks if the names & the field types match.
If the table
- * sink is a [[PartitionableTableSink]], also check that the partitions are
valid.
+ * Checks if the given query can be written into the given sink. It checks
the field types
+ * should be compatible (types should equal including precisions). If types
are not compatible,
+ * but can be implicitly casted, a cast projection will be applied.
Otherwise, an exception will
+ * be thrown.
+ *
+ * @param query the query to be checked
+ * @param sinkSchema the schema of sink to be checked
+ * @param typeFactory type factory
+ * @return the query RelNode which may be applied the implicitly cast
projection.
+ */
+ def validateSchemaAndApplyImplicitCast(
+ query: RelNode,
+ sinkSchema: TableSchema,
+ typeFactory: FlinkTypeFactory,
+ sinkIdentifier: Option[String] = None): RelNode = {
+
+ val queryLogicalType = DataTypeUtils
+ // convert type to nullable, because we ignore nullability when writing
query into sink
Review comment:
Why need to nullable?
Without this not work?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services