[ 
https://issues.apache.org/jira/browse/FLINK-25615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474283#comment-17474283
 ] 

Matthias Schwalbe commented on FLINK-25615:
-------------------------------------------

Goede morgen Martijn :),

 

The error remains in all versions after 1.9 including the soon to be released 
1.15. In the user list I see reports of people still using pre 1.9 versions.

The trouble happens once they migrate pre 1.9 jobs to a current version (We had 
long-running 1.8 jobs that we only recently migrated).

Within a couple of days _metadata (savepoint/checkpoint) files grew from an 
average of 10MB to gigabytes, impeding the akka communication, memory 
consumption as well as other jobs on the machines. (I spent around 50 hours 
tracking this down, not wanting to risk the long term collected state (customer 
money involved))

I guess the fix is a small one to integrate, pending some testing.

 

The other solution is to change the uid() of the kafka producer which clears 
the pre 1.9 state but also leaves kafka transactions dangling ...

 

I leave the decision to continue with the ticket to more experienced folks...

 

Thias

> FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state
> ----------------------------------------------------------------
>
>                 Key: FLINK-25615
>                 URL: https://issues.apache.org/jira/browse/FLINK-25615
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.9.0
>            Reporter: Matthias Schwalbe
>            Priority: Major
>
> I've found an unnoticed error in FlinkKafkaProvider when migrating from pre 
> Flink 1.9 state to versions starting with Flink 1.9:
>  * the operator state for next-transactional-id-hint should be deleted and 
> replaced by operator state next-transactional-id-hint-v2, however
>  * operator state next-transactional-id-hint is never deleted
>  * see here: [1] :
> {quote}        if (context.getOperatorStateStore()
>                 .getRegisteredStateNames()
>                 .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) {
>             migrateNextTransactionalIdHindState(context);
>         }{quote} * migrateNextTransactionalIdHindState is never called, as 
> the condition cannot become true:
>  ** getRegisteredStateNames returns a list of String, whereas 
> NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is ListStateDescriptor (type mismatch)
> The Effect is:
>  * because NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is for a UnionListState, and
>  * the state is not cleared,
>  * each time the job restarts from a savepoint or checkpoint the size 
> multiplies times the parallelism
>  * then because each entry leaves an offset in metadata, akka.framesize 
> becomes too small, before we run into memory overflow
>  
> The breaking change has been introduced in commit 
> 70fa80e3862b367be22b593db685f9898a2838ef
>  
> A simple fix would be to change the code to:
> {quote}        if (context.getOperatorStateStore()
>                 .getRegisteredStateNames()
>                 .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.getName())) {
>             migrateNextTransactionalIdHindState(context);
>         }
> {quote}
>  
> Although FlinkKafkaProvider  is marked as deprecated it is probably a while 
> here to stay
>  
> Greeting
> Matthias (Thias) Schwalbe
>  
> [1] 
> https://github.com/apache/flink/blob/d7cf2c10f8d4fba81173854cbd8be27c657c7c7f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1167-L1171
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to