Thanks for the awesome analysis, you are right, after patch [2] the endinput event and metadata event may lost the execution sequence, which caused the problem here. Feel free to fire a JIRA ticket to fix it :)
Best, Danny Александр Трушев <trushev.a...@gmail.com> 于2022年6月14日周二 17:43写道: > > Hello everyone, I found a strange behavior of Flink job and would like to > share it with you > > *How to reproduce:* > 1) Assume Flink SQL > > create table t1 (uuid string, name string) with ( > 'connector' = 'hudi', > 'write.batch.size' = '0.0000000001' -- trigger flush after each tuple > ); > insert into t1 > select cast(uuid as string), cast(name as string) > from (values ('id1', 'Julian')); > > > 2) Than try to read > > select * from t1; > > result: empty > > What do you think is it an expected result? > Even though checkpoints are disabled I expected to see the written tuple > (id1, Julian). > > *Details:* > > When insertion is running there is the data file with tuple (id1, Julian) > However, when it is completed the data file is removed and log says: > org.apache.hudi.table.HoodieTable - Removing duplicate data files created > due to spark retries before committing... > > The root of problem is placed in > org.apache.hudi.sink.StreamWriteOperatorCoordinator#handleEventFromOperator > simplified listing: > > void handleEventFromOperator(event) { > if (event.isEndInput()) { > handleEndInputEvent(event); > } else { > executor.execute(() -> handleWriteMetaEvent(event)); > } > } > > there is a valid sequence of operation: > 1) OperatorThread: executor.execute(() -> > handleWriteMetaEvent(WriteEvent(id1, Julian))) > 2) OperatorThread: handleEndInputEvent(EndInputEvent) // WriteEvent(id1, > Julian) is not performed by ExecutorThread yet > 3) OperatorThread: HoodieTable.finalizeWrite() // removes data file > 4) ExecutorThread: handleWriteMetaEvent(WriteEvent(id1, Julian)) > > Here is example[1] where HoodieTable.finalizeWrite() happened before > handleWriteMetaEvent(WriteEvent(id1, Julian)) > > I believe it is a bug due to changes[2] in > org.apache.hudi.sink.StreamWriteOperatorCoordinator#handleEventFromOperator > which added performing handleEndInputEvent(event) by OperatorThread instead > of ExecutorThread > > *Possible solution:* > > If I'm right that this behavior is unexpected I'd like to open tiket and PR > with fix: > add something like this > > void handleEventFromOperator(event) { > if (event.isEndInput()) { > executor.waitAllTasksCompleted() // <------------------- > handleEndInputEvent(event); > } else { > executor.execute(() -> handleWriteMetaEvent(event)); > } > } > > [1] > https://dev.azure.com/apache-hudi-ci-org/apache-hudi-ci/_build/results?buildId=9252&view=logs&j=3b6e910d-b98f-5de6-b9cb-1e5ff571f5de&t=30b5aae4-0ea0-5566-42d0-febf71a7061a&l=18884 > [2] > https://github.com/apache/hudi/pull/4561/files#diff-b04b4d3e697f040d71e4fbe2b2492e188f74b8b6804b1aea3f6de897cbebec17R256 > > Aleksandr Trushev