[
https://issues.apache.org/jira/browse/HUDI-4741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
voon updated HUDI-4741:
-----------------------
Reviewers: Teng Huo (was: Teng Huo, 冯健)
> Deadlock when restarting failed TM in AbstractStreamWriteFunction
> -----------------------------------------------------------------
>
> Key: HUDI-4741
> URL: https://issues.apache.org/jira/browse/HUDI-4741
> Project: Apache Hudi
> Issue Type: Bug
> Components: flink
> Reporter: voon
> Assignee: voon
> Priority: Major
> Fix For: 0.12.1
>
>
> h1. Summary of Events
> # TM heartbeat not sent to JM (Can be triggered by killing a container), JM
> kills the TM/container
> # JM restarts the container, but the restarting code is not handled
> properly, causing there to be a deadlock
> # Deadlock causes instantToWrite() to loop for 10 minutes (default Flink
> checkpoint timeout), causing a instant initialization timeout error
> # JM is restarted
> # JM restore state from previously successful checkpoint
> # Ckp metadata path is tainted with multiple {_}INFLIGHT{_}s
> # Synchronisation issue occurs if TM executes *`lastPendingInstant()`*
> before JM executes *`startInstant()`*
> # Single commit multi instant issue occurs
> # When tainted TM reads obtained the wrong _INFLIGHT_ instant, it will start
> a new write cycle with the correct _INFLIGHT_ instant while a checkpoint is
> being performed.
> # *`reconcileAgainstMarkers()`* will delete files that tainted TMs are
> writing to with the correct _INFLIGHT_ instant in the next cycle, causing
> FileNotFoundException, _COLUMN_ state errors and parquet corruption errors.
>
> h1. Code for reproducing
> h2. Flink SQL Code
> {code:java}
> CREATE TABLE input_table (
> `val` STRING
> ,`event_time` TIMESTAMP(3)
> ,`partition` BIGINT
> ,`offset` BIGINT
> ) WITH (
> 'connector' = 'datagen',
> 'fields.val.length' = '99999',
> 'rows-per-second' = '15000'
> );CREATE TABLE test_hudi
> (
> `val` STRING
> ,`event_time` TIMESTAMP(3)
> ,`partition` BIGINT
> ,`offset` BIGINT
> ,`dt` STRING
> ,`hh` STRING
> ) PARTITIONED BY (dt, hh)
> WITH (
> 'connector' = 'hudi',
> 'path' = 'hdfs://jm_tm_sync_error/',
> 'table.type' = 'COPY_ON_WRITE',
> 'write.operation' = 'insert',
> 'hoodie.parquet.small.file.limit' = '104857600',
> 'hoodie.parquet.max.file.size' = '268435456',
> 'hoodie.datasource.write.recordkey.field' = 'partition,offset',
> 'hoodie.datasource.write.hive_style_partitioning' = 'true',
> 'hoodie.datasource.write.partitionpath.field' = 'dt,hh',
> 'write.bulk_insert.sort_input' = 'false',
> 'index.bootstrap.enabled' = 'false',
> 'index.state.ttl' = '60',
> 'index.type' = 'FLINK_STATE',
> 'hoodie.datasource.write.keygenerator.class' =
> 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
> 'write.tasks' = '8',
> 'hive_sync.enable' = 'false'
> );insert into test_hudi
> select `val`
> ,`event_time`
> ,`partition`
> ,`offset`
> ,DATE_FORMAT(event_time, 'yyyy-MM-dd')
> ,DATE_FORMAT(event_time, 'HH')
> from input_table; {code}
>
> h2. Advanced Properties
> {code:java}
> execution.checkpointing.interval=60000ms {code}
>
> h2. Job Profile Properties
> {code:java}
> flink.version=1.13.14
> default.parallelism=8
> restart.from.savepoint=true
> sql.job.mode=normal
> running.mode=streaming
> slots.per.tm=2
> cpu.per.tm=2vcore
> memory.per.tm=6G
> jvm.heap.ratio=70% {code}
>
>
> h1. Issues
> There are 2 main issues here:
> # *TM failing + starting a TM in a new container causing deadlock*
> # *Single commit multi-instant causing various base file parquet errors*
> ** java.io.FileNotFoundException: File does not exist:
> 20220712165157207.parquet (inode 1234567890) Holder
> DFSClient_NONMAPREDUCE_1111111111_22 does not have any open files.
> ** java.io.IOException: The file being written is in an invalid state.
> Probably caused by an error thrown previously. Current state: COLUMN
> ** java.lang.RuntimeException: 20220805210423582.parquet is not a Parquet
> file. expected magic number at tail [80, 65, 82, 49] but found [1, 0, -38, 2]
>
> h1. TM failing + starting a TM in a new container causing deadlock
> # When a TM fails + starting and restoring a TM in a new container creates a
> deadlock situation
> ** TM is waiting for JM to create a new _INFLIGHT_ instant, and the
> ** JM is waiting for TM to send a success WriteMetadataEvent
> # The deadlock above will cause either of the errors below:
> ** org.apache.hudi.exception.HoodieException: Timeout(601000ms) while
> waiting for instant initialize
> ** org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint
> expired before completing.
> # This will trigger org.apache.flink.runtime.jobmaster.JobMaster [] - Trying
> to recover from a global failure.
> # JM will try to restore itself from the last successful checkpoint
> # This will cause the next issue (illustrated below)
> h2. Root cause
> When restoring the TM, *`AbstractStreamWriteFunction#initializeState()`* will
> attempt to restore the state of the TM. At this stage,
> *`this.currentInstant`* will be initialized by invoking
> {*}`lastPendingInstant()`{*}, in which the ckp metadata path will be loaded
> and a _INFLIGHT_ instant is returned.
>
> When invoking {*}`instantToWrite()`{*},
> *`instant.equals(this.currentInstant)`* will always be true as the local
> *`instant`* is equal to {*}`this.currentInstant`{*}. Hence, the current
> implementation will be stuck in an infinite loop as
> {*}`lastPendingInstant()`{*}, which governs both *`instant`* and
> *`this.currentInstant`* will always return the same value as the state of the
> ckp metadata path is never changed.
>
> This is so as JM is waiting for the TM to finish writing for the batch for
> the _INFLIGHT_ instant. At the same time TM is waiting for JM to create a new
> _INFLIGHT_ instant, hence the deadlock.
>
> The fix is to add additional logic to handle such a case to ensure that a TM
> can obtain a correct _INFLIGHT_ instant when being recovering.
>
> h1. Single commit multi-instant
> # JM restore state from previously successful checkpoint
> # Ckp metadata path is tainted with multiple {_}INFLIGHT{_}s
> # Synchronisation issue occurs if TM executes *`lastPendingInstant()`*
> before JM executes *`startInstant()`*
> # Single commit multi instant issue occurs
> # When tainted TM reads obtained the wrong _INFLIGHT_ instant, it will start
> a new write cycle with the correct _INFLIGHT_ instant while a checkpoint is
> being performed.
> # *`reconcileAgainstMarkers()`* will delete files that tainted TMs are
> writing to with the correct _INFLIGHT_ instant in the next cycle, causing
> FileNotFoundException, _COLUMN_ state errors and parquet corruption errors.
>
> This is caused by the synchronisation issue due to Task Manager (TM) running
> *`ckpMetaData#lastPendingInstant()`* before the Job Manager (JM) executes
> {*}`StreamWriteOperatorCoordinator#startInstant()`{*}, causing the TM to
> fetch an old instantTime.
>
> Since _ABORTED_ statuses are not written to the metadata path, TM will fetch
> a previously aborted instant, thinking that it is still {_}INFLIGHT{_}.
>
> Hence, the new commit will have files of 2 instants should there be a restart
> AND the timeline contains a _INCOMPLETE_ instant
> ({_}ABORTED{_}/{_}INFLIGHT{_}).
>
> Please refer to the example below if a RESTART + job restore is triggered,
> causing a tainted ckp metadata path to be produced. A tainted ckp metadata
> path is a path in which there are more than 1 INFLIGHT files.
>
> h2. CORRECT Instant fetched after JM restores from checkpoint
> After the job restarts, commit *`20220828165937711`* is created, and has
> successfully completed.
>
> If TM invokes *`ckpMetaData#lastPendingInstant()`* AFTER JM runs
> {*}`StreamWriteOperatorCoordinator#startInstant()`{*}, it will read the NEW
> instant on the .aux hdfs path.
>
> *`StreamWriteOperatorCoordinator#startInstant()`* will create the instant
> {*}`20220828170053510`{*}, causing there to be 2 _INFLIGHT_ commits. Since
> the most recent _INCOMPLETE_ instant is fetched, the correct instant, which
> is *`20220828170053510`* will be returned.
>
> The .aux hdfs path will look like this when
> *`ckpMetaData#lastPendingInstant()`* is invoked.
>
> {code:java}
> [
> Ckp{instant='20220828164426755', state='INFLIGHT'},
> Ckp{instant='20220828165937711', state='COMPLETED'},
> Ckp{instant='20220828170053510', state='INFLIGHT'}
> ] {code}
>
>
> h2. INCORRECT Instant fetched after JM restores from checkpoint
> If the TM invokes *`ckpMetaData#lastPendingInstant()`* BEFORE JM runs
> {*}`StreamWriteOperatorCoordinator#startInstant()`{*}, it will read the OLD
> instant on the .aux hdfs path.
> The .aux hdfs path will look like this when
> *`ckpMetaData#lastPendingInstant()`* is invoked.
>
> {code:java}
> [
> Ckp{instant='20220828164123688', state='COMPLETED'},
> Ckp{instant='20220828164426755', state='INFLIGHT'},
> Ckp{instant='20220828165937711', state='COMPLETED'}
> ] {code}
>
> Since the new instant file has not been created yet, the most _INCOMPLETE_ is
> fetched, which is {*}`20220828164426755`{*}.
> In such a case, when there is a possibility that different TMs can obtain
> different instants time from {*}`ckpMetaData#lastPendingInstant()`{*}, a
> single commit might contain multiple instants.
>
> h2. FileNotFoundException and various parquet corruption errors
> Building upon the example in {_}Single commit multiple instant error{_}, when
> *`AppendWriteFunction#flushData()`* is invoked, the current *`writerHelper`*
> will be cleaned up by closing all existing file handles. The *`writerHelper`*
> will then be set to {_}NULL{_}.
> At this point, Hudi is performing a checkpoint and is about to write to
> Hudi's timeline by creating {*}`{*}.commit`* file with the commit's metadata.
> Suppose that a TM (Let's call this the {*}tainted TM{*}) is using the an old
> instant in the tainted ckp metadata path as such: ({*}`20220828164426755`{*}
> will be used)
> {code:java}
> [
> Ckp{instant='20220828164123688', state='COMPLETED'},
> Ckp{instant='20220828164426755', state='INFLIGHT'},
> Ckp{instant='20220828165937711', state='COMPLETED'},
> Ckp{instant='20220828170053510', state='INFLIGHT'}
> ] {code}
>
> Once *`AppendWriteFunction#flushData()`* has completed, on the next
> invocation of {*}`AppendWriteFunction#processElement()`{*},
> *`AppendWriteFunction#initWriterHelper()`* will be invoked, causing a new
> writerHelper with the instant *`20220828170053510`* to be created, and
> writing will begin to files of this instant.
>
> While all this is happening, Flink is performing a checkpoint and is writing
> to Hudi's timeline by creating a {*}`{*}.commit`* file with the commit's
> metadata.
> Before it can create and write to the {*}`{*}.commit`{*},
> *`HoodieTable#finalizeWrite()`{*} will be invoked.
>
> Subsequently, *`HoodieTable#reconcileAgainstMarkers()`* is invoked. What this
> method does is to cleanup partially written data-files due to failed (but
> succeeded on retry) tasks. (SUPPOSEDLY)
> Since the *tainted TM* is using the instant that is being committed to the
> Hudi timeline, the files that it are currently being written to will be
> cleaned up as the marker files can be found, but data files are not found in
> the {*}`HoodieWriteStat`{*}.
>
> In essence, the *tainted TM* is starting a write cycle when it should not be
> doing so with an instant that is being committed Hudi timeline, hence,
> causing files to be deleted/corrupted/being in the wrong state.
>
> Hence, when the JM is performing a checkpoint + commit, the *tainted TM*
> might try to close the file handle if it has reached the
> {*}`parquet.max.filesize`{*}. When trying to close the file handle, this file
> might have already been deleted by
> {*}`HoodieTable#reconcileAgainstMarkers()`{*}, causing the
> {*}`java.io.FileNotFoundException`{*}.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)