godfreyhe commented on a change in pull request #12456:
URL: https://github.com/apache/flink/pull/12456#discussion_r438510311



##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
##########
@@ -180,7 +182,37 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem)
     * @return a struct type with the input fieldNames, input fieldTypes, and 
system fields
     */
   def buildLogicalRowType(tableSchema: TableSchema): RelDataType = {
-    buildLogicalRowType(tableSchema.getFieldNames, tableSchema.getFieldTypes)
+    buildLogicalRowType(tableSchema.getFieldNames, 
tableSchema.getFieldDataTypes)
+  }
+
+  /**
+   * Creates a struct type with the input fieldNames and input fieldTypes 
using FlinkTypeFactory
+   *
+   * @param fieldNames field names
+   * @param fieldTypes field types, every element is Flink's [[DataType]]
+   * @return a struct type with the input fieldNames, input fieldTypes, and 
system fields
+   */
+  def buildLogicalRowType(
+    fieldNames: Array[String],
+    fieldTypes: Array[DataType])
+  : RelDataType = {
+    val logicalRowTypeBuilder = builder
+
+    val fields = fieldNames.zip(fieldTypes)
+    fields.foreach(f => {
+      // time indicators are not nullable
+      val logicalType = f._2.getLogicalType
+      val nullable  = if (FlinkTypeFactory.isTimeIndicatorType(logicalType)) {
+        false

Review comment:
       This fix is only for old planner, and only type info has such problem, 
because type info does not have nullable attribute.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to