[
https://issues.apache.org/jira/browse/FLINK-23702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17398432#comment-17398432
]
Yao Zhang commented on FLINK-23702:
-----------------------------------
Hi [~lzljs3620320],
I debugged the codegen process and the generated code was:
{code:java}
out.setRowKind(in1.getRowKind());
isNull$36 = false;
result$37 = null;
if (!isNull$36) {
result$37 =
org.apache.flink.table.data.DecimalDataUtils.castToDecimal(((org.apache.flink.table.data.DecimalData)
decimal$35), 7, 3);
isNull$36 = (result$37 == null);
}
isNull$38 = isNull$36 || false;
result$39 = null;
if (!isNull$38) {
result$39 =
org.apache.flink.table.runtime.functions.SqlFunctionUtils.struncate(result$37,
((int) 0))
;
isNull$38 = (result$39 == null);
}
if (isNull$38) {
out.setNullAt(0);
} else {
out.setNonPrimitiveValue(0, result$39);
}
output.collect(outElement.replace(out));
{code}
"cast(xxx as DECIMAL)" will be translated to
"org.apache.flink.table.data.DecimalDataUtils.castToDecimal" method call.
{code:java}
public static DecimalData castToDecimal(DecimalData dec, int precision, int
scale) {
return fromBigDecimal(dec.toBigDecimal(), precision, scale);
}
public static @Nullable DecimalData fromBigDecimal(BigDecimal bd, int
precision, int scale) {
bd = bd.setScale(scale, RoundingMode.HALF_UP);
if (bd.precision() > precision) {
return null;
}
long longVal = -1;
if (precision <= MAX_COMPACT_PRECISION) {
longVal = bd.movePointRight(scale).longValueExact();
}
return new DecimalData(precision, scale, longVal, bd);
}
{code}
As the codes above, it indicates that if the precision of the original
BigDecimal is larger than the precision we want to cast to, it will return null.
This is what leads to this issue.
Personally I think we should throw an error instead of simply return null when
cast encounters an error.
> SQL casting to decimal which cannot hold its original value throws 'Column
> 'EXPR$0' is NOT NULL, however, a null value is being written into it'
> ------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-23702
> URL: https://issues.apache.org/jira/browse/FLINK-23702
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.13.2
> Reporter: Yao Zhang
> Priority: Critical
>
> If we run the following SQL:
> {code:sql}
> select CAST( 1234567.123456789 as DECIMAL(10, 3))
> {code}
> Flink can get the correct result because the result 1234567.123 meets its
> definition (precision=10 and scale=3). It only loses accuracy in the fraction
> section.
> However, if we execute:
> {code:sql}
> select CAST( 1234567.123456789 as DECIMAL(9, 3))
> {code}
> it will result in an exception as the original value cannot stored according
> to this definition.
> But the exception seems to be irrelevant to its root cause:
> Exception in thread "main" java.lang.RuntimeException: Failed to fetch next
> result
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> at
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
> at
> org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:152)
> at
> org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:160)
> at com.paultech.sql.SqlStreamEnvDemo$.main(SqlStreamEnvDemo.scala:70)
> at com.paultech.sql.SqlStreamEnvDemo.main(SqlStreamEnvDemo.scala)
> Caused by: java.io.IOException: Failed to fetch job execution result
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:177)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
> ... 8 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> at
> java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
> at
> java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:174)
> ... 8 more
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
> at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
> at akka.actor.Actor$class.aroundReceive(Actor.scala)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage$$$capture(ActorCell.scala:592)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.table.api.TableException: Column 'EXPR$0' is NOT
> NULL, however, a null value is being written into it. You can set job
> configuration 'table.exec.sink.not-null-enforcer'='drop' to suppress this
> exception and drop such records silently.
> at
> org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer.filter(SinkNotNullEnforcer.java:56)
> at
> org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer.filter(SinkNotNullEnforcer.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at StreamExecCalc$16.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
> at
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)