[
https://issues.apache.org/jira/browse/HUDI-4741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
voon updated HUDI-4741:
-----------------------
Description:
h1. Summary of Events
# TM heartbeat not sent to JM (Can be triggered by killing a container), JM
kills the TM/container
# JM restarts the container, but the restarting code is not handled properly,
causing there to be a deadlock
# Deadlock causes instantToWrite() to loop for 10 minutes (default Flink
checkpoint timeout), causing a instant initialization timeout error
# JM is restarted
# JM restore state from previously successful checkpoint
# Issue in HUDI-4907 occurs
h1. Code for reproducing
h2. Flink SQL Code
{code:java}
CREATE TABLE input_table (
`val` STRING
,`event_time` TIMESTAMP(3)
,`partition` BIGINT
,`offset` BIGINT
) WITH (
'connector' = 'datagen',
'fields.val.length' = '99999',
'rows-per-second' = '15000'
);CREATE TABLE test_hudi
(
`val` STRING
,`event_time` TIMESTAMP(3)
,`partition` BIGINT
,`offset` BIGINT
,`dt` STRING
,`hh` STRING
) PARTITIONED BY (dt, hh)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://jm_tm_sync_error/',
'table.type' = 'COPY_ON_WRITE',
'write.operation' = 'insert',
'hoodie.parquet.small.file.limit' = '104857600',
'hoodie.parquet.max.file.size' = '268435456',
'hoodie.datasource.write.recordkey.field' = 'partition,offset',
'hoodie.datasource.write.hive_style_partitioning' = 'true',
'hoodie.datasource.write.partitionpath.field' = 'dt,hh',
'write.bulk_insert.sort_input' = 'false',
'index.bootstrap.enabled' = 'false',
'index.state.ttl' = '60',
'index.type' = 'FLINK_STATE',
'hoodie.datasource.write.keygenerator.class' =
'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
'write.tasks' = '8',
'hive_sync.enable' = 'false'
);insert into test_hudi
select `val`
,`event_time`
,`partition`
,`offset`
,DATE_FORMAT(event_time, 'yyyy-MM-dd')
,DATE_FORMAT(event_time, 'HH')
from input_table; {code}
h2. Advanced Properties
{code:java}
execution.checkpointing.interval=60000ms {code}
h2. Job Profile Properties
{code:java}
flink.version=1.13.14
default.parallelism=8
restart.from.savepoint=true
sql.job.mode=normal
running.mode=streaming
slots.per.tm=2
cpu.per.tm=2vcore
memory.per.tm=6G
jvm.heap.ratio=70% {code}
h1. Issues: TM failing + starting a TM in a new container causing deadlock
# When a TM fails + starting and restoring a TM in a new container creates a
deadlock situation
** TM is waiting for JM to create a new _INFLIGHT_ instant, and the
** JM is waiting for TM to send a success WriteMetadataEvent
# The deadlock above will cause either of the errors below:
** org.apache.hudi.exception.HoodieException: Timeout(601000ms) while waiting
for instant initialize
** org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired
before completing.
# This will trigger org.apache.flink.runtime.jobmaster.JobMaster [] - Trying
to recover from a global failure.
# JM will try to restore itself from the last successful checkpoint
# This will cause HUDI-4907
h2. Root cause
When restoring the TM, *`AbstractStreamWriteFunction#initializeState()`* will
attempt to restore the state of the TM. At this stage, *`this.currentInstant`*
will be initialized by invoking {*}`lastPendingInstant()`{*}, in which the ckp
metadata path will be loaded and a _INFLIGHT_ instant is returned.
When invoking {*}`instantToWrite()`{*}, *`instant.equals(this.currentInstant)`*
will always be true as the local *`instant`* is equal to
{*}`this.currentInstant`{*}. Hence, the current implementation will be stuck in
an infinite loop as {*}`lastPendingInstant()`{*}, which governs both
*`instant`* and *`this.currentInstant`* will always return the same value as
the state of the ckp metadata path is never changed.
This is so as JM is waiting for the TM to finish writing for the batch for the
_INFLIGHT_ instant. At the same time TM is waiting for JM to create a new
_INFLIGHT_ instant, hence the deadlock.
The short term fix is to enforce global failover every time there is a failure.
was:
h1. Summary of Events
# TM heartbeat not sent to JM (Can be triggered by killing a container), JM
kills the TM/container
# JM restarts the container, but the restarting code is not handled properly,
causing there to be a deadlock
# Deadlock causes instantToWrite() to loop for 10 minutes (default Flink
checkpoint timeout), causing a instant initialization timeout error
# JM is restarted
# JM restore state from previously successful checkpoint
h1. Code for reproducing
h2. Flink SQL Code
{code:java}
CREATE TABLE input_table (
`val` STRING
,`event_time` TIMESTAMP(3)
,`partition` BIGINT
,`offset` BIGINT
) WITH (
'connector' = 'datagen',
'fields.val.length' = '99999',
'rows-per-second' = '15000'
);CREATE TABLE test_hudi
(
`val` STRING
,`event_time` TIMESTAMP(3)
,`partition` BIGINT
,`offset` BIGINT
,`dt` STRING
,`hh` STRING
) PARTITIONED BY (dt, hh)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://jm_tm_sync_error/',
'table.type' = 'COPY_ON_WRITE',
'write.operation' = 'insert',
'hoodie.parquet.small.file.limit' = '104857600',
'hoodie.parquet.max.file.size' = '268435456',
'hoodie.datasource.write.recordkey.field' = 'partition,offset',
'hoodie.datasource.write.hive_style_partitioning' = 'true',
'hoodie.datasource.write.partitionpath.field' = 'dt,hh',
'write.bulk_insert.sort_input' = 'false',
'index.bootstrap.enabled' = 'false',
'index.state.ttl' = '60',
'index.type' = 'FLINK_STATE',
'hoodie.datasource.write.keygenerator.class' =
'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
'write.tasks' = '8',
'hive_sync.enable' = 'false'
);insert into test_hudi
select `val`
,`event_time`
,`partition`
,`offset`
,DATE_FORMAT(event_time, 'yyyy-MM-dd')
,DATE_FORMAT(event_time, 'HH')
from input_table; {code}
h2. Advanced Properties
{code:java}
execution.checkpointing.interval=60000ms {code}
h2. Job Profile Properties
{code:java}
flink.version=1.13.14
default.parallelism=8
restart.from.savepoint=true
sql.job.mode=normal
running.mode=streaming
slots.per.tm=2
cpu.per.tm=2vcore
memory.per.tm=6G
jvm.heap.ratio=70% {code}
h1. Issues
There are 2 main issues here:
# *TM failing + starting a TM in a new container causing deadlock*
# TODO: INSERT HUDI ISSUE
h1. TM failing + starting a TM in a new container causing deadlock
# When a TM fails + starting and restoring a TM in a new container creates a
deadlock situation
** TM is waiting for JM to create a new _INFLIGHT_ instant, and the
** JM is waiting for TM to send a success WriteMetadataEvent
# The deadlock above will cause either of the errors below:
** org.apache.hudi.exception.HoodieException: Timeout(601000ms) while waiting
for instant initialize
** org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired
before completing.
# This will trigger org.apache.flink.runtime.jobmaster.JobMaster [] - Trying
to recover from a global failure.
# JM will try to restore itself from the last successful checkpoint
# This will cause HUDI-4907
h2. Root cause
When restoring the TM, *`AbstractStreamWriteFunction#initializeState()`* will
attempt to restore the state of the TM. At this stage, *`this.currentInstant`*
will be initialized by invoking {*}`lastPendingInstant()`{*}, in which the ckp
metadata path will be loaded and a _INFLIGHT_ instant is returned.
When invoking {*}`instantToWrite()`{*}, *`instant.equals(this.currentInstant)`*
will always be true as the local *`instant`* is equal to
{*}`this.currentInstant`{*}. Hence, the current implementation will be stuck in
an infinite loop as {*}`lastPendingInstant()`{*}, which governs both
*`instant`* and *`this.currentInstant`* will always return the same value as
the state of the ckp metadata path is never changed.
This is so as JM is waiting for the TM to finish writing for the batch for the
_INFLIGHT_ instant. At the same time TM is waiting for JM to create a new
_INFLIGHT_ instant, hence the deadlock.
The short term fix is to enforce global failover every time there is a failure.
> Deadlock when restarting failed TM in AbstractStreamWriteFunction
> -----------------------------------------------------------------
>
> Key: HUDI-4741
> URL: https://issues.apache.org/jira/browse/HUDI-4741
> Project: Apache Hudi
> Issue Type: Bug
> Components: flink
> Reporter: voon
> Assignee: voon
> Priority: Major
> Fix For: 0.12.1
>
>
> h1. Summary of Events
> # TM heartbeat not sent to JM (Can be triggered by killing a container), JM
> kills the TM/container
> # JM restarts the container, but the restarting code is not handled
> properly, causing there to be a deadlock
> # Deadlock causes instantToWrite() to loop for 10 minutes (default Flink
> checkpoint timeout), causing a instant initialization timeout error
> # JM is restarted
> # JM restore state from previously successful checkpoint
> # Issue in HUDI-4907 occurs
>
> h1. Code for reproducing
> h2. Flink SQL Code
> {code:java}
> CREATE TABLE input_table (
> `val` STRING
> ,`event_time` TIMESTAMP(3)
> ,`partition` BIGINT
> ,`offset` BIGINT
> ) WITH (
> 'connector' = 'datagen',
> 'fields.val.length' = '99999',
> 'rows-per-second' = '15000'
> );CREATE TABLE test_hudi
> (
> `val` STRING
> ,`event_time` TIMESTAMP(3)
> ,`partition` BIGINT
> ,`offset` BIGINT
> ,`dt` STRING
> ,`hh` STRING
> ) PARTITIONED BY (dt, hh)
> WITH (
> 'connector' = 'hudi',
> 'path' = 'hdfs://jm_tm_sync_error/',
> 'table.type' = 'COPY_ON_WRITE',
> 'write.operation' = 'insert',
> 'hoodie.parquet.small.file.limit' = '104857600',
> 'hoodie.parquet.max.file.size' = '268435456',
> 'hoodie.datasource.write.recordkey.field' = 'partition,offset',
> 'hoodie.datasource.write.hive_style_partitioning' = 'true',
> 'hoodie.datasource.write.partitionpath.field' = 'dt,hh',
> 'write.bulk_insert.sort_input' = 'false',
> 'index.bootstrap.enabled' = 'false',
> 'index.state.ttl' = '60',
> 'index.type' = 'FLINK_STATE',
> 'hoodie.datasource.write.keygenerator.class' =
> 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
> 'write.tasks' = '8',
> 'hive_sync.enable' = 'false'
> );insert into test_hudi
> select `val`
> ,`event_time`
> ,`partition`
> ,`offset`
> ,DATE_FORMAT(event_time, 'yyyy-MM-dd')
> ,DATE_FORMAT(event_time, 'HH')
> from input_table; {code}
>
> h2. Advanced Properties
> {code:java}
> execution.checkpointing.interval=60000ms {code}
>
> h2. Job Profile Properties
> {code:java}
> flink.version=1.13.14
> default.parallelism=8
> restart.from.savepoint=true
> sql.job.mode=normal
> running.mode=streaming
> slots.per.tm=2
> cpu.per.tm=2vcore
> memory.per.tm=6G
> jvm.heap.ratio=70% {code}
>
>
> h1. Issues: TM failing + starting a TM in a new container causing deadlock
> # When a TM fails + starting and restoring a TM in a new container creates a
> deadlock situation
> ** TM is waiting for JM to create a new _INFLIGHT_ instant, and the
> ** JM is waiting for TM to send a success WriteMetadataEvent
> # The deadlock above will cause either of the errors below:
> ** org.apache.hudi.exception.HoodieException: Timeout(601000ms) while
> waiting for instant initialize
> ** org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint
> expired before completing.
> # This will trigger org.apache.flink.runtime.jobmaster.JobMaster [] - Trying
> to recover from a global failure.
> # JM will try to restore itself from the last successful checkpoint
> # This will cause HUDI-4907
> h2. Root cause
> When restoring the TM, *`AbstractStreamWriteFunction#initializeState()`* will
> attempt to restore the state of the TM. At this stage,
> *`this.currentInstant`* will be initialized by invoking
> {*}`lastPendingInstant()`{*}, in which the ckp metadata path will be loaded
> and a _INFLIGHT_ instant is returned.
>
> When invoking {*}`instantToWrite()`{*},
> *`instant.equals(this.currentInstant)`* will always be true as the local
> *`instant`* is equal to {*}`this.currentInstant`{*}. Hence, the current
> implementation will be stuck in an infinite loop as
> {*}`lastPendingInstant()`{*}, which governs both *`instant`* and
> *`this.currentInstant`* will always return the same value as the state of the
> ckp metadata path is never changed.
>
> This is so as JM is waiting for the TM to finish writing for the batch for
> the _INFLIGHT_ instant. At the same time TM is waiting for JM to create a new
> _INFLIGHT_ instant, hence the deadlock.
>
> The short term fix is to enforce global failover every time there is a
> failure.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)