Caizhi Weng created FLINK-22449:
-----------------------------------
Summary: Casting an invalid constant string to int throws
exception from SinkNotNullEnforcer
Key: FLINK-22449
URL: https://issues.apache.org/jira/browse/FLINK-22449
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.13.0
Reporter: Caizhi Weng
Add the following test case to {{CalcITCase}} to reproduce this bug:
{code:scala}
@Test
def myTest(): Unit = {
checkResult("SELECT CAST('haha' AS INT)", Seq(row(null)))
}
{code}
The exception stack is
{code}
Caused by: org.apache.flink.table.api.TableException: Column 'EXPR$0' is NOT
NULL, however, a null value is being written into it. You can set job
configuration 'table.exec.sink.not-null-enforcer'='drop' to suppress this
exception and drop such records silently.
at
org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer.filter(SinkNotNullEnforcer.java:56)
at
org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer.filter(SinkNotNullEnforcer.java:30)
at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:93)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at BatchExecCalc$33.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:93)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
{code}
This is because the result type of CAST is inferred as NOT NULL (see
{{SqlCastFunction#inferReturnType}} and
{{StandardConvertletTable#convertCast}}, the nullability is the same with the
input argument), however parsing an invalid string to int will produce null
values.
One way I could think of is to change the result type of CAST to always
nullable (at least for some specific types of casting, for example casting from
string to int), but as CAST is a very low-level function this might have a big
impact (for example, if a rule adds casting, the resulting row type might not
be equal to the original row type due to mismatch in nullability).
So it seems that at the current stage we should set all columns in a select
sink to be nullable. However this indicates that one cannot truly rely on the
nullability of any result type.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)