[ 
https://issues.apache.org/jira/browse/FLINK-29627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17617611#comment-17617611
 ] 

Krzysztof Chmielewski edited comment on FLINK-29627 at 10/14/22 9:53 AM:
-------------------------------------------------------------------------

PR ready but waiting on #21022 to be merged.
https://github.com/apache/flink/pull/21052


was (Author: kristoffsc):
PR
https://github.com/apache/flink/pull/21052

> Sink - Duplicate key exception during recover more than 1 committable.
> ----------------------------------------------------------------------
>
>                 Key: FLINK-29627
>                 URL: https://issues.apache.org/jira/browse/FLINK-29627
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.16.0, 1.17.0, 1.15.2, 1.16.1
>            Reporter: Krzysztof Chmielewski
>            Assignee: Krzysztof Chmielewski
>            Priority: Critical
>
> Recovery more than one Committable  causes `IllegalStateException` and 
> prevents cluster to start.
> When we recover the `CheckpointCommittableManager` we deserialize 
> SubtaskCommittableManager instances from recovery state and we put them into 
> `Map<Integer, SubtaskCommittableManager<CommT>>`. The key of this map is 
> subtaskId of the recovered manager. However this will fail if we have to 
> recover more than one committable. 
> What w should do is to call `SubtaskCommittableManager::merge` if we already 
> deserialize manager for this subtaskId.
> Stack Trace:
> {code:java}
> 28603 [flink-akka.actor.default-dispatcher-8] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Global 
> Committer (1/1) 
> (485dc57aca56235b9d1ab803c8c966ad_47d89856a1cf553f16e7063d953b7d42_0_1) 
> switched from INITIALIZING to FAILED on 2ed5c848-d360-48ae-9a92-730b022c8a39 
> @ kubernetes.docker.internal (dataPort=-1).
> java.lang.IllegalStateException: Duplicate key 0 (attempted merging values 
> org.apache.flink.streaming.runtime.operators.sink.committables.SubtaskCommittableManager@631940ac
>  and 
> org.apache.flink.streaming.runtime.operators.sink.committables.SubtaskCommittableManager@7ff3bd7)
>       at 
> java.util.stream.Collectors.duplicateKeyException(Collectors.java:133) ~[?:?]
>       at 
> java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:180)
>  ~[?:?]
>       at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) 
> ~[?:?]
>       at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
>  ~[?:?]
>       at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
>       at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) 
> ~[?:?]
>       at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) 
> ~[?:?]
>       at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
>       at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) ~[?:?]
>       at 
> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer$CheckpointSimpleVersionedSerializer.deserialize(CommittableCollectorSerializer.java:153)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer$CheckpointSimpleVersionedSerializer.deserialize(CommittableCollectorSerializer.java:124)
>  ~[classes/:?]
>       at 
> org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeserializeList(SimpleVersionedSerialization.java:148)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer.deserializeV2(CommittableCollectorSerializer.java:105)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer.deserialize(CommittableCollectorSerializer.java:82)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer.deserialize(CommittableCollectorSerializer.java:41)
>  ~[classes/:?]
>       at 
> org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:121)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterSerializer.deserializeV2(GlobalCommitterSerializer.java:128)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterSerializer.deserialize(GlobalCommitterSerializer.java:99)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterSerializer.deserialize(GlobalCommitterSerializer.java:42)
>  ~[classes/:?]
>       at 
> org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:227)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:138)
>  ~[classes/:?]
>       at java.lang.Iterable.forEach(Iterable.java:74) ~[?:?]
>       at 
> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.initializeState(GlobalCommitterOperator.java:133)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:727)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:703)
>  ~[classes/:?]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:670)
>  ~[classes/:?]
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>  ~[classes/:?]
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) 
> ~[classes/:?]
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) 
> ~[classes/:?]
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) 
> ~[classes/:?]
>       at java.lang.Thread.run(Thread.java:834) ~[?:?]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to