[ 
https://issues.apache.org/jira/browse/HUDI-4741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

voon updated HUDI-4741:
-----------------------
    Reviewers: Teng Huo  (was: Teng Huo, 冯健)

> Deadlock when restarting failed TM in AbstractStreamWriteFunction
> -----------------------------------------------------------------
>
>                 Key: HUDI-4741
>                 URL: https://issues.apache.org/jira/browse/HUDI-4741
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: flink
>            Reporter: voon
>            Assignee: voon
>            Priority: Major
>             Fix For: 0.12.1
>
>
> h1. Summary of Events
>  # TM heartbeat not sent to JM (Can be triggered by killing a container), JM 
> kills the TM/container
>  # JM restarts the container, but the restarting code is not handled 
> properly, causing there to be a deadlock
>  # Deadlock causes instantToWrite() to loop for 10 minutes (default Flink 
> checkpoint timeout), causing a instant initialization timeout error
>  # JM is restarted
>  # JM restore state from previously successful checkpoint
>  # Ckp metadata path is tainted with multiple {_}INFLIGHT{_}s
>  # Synchronisation issue occurs if TM executes *`lastPendingInstant()`* 
> before JM executes *`startInstant()`*
>  # Single commit multi instant issue occurs
>  # When tainted TM reads obtained the wrong _INFLIGHT_ instant, it will start 
> a new write cycle with the correct _INFLIGHT_ instant while a checkpoint is 
> being performed.
>  # *`reconcileAgainstMarkers()`* will delete files that tainted TMs are 
> writing to with the correct _INFLIGHT_ instant in the next cycle, causing 
> FileNotFoundException, _COLUMN_ state errors and parquet corruption errors.
>  
> h1. Code for reproducing
> h2. Flink SQL Code
> {code:java}
> CREATE TABLE input_table (
>     `val`               STRING
>     ,`event_time`       TIMESTAMP(3)
>     ,`partition`        BIGINT
>     ,`offset`           BIGINT
> ) WITH (
>     'connector' = 'datagen',
>     'fields.val.length' = '99999',
>     'rows-per-second' = '15000'
> );CREATE TABLE test_hudi
> (
>     `val`                 STRING
>     ,`event_time`       TIMESTAMP(3)
>     ,`partition`        BIGINT
>     ,`offset`           BIGINT
>     ,`dt`               STRING
>     ,`hh`               STRING
> ) PARTITIONED BY (dt, hh)
> WITH (
>     'connector' = 'hudi',
>     'path' = 'hdfs://jm_tm_sync_error/',
>     'table.type' = 'COPY_ON_WRITE',
>     'write.operation' = 'insert',
>     'hoodie.parquet.small.file.limit' = '104857600',
>     'hoodie.parquet.max.file.size' = '268435456',
>     'hoodie.datasource.write.recordkey.field' = 'partition,offset',
>     'hoodie.datasource.write.hive_style_partitioning' = 'true',
>     'hoodie.datasource.write.partitionpath.field' = 'dt,hh',
>     'write.bulk_insert.sort_input' = 'false',
>     'index.bootstrap.enabled' = 'false',
>     'index.state.ttl' = '60',
>     'index.type' = 'FLINK_STATE',
>     'hoodie.datasource.write.keygenerator.class' = 
> 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
>     'write.tasks' = '8',
>     'hive_sync.enable' = 'false'
> );insert into test_hudi
> select  `val`
>         ,`event_time`
>         ,`partition`
>         ,`offset`
>         ,DATE_FORMAT(event_time, 'yyyy-MM-dd')
>         ,DATE_FORMAT(event_time, 'HH')
>  from input_table; {code}
>  
> h2. Advanced Properties
> {code:java}
> execution.checkpointing.interval=60000ms {code}
>  
> h2. Job Profile Properties
> {code:java}
> flink.version=1.13.14
> default.parallelism=8
> restart.from.savepoint=true
> sql.job.mode=normal
> running.mode=streaming
> slots.per.tm=2
> cpu.per.tm=2vcore
> memory.per.tm=6G
> jvm.heap.ratio=70% {code}
>  
>  
> h1. Issues
> There are 2 main issues here:
>  # *TM failing + starting a TM in a new container causing deadlock*
>  # *Single commit multi-instant causing various base file parquet errors*
>  ** java.io.FileNotFoundException: File does not exist: 
> 20220712165157207.parquet (inode 1234567890) Holder 
> DFSClient_NONMAPREDUCE_1111111111_22 does not have any open files.
>  ** java.io.IOException: The file being written is in an invalid state. 
> Probably caused by an error thrown previously. Current state: COLUMN
>  ** java.lang.RuntimeException: 20220805210423582.parquet is not a Parquet 
> file. expected magic number at tail [80, 65, 82, 49] but found [1, 0, -38, 2] 
>  
> h1. TM failing + starting a TM in a new container causing deadlock 
>  # When a TM fails + starting and restoring a TM in a new container creates a 
> deadlock situation
>  ** TM is waiting for JM to create a new _INFLIGHT_ instant, and the
>  ** JM is waiting for TM to send a success WriteMetadataEvent
>  # The deadlock above will cause either of the errors below:
>  ** org.apache.hudi.exception.HoodieException: Timeout(601000ms) while 
> waiting for instant initialize
>  ** org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint 
> expired before completing.
>  # This will trigger org.apache.flink.runtime.jobmaster.JobMaster [] - Trying 
> to recover from a global failure.
>  # JM will try to restore itself from the last successful checkpoint
>  # This will cause the next issue (illustrated below)
> h2. Root cause
> When restoring the TM, *`AbstractStreamWriteFunction#initializeState()`* will 
> attempt to restore the state of the TM. At this stage, 
> *`this.currentInstant`* will be initialized by invoking 
> {*}`lastPendingInstant()`{*}, in which the ckp metadata path will be loaded 
> and a _INFLIGHT_ instant is returned.
>  
> When invoking {*}`instantToWrite()`{*}, 
> *`instant.equals(this.currentInstant)`* will always be true as the local 
> *`instant`* is equal to {*}`this.currentInstant`{*}. Hence, the current 
> implementation will be stuck in an infinite loop as 
> {*}`lastPendingInstant()`{*}, which governs both *`instant`* and 
> *`this.currentInstant`* will always return the same value as the state of the 
> ckp metadata path is never changed. 
>  
> This is so as JM is waiting for the TM to finish writing for the batch for 
> the _INFLIGHT_ instant. At the same time TM is waiting for JM to create a new 
> _INFLIGHT_ instant, hence the deadlock. 
>  
> The fix is to add additional logic to handle such a case to ensure that a TM 
> can obtain a correct _INFLIGHT_ instant when being recovering.
>  
> h1. Single commit multi-instant
>  # JM restore state from previously successful checkpoint
>  # Ckp metadata path is tainted with multiple {_}INFLIGHT{_}s
>  # Synchronisation issue occurs if TM executes *`lastPendingInstant()`* 
> before JM executes *`startInstant()`*
>  # Single commit multi instant issue occurs
>  # When tainted TM reads obtained the wrong _INFLIGHT_ instant, it will start 
> a new write cycle with the correct _INFLIGHT_ instant while a checkpoint is 
> being performed.
>  # *`reconcileAgainstMarkers()`* will delete files that tainted TMs are 
> writing to with the correct _INFLIGHT_ instant in the next cycle, causing 
> FileNotFoundException, _COLUMN_ state errors and parquet corruption errors.
>  
> This is caused by the synchronisation issue due to Task Manager (TM) running 
> *`ckpMetaData#lastPendingInstant()`* before the Job Manager (JM) executes 
> {*}`StreamWriteOperatorCoordinator#startInstant()`{*}, causing the TM to 
> fetch an old instantTime. 
>  
> Since _ABORTED_ statuses are not written to the metadata path, TM will fetch 
> a previously aborted instant, thinking that it is still {_}INFLIGHT{_}.
>  
> Hence, the new commit will have files of 2 instants should there be a restart 
> AND the timeline contains a _INCOMPLETE_ instant 
> ({_}ABORTED{_}/{_}INFLIGHT{_}).
>  
> Please refer to the example below if a RESTART + job restore is triggered, 
> causing a tainted ckp metadata path to be produced. A tainted ckp metadata 
> path is a path in which there are more than 1 INFLIGHT files.
>  
> h2. CORRECT Instant fetched after JM restores from checkpoint
> After the job restarts, commit *`20220828165937711`* is created, and has 
> successfully completed.
>  
> If TM invokes *`ckpMetaData#lastPendingInstant()`* AFTER JM runs 
> {*}`StreamWriteOperatorCoordinator#startInstant()`{*}, it will read the NEW 
> instant on the .aux hdfs path.
>  
> *`StreamWriteOperatorCoordinator#startInstant()`* will create the instant 
> {*}`20220828170053510`{*}, causing there to be 2 _INFLIGHT_ commits. Since 
> the most recent _INCOMPLETE_ instant is fetched, the correct instant, which 
> is *`20220828170053510`* will be returned.
>  
> The .aux hdfs path will look like this when 
> *`ckpMetaData#lastPendingInstant()`* is invoked.
>  
> {code:java}
> [
>   Ckp{instant='20220828164426755', state='INFLIGHT'}, 
>   Ckp{instant='20220828165937711', state='COMPLETED'}, 
>   Ckp{instant='20220828170053510', state='INFLIGHT'}
> ] {code}
>  
>  
> h2. INCORRECT Instant fetched after JM restores from checkpoint
> If the TM invokes *`ckpMetaData#lastPendingInstant()`* BEFORE JM runs 
> {*}`StreamWriteOperatorCoordinator#startInstant()`{*}, it will read the OLD 
> instant on the .aux hdfs path.
> The .aux hdfs path will look like this when 
> *`ckpMetaData#lastPendingInstant()`* is invoked.
>  
> {code:java}
> [
>   Ckp{instant='20220828164123688', state='COMPLETED'}, 
>   Ckp{instant='20220828164426755', state='INFLIGHT'}, 
>   Ckp{instant='20220828165937711', state='COMPLETED'}
> ] {code}
>  
> Since the new instant file has not been created yet, the most _INCOMPLETE_ is 
> fetched, which is {*}`20220828164426755`{*}.
> In such a case, when there is a possibility that different TMs can obtain 
> different instants time from {*}`ckpMetaData#lastPendingInstant()`{*}, a 
> single commit might contain multiple instants.
>  
> h2. FileNotFoundException and various parquet corruption errors
> Building upon the example in {_}Single commit multiple instant error{_}, when 
> *`AppendWriteFunction#flushData()`* is invoked, the current *`writerHelper`* 
> will be cleaned up by closing all existing file handles. The *`writerHelper`* 
> will then be set to {_}NULL{_}. 
> At this point, Hudi is performing a checkpoint and is about to write to 
> Hudi's timeline by creating {*}`{*}.commit`* file with the commit's metadata.
> Suppose that a TM (Let's call this the {*}tainted TM{*}) is using the an old 
> instant in the tainted ckp metadata path as such: ({*}`20220828164426755`{*} 
> will be used)
> {code:java}
> [
>   Ckp{instant='20220828164123688', state='COMPLETED'}, 
>   Ckp{instant='20220828164426755', state='INFLIGHT'}, 
>   Ckp{instant='20220828165937711', state='COMPLETED'},
>   Ckp{instant='20220828170053510', state='INFLIGHT'}
> ] {code}
>  
> Once *`AppendWriteFunction#flushData()`* has completed, on the next 
> invocation of {*}`AppendWriteFunction#processElement()`{*}, 
> *`AppendWriteFunction#initWriterHelper()`* will be invoked, causing a new 
> writerHelper with the instant *`20220828170053510`* to be created, and 
> writing will begin to files of this instant.
>  
> While all this is happening, Flink is performing a checkpoint and is writing 
> to Hudi's timeline by creating a {*}`{*}.commit`* file with the commit's 
> metadata.
> Before it can create and write to the {*}`{*}.commit`{*}, 
> *`HoodieTable#finalizeWrite()`{*} will be invoked. 
>  
> Subsequently, *`HoodieTable#reconcileAgainstMarkers()`* is invoked. What this 
> method does is to cleanup partially written data-files due to failed (but 
> succeeded on retry) tasks. (SUPPOSEDLY)
> Since the *tainted TM* is using the instant that is being committed to the 
> Hudi timeline, the files that it are currently being written to will be 
> cleaned up as the marker files can be found, but data files are not found in 
> the {*}`HoodieWriteStat`{*}. 
>  
> In essence, the *tainted TM* is starting a write cycle when it should not be 
> doing so with an instant that is being committed Hudi timeline, hence, 
> causing files to be deleted/corrupted/being in the wrong state.
>  
> Hence, when the JM is performing a checkpoint + commit, the *tainted TM* 
> might try to close the file handle if it has reached the 
> {*}`parquet.max.filesize`{*}. When trying to close the file handle, this file 
> might have already been deleted by 
> {*}`HoodieTable#reconcileAgainstMarkers()`{*}, causing the 
> {*}`java.io.FileNotFoundException`{*}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to