[
https://issues.apache.org/jira/browse/FLINK-18862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jark Wu updated FLINK-18862:
----------------------------
Description:
1. Env:flinksql、 version 1.11.1,perjob mode
2. Error:org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast
to org.apache.flink.table.data.StringData
3、Job:
(1) create a kafka table
{code:java}
CREATE TABLE kafka(
x String,
y String
)with(
'connector' = 'kafka',
......
)
{code}
(2)create a view:
{code:java}
CREATE VIEW view1 AS
SELECT
x,
y,
CAST(COUNT(1) AS VARCHAR) AS ct
FROM kafka
GROUP BY
x, y
{code}
(3) aggregate on the view:
{code:java}
select
x,
LISTAGG(CONCAT_WS('=', y, ct), ',') AS lists
FROM view1
GROUP BY x
{code}
And then the exception is
thrown:org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to
org.apache.flink.table.data.StringData
The problem is that, there is no RawValueData in the query. The result type of
count(1) should be bigint, not RawValueData.
(4) If there is no aggregation, the job can run succefully.
{code:java}
select
x,
CONCAT_WS('=', y, ct)
from view1
{code}
The detailed exception:
{code:java}
java.lang.ClassCastException:
org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to
org.apache.flink.table.data.StringData
at
org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139)
~[flink-table-blink_2.11-1.11.1.jar:?]
at org.apache.flink.table.data.RowData.get(RowData.java:273)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156)
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123)
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205)
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
[ad_features_auto-1.0-SNAPSHOT.jar:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
[ad_features_auto-1.0-SNAPSHOT.jar:?]
{code}
was:
1、环境:flinksql、 版本是1.11.1,perjob模式
2、报错:org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to
org.apache.flink.table.data.StringData
3、产生的背景:
(1)从kafka建一张表:
{code:java}
CREATE TABLE kafka(
x String,
y String
)with(
'connector' = 'kafka',
......
)
{code}
(2)建一张view表:
{code:java}
CREATE VIEW view1 AS
SELECT
x,
y,
CAST(COUNT(1) AS VARCHAR) AS ct
FROM kafka
GROUP BY
x, y
{code}
(3)然后利用这个view再做一次agg操作:
{code:java}
select
x,
LISTAGG(CONCAT_WS('=', y, ct), ',') AS lists
FROM view1
GROUP BY x
{code}
然后就报错了:org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast
to org.apache.flink.table.data.StringData
(4)但是我尝试不做agg操作,没有报错,说明count(1)的结果值是可以转成String。
{code:java}
select
x,
CONCAT_WS('=', y, ct)
from view1
{code}
*但是再经过一次agg的操作为什么会报错呢?????????*
4、问题:通过上面的对比说明count(1) 是能够被cast 成string的,可是再经过一次agg的操作怎么就不行了,
请问有什么比较好的办法解决这个问题?
5、稍微详细点的报错:
{code:java}
java.lang.ClassCastException:
org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to
org.apache.flink.table.data.StringData
at
org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139)
~[flink-table-blink_2.11-1.11.1.jar:?]
at org.apache.flink.table.data.RowData.get(RowData.java:273)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156)
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123)
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205)
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
~[ad_features_auto-1.0-SNAPSHOT.jar:?]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
[ad_features_auto-1.0-SNAPSHOT.jar:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
[ad_features_auto-1.0-SNAPSHOT.jar:?]
{code}
> Fix BinaryRawValueData cannot be cast to StringData in runtime
> --------------------------------------------------------------
>
> Key: FLINK-18862
> URL: https://issues.apache.org/jira/browse/FLINK-18862
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.11.1
> Reporter: YUJIANBO
> Assignee: Jark Wu
> Priority: Major
> Fix For: 1.12.0, 1.11.2
>
> Attachments: sql 中agg算子的 count(1) 的结果强转成String类型再经过一次agg的操作就不能成功.txt
>
>
> 1. Env:flinksql、 version 1.11.1,perjob mode
> 2. Error:org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast
> to org.apache.flink.table.data.StringData
> 3、Job:
> (1) create a kafka table
> {code:java}
> CREATE TABLE kafka(
> x String,
> y String
> )with(
> 'connector' = 'kafka',
> ......
> )
> {code}
> (2)create a view:
> {code:java}
> CREATE VIEW view1 AS
> SELECT
> x,
> y,
> CAST(COUNT(1) AS VARCHAR) AS ct
> FROM kafka
> GROUP BY
> x, y
> {code}
> (3) aggregate on the view:
> {code:java}
> select
> x,
> LISTAGG(CONCAT_WS('=', y, ct), ',') AS lists
> FROM view1
> GROUP BY x
> {code}
> And then the exception is
> thrown:org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast
> to org.apache.flink.table.data.StringData
> The problem is that, there is no RawValueData in the query. The result type
> of count(1) should be bigint, not RawValueData.
>
> (4) If there is no aggregation, the job can run succefully.
> {code:java}
> select
> x,
> CONCAT_WS('=', y, ct)
> from view1
> {code}
> The detailed exception:
> {code:java}
> java.lang.ClassCastException:
> org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to
> org.apache.flink.table.data.StringData
> at
> org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169)
> ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> at
> org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139)
> ~[flink-table-blink_2.11-1.11.1.jar:?]
> at org.apache.flink.table.data.RowData.get(RowData.java:273)
> ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
> ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> at
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
> ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
> ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
> ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
> ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
> ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
> ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> [ad_features_auto-1.0-SNAPSHOT.jar:?]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> [ad_features_auto-1.0-SNAPSHOT.jar:?]
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)