Caizhi Weng created FLINK-22449:
-----------------------------------

             Summary: Casting an invalid constant string to int throws 
exception from SinkNotNullEnforcer
                 Key: FLINK-22449
                 URL: https://issues.apache.org/jira/browse/FLINK-22449
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.13.0
            Reporter: Caizhi Weng


Add the following test case to {{CalcITCase}} to reproduce this bug:

{code:scala}
@Test
def myTest(): Unit = {
  checkResult("SELECT CAST('haha' AS INT)", Seq(row(null)))
}
{code}

The exception stack is
{code}
Caused by: org.apache.flink.table.api.TableException: Column 'EXPR$0' is NOT 
NULL, however, a null value is being written into it. You can set job 
configuration 'table.exec.sink.not-null-enforcer'='drop' to suppress this 
exception and drop such records silently.
        at 
org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer.filter(SinkNotNullEnforcer.java:56)
        at 
org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer.filter(SinkNotNullEnforcer.java:30)
        at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
        at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
        at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:93)
        at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
        at BatchExecCalc$33.processElement(Unknown Source)
        at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
        at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:93)
        at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
        at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
{code}

This is because the result type of CAST is inferred as NOT NULL (see 
{{SqlCastFunction#inferReturnType}} and 
{{StandardConvertletTable#convertCast}}, the nullability is the same with the 
input argument), however parsing an invalid string to int will produce null 
values.

One way I could think of is to change the result type of CAST to always 
nullable (at least for some specific types of casting, for example casting from 
string to int), but as CAST is a very low-level function this might have a big 
impact (for example, if a rule adds casting, the resulting row type might not 
be equal to the original row type due to mismatch in nullability).

So it seems that at the current stage we should set all columns in a select 
sink to be nullable. However this indicates that one cannot truly rely on the 
nullability of any result type.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to