[ 
https://issues.apache.org/jira/browse/HUDI-4733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

nonggia.liang updated HUDI-4733:
--------------------------------
    Description: 
When reading a MOR table in flink, we encountered an exception from flink 
runtime ( as shown in image1), which complained the table source should not 
emit a retract record.

I think here is the cause, in HoodieTableSource:
{code:java}
@Override
public ChangelogMode getChangelogMode() {
  // when read as streaming and changelog mode is enabled, emit as FULL mode;
  // when all the changes are compacted or read as batch, emit as INSERT mode.
  return OptionsResolver.emitChangelog(conf) ? ChangelogModes.FULL : 
ChangelogMode.insertOnly();
} {code}
{code:java}
private InputFormat<RowData, ?> getStreamInputFormat() { 
...
if (FlinkOptions.QUERY_TYPE_SNAPSHOT.equals(queryType)) { 
  final HoodieTableType tableType = 
HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)); 
  boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ; 
  return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema, 
rowDataType, Collections.emptyList(), emitDelete); }
...
 }
{code}
With these options:

{{'table.type'}} {{= }}{{'MERGE_ON_READ'}}

{{'read.streaming.enabled'}} {{= }}{{'true'}}

{{The HoodieTableSource}} annouces it has only INSERT changelog, 

but MergeOnReadInputFormat will emit delete.

  was:
When reading a MOR table in flink, we encountered an exception from flink 
runtime ( as shown in image1), which complained the table source should not 
emit a retract record.

I think here is the cause, in HoodieTableSource:
{code:java}
@Override
public ChangelogMode getChangelogMode() {
  // when read as streaming and changelog mode is enabled, emit as FULL mode;
  // when all the changes are compacted or read as batch, emit as INSERT mode.
  return OptionsResolver.emitChangelog(conf) ? ChangelogModes.FULL : 
ChangelogMode.insertOnly();
} {code}
{code:java}
private InputFormat<RowData, ?> getStreamInputFormat() { 
...
if (FlinkOptions.QUERY_TYPE_SNAPSHOT.equals(queryType)) { 
  final HoodieTableType tableType = 
HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)); 
  boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ; 
  return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema, 
rowDataType, Collections.emptyList(), emitDelete); }
...
 }
{code}
With these options:

{{'table.type'}} {{= }}{{'MERGE_ON_READ'}}

{{'read.streaming.enabled'}} {{= }}{{'true'}}

{{{{}}{}}}{{{}The HoodieTableSource{}}} annouces it has only INSERT changelog, 

but MergeOnReadInputFormat will emit delete.


> Flag emitDelete is inconsistent in HoodieTableSource and 
> MergeOnReadInputFormat
> -------------------------------------------------------------------------------
>
>                 Key: HUDI-4733
>                 URL: https://issues.apache.org/jira/browse/HUDI-4733
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: flink, flink-sql
>            Reporter: nonggia.liang
>            Priority: Minor
>         Attachments: image 1.png
>
>
> When reading a MOR table in flink, we encountered an exception from flink 
> runtime ( as shown in image1), which complained the table source should not 
> emit a retract record.
> I think here is the cause, in HoodieTableSource:
> {code:java}
> @Override
> public ChangelogMode getChangelogMode() {
>   // when read as streaming and changelog mode is enabled, emit as FULL mode;
>   // when all the changes are compacted or read as batch, emit as INSERT mode.
>   return OptionsResolver.emitChangelog(conf) ? ChangelogModes.FULL : 
> ChangelogMode.insertOnly();
> } {code}
> {code:java}
> private InputFormat<RowData, ?> getStreamInputFormat() { 
> ...
> if (FlinkOptions.QUERY_TYPE_SNAPSHOT.equals(queryType)) { 
>   final HoodieTableType tableType = 
> HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)); 
>   boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ; 
>   return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema, 
> rowDataType, Collections.emptyList(), emitDelete); }
> ...
>  }
> {code}
> With these options:
> {{'table.type'}} {{= }}{{'MERGE_ON_READ'}}
> {{'read.streaming.enabled'}} {{= }}{{'true'}}
> {{The HoodieTableSource}} annouces it has only INSERT changelog, 
> but MergeOnReadInputFormat will emit delete.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to