wuchong commented on a change in pull request #10667: [FLINK-15313][table] Fix
can't insert decimal data into sink using TypeInformation
URL: https://github.com/apache/flink/pull/10667#discussion_r361147091
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
##########
@@ -98,5 +130,148 @@ object TableSinkUtils {
}
}
}
+
+ sink match {
+ case overwritableTableSink: OverwritableTableSink =>
+ overwritableTableSink.setOverwrite(sinkOperation.isOverwrite)
+ case _ =>
+ assert(!sinkOperation.isOverwrite, "INSERT OVERWRITE requires " +
+ s"${classOf[OverwritableTableSink].getSimpleName} but actually got "
+
+ sink.getClass.getName)
+ }
+ }
+
+ /**
+ * Inferences the physical schema of [[TableSink]], the physical schema
ignores change flag
+ * field and normalizes physical types (can be generic type or POJO type)
into [[TableSchema]].
+ * @param queryLogicalType the logical type of query, will be used to
full-fill sink physical
+ * schema if the sink physical type is not
specified.
+ * @param sink the instance of [[TableSink]]
+ */
+ def inferSinkPhysicalSchema(
+ queryLogicalType: RowType,
+ sink: TableSink[_]): TableSchema = {
+ val withChangeFlag = sink match {
+ case _: RetractStreamTableSink[_] | _: UpsertStreamTableSink[_] => true
+ case _: StreamTableSink[_] => false
+ case dsts: DataStreamTableSink[_] => dsts.withChangeFlag
+ }
+ inferSinkPhysicalSchema(sink.getConsumedDataType, queryLogicalType,
withChangeFlag)
+ }
+
+ /**
+ * Inferences the physical schema of [[TableSink]], the physical schema
ignores change flag
+ * field and normalizes physical types (can be generic type or POJO type)
into [[TableSchema]].
+ *
+ * @param consumedDataType the consumed data type of sink
+ * @param queryLogicalType the logical type of query, will be used to
full-fill sink physical
+ * schema if the sink physical type is not
specified.
+ * @param withChangeFlag true if the emitted records contains change flags.
+ */
+ def inferSinkPhysicalSchema(
+ consumedDataType: DataType,
+ queryLogicalType: RowType,
+ withChangeFlag: Boolean): TableSchema = {
+ // the requested output physical type which ignores the flag field
+ val requestedOutputType = inferSinkPhysicalDataType(
+ consumedDataType,
+ queryLogicalType,
+ withChangeFlag)
+ if (LogicalTypeChecks.isCompositeType(requestedOutputType.getLogicalType))
{
+ DataTypeUtils.expandCompositeTypeToSchema(requestedOutputType)
+ } else {
+ // atomic type
+ TableSchema.builder().field("f0", requestedOutputType).build()
+ }
+ }
+
+ /**
+ * Inferences the physical data type of [[TableSink]], the physical data
type ignores
+ * the change flag field.
+ *
+ * @param consumedDataType the consumed data type of sink
+ * @param queryLogicalType the logical type of query, will be used to
full-fill sink physical
+ * schema if the sink physical type is not
specified.
+ * @param withChangeFlag true if the emitted records contains change flags.
+ */
+ def inferSinkPhysicalDataType(
Review comment:
I think even if we merge them in one method, we can't reduce code, it just
mixes codes in one method. And the tuple2 type information is only needed in
code generation. So I think it's better to keep them separately.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services