YuvalItzchakov edited a comment on issue #9780: [FLINK-14042] 
[flink-table-planner] Fix TemporalTable row schema always inferred as nullable
URL: https://github.com/apache/flink/pull/9780#issuecomment-544151621
 
 
   @twalthr OK, once I wrote the test I figured out my fix here won't solve the 
problem. After doing some more debugging, I've come to the following findings:
   
   Given the following test:
   
   ```scala
   val util = scalaStreamTestUtil()
   
   val sourceTable = util.addTableSource[(Timestamp, Long, String)]("MyTable", 
'f1, 'f2, 'f3)
   val temporal =
   util.tableEnv.sqlQuery("SELECT f1, f2, COLLECT(DISTINCT f3) f3s FROM MyTable 
GROUP BY f1, f2")
   val temporalFunc = temporal.createTemporalTableFunction('f1, 'f2)
   
   util.tableEnv.registerFunction("f", temporalFunc)
   val queryTable =
     util.tableEnv.sqlQuery("select f1, f2, b from MyTable, LATERAL 
TABLE(f(f1)) AS T(a, b, cs)")
   
   util.verifyPlan(queryTable)
   ```
   If we first look at the generated table schema for the underlying table by 
the SQL query for the temporal table, we see:
   
   ```
   root
    |-- f1: TIMESTAMP(3)
    |-- f2: BIGINT
    |-- f3s: MULTISET<STRING> NOT NULL
   ```
   
   When `FlinkPlanner` validates the SQL query it reaches the part where it 
needs to validate the `TemporalTableFunction` I've defined called `f`. It calls 
[FunctionCatalogOperatorTable.lookupOperatorOverloads](https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/FunctionCatalogOperatorTable.java#L57)
 to search for the method and once it finds it, it needs to convert it to a 
standard `SqlFunction`. In order to do that, it needs to convert the 
`TypeInformation[T]` of the `TemporalTableFunction`'s result type to the new 
`DataType` API. 
   
   Problem is, `TypeInformation[T]` doesn't carry information about the 
nullability of the field, thus when the conversion to `Multiset[T]` happens, 
[it ends up calling the default constructor which sets `nullable = true` by 
default](https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/MultisetType.java#L60),
 which ends up blowing up at runtime because the `TableSchema` expected a NOT 
NULL field.
   
   I'm not entirely sure how we can get around this issue.
   
   **EDIT**:
   
   OK, it seems like the `TableFunctionDefinition` for the temporal table 
already carries the `DataType` information which is visible via: 
`((TemporalTableFunctionImpl) 
functionDefinition.tableFunction).getUnderlyingHistoryTable().getTableSchema().getFieldDataTypes()`,
 which we can use in order to avoid the data losing conversion from 
`TypeInformation[T]`. WDYT?
   
   **EDIT 2:**
   
   Turns out this is not enough, since 
`DeferredTypeFlinkTableFunction.getExternalResultType()` will attempt to call 
`tableFunction.getResult`, which has the `TypeInformation[T]` and thus will 
result in a nullable type as well.
   
   **EDIT 3:**
   
   `SqlTypeFactoryImpl.createMultisetType` defaults to creating a non null 
multiset:
   
   ```java
   public RelDataType createMultisetType(
         RelDataType type,
         long maxCardinality) {
       assert maxCardinality == -1;
       RelDataType newType = new MultisetSqlType(type, false);
       return canonize(newType);
     }
   ```
   
   This is where the two diverge and why the `TableSchema` has a non null 
multiset type, and it seems like this will happen for any complex data type?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to