nonggia.liang created HUDI-4733:
-----------------------------------
Summary: Flag emitDelete is inconsistent in HoodieTableSource and
MergeOnReadInputFormat
Key: HUDI-4733
URL: https://issues.apache.org/jira/browse/HUDI-4733
Project: Apache Hudi
Issue Type: Bug
Components: flink, flink-sql
Reporter: nonggia.liang
Attachments: image 1.png
When reading a MOR table in flink, we encountered an exception from flink
runtime ( as shown in image1), which complained the table source should not
emit a retract record.
I think here is the cause, in HoodieTableSource:
{code:java}
@Override
public ChangelogMode getChangelogMode() {
// when read as streaming and changelog mode is enabled, emit as FULL mode;
// when all the changes are compacted or read as batch, emit as INSERT mode.
return OptionsResolver.emitChangelog(conf) ? ChangelogModes.FULL :
ChangelogMode.insertOnly();
} {code}
{code:java}
private InputFormat<RowData, ?> getStreamInputFormat() {
...
if (FlinkOptions.QUERY_TYPE_SNAPSHOT.equals(queryType)) { final HoodieTableType
tableType =
HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)); boolean
emitDelete = tableType == HoodieTableType.MERGE_ON_READ; return
mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema, rowDataType,
Collections.emptyList(), emitDelete); }
...
}
{code}
With these options:
{{'table.type'}} {{= }}{{'MERGE_ON_READ'}}
{{'read.streaming.enabled'}} {{= }}{{'true'}}
{{{}{}}}{{{}The HoodieTableSource{}}} annouces it has only INSERT changelog,
but MergeOnReadInputFormat will emit delete.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)