[
https://issues.apache.org/jira/browse/FLINK-23702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17396705#comment-17396705
]
Timo Walther commented on FLINK-23702:
--------------------------------------
We had similar issues before. IIRC, the cast operator tries to be lenient and
return NULL in case of errors. [~jark] we should make the casting behavior
configurable and fail by default. Returning null in such cases is confusing.
> SQL casting to decimal which cannot hold its original value throws 'Column
> 'EXPR$0' is NOT NULL, however, a null value is being written into it'
> ------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-23702
> URL: https://issues.apache.org/jira/browse/FLINK-23702
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.13.2
> Reporter: Yao Zhang
> Priority: Critical
>
> If we run the following SQL:
> {code:sql}
> select CAST( 1234567.123456789 as DECIMAL(10, 3))
> {code}
> Flink can get the correct result because the result 1234567.123 meets its
> definition (precision=10 and scale=3). It only loses accuracy in the fraction
> section.
> However, if we execute:
> {code:sql}
> select CAST( 1234567.123456789 as DECIMAL(9, 3))
> {code}
> it will result in an exception as the original value cannot stored according
> to this definition.
> But the exception seems to be irrelevant to its root cause:
> Exception in thread "main" java.lang.RuntimeException: Failed to fetch next
> result
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> at
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
> at
> org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:152)
> at
> org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:160)
> at com.paultech.sql.SqlStreamEnvDemo$.main(SqlStreamEnvDemo.scala:70)
> at com.paultech.sql.SqlStreamEnvDemo.main(SqlStreamEnvDemo.scala)
> Caused by: java.io.IOException: Failed to fetch job execution result
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:177)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
> ... 8 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> at
> java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
> at
> java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:174)
> ... 8 more
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
> at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
> at akka.actor.Actor$class.aroundReceive(Actor.scala)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage$$$capture(ActorCell.scala:592)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.table.api.TableException: Column 'EXPR$0' is NOT
> NULL, however, a null value is being written into it. You can set job
> configuration 'table.exec.sink.not-null-enforcer'='drop' to suppress this
> exception and drop such records silently.
> at
> org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer.filter(SinkNotNullEnforcer.java:56)
> at
> org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer.filter(SinkNotNullEnforcer.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at StreamExecCalc$16.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
> at
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)