[ 
https://issues.apache.org/jira/browse/HUDI-2163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiulong updated HUDI-2163:
--------------------------
    Description: 
first, i create a table in flink with connector=‘hudi’ and 
table.type=‘MERGE_ON_READ’ and 'read.streaming.enabled' = ‘true’. 

second, i write records with RowKind(INSERT or DELETE) using flink table 
streaming sql.

third, in flink sql-client, the result is all right, insert event produces new 
record or updates old record, deletion event deletes old record. 
{color:#fb0106}but i cann’t detect deletion event in flink sql-client changelog 
mode.{color}

{color:#172b4d}fourth, i tried to read hudi table using flink sql “select * 
from xxx” and transform flink Table object to 
RetractStream(StreamTableEnvironment.toRetractStream(Table, Row.class)), 
{color:#fb0106}and it cann’t detect deletion events too.{color}{color}

 

{color:#172b4d}{color:#fb0106}{color:#000000}In Hudi Source code, 
{color}{color:#fb0106}i found only in case of “MergeOnReadInputSplit has no 
basePath property” or “MergeOnReadInputSplit.getMergeType is 
{color}{color:#fb0106}skip_merge”, Iterator of MergeOnReadInputSplit{color} 
will be emit deletion event. And none of previous conditions are met. 
{color}{color}

do i make something wrong? How do I detect deletion event in flink hudi? Please 
offer some advices, thanks!

 

{color:#000000}```{color}

{color:#000000}[{color}org.apache.hudi.table.format.mor.MergeOnReadInputFormat.java]

@Override
public void open(MergeOnReadInputSplit split) throws IOException {
 this.currentReadCount = 0L;
 this.hadoopConf = StreamerUtil.getHadoopConf();


 if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 
0)) {
 if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
 // base file only with commit time filtering
 this.iterator = new BaseFileOnlyFilteringIterator(
 split.getInstantRange(),
 this.tableState.getRequiredRowType(),
 getReader(split.getBasePath().get(), 
getRequiredPosWithCommitTime(this.requiredPos)));
 } else {
 // base file only
 this.iterator = new 
BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get()));
 }
 } else if (!split.getBasePath().isPresent()) {
 // log files only
 this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
 } else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) {
 this.iterator = new SkipMergeIterator(
 getRequiredSchemaReader(split.getBasePath().get()),
 getLogFileIterator(split));
 } else if (split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) 
{
 this.iterator = new MergeIterator(
 hadoopConf,
 split,
 this.tableState.getRowType(),
 this.tableState.getRequiredRowType(),
 new Schema.Parser().parse(this.tableState.getAvroSchema()),
 new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
 this.requiredPos,
 getFullSchemaReader(split.getBasePath().get()));
 } else {
 throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR 
File Split for "
 + "file path: " + split.getBasePath()
 + "log paths: " + split.getLogPaths()
 + "hoodie table path: " + split.getTablePath()
 + "spark partition Index: " + split.getSplitNumber()
 + "merge type: " + split.getMergeType());
 }

 

 

private Iterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) {
 final Schema tableSchema = new 
Schema.Parser().parse(tableState.getAvroSchema());
 final Schema requiredSchema = new 
Schema.Parser().parse(tableState.getRequiredAvroSchema());
 final GenericRecordBuilder recordBuilder = new 
GenericRecordBuilder(requiredSchema);
 final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
 AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
 final Map<String, HoodieRecord<? extends HoodieRecordPayload>> logRecords =
 FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords();
 final Iterator<String> logRecordsKeyIterator = logRecords.keySet().iterator();
 final int[] pkOffset = tableState.getPkOffsetsInRequired();
 // flag saying whether the pk semantics has been dropped by user specified
 // projections. For e.g, if the pk fields are [a, b] but user only select a,
 // then the pk semantics is lost.
 final boolean pkSemanticLost = Arrays.stream(pkOffset).anyMatch(offset -> 
offset == -1);
 final LogicalType[] pkTypes = pkSemanticLost ? null : 
tableState.getPkTypes(pkOffset);
 final StringToRowDataConverter converter = pkSemanticLost ? null : new 
StringToRowDataConverter(pkTypes);

 return new Iterator<RowData>() {
 private RowData currentRecord;

 @Override
 public boolean hasNext() {
 while (logRecordsKeyIterator.hasNext()) {
 String curAvroKey = logRecordsKeyIterator.next();
 Option<IndexedRecord> curAvroRecord = null;
 final HoodieRecord<?> hoodieRecord = logRecords.get(curAvroKey);
 try {
 curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
 } catch (IOException e) {
 throw new HoodieException("Get avro insert value error for key: " + 
curAvroKey, e);
 }
 if (!curAvroRecord.isPresent()) {
 // delete record found
 if (emitDelete && !pkSemanticLost) {
 GenericRowData delete = new 
GenericRowData(tableState.getRequiredRowType().getFieldCount());

 final String recordKey = hoodieRecord.getRecordKey();
 final String[] pkFields = KeyGenUtils.extractRecordKeys(recordKey);
 final Object[] converted = converter.convert(pkFields);
 for (int i = 0; i < pkOffset.length; i++) {
 delete.setField(pkOffset[i], converted[i]);
 }
 delete.setRowKind(RowKind.DELETE);

 this.currentRecord = delete;
 return true;
 }
 // skipping if the condition is unsatisfied
 // continue;
 } else {
 GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
 curAvroRecord.get(),
 requiredSchema,
 requiredPos,
 recordBuilder);
 currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
 return true;
 }
 }
 return false;
 }

 @Override
 public RowData next() {
 return currentRecord;
 }
 };
}

{color:#000000}```{color}

 

  was:
first, i create a table in flink with connector=‘hudi’ and 
table.type=‘MERGE_ON_READ’ and 'read.streaming.enabled' = ‘true’. 

 


> reading hudi mor table in flink sql  does not send deletion events to down 
> stream
> ---------------------------------------------------------------------------------
>
>                 Key: HUDI-2163
>                 URL: https://issues.apache.org/jira/browse/HUDI-2163
>             Project: Apache Hudi
>          Issue Type: Bug
>         Environment: hudi-flink-bundle-0.9.0-SNAPSHOT
>            Reporter: jiulong
>            Priority: Major
>             Fix For: 0.9.0
>
>
> first, i create a table in flink with connector=‘hudi’ and 
> table.type=‘MERGE_ON_READ’ and 'read.streaming.enabled' = ‘true’. 
> second, i write records with RowKind(INSERT or DELETE) using flink table 
> streaming sql.
> third, in flink sql-client, the result is all right, insert event produces 
> new record or updates old record, deletion event deletes old record. 
> {color:#fb0106}but i cann’t detect deletion event in flink sql-client 
> changelog mode.{color}
> {color:#172b4d}fourth, i tried to read hudi table using flink sql “select * 
> from xxx” and transform flink Table object to 
> RetractStream(StreamTableEnvironment.toRetractStream(Table, Row.class)), 
> {color:#fb0106}and it cann’t detect deletion events too.{color}{color}
>  
> {color:#172b4d}{color:#fb0106}{color:#000000}In Hudi Source code, 
> {color}{color:#fb0106}i found only in case of “MergeOnReadInputSplit has no 
> basePath property” or “MergeOnReadInputSplit.getMergeType is 
> {color}{color:#fb0106}skip_merge”, Iterator of MergeOnReadInputSplit{color} 
> will be emit deletion event. And none of previous conditions are met. 
> {color}{color}
> do i make something wrong? How do I detect deletion event in flink hudi? 
> Please offer some advices, thanks!
>  
> {color:#000000}```{color}
> {color:#000000}[{color}org.apache.hudi.table.format.mor.MergeOnReadInputFormat.java]
> @Override
> public void open(MergeOnReadInputSplit split) throws IOException {
>  this.currentReadCount = 0L;
>  this.hadoopConf = StreamerUtil.getHadoopConf();
>  if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 
> 0)) {
>  if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
>  // base file only with commit time filtering
>  this.iterator = new BaseFileOnlyFilteringIterator(
>  split.getInstantRange(),
>  this.tableState.getRequiredRowType(),
>  getReader(split.getBasePath().get(), 
> getRequiredPosWithCommitTime(this.requiredPos)));
>  } else {
>  // base file only
>  this.iterator = new 
> BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get()));
>  }
>  } else if (!split.getBasePath().isPresent()) {
>  // log files only
>  this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
>  } else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) {
>  this.iterator = new SkipMergeIterator(
>  getRequiredSchemaReader(split.getBasePath().get()),
>  getLogFileIterator(split));
>  } else if 
> (split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) {
>  this.iterator = new MergeIterator(
>  hadoopConf,
>  split,
>  this.tableState.getRowType(),
>  this.tableState.getRequiredRowType(),
>  new Schema.Parser().parse(this.tableState.getAvroSchema()),
>  new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
>  this.requiredPos,
>  getFullSchemaReader(split.getBasePath().get()));
>  } else {
>  throw new HoodieException("Unable to select an Iterator to read the Hoodie 
> MOR File Split for "
>  + "file path: " + split.getBasePath()
>  + "log paths: " + split.getLogPaths()
>  + "hoodie table path: " + split.getTablePath()
>  + "spark partition Index: " + split.getSplitNumber()
>  + "merge type: " + split.getMergeType());
>  }
>  
>  
> private Iterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) {
>  final Schema tableSchema = new 
> Schema.Parser().parse(tableState.getAvroSchema());
>  final Schema requiredSchema = new 
> Schema.Parser().parse(tableState.getRequiredAvroSchema());
>  final GenericRecordBuilder recordBuilder = new 
> GenericRecordBuilder(requiredSchema);
>  final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
>  AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
>  final Map<String, HoodieRecord<? extends HoodieRecordPayload>> logRecords =
>  FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords();
>  final Iterator<String> logRecordsKeyIterator = 
> logRecords.keySet().iterator();
>  final int[] pkOffset = tableState.getPkOffsetsInRequired();
>  // flag saying whether the pk semantics has been dropped by user specified
>  // projections. For e.g, if the pk fields are [a, b] but user only select a,
>  // then the pk semantics is lost.
>  final boolean pkSemanticLost = Arrays.stream(pkOffset).anyMatch(offset -> 
> offset == -1);
>  final LogicalType[] pkTypes = pkSemanticLost ? null : 
> tableState.getPkTypes(pkOffset);
>  final StringToRowDataConverter converter = pkSemanticLost ? null : new 
> StringToRowDataConverter(pkTypes);
>  return new Iterator<RowData>() {
>  private RowData currentRecord;
>  @Override
>  public boolean hasNext() {
>  while (logRecordsKeyIterator.hasNext()) {
>  String curAvroKey = logRecordsKeyIterator.next();
>  Option<IndexedRecord> curAvroRecord = null;
>  final HoodieRecord<?> hoodieRecord = logRecords.get(curAvroKey);
>  try {
>  curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
>  } catch (IOException e) {
>  throw new HoodieException("Get avro insert value error for key: " + 
> curAvroKey, e);
>  }
>  if (!curAvroRecord.isPresent()) {
>  // delete record found
>  if (emitDelete && !pkSemanticLost) {
>  GenericRowData delete = new 
> GenericRowData(tableState.getRequiredRowType().getFieldCount());
>  final String recordKey = hoodieRecord.getRecordKey();
>  final String[] pkFields = KeyGenUtils.extractRecordKeys(recordKey);
>  final Object[] converted = converter.convert(pkFields);
>  for (int i = 0; i < pkOffset.length; i++) {
>  delete.setField(pkOffset[i], converted[i]);
>  }
>  delete.setRowKind(RowKind.DELETE);
>  this.currentRecord = delete;
>  return true;
>  }
>  // skipping if the condition is unsatisfied
>  // continue;
>  } else {
>  GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
>  curAvroRecord.get(),
>  requiredSchema,
>  requiredPos,
>  recordBuilder);
>  currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
>  return true;
>  }
>  }
>  return false;
>  }
>  @Override
>  public RowData next() {
>  return currentRecord;
>  }
>  };
> }
> {color:#000000}```{color}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to