[
https://issues.apache.org/jira/browse/FLINK-19797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17348313#comment-17348313
]
lincoln lee commented on FLINK-19797:
-------------------------------------
[~FrankZou], I've tested a similar case in 1.13 and master branch, the decimal
array works. You can try the latest release 1.13.1.
Here's the test:
{code}
@Test
def testUdafWithDecimalArrayInput(): Unit = {
util.addFunction("testAgg", new TestAgg)
val sql =
"""
|SELECT a, c, testAgg(b, array[0.10, 0.50, 0.99]) FROM T GROUP BY a, c
""".stripMargin
util.verifyExecPlan(sql)
}
class TestAgg extends AggregateFunction[JBigDecimal, JBigDecimal] {
def accumulate(acc: JBigDecimal, p1: Long, p2: Array[JBigDecimal]): Unit = {
}
override def getValue(accumulator: JBigDecimal): JBigDecimal = {
accumulator
}
override def createAccumulator(): JBigDecimal = {
new JBigDecimal(0)
}
}
{code}
> NPE when we use decimal array as input of UDAF
> ----------------------------------------------
>
> Key: FLINK-19797
> URL: https://issues.apache.org/jira/browse/FLINK-19797
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Reporter: Zou
> Priority: Minor
> Labels: auto-deprioritized-major
>
> Flink version: 1.11
>
> Flink throws NPE when we use decimal array as input of UDAF.
> Here is a simple example:
> {code:java}
> select percentile(num, array[0.10, 0.50, 0.99]) from source
> {code}
> And the exception is:
> {code:java}
> Caused by: java.lang.NullPointerException: null
> at
> org.apache.flink.table.data.util.DataFormatConverters$BigDecimalConverter.toExternalImpl(DataFormatConverters.java:680)
> ~[classes/:na]
> at
> org.apache.flink.table.data.util.DataFormatConverters$BigDecimalConverter.toExternalImpl(DataFormatConverters.java:661)
> ~[classes/:na]
> at
> org.apache.flink.table.data.util.DataFormatConverters.arrayDataToJavaArray(DataFormatConverters.java:1195)
> ~[classes/:na]
> at
> org.apache.flink.table.data.util.DataFormatConverters.access$200(DataFormatConverters.java:104)
> ~[classes/:na]
> at
> org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1148)
> ~[classes/:na]
> at
> org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1095)
> ~[classes/:na]
> at
> org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:383)
> ~[classes/:na]
> at NoGroupingAggregateWithoutKeys$25.processElement(Unknown Source)
> ~[na:na]
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
> ~[classes/:na]
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:183)
> ~[classes/:na]
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
> ~[classes/:na]
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> ~[classes/:na]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
> ~[classes/:na]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> ~[classes/:na]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> ~[classes/:na]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:565)
> ~[classes/:na]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
> ~[classes/:na]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> ~[classes/:na]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> ~[classes/:na]
> at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_251]
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)