Hello everyone, I found a strange behavior of Flink job and would like to
share it with you
*How to reproduce:*
1) Assume Flink SQL
create table t1 (uuid string, name string) with (
'connector' = 'hudi',
'write.batch.size' = '0.0000000001' -- trigger flush after each tuple
);
insert into t1
select cast(uuid as string), cast(name as string)
from (values ('id1', 'Julian'));
2) Than try to read
select * from t1;
result: empty
What do you think is it an expected result?
Even though checkpoints are disabled I expected to see the written tuple
(id1, Julian).
*Details:*
When insertion is running there is the data file with tuple (id1, Julian)
However, when it is completed the data file is removed and log says:
org.apache.hudi.table.HoodieTable - Removing duplicate data files created
due to spark retries before committing...
The root of problem is placed in
org.apache.hudi.sink.StreamWriteOperatorCoordinator#handleEventFromOperator
simplified listing:
void handleEventFromOperator(event) {
if (event.isEndInput()) {
handleEndInputEvent(event);
} else {
executor.execute(() -> handleWriteMetaEvent(event));
}
}
there is a valid sequence of operation:
1) OperatorThread: executor.execute(() ->
handleWriteMetaEvent(WriteEvent(id1, Julian)))
2) OperatorThread: handleEndInputEvent(EndInputEvent) // WriteEvent(id1,
Julian) is not performed by ExecutorThread yet
3) OperatorThread: HoodieTable.finalizeWrite() // removes data file
4) ExecutorThread: handleWriteMetaEvent(WriteEvent(id1, Julian))
Here is example[1] where HoodieTable.finalizeWrite() happened before
handleWriteMetaEvent(WriteEvent(id1, Julian))
I believe it is a bug due to changes[2] in
org.apache.hudi.sink.StreamWriteOperatorCoordinator#handleEventFromOperator
which added performing handleEndInputEvent(event) by OperatorThread instead
of ExecutorThread
*Possible solution:*
If I'm right that this behavior is unexpected I'd like to open tiket and PR
with fix:
add something like this
void handleEventFromOperator(event) {
if (event.isEndInput()) {
executor.waitAllTasksCompleted() // <-------------------
handleEndInputEvent(event);
} else {
executor.execute(() -> handleWriteMetaEvent(event));
}
}
[1]
https://dev.azure.com/apache-hudi-ci-org/apache-hudi-ci/_build/results?buildId=9252&view=logs&j=3b6e910d-b98f-5de6-b9cb-1e5ff571f5de&t=30b5aae4-0ea0-5566-42d0-febf71a7061a&l=18884
[2]
https://github.com/apache/hudi/pull/4561/files#diff-b04b4d3e697f040d71e4fbe2b2492e188f74b8b6804b1aea3f6de897cbebec17R256
Aleksandr Trushev