[
https://issues.apache.org/jira/browse/HUDI-4741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
voon updated HUDI-4741:
-----------------------
Description: (was: # Summary of Events
1. TM heartbeat not sent to JM (Can be triggered by killing a container), JM
kills the TM/container
2. JM restarts the container, but the restarting code is not handled properly,
causing there to be a deadlock
3. Deadlock causes instantToWrite() to loop for 10 minutes (default Flink
checkpoint timeout), causing a instant initialization timeout error
4. JM is restarted
5. JM restore state from previously successful checkpoint
6. Ckp metadata path is tainted with multiple INFLIGHTs
7. Synchronisation issue occurs if TM executes lastPendingInstant() before JM
executes startInstant()
8. Single commit multi instant issue occurs
9. When tainted TM reads obtained the wrong INFLIGHT instant, it will start a
new write cycle with the correct INFLIGHT instant while a checkpoint is being
performed.
10. reconcileAgainstMarkers() will delete files that tainted TMs are writing to
with the correct INFLIGHT instant in the next cycle, causing
FileNotFoundException, COLUMN state errors and parquet corruption errors.
# Code for reproducing
## Flink SQL Code
```sql
CREATE TABLE input_table (
`val` STRING
,`event_time` TIMESTAMP(3)
,`partition` BIGINT
,`offset` BIGINT
) WITH (
'connector' = 'datagen',
'fields.val.length' = '99999',
'rows-per-second' = '15000'
);
CREATE TABLE test_hudi
(
`val` STRING
,`event_time` TIMESTAMP(3)
,`partition` BIGINT
,`offset` BIGINT
,`dt` STRING
,`hh` STRING
) PARTITIONED BY (dt, hh)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://jm_tm_sync_error/',
'table.type' = 'COPY_ON_WRITE',
'write.operation' = 'insert',
'hoodie.parquet.small.file.limit' = '104857600',
'hoodie.parquet.max.file.size' = '268435456',
'hoodie.datasource.write.recordkey.field' = 'partition,offset',
'hoodie.datasource.write.hive_style_partitioning' = 'true',
'hoodie.datasource.write.partitionpath.field' = 'dt,hh',
'write.bulk_insert.sort_input' = 'false',
'index.bootstrap.enabled' = 'false',
'index.state.ttl' = '60',
'index.type' = 'FLINK_STATE',
'hoodie.datasource.write.keygenerator.class' =
'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
'write.tasks' = '8',
'hive_sync.enable' = 'false'
);
insert into test_hudi
select `val`
,`event_time`
,`partition`
,`offset`
,DATE_FORMAT(event_time, 'yyyy-MM-dd')
,DATE_FORMAT(event_time, 'HH')
from input_table;
```
## Advanced Properties
```properties
execution.checkpointing.interval=60000ms
```
## Job Profile Properties
```properties
flink.version=1.13.14
default.parallelism=8
restart.from.savepoint=true
sql.job.mode=normal
running.mode=streaming
slots.per.tm=2
cpu.per.tm=2vcore
memory.per.tm=6G
jvm.heap.ratio=70%
```
# Issues
There are 2 main issues here:
1. TM failing + starting a TM in a new container causing deadlock
2. Single commit multi-instant causing various basefile parquet errors
- java.io.FileNotFoundException: File does not exist:
20220712165157207.parquet (inode 1234567890) Holder
DFSClient_NONMAPREDUCE_1111111111_22 does not have any open files.
- java.io.IOException: The file being written is in an invalid state.
Probably caused by an error thrown previously. Current state: COLUMN
- java.lang.RuntimeException: 20220805210423582.parquet is not a
Parquet file. expected magic number at tail [80, 65, 82, 49] but found [1, 0,
-38, 2]
## TM failing + starting a TM in a new container causing deadlock
1. When a TM fails + starting and restoring a TM in a new container creates a
deadlock situation
- TM is waiting for JM to create a new INFLIGHT instant, and the
- JM is waiting for TM to send a success WriteMetadataEvent
2. The deadlock above will cause either of the errors below:
- org.apache.hudi.exception.HoodieException: Timeout(601000ms) while
waiting for instant initialize
- org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint
expired before completing.
3. This will trigger org.apache.flink.runtime.jobmaster.JobMaster [] - Trying
to recover from a global failure.
4. JM will try to restore itself from the last successful checkpoint
5. This will cause the next issue (illustrated below)
### Root cause
When restoring the TM, `AbstractStreamWriteFunction#initializeState()` will
attempt to restore the state of the TM. At this stage, `this.currentInstant`
will be initialized by invoking `lastPendingInstant()`, in which the ckp
metadata path will be loaded and a INFLIGHT instant is returned.
When invoking `instantToWrite()`, `instant.equals(this.currentInstant)` will
always be true as the local `instant` is equal to `this.currentInstant`. Hence,
the current implementation will be stuck in an infinite loop as
`lastPendingInstant()`, which governs both `instant` and `this.currentInstant`
will always return the same value.
This is so as JM is waiting for the TM to finish writing for the batch for the
INFLIGHT instant. At the same time TM is waiting for JM to create a new
INFLIGHT instant, hence the deadlock.
The fix is to add additional logic to handle such a case to ensure that a TM
can obtain a correct INFLIGHT instant when being recovering.
## Single commit multi-instant
A single commit is shown to have written files to a Hudi partition with
multiple instants. This can be shown by inspecting the commit files on Hudi's
timeline:
```json
"fileIdAndRelativePaths" : {
"d51b5af6-f398-47a3-a72e-ba3eedae8966-4" :
"dt=2022-08-27/hh=03/d51b5af6-f398-47a3-a72e-ba3eedae8966-4_19-32-2_20220827112141869.parquet",
"e632d9f7-35ba-4c97-92e6-4af8cdf2636b-4" :
"dt=2022-08-27/hh=03/e632d9f7-35ba-4c97-92e6-4af8cdf2636b-4_27-32-2_20220827112141869.parquet",
"70b490c2-f73a-4fa9-94f7-a08222ed88aa-1" :
"dt=2022-08-27/hh=03/70b490c2-f73a-4fa9-94f7-a08222ed88aa-1_26-32-2_20220827110921416.parquet",
"70b490c2-f73a-4fa9-94f7-a08222ed88aa-2" :
"dt=2022-08-27/hh=03/70b490c2-f73a-4fa9-94f7-a08222ed88aa-2_26-32-2_20220827110921416.parquet",
"0d388f0c-1e4b-4837-97be-bf41de53adf8-9" :
"dt=2022-08-27/hh=03/0d388f0c-1e4b-4837-97be-bf41de53adf8-9_3-32-2_20220827112141869.parquet",
"2036e1d3-3326-4d30-aaeb-1a40abb1fe3b-16" :
"dt=2022-08-27/hh=03/2036e1d3-3326-4d30-aaeb-1a40abb1fe3b-16_15-32-2_20220827112141869.parquet",
"2036e1d3-3326-4d30-aaeb-1a40abb1fe3b-17" : "dt=2022-08-27/hh=03/
}
```
1. JM restore state from previously successful checkpoint
2. Ckp metadata path is tainted with multiple INFLIGHTs
3. Synchronisation issue occurs if TM executes lastPendingInstant() before JM
executes startInstant()
4. Single commit multi instant issue occurs
5. When tainted TM reads obtained the wrong INFLIGHT instant, it will start a
new write cycle with the correct INFLIGHT instant while a checkpoint is being
performed.
6. reconcileAgainstMarkers() will delete files that tainted TMs are writing to
with the correct INFLIGHT instant in the next cycle, causing
FileNotFoundException, COLUMN state errors and parquet corruption errors.
Confirmed the synchronisation error due to TM running
`ckpMetaData#lastPendingInstant()` before the Job Manager (JM) executes
`StreamWriteOperatorCoordinator#startInstant()`, causing the Task Manager (TM)
to fetch an old instantTime.
Since `ABORTED` statuses are not written to the metadata path, TM will fetch a
previously aborted instant, thinking that it is still inflight.
Hence, the new commit will have files of 2 instants should there be a restart
AND the timeline contains a INCOMPLETE instant (ABORTED/INFLIGHT).
Please refer to the example below if a RESTART + job restore is triggered,
causing a tainted ckp metadata path to be produced. A tainted ckp metadata path
is a path in which there are more than 1 INFLIGHT files.
### Correct Example
After the job restarts, commit `20220828165937711` is created, and has
successfully completed.
If TM invokes `ckpMetaData#lastPendingInstant()` AFTER JM runs
`StreamWriteOperatorCoordinator#startInstant()`, it will read the **NEW**
instant on the .aux hdfs path.
`StreamWriteOperatorCoordinator#startInstant()` will create the instant
`20220828170053510`, causing there to be 2 INFLIGHT commits. Since the most
recent `INCOMPLETE` instant is fetched, the correct instant, which is
`20220828170053510` will be returned.
The .aux hdfs path will look like this when `ckpMetaData#lastPendingInstant()`
is invoked.
```markdown
[
Ckp{instant='20220828164426755', state='INFLIGHT'},
Ckp{instant='20220828165937711', state='COMPLETED'},
Ckp{instant='20220828170053510', state='INFLIGHT'}
]
```
### Incorrect example
If the TM invokes `ckpMetaData#lastPendingInstant()` **BEFORE** JM runs
`StreamWriteOperatorCoordinator#startInstant()`, it will read the **OLD**
instant on the .aux hdfs path.
The .aux hdfs path will look like this when `ckpMetaData#lastPendingInstant()`
is invoked.
```markdown
[
Ckp{instant='20220828164123688', state='COMPLETED'},
Ckp{instant='20220828164426755', state='INFLIGHT'},
Ckp{instant='20220828165937711', state='COMPLETED'}
]
```
Since the new instant file has not been created yet, the most `INCOMPLETE` is
fetched, which is `20220828164426755`.
In such a case, when there is a possibility that different TMs can obtain
different instants time from `ckpMetaData#lastPendingInstant()`, a single
commit might contain multiple instants.
### FileNotFoundException
Building upon the example in `Single commit multiple instant error`, when
`AppendWriteFunction#flushData()` is invoked, the current `writerHelper` will
be cleaned up by closing all existing file handles. The `writerHelper` will
then be set to NULL.
At this point, Hudi is performing a checkpoint and is about to write to Hudi's
timeline by creating ` *.commit` file with the commit's metadata.
Suppose that a TM (Let's call this the **tainted TM**) is using the an old
instant in the tainted ckp metadata path as such:
(`20220828164426755` will be used)
```markdown
[
Ckp{instant='20220828164123688', state='COMPLETED'},
Ckp{instant='20220828164426755', state='INFLIGHT'},
Ckp{instant='20220828165937711', state='COMPLETED'},
Ckp{instant='20220828170053510', state='INFLIGHT'}
]
```
Once `AppendWriteFunction#flushData()` has completed, on the next invocation of
`AppendWriteFunction#processElement()`,
`AppendWriteFunction#initWriterHelper()` will be invoked, causing a new
writerHelper with the instant `20220828170053510` to be created, and writing
will begin to files of this instant.
While all this is happening, Flink is performing a checkpoint and is writing to
Hudi's timeline by creating a `*.commit` file with the commit's metadata.
Before it can create and write to the `*.commit`, `HoodieTable#finalizeWrite()`
will be invoked.
Subsequently, `HoodieTable#reconcileAgainstMarkers()` is invoked. What this
method does is to cleanup partially written data-files due to failed (but
succeeded on retry) tasks. (SUPPOSEDLY)
Since the **tainted TM** is using the instant that is being committed to the
Hudi timeline, the files that it are currently being written to will be cleaned
up as the marker files can be found, but data files are not found in the
`HoodieWriteStat`.
Hence, when the JM is performing a checkpoint + commit, the **tainted TM**
might try to close the file handle if it has reached the
`parquet.max.filesize`. When trying to close the file handle, this file might
have already been deleted by `HoodieTable#reconcileAgainstMarkers()`, causing
the `java.io.FileNotFoundException`.
)
> Deadlock when restarting failed TM in AbstractStreamWriteFunction
> -----------------------------------------------------------------
>
> Key: HUDI-4741
> URL: https://issues.apache.org/jira/browse/HUDI-4741
> Project: Apache Hudi
> Issue Type: Bug
> Components: flink
> Reporter: voon
> Priority: Major
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)