[
https://issues.apache.org/jira/browse/FLINK-23702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17397001#comment-17397001
]
Jingsong Lee commented on FLINK-23702:
--------------------------------------
Most of our functions return null in case of exception, which can avoid
abnormal interruption of streaming job.
Although it seems that the SQL standard does not mention this behavior,
Postgres, as a representative of ANSI-SQL, throws errors directly in most
functions.
Cast is a very annoying function. We returned null when error occurs, but we
can't change the type inference definition in Calcite. It does bring a lot of
bugs. (Maybe we should introduce a option in Calcite to support force nullable
return type)
I am +1 for introducing a config option. (maybe the default is throwing errors,
this is an incompatible behavior change and may require further discussion)
> SQL casting to decimal which cannot hold its original value throws 'Column
> 'EXPR$0' is NOT NULL, however, a null value is being written into it'
> ------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-23702
> URL: https://issues.apache.org/jira/browse/FLINK-23702
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.13.2
> Reporter: Yao Zhang
> Priority: Critical
>
> If we run the following SQL:
> {code:sql}
> select CAST( 1234567.123456789 as DECIMAL(10, 3))
> {code}
> Flink can get the correct result because the result 1234567.123 meets its
> definition (precision=10 and scale=3). It only loses accuracy in the fraction
> section.
> However, if we execute:
> {code:sql}
> select CAST( 1234567.123456789 as DECIMAL(9, 3))
> {code}
> it will result in an exception as the original value cannot stored according
> to this definition.
> But the exception seems to be irrelevant to its root cause:
> Exception in thread "main" java.lang.RuntimeException: Failed to fetch next
> result
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> at
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
> at
> org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:152)
> at
> org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:160)
> at com.paultech.sql.SqlStreamEnvDemo$.main(SqlStreamEnvDemo.scala:70)
> at com.paultech.sql.SqlStreamEnvDemo.main(SqlStreamEnvDemo.scala)
> Caused by: java.io.IOException: Failed to fetch job execution result
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:177)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
> ... 8 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> at
> java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
> at
> java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:174)
> ... 8 more
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
> at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
> at akka.actor.Actor$class.aroundReceive(Actor.scala)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage$$$capture(ActorCell.scala:592)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.table.api.TableException: Column 'EXPR$0' is NOT
> NULL, however, a null value is being written into it. You can set job
> configuration 'table.exec.sink.not-null-enforcer'='drop' to suppress this
> exception and drop such records silently.
> at
> org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer.filter(SinkNotNullEnforcer.java:56)
> at
> org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer.filter(SinkNotNullEnforcer.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at StreamExecCalc$16.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
> at
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)