nonggia.liang created HUDI-4733:
-----------------------------------

             Summary: Flag emitDelete is inconsistent in HoodieTableSource and 
MergeOnReadInputFormat
                 Key: HUDI-4733
                 URL: https://issues.apache.org/jira/browse/HUDI-4733
             Project: Apache Hudi
          Issue Type: Bug
          Components: flink, flink-sql
            Reporter: nonggia.liang
         Attachments: image 1.png

When reading a MOR table in flink, we encountered an exception from flink 
runtime ( as shown in image1), which complained the table source should not 
emit a retract record.

I think here is the cause, in HoodieTableSource:
{code:java}
@Override
public ChangelogMode getChangelogMode() {
  // when read as streaming and changelog mode is enabled, emit as FULL mode;
  // when all the changes are compacted or read as batch, emit as INSERT mode.
  return OptionsResolver.emitChangelog(conf) ? ChangelogModes.FULL : 
ChangelogMode.insertOnly();
} {code}
{code:java}
private InputFormat<RowData, ?> getStreamInputFormat() { 
...
if (FlinkOptions.QUERY_TYPE_SNAPSHOT.equals(queryType)) { final HoodieTableType 
tableType = 
HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)); boolean 
emitDelete = tableType == HoodieTableType.MERGE_ON_READ; return 
mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema, rowDataType, 
Collections.emptyList(), emitDelete); }
...
 }
{code}
With these options:

{{'table.type'}} {{= }}{{'MERGE_ON_READ'}}

{{'read.streaming.enabled'}} {{= }}{{'true'}}

{{{}{}}}{{{}The HoodieTableSource{}}} annouces it has only INSERT changelog, 

but MergeOnReadInputFormat will emit delete.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to