[
https://issues.apache.org/jira/browse/HUDI-2163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
jiulong updated HUDI-2163:
--------------------------
Description:
first, i create a table in flink with connector=‘hudi’ and
table.type=‘MERGE_ON_READ’ and 'read.streaming.enabled' = ‘true’.
second, i write records with RowKind(INSERT or DELETE) using flink table
streaming sql.
third, in flink sql-client, the result is all right, insert event produces new
record or updates old record, deletion event deletes old record.
{color:#fb0106}but i cann’t detect deletion event in flink sql-client changelog
mode.{color}
{color:#172b4d}fourth, i tried to read hudi table using flink sql “select *
from xxx” and transform flink Table object to
RetractStream(StreamTableEnvironment.toRetractStream(Table, Row.class)),
{color:#fb0106}and it cann’t detect deletion events too.{color}{color}
{color:#172b4d}{color:#fb0106}{color:#000000}In Hudi Source code,
{color}{color:#fb0106}i found only in case of “MergeOnReadInputSplit has no
basePath property” or “MergeOnReadInputSplit.getMergeType is
{color}{color:#fb0106}skip_merge”, Iterator of MergeOnReadInputSplit{color}
will be emit deletion event. And none of previous conditions are met.
{color}{color}
do i make something wrong? How do I detect deletion event in flink hudi? Please
offer some advices, thanks!
{color:#000000}```{color}
{color:#000000}[{color}org.apache.hudi.table.format.mor.MergeOnReadInputFormat.java]
@Override
public void open(MergeOnReadInputSplit split) throws IOException {
this.currentReadCount = 0L;
this.hadoopConf = StreamerUtil.getHadoopConf();
if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() >
0)) {
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
// base file only with commit time filtering
this.iterator = new BaseFileOnlyFilteringIterator(
split.getInstantRange(),
this.tableState.getRequiredRowType(),
getReader(split.getBasePath().get(),
getRequiredPosWithCommitTime(this.requiredPos)));
} else {
// base file only
this.iterator = new
BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get()));
}
} else if (!split.getBasePath().isPresent()) {
// log files only
this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
} else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) {
this.iterator = new SkipMergeIterator(
getRequiredSchemaReader(split.getBasePath().get()),
getLogFileIterator(split));
} else if (split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE))
{
this.iterator = new MergeIterator(
hadoopConf,
split,
this.tableState.getRowType(),
this.tableState.getRequiredRowType(),
new Schema.Parser().parse(this.tableState.getAvroSchema()),
new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
this.requiredPos,
getFullSchemaReader(split.getBasePath().get()));
} else {
throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR
File Split for "
+ "file path: " + split.getBasePath()
+ "log paths: " + split.getLogPaths()
+ "hoodie table path: " + split.getTablePath()
+ "spark partition Index: " + split.getSplitNumber()
+ "merge type: " + split.getMergeType());
}
private Iterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) {
final Schema tableSchema = new
Schema.Parser().parse(tableState.getAvroSchema());
final Schema requiredSchema = new
Schema.Parser().parse(tableState.getRequiredAvroSchema());
final GenericRecordBuilder recordBuilder = new
GenericRecordBuilder(requiredSchema);
final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
final Map<String, HoodieRecord<? extends HoodieRecordPayload>> logRecords =
FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords();
final Iterator<String> logRecordsKeyIterator = logRecords.keySet().iterator();
final int[] pkOffset = tableState.getPkOffsetsInRequired();
// flag saying whether the pk semantics has been dropped by user specified
// projections. For e.g, if the pk fields are [a, b] but user only select a,
// then the pk semantics is lost.
final boolean pkSemanticLost = Arrays.stream(pkOffset).anyMatch(offset ->
offset == -1);
final LogicalType[] pkTypes = pkSemanticLost ? null :
tableState.getPkTypes(pkOffset);
final StringToRowDataConverter converter = pkSemanticLost ? null : new
StringToRowDataConverter(pkTypes);
return new Iterator<RowData>() {
private RowData currentRecord;
@Override
public boolean hasNext() {
while (logRecordsKeyIterator.hasNext()) {
String curAvroKey = logRecordsKeyIterator.next();
Option<IndexedRecord> curAvroRecord = null;
final HoodieRecord<?> hoodieRecord = logRecords.get(curAvroKey);
try {
curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
} catch (IOException e) {
throw new HoodieException("Get avro insert value error for key: " +
curAvroKey, e);
}
if (!curAvroRecord.isPresent()) {
// delete record found
if (emitDelete && !pkSemanticLost) {
GenericRowData delete = new
GenericRowData(tableState.getRequiredRowType().getFieldCount());
final String recordKey = hoodieRecord.getRecordKey();
final String[] pkFields = KeyGenUtils.extractRecordKeys(recordKey);
final Object[] converted = converter.convert(pkFields);
for (int i = 0; i < pkOffset.length; i++) {
delete.setField(pkOffset[i], converted[i]);
}
delete.setRowKind(RowKind.DELETE);
this.currentRecord = delete;
return true;
}
// skipping if the condition is unsatisfied
// continue;
} else {
GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
curAvroRecord.get(),
requiredSchema,
requiredPos,
recordBuilder);
currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
return true;
}
}
return false;
}
@Override
public RowData next() {
return currentRecord;
}
};
}
{color:#000000}```{color}
was:
first, i create a table in flink with connector=‘hudi’ and
table.type=‘MERGE_ON_READ’ and 'read.streaming.enabled' = ‘true’.
> reading hudi mor table in flink sql does not send deletion events to down
> stream
> ---------------------------------------------------------------------------------
>
> Key: HUDI-2163
> URL: https://issues.apache.org/jira/browse/HUDI-2163
> Project: Apache Hudi
> Issue Type: Bug
> Environment: hudi-flink-bundle-0.9.0-SNAPSHOT
> Reporter: jiulong
> Priority: Major
> Fix For: 0.9.0
>
>
> first, i create a table in flink with connector=‘hudi’ and
> table.type=‘MERGE_ON_READ’ and 'read.streaming.enabled' = ‘true’.
> second, i write records with RowKind(INSERT or DELETE) using flink table
> streaming sql.
> third, in flink sql-client, the result is all right, insert event produces
> new record or updates old record, deletion event deletes old record.
> {color:#fb0106}but i cann’t detect deletion event in flink sql-client
> changelog mode.{color}
> {color:#172b4d}fourth, i tried to read hudi table using flink sql “select *
> from xxx” and transform flink Table object to
> RetractStream(StreamTableEnvironment.toRetractStream(Table, Row.class)),
> {color:#fb0106}and it cann’t detect deletion events too.{color}{color}
>
> {color:#172b4d}{color:#fb0106}{color:#000000}In Hudi Source code,
> {color}{color:#fb0106}i found only in case of “MergeOnReadInputSplit has no
> basePath property” or “MergeOnReadInputSplit.getMergeType is
> {color}{color:#fb0106}skip_merge”, Iterator of MergeOnReadInputSplit{color}
> will be emit deletion event. And none of previous conditions are met.
> {color}{color}
> do i make something wrong? How do I detect deletion event in flink hudi?
> Please offer some advices, thanks!
>
> {color:#000000}```{color}
> {color:#000000}[{color}org.apache.hudi.table.format.mor.MergeOnReadInputFormat.java]
> @Override
> public void open(MergeOnReadInputSplit split) throws IOException {
> this.currentReadCount = 0L;
> this.hadoopConf = StreamerUtil.getHadoopConf();
> if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() >
> 0)) {
> if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
> // base file only with commit time filtering
> this.iterator = new BaseFileOnlyFilteringIterator(
> split.getInstantRange(),
> this.tableState.getRequiredRowType(),
> getReader(split.getBasePath().get(),
> getRequiredPosWithCommitTime(this.requiredPos)));
> } else {
> // base file only
> this.iterator = new
> BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get()));
> }
> } else if (!split.getBasePath().isPresent()) {
> // log files only
> this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
> } else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) {
> this.iterator = new SkipMergeIterator(
> getRequiredSchemaReader(split.getBasePath().get()),
> getLogFileIterator(split));
> } else if
> (split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) {
> this.iterator = new MergeIterator(
> hadoopConf,
> split,
> this.tableState.getRowType(),
> this.tableState.getRequiredRowType(),
> new Schema.Parser().parse(this.tableState.getAvroSchema()),
> new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
> this.requiredPos,
> getFullSchemaReader(split.getBasePath().get()));
> } else {
> throw new HoodieException("Unable to select an Iterator to read the Hoodie
> MOR File Split for "
> + "file path: " + split.getBasePath()
> + "log paths: " + split.getLogPaths()
> + "hoodie table path: " + split.getTablePath()
> + "spark partition Index: " + split.getSplitNumber()
> + "merge type: " + split.getMergeType());
> }
>
>
> private Iterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) {
> final Schema tableSchema = new
> Schema.Parser().parse(tableState.getAvroSchema());
> final Schema requiredSchema = new
> Schema.Parser().parse(tableState.getRequiredAvroSchema());
> final GenericRecordBuilder recordBuilder = new
> GenericRecordBuilder(requiredSchema);
> final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
> AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
> final Map<String, HoodieRecord<? extends HoodieRecordPayload>> logRecords =
> FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords();
> final Iterator<String> logRecordsKeyIterator =
> logRecords.keySet().iterator();
> final int[] pkOffset = tableState.getPkOffsetsInRequired();
> // flag saying whether the pk semantics has been dropped by user specified
> // projections. For e.g, if the pk fields are [a, b] but user only select a,
> // then the pk semantics is lost.
> final boolean pkSemanticLost = Arrays.stream(pkOffset).anyMatch(offset ->
> offset == -1);
> final LogicalType[] pkTypes = pkSemanticLost ? null :
> tableState.getPkTypes(pkOffset);
> final StringToRowDataConverter converter = pkSemanticLost ? null : new
> StringToRowDataConverter(pkTypes);
> return new Iterator<RowData>() {
> private RowData currentRecord;
> @Override
> public boolean hasNext() {
> while (logRecordsKeyIterator.hasNext()) {
> String curAvroKey = logRecordsKeyIterator.next();
> Option<IndexedRecord> curAvroRecord = null;
> final HoodieRecord<?> hoodieRecord = logRecords.get(curAvroKey);
> try {
> curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
> } catch (IOException e) {
> throw new HoodieException("Get avro insert value error for key: " +
> curAvroKey, e);
> }
> if (!curAvroRecord.isPresent()) {
> // delete record found
> if (emitDelete && !pkSemanticLost) {
> GenericRowData delete = new
> GenericRowData(tableState.getRequiredRowType().getFieldCount());
> final String recordKey = hoodieRecord.getRecordKey();
> final String[] pkFields = KeyGenUtils.extractRecordKeys(recordKey);
> final Object[] converted = converter.convert(pkFields);
> for (int i = 0; i < pkOffset.length; i++) {
> delete.setField(pkOffset[i], converted[i]);
> }
> delete.setRowKind(RowKind.DELETE);
> this.currentRecord = delete;
> return true;
> }
> // skipping if the condition is unsatisfied
> // continue;
> } else {
> GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
> curAvroRecord.get(),
> requiredSchema,
> requiredPos,
> recordBuilder);
> currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
> return true;
> }
> }
> return false;
> }
> @Override
> public RowData next() {
> return currentRecord;
> }
> };
> }
> {color:#000000}```{color}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)