[ 
https://issues.apache.org/jira/browse/HUDI-4741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

voon updated HUDI-4741:
-----------------------
    Description: 
h1. Summary of Events
 # TM heartbeat not sent to JM (Can be triggered by killing a container), JM 
kills the TM/container
 # JM restarts the container, but the restarting code is not handled properly, 
causing there to be a deadlock
 # Deadlock causes instantToWrite() to loop for 10 minutes (default Flink 
checkpoint timeout), causing a instant initialization timeout error
 # JM is restarted
 # JM restore state from previously successful checkpoint
 # Issue in HUDI-4907 occurs

 
h1. Code for reproducing
h2. Flink SQL Code
{code:java}
CREATE TABLE input_table (
    `val`               STRING
    ,`event_time`       TIMESTAMP(3)
    ,`partition`        BIGINT
    ,`offset`           BIGINT
) WITH (
    'connector' = 'datagen',
    'fields.val.length' = '99999',
    'rows-per-second' = '15000'
);CREATE TABLE test_hudi
(
    `val`                 STRING
    ,`event_time`       TIMESTAMP(3)
    ,`partition`        BIGINT
    ,`offset`           BIGINT
    ,`dt`               STRING
    ,`hh`               STRING
) PARTITIONED BY (dt, hh)
WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://jm_tm_sync_error/',
    'table.type' = 'COPY_ON_WRITE',
    'write.operation' = 'insert',
    'hoodie.parquet.small.file.limit' = '104857600',
    'hoodie.parquet.max.file.size' = '268435456',
    'hoodie.datasource.write.recordkey.field' = 'partition,offset',
    'hoodie.datasource.write.hive_style_partitioning' = 'true',
    'hoodie.datasource.write.partitionpath.field' = 'dt,hh',
    'write.bulk_insert.sort_input' = 'false',
    'index.bootstrap.enabled' = 'false',
    'index.state.ttl' = '60',
    'index.type' = 'FLINK_STATE',
    'hoodie.datasource.write.keygenerator.class' = 
'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
    'write.tasks' = '8',
    'hive_sync.enable' = 'false'
);insert into test_hudi
select  `val`
        ,`event_time`
        ,`partition`
        ,`offset`
        ,DATE_FORMAT(event_time, 'yyyy-MM-dd')
        ,DATE_FORMAT(event_time, 'HH')
 from input_table; {code}
 
h2. Advanced Properties
{code:java}
execution.checkpointing.interval=60000ms {code}
 
h2. Job Profile Properties
{code:java}
flink.version=1.13.14
default.parallelism=8
restart.from.savepoint=true
sql.job.mode=normal
running.mode=streaming
slots.per.tm=2
cpu.per.tm=2vcore
memory.per.tm=6G
jvm.heap.ratio=70% {code}
 

 
h1. Issues: TM failing + starting a TM in a new container causing deadlock 
 # When a TM fails + starting and restoring a TM in a new container creates a 
deadlock situation
 ** TM is waiting for JM to create a new _INFLIGHT_ instant, and the
 ** JM is waiting for TM to send a success WriteMetadataEvent
 # The deadlock above will cause either of the errors below:
 ** org.apache.hudi.exception.HoodieException: Timeout(601000ms) while waiting 
for instant initialize
 ** org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired 
before completing.
 # This will trigger org.apache.flink.runtime.jobmaster.JobMaster [] - Trying 
to recover from a global failure.
 # JM will try to restore itself from the last successful checkpoint
 # This will cause HUDI-4907

h2. Root cause

When restoring the TM, *`AbstractStreamWriteFunction#initializeState()`* will 
attempt to restore the state of the TM. At this stage, *`this.currentInstant`* 
will be initialized by invoking {*}`lastPendingInstant()`{*}, in which the ckp 
metadata path will be loaded and a _INFLIGHT_ instant is returned.

 

When invoking {*}`instantToWrite()`{*}, *`instant.equals(this.currentInstant)`* 
will always be true as the local *`instant`* is equal to 
{*}`this.currentInstant`{*}. Hence, the current implementation will be stuck in 
an infinite loop as {*}`lastPendingInstant()`{*}, which governs both 
*`instant`* and *`this.currentInstant`* will always return the same value as 
the state of the ckp metadata path is never changed. 

 

This is so as JM is waiting for the TM to finish writing for the batch for the 
_INFLIGHT_ instant. At the same time TM is waiting for JM to create a new 
_INFLIGHT_ instant, hence the deadlock. 

 

The short term fix is to enforce global failover every time there is a failure.

  was:
h1. Summary of Events
 # TM heartbeat not sent to JM (Can be triggered by killing a container), JM 
kills the TM/container
 # JM restarts the container, but the restarting code is not handled properly, 
causing there to be a deadlock
 # Deadlock causes instantToWrite() to loop for 10 minutes (default Flink 
checkpoint timeout), causing a instant initialization timeout error
 # JM is restarted
 # JM restore state from previously successful checkpoint

 
h1. Code for reproducing
h2. Flink SQL Code
{code:java}
CREATE TABLE input_table (
    `val`               STRING
    ,`event_time`       TIMESTAMP(3)
    ,`partition`        BIGINT
    ,`offset`           BIGINT
) WITH (
    'connector' = 'datagen',
    'fields.val.length' = '99999',
    'rows-per-second' = '15000'
);CREATE TABLE test_hudi
(
    `val`                 STRING
    ,`event_time`       TIMESTAMP(3)
    ,`partition`        BIGINT
    ,`offset`           BIGINT
    ,`dt`               STRING
    ,`hh`               STRING
) PARTITIONED BY (dt, hh)
WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://jm_tm_sync_error/',
    'table.type' = 'COPY_ON_WRITE',
    'write.operation' = 'insert',
    'hoodie.parquet.small.file.limit' = '104857600',
    'hoodie.parquet.max.file.size' = '268435456',
    'hoodie.datasource.write.recordkey.field' = 'partition,offset',
    'hoodie.datasource.write.hive_style_partitioning' = 'true',
    'hoodie.datasource.write.partitionpath.field' = 'dt,hh',
    'write.bulk_insert.sort_input' = 'false',
    'index.bootstrap.enabled' = 'false',
    'index.state.ttl' = '60',
    'index.type' = 'FLINK_STATE',
    'hoodie.datasource.write.keygenerator.class' = 
'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
    'write.tasks' = '8',
    'hive_sync.enable' = 'false'
);insert into test_hudi
select  `val`
        ,`event_time`
        ,`partition`
        ,`offset`
        ,DATE_FORMAT(event_time, 'yyyy-MM-dd')
        ,DATE_FORMAT(event_time, 'HH')
 from input_table; {code}
 
h2. Advanced Properties
{code:java}
execution.checkpointing.interval=60000ms {code}
 
h2. Job Profile Properties
{code:java}
flink.version=1.13.14
default.parallelism=8
restart.from.savepoint=true
sql.job.mode=normal
running.mode=streaming
slots.per.tm=2
cpu.per.tm=2vcore
memory.per.tm=6G
jvm.heap.ratio=70% {code}
 

 
h1. Issues

There are 2 main issues here:
 # *TM failing + starting a TM in a new container causing deadlock*
 # TODO: INSERT HUDI ISSUE

 
h1. TM failing + starting a TM in a new container causing deadlock 
 # When a TM fails + starting and restoring a TM in a new container creates a 
deadlock situation
 ** TM is waiting for JM to create a new _INFLIGHT_ instant, and the
 ** JM is waiting for TM to send a success WriteMetadataEvent
 # The deadlock above will cause either of the errors below:
 ** org.apache.hudi.exception.HoodieException: Timeout(601000ms) while waiting 
for instant initialize
 ** org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired 
before completing.
 # This will trigger org.apache.flink.runtime.jobmaster.JobMaster [] - Trying 
to recover from a global failure.
 # JM will try to restore itself from the last successful checkpoint
 # This will cause HUDI-4907

h2. Root cause

When restoring the TM, *`AbstractStreamWriteFunction#initializeState()`* will 
attempt to restore the state of the TM. At this stage, *`this.currentInstant`* 
will be initialized by invoking {*}`lastPendingInstant()`{*}, in which the ckp 
metadata path will be loaded and a _INFLIGHT_ instant is returned.

 

When invoking {*}`instantToWrite()`{*}, *`instant.equals(this.currentInstant)`* 
will always be true as the local *`instant`* is equal to 
{*}`this.currentInstant`{*}. Hence, the current implementation will be stuck in 
an infinite loop as {*}`lastPendingInstant()`{*}, which governs both 
*`instant`* and *`this.currentInstant`* will always return the same value as 
the state of the ckp metadata path is never changed. 

 

This is so as JM is waiting for the TM to finish writing for the batch for the 
_INFLIGHT_ instant. At the same time TM is waiting for JM to create a new 
_INFLIGHT_ instant, hence the deadlock. 

 

The short term fix is to enforce global failover every time there is a failure.


> Deadlock when restarting failed TM in AbstractStreamWriteFunction
> -----------------------------------------------------------------
>
>                 Key: HUDI-4741
>                 URL: https://issues.apache.org/jira/browse/HUDI-4741
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: flink
>            Reporter: voon
>            Assignee: voon
>            Priority: Major
>             Fix For: 0.12.1
>
>
> h1. Summary of Events
>  # TM heartbeat not sent to JM (Can be triggered by killing a container), JM 
> kills the TM/container
>  # JM restarts the container, but the restarting code is not handled 
> properly, causing there to be a deadlock
>  # Deadlock causes instantToWrite() to loop for 10 minutes (default Flink 
> checkpoint timeout), causing a instant initialization timeout error
>  # JM is restarted
>  # JM restore state from previously successful checkpoint
>  # Issue in HUDI-4907 occurs
>  
> h1. Code for reproducing
> h2. Flink SQL Code
> {code:java}
> CREATE TABLE input_table (
>     `val`               STRING
>     ,`event_time`       TIMESTAMP(3)
>     ,`partition`        BIGINT
>     ,`offset`           BIGINT
> ) WITH (
>     'connector' = 'datagen',
>     'fields.val.length' = '99999',
>     'rows-per-second' = '15000'
> );CREATE TABLE test_hudi
> (
>     `val`                 STRING
>     ,`event_time`       TIMESTAMP(3)
>     ,`partition`        BIGINT
>     ,`offset`           BIGINT
>     ,`dt`               STRING
>     ,`hh`               STRING
> ) PARTITIONED BY (dt, hh)
> WITH (
>     'connector' = 'hudi',
>     'path' = 'hdfs://jm_tm_sync_error/',
>     'table.type' = 'COPY_ON_WRITE',
>     'write.operation' = 'insert',
>     'hoodie.parquet.small.file.limit' = '104857600',
>     'hoodie.parquet.max.file.size' = '268435456',
>     'hoodie.datasource.write.recordkey.field' = 'partition,offset',
>     'hoodie.datasource.write.hive_style_partitioning' = 'true',
>     'hoodie.datasource.write.partitionpath.field' = 'dt,hh',
>     'write.bulk_insert.sort_input' = 'false',
>     'index.bootstrap.enabled' = 'false',
>     'index.state.ttl' = '60',
>     'index.type' = 'FLINK_STATE',
>     'hoodie.datasource.write.keygenerator.class' = 
> 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
>     'write.tasks' = '8',
>     'hive_sync.enable' = 'false'
> );insert into test_hudi
> select  `val`
>         ,`event_time`
>         ,`partition`
>         ,`offset`
>         ,DATE_FORMAT(event_time, 'yyyy-MM-dd')
>         ,DATE_FORMAT(event_time, 'HH')
>  from input_table; {code}
>  
> h2. Advanced Properties
> {code:java}
> execution.checkpointing.interval=60000ms {code}
>  
> h2. Job Profile Properties
> {code:java}
> flink.version=1.13.14
> default.parallelism=8
> restart.from.savepoint=true
> sql.job.mode=normal
> running.mode=streaming
> slots.per.tm=2
> cpu.per.tm=2vcore
> memory.per.tm=6G
> jvm.heap.ratio=70% {code}
>  
>  
> h1. Issues: TM failing + starting a TM in a new container causing deadlock 
>  # When a TM fails + starting and restoring a TM in a new container creates a 
> deadlock situation
>  ** TM is waiting for JM to create a new _INFLIGHT_ instant, and the
>  ** JM is waiting for TM to send a success WriteMetadataEvent
>  # The deadlock above will cause either of the errors below:
>  ** org.apache.hudi.exception.HoodieException: Timeout(601000ms) while 
> waiting for instant initialize
>  ** org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint 
> expired before completing.
>  # This will trigger org.apache.flink.runtime.jobmaster.JobMaster [] - Trying 
> to recover from a global failure.
>  # JM will try to restore itself from the last successful checkpoint
>  # This will cause HUDI-4907
> h2. Root cause
> When restoring the TM, *`AbstractStreamWriteFunction#initializeState()`* will 
> attempt to restore the state of the TM. At this stage, 
> *`this.currentInstant`* will be initialized by invoking 
> {*}`lastPendingInstant()`{*}, in which the ckp metadata path will be loaded 
> and a _INFLIGHT_ instant is returned.
>  
> When invoking {*}`instantToWrite()`{*}, 
> *`instant.equals(this.currentInstant)`* will always be true as the local 
> *`instant`* is equal to {*}`this.currentInstant`{*}. Hence, the current 
> implementation will be stuck in an infinite loop as 
> {*}`lastPendingInstant()`{*}, which governs both *`instant`* and 
> *`this.currentInstant`* will always return the same value as the state of the 
> ckp metadata path is never changed. 
>  
> This is so as JM is waiting for the TM to finish writing for the batch for 
> the _INFLIGHT_ instant. At the same time TM is waiting for JM to create a new 
> _INFLIGHT_ instant, hence the deadlock. 
>  
> The short term fix is to enforce global failover every time there is a 
> failure.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to