[ 
https://issues.apache.org/jira/browse/HUDI-4741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

voon updated HUDI-4741:
-----------------------
    Description: 
h1. Summary of Events
 # TM heartbeat not sent to JM (Can be triggered by killing a container), JM 
kills the TM/container
 # JM restarts the container, but the restarting code is not handled properly, 
causing there to be a deadlock
 # Deadlock causes instantToWrite() to loop for 10 minutes (default Flink 
checkpoint timeout), causing a instant initialization timeout error
 # JM is restarted
 # JM restore state from previously successful checkpoint

 
h1. Code for reproducing
h2. Flink SQL Code
{code:java}
CREATE TABLE input_table (
    `val`               STRING
    ,`event_time`       TIMESTAMP(3)
    ,`partition`        BIGINT
    ,`offset`           BIGINT
) WITH (
    'connector' = 'datagen',
    'fields.val.length' = '99999',
    'rows-per-second' = '15000'
);CREATE TABLE test_hudi
(
    `val`                 STRING
    ,`event_time`       TIMESTAMP(3)
    ,`partition`        BIGINT
    ,`offset`           BIGINT
    ,`dt`               STRING
    ,`hh`               STRING
) PARTITIONED BY (dt, hh)
WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://jm_tm_sync_error/',
    'table.type' = 'COPY_ON_WRITE',
    'write.operation' = 'insert',
    'hoodie.parquet.small.file.limit' = '104857600',
    'hoodie.parquet.max.file.size' = '268435456',
    'hoodie.datasource.write.recordkey.field' = 'partition,offset',
    'hoodie.datasource.write.hive_style_partitioning' = 'true',
    'hoodie.datasource.write.partitionpath.field' = 'dt,hh',
    'write.bulk_insert.sort_input' = 'false',
    'index.bootstrap.enabled' = 'false',
    'index.state.ttl' = '60',
    'index.type' = 'FLINK_STATE',
    'hoodie.datasource.write.keygenerator.class' = 
'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
    'write.tasks' = '8',
    'hive_sync.enable' = 'false'
);insert into test_hudi
select  `val`
        ,`event_time`
        ,`partition`
        ,`offset`
        ,DATE_FORMAT(event_time, 'yyyy-MM-dd')
        ,DATE_FORMAT(event_time, 'HH')
 from input_table; {code}
 
h2. Advanced Properties
{code:java}
execution.checkpointing.interval=60000ms {code}
 
h2. Job Profile Properties
{code:java}
flink.version=1.13.14
default.parallelism=8
restart.from.savepoint=true
sql.job.mode=normal
running.mode=streaming
slots.per.tm=2
cpu.per.tm=2vcore
memory.per.tm=6G
jvm.heap.ratio=70% {code}
 

 
h1. Issues

There are 2 main issues here:
 # *TM failing + starting a TM in a new container causing deadlock*
 # TODO: INSERT HUDI ISSUE

 
h1. TM failing + starting a TM in a new container causing deadlock 
 # When a TM fails + starting and restoring a TM in a new container creates a 
deadlock situation
 ** TM is waiting for JM to create a new _INFLIGHT_ instant, and the
 ** JM is waiting for TM to send a success WriteMetadataEvent
 # The deadlock above will cause either of the errors below:
 ** org.apache.hudi.exception.HoodieException: Timeout(601000ms) while waiting 
for instant initialize
 ** org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired 
before completing.
 # This will trigger org.apache.flink.runtime.jobmaster.JobMaster [] - Trying 
to recover from a global failure.
 # JM will try to restore itself from the last successful checkpoint
 # This will cause HUDI-4907

h2. Root cause

When restoring the TM, *`AbstractStreamWriteFunction#initializeState()`* will 
attempt to restore the state of the TM. At this stage, *`this.currentInstant`* 
will be initialized by invoking {*}`lastPendingInstant()`{*}, in which the ckp 
metadata path will be loaded and a _INFLIGHT_ instant is returned.

 

When invoking {*}`instantToWrite()`{*}, *`instant.equals(this.currentInstant)`* 
will always be true as the local *`instant`* is equal to 
{*}`this.currentInstant`{*}. Hence, the current implementation will be stuck in 
an infinite loop as {*}`lastPendingInstant()`{*}, which governs both 
*`instant`* and *`this.currentInstant`* will always return the same value as 
the state of the ckp metadata path is never changed. 

 

This is so as JM is waiting for the TM to finish writing for the batch for the 
_INFLIGHT_ instant. At the same time TM is waiting for JM to create a new 
_INFLIGHT_ instant, hence the deadlock. 

 

The short term fix is to enforce global failover every time there is a failure.

  was:
h1. Summary of Events
 # TM heartbeat not sent to JM (Can be triggered by killing a container), JM 
kills the TM/container
 # JM restarts the container, but the restarting code is not handled properly, 
causing there to be a deadlock
 # Deadlock causes instantToWrite() to loop for 10 minutes (default Flink 
checkpoint timeout), causing a instant initialization timeout error
 # JM is restarted
 # JM restore state from previously successful checkpoint
 # Ckp metadata path is tainted with multiple {_}INFLIGHT{_}s
 # Synchronisation issue occurs if TM executes *`lastPendingInstant()`* before 
JM executes *`startInstant()`*
 # Single commit multi instant issue occurs
 # When tainted TM reads obtained the wrong _INFLIGHT_ instant, it will start a 
new write cycle with the correct _INFLIGHT_ instant while a checkpoint is being 
performed.
 # *`reconcileAgainstMarkers()`* will delete files that tainted TMs are writing 
to with the correct _INFLIGHT_ instant in the next cycle, causing 
FileNotFoundException, _COLUMN_ state errors and parquet corruption errors.

 
h1. Code for reproducing
h2. Flink SQL Code
{code:java}
CREATE TABLE input_table (
    `val`               STRING
    ,`event_time`       TIMESTAMP(3)
    ,`partition`        BIGINT
    ,`offset`           BIGINT
) WITH (
    'connector' = 'datagen',
    'fields.val.length' = '99999',
    'rows-per-second' = '15000'
);CREATE TABLE test_hudi
(
    `val`                 STRING
    ,`event_time`       TIMESTAMP(3)
    ,`partition`        BIGINT
    ,`offset`           BIGINT
    ,`dt`               STRING
    ,`hh`               STRING
) PARTITIONED BY (dt, hh)
WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://jm_tm_sync_error/',
    'table.type' = 'COPY_ON_WRITE',
    'write.operation' = 'insert',
    'hoodie.parquet.small.file.limit' = '104857600',
    'hoodie.parquet.max.file.size' = '268435456',
    'hoodie.datasource.write.recordkey.field' = 'partition,offset',
    'hoodie.datasource.write.hive_style_partitioning' = 'true',
    'hoodie.datasource.write.partitionpath.field' = 'dt,hh',
    'write.bulk_insert.sort_input' = 'false',
    'index.bootstrap.enabled' = 'false',
    'index.state.ttl' = '60',
    'index.type' = 'FLINK_STATE',
    'hoodie.datasource.write.keygenerator.class' = 
'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
    'write.tasks' = '8',
    'hive_sync.enable' = 'false'
);insert into test_hudi
select  `val`
        ,`event_time`
        ,`partition`
        ,`offset`
        ,DATE_FORMAT(event_time, 'yyyy-MM-dd')
        ,DATE_FORMAT(event_time, 'HH')
 from input_table; {code}
 
h2. Advanced Properties
{code:java}
execution.checkpointing.interval=60000ms {code}
 
h2. Job Profile Properties
{code:java}
flink.version=1.13.14
default.parallelism=8
restart.from.savepoint=true
sql.job.mode=normal
running.mode=streaming
slots.per.tm=2
cpu.per.tm=2vcore
memory.per.tm=6G
jvm.heap.ratio=70% {code}
 

 
h1. Issues

There are 2 main issues here:
 # *TM failing + starting a TM in a new container causing deadlock*
 # *Single commit multi-instant causing various base file parquet errors*
 ** java.io.FileNotFoundException: File does not exist: 
20220712165157207.parquet (inode 1234567890) Holder 
DFSClient_NONMAPREDUCE_1111111111_22 does not have any open files.
 ** java.io.IOException: The file being written is in an invalid state. 
Probably caused by an error thrown previously. Current state: COLUMN
 ** java.lang.RuntimeException: 20220805210423582.parquet is not a Parquet 
file. expected magic number at tail [80, 65, 82, 49] but found [1, 0, -38, 2] 

 
h1. TM failing + starting a TM in a new container causing deadlock 
 # When a TM fails + starting and restoring a TM in a new container creates a 
deadlock situation
 ** TM is waiting for JM to create a new _INFLIGHT_ instant, and the
 ** JM is waiting for TM to send a success WriteMetadataEvent
 # The deadlock above will cause either of the errors below:
 ** org.apache.hudi.exception.HoodieException: Timeout(601000ms) while waiting 
for instant initialize
 ** org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired 
before completing.
 # This will trigger org.apache.flink.runtime.jobmaster.JobMaster [] - Trying 
to recover from a global failure.
 # JM will try to restore itself from the last successful checkpoint
 # This will cause the next issue (illustrated below)

h2. Root cause

When restoring the TM, *`AbstractStreamWriteFunction#initializeState()`* will 
attempt to restore the state of the TM. At this stage, *`this.currentInstant`* 
will be initialized by invoking {*}`lastPendingInstant()`{*}, in which the ckp 
metadata path will be loaded and a _INFLIGHT_ instant is returned.

 

When invoking {*}`instantToWrite()`{*}, *`instant.equals(this.currentInstant)`* 
will always be true as the local *`instant`* is equal to 
{*}`this.currentInstant`{*}. Hence, the current implementation will be stuck in 
an infinite loop as {*}`lastPendingInstant()`{*}, which governs both 
*`instant`* and *`this.currentInstant`* will always return the same value as 
the state of the ckp metadata path is never changed. 

 

This is so as JM is waiting for the TM to finish writing for the batch for the 
_INFLIGHT_ instant. At the same time TM is waiting for JM to create a new 
_INFLIGHT_ instant, hence the deadlock. 

 

The fix is to add additional logic to handle such a case to ensure that a TM 
can obtain a correct _INFLIGHT_ instant when being recovering.

 
h1. Single commit multi-instant
 # JM restore state from previously successful checkpoint
 # Ckp metadata path is tainted with multiple {_}INFLIGHT{_}s
 # Synchronisation issue occurs if TM executes *`lastPendingInstant()`* before 
JM executes *`startInstant()`*
 # Single commit multi instant issue occurs
 # When tainted TM reads obtained the wrong _INFLIGHT_ instant, it will start a 
new write cycle with the correct _INFLIGHT_ instant while a checkpoint is being 
performed.
 # *`reconcileAgainstMarkers()`* will delete files that tainted TMs are writing 
to with the correct _INFLIGHT_ instant in the next cycle, causing 
FileNotFoundException, _COLUMN_ state errors and parquet corruption errors.

 

This is caused by the synchronisation issue due to Task Manager (TM) running 
*`ckpMetaData#lastPendingInstant()`* before the Job Manager (JM) executes 
{*}`StreamWriteOperatorCoordinator#startInstant()`{*}, causing the TM to fetch 
an old instantTime. 

 

Since _ABORTED_ statuses are not written to the metadata path, TM will fetch a 
previously aborted instant, thinking that it is still {_}INFLIGHT{_}.

 

Hence, the new commit will have files of 2 instants should there be a restart 
AND the timeline contains a _INCOMPLETE_ instant ({_}ABORTED{_}/{_}INFLIGHT{_}).

 

Please refer to the example below if a RESTART + job restore is triggered, 
causing a tainted ckp metadata path to be produced. A tainted ckp metadata path 
is a path in which there are more than 1 INFLIGHT files.

 
h2. CORRECT Instant fetched after JM restores from checkpoint

After the job restarts, commit *`20220828165937711`* is created, and has 
successfully completed.

 

If TM invokes *`ckpMetaData#lastPendingInstant()`* AFTER JM runs 
{*}`StreamWriteOperatorCoordinator#startInstant()`{*}, it will read the NEW 
instant on the .aux hdfs path.

 

*`StreamWriteOperatorCoordinator#startInstant()`* will create the instant 
{*}`20220828170053510`{*}, causing there to be 2 _INFLIGHT_ commits. Since the 
most recent _INCOMPLETE_ instant is fetched, the correct instant, which is 
*`20220828170053510`* will be returned.

 

The .aux hdfs path will look like this when 
*`ckpMetaData#lastPendingInstant()`* is invoked.

 
{code:java}
[
  Ckp{instant='20220828164426755', state='INFLIGHT'}, 
  Ckp{instant='20220828165937711', state='COMPLETED'}, 
  Ckp{instant='20220828170053510', state='INFLIGHT'}
] {code}
 

 
h2. INCORRECT Instant fetched after JM restores from checkpoint

If the TM invokes *`ckpMetaData#lastPendingInstant()`* BEFORE JM runs 
{*}`StreamWriteOperatorCoordinator#startInstant()`{*}, it will read the OLD 
instant on the .aux hdfs path.

The .aux hdfs path will look like this when 
*`ckpMetaData#lastPendingInstant()`* is invoked.

 
{code:java}
[
  Ckp{instant='20220828164123688', state='COMPLETED'}, 
  Ckp{instant='20220828164426755', state='INFLIGHT'}, 
  Ckp{instant='20220828165937711', state='COMPLETED'}
] {code}
 

Since the new instant file has not been created yet, the most _INCOMPLETE_ is 
fetched, which is {*}`20220828164426755`{*}.

In such a case, when there is a possibility that different TMs can obtain 
different instants time from {*}`ckpMetaData#lastPendingInstant()`{*}, a single 
commit might contain multiple instants.

 
h2. FileNotFoundException and various parquet corruption errors

Building upon the example in {_}Single commit multiple instant error{_}, when 
*`AppendWriteFunction#flushData()`* is invoked, the current *`writerHelper`* 
will be cleaned up by closing all existing file handles. The *`writerHelper`* 
will then be set to {_}NULL{_}. 

At this point, Hudi is performing a checkpoint and is about to write to Hudi's 
timeline by creating {*}`{*}.commit`* file with the commit's metadata.

Suppose that a TM (Let's call this the {*}tainted TM{*}) is using the an old 
instant in the tainted ckp metadata path as such: ({*}`20220828164426755`{*} 
will be used)
{code:java}
[
  Ckp{instant='20220828164123688', state='COMPLETED'}, 
  Ckp{instant='20220828164426755', state='INFLIGHT'}, 
  Ckp{instant='20220828165937711', state='COMPLETED'},
  Ckp{instant='20220828170053510', state='INFLIGHT'}
] {code}
 

Once *`AppendWriteFunction#flushData()`* has completed, on the next invocation 
of {*}`AppendWriteFunction#processElement()`{*}, 
*`AppendWriteFunction#initWriterHelper()`* will be invoked, causing a new 
writerHelper with the instant *`20220828170053510`* to be created, and writing 
will begin to files of this instant.

 

While all this is happening, Flink is performing a checkpoint and is writing to 
Hudi's timeline by creating a {*}`{*}.commit`* file with the commit's metadata.

Before it can create and write to the {*}`{*}.commit`{*}, 
*`HoodieTable#finalizeWrite()`{*} will be invoked. 

 

Subsequently, *`HoodieTable#reconcileAgainstMarkers()`* is invoked. What this 
method does is to cleanup partially written data-files due to failed (but 
succeeded on retry) tasks. (SUPPOSEDLY)

Since the *tainted TM* is using the instant that is being committed to the Hudi 
timeline, the files that it are currently being written to will be cleaned up 
as the marker files can be found, but data files are not found in the 
{*}`HoodieWriteStat`{*}. 

 

In essence, the *tainted TM* is starting a write cycle when it should not be 
doing so with an instant that is being committed Hudi timeline, hence, causing 
files to be deleted/corrupted/being in the wrong state.

 

Hence, when the JM is performing a checkpoint + commit, the *tainted TM* might 
try to close the file handle if it has reached the 
{*}`parquet.max.filesize`{*}. When trying to close the file handle, this file 
might have already been deleted by 
{*}`HoodieTable#reconcileAgainstMarkers()`{*}, causing the 
{*}`java.io.FileNotFoundException`{*}.


> Deadlock when restarting failed TM in AbstractStreamWriteFunction
> -----------------------------------------------------------------
>
>                 Key: HUDI-4741
>                 URL: https://issues.apache.org/jira/browse/HUDI-4741
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: flink
>            Reporter: voon
>            Assignee: voon
>            Priority: Major
>             Fix For: 0.12.1
>
>
> h1. Summary of Events
>  # TM heartbeat not sent to JM (Can be triggered by killing a container), JM 
> kills the TM/container
>  # JM restarts the container, but the restarting code is not handled 
> properly, causing there to be a deadlock
>  # Deadlock causes instantToWrite() to loop for 10 minutes (default Flink 
> checkpoint timeout), causing a instant initialization timeout error
>  # JM is restarted
>  # JM restore state from previously successful checkpoint
>  
> h1. Code for reproducing
> h2. Flink SQL Code
> {code:java}
> CREATE TABLE input_table (
>     `val`               STRING
>     ,`event_time`       TIMESTAMP(3)
>     ,`partition`        BIGINT
>     ,`offset`           BIGINT
> ) WITH (
>     'connector' = 'datagen',
>     'fields.val.length' = '99999',
>     'rows-per-second' = '15000'
> );CREATE TABLE test_hudi
> (
>     `val`                 STRING
>     ,`event_time`       TIMESTAMP(3)
>     ,`partition`        BIGINT
>     ,`offset`           BIGINT
>     ,`dt`               STRING
>     ,`hh`               STRING
> ) PARTITIONED BY (dt, hh)
> WITH (
>     'connector' = 'hudi',
>     'path' = 'hdfs://jm_tm_sync_error/',
>     'table.type' = 'COPY_ON_WRITE',
>     'write.operation' = 'insert',
>     'hoodie.parquet.small.file.limit' = '104857600',
>     'hoodie.parquet.max.file.size' = '268435456',
>     'hoodie.datasource.write.recordkey.field' = 'partition,offset',
>     'hoodie.datasource.write.hive_style_partitioning' = 'true',
>     'hoodie.datasource.write.partitionpath.field' = 'dt,hh',
>     'write.bulk_insert.sort_input' = 'false',
>     'index.bootstrap.enabled' = 'false',
>     'index.state.ttl' = '60',
>     'index.type' = 'FLINK_STATE',
>     'hoodie.datasource.write.keygenerator.class' = 
> 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
>     'write.tasks' = '8',
>     'hive_sync.enable' = 'false'
> );insert into test_hudi
> select  `val`
>         ,`event_time`
>         ,`partition`
>         ,`offset`
>         ,DATE_FORMAT(event_time, 'yyyy-MM-dd')
>         ,DATE_FORMAT(event_time, 'HH')
>  from input_table; {code}
>  
> h2. Advanced Properties
> {code:java}
> execution.checkpointing.interval=60000ms {code}
>  
> h2. Job Profile Properties
> {code:java}
> flink.version=1.13.14
> default.parallelism=8
> restart.from.savepoint=true
> sql.job.mode=normal
> running.mode=streaming
> slots.per.tm=2
> cpu.per.tm=2vcore
> memory.per.tm=6G
> jvm.heap.ratio=70% {code}
>  
>  
> h1. Issues
> There are 2 main issues here:
>  # *TM failing + starting a TM in a new container causing deadlock*
>  # TODO: INSERT HUDI ISSUE
>  
> h1. TM failing + starting a TM in a new container causing deadlock 
>  # When a TM fails + starting and restoring a TM in a new container creates a 
> deadlock situation
>  ** TM is waiting for JM to create a new _INFLIGHT_ instant, and the
>  ** JM is waiting for TM to send a success WriteMetadataEvent
>  # The deadlock above will cause either of the errors below:
>  ** org.apache.hudi.exception.HoodieException: Timeout(601000ms) while 
> waiting for instant initialize
>  ** org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint 
> expired before completing.
>  # This will trigger org.apache.flink.runtime.jobmaster.JobMaster [] - Trying 
> to recover from a global failure.
>  # JM will try to restore itself from the last successful checkpoint
>  # This will cause HUDI-4907
> h2. Root cause
> When restoring the TM, *`AbstractStreamWriteFunction#initializeState()`* will 
> attempt to restore the state of the TM. At this stage, 
> *`this.currentInstant`* will be initialized by invoking 
> {*}`lastPendingInstant()`{*}, in which the ckp metadata path will be loaded 
> and a _INFLIGHT_ instant is returned.
>  
> When invoking {*}`instantToWrite()`{*}, 
> *`instant.equals(this.currentInstant)`* will always be true as the local 
> *`instant`* is equal to {*}`this.currentInstant`{*}. Hence, the current 
> implementation will be stuck in an infinite loop as 
> {*}`lastPendingInstant()`{*}, which governs both *`instant`* and 
> *`this.currentInstant`* will always return the same value as the state of the 
> ckp metadata path is never changed. 
>  
> This is so as JM is waiting for the TM to finish writing for the batch for 
> the _INFLIGHT_ instant. At the same time TM is waiting for JM to create a new 
> _INFLIGHT_ instant, hence the deadlock. 
>  
> The short term fix is to enforce global failover every time there is a 
> failure.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to