[ 
https://issues.apache.org/jira/browse/FLINK-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468761#comment-16468761
 ] 

ASF GitHub Bot commented on FLINK-9295:
---------------------------------------

GitHub user pnowojski opened a pull request:

    https://github.com/apache/flink/pull/5977

    [FLINK-9295][kafka] Fix transactional.id collisions for 
FlinkKafkaProducer011

    Previously if there were two completely independent FlinkKafkaProducer011 
data sinks in the job graph, their transactional.id would collide with one 
another. Fix is to use operator's unique ID as well along task name and subtask 
id.
    
    In order to do that, operator's unique ID has to be exposed to UDF via 
`RuntimeContext`.
        
    This change is backward compatible for recovering from older savepoints, 
since transactional.ids generated by the old generator still will be used after 
restoring from state.
    
    ## Brief change log
    
    Please check individual commit messages
    
    ## Verifying this change
    
    This change adds a new `FlinkKafaProducer011` test case that covers bug fix.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (**yes** / no)
      - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pnowojski/flink f9295

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5977.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5977
    
----
commit f7ee017f7e87d96182c859ad4c7849e93785e3e9
Author: Piotr Nowojski <piotr.nowojski@...>
Date:   2018-05-09T09:12:18Z

    [hotfix][tests] Introduce MockEnvironmentBuilder to deduplicate 
MockEnvironment constructors

commit c19c56205846c13ef33f436ea49eabd5eb670408
Author: Piotr Nowojski <piotr.nowojski@...>
Date:   2018-05-09T09:29:18Z

    [hotfix][tests] Reduce mockito usage in tests

commit fa5bc99c3029f6fe4b772ee9b0f73dd42fcc445a
Author: Piotr Nowojski <piotr.nowojski@...>
Date:   2018-05-08T15:46:29Z

    [FLINK-9316][streaming] Expose operator's unique ID in DataStream programs
    
    This allows to uniquely and stably across multiple job submissions identify 
operators.
    Previously two different operators that were executed by tasks that had the 
same name
    were indistinguishable.

commit fe34837a820b8c3db0b758a2a1cdcafaed1813e8
Author: Piotr Nowojski <piotr.nowojski@...>
Date:   2018-05-08T15:49:31Z

    [FLINK-9295][kafka] Fix two transactional.id collisions for 
FlinkKafkaProducer011
    
    Previously if there were two completely independent FlinkKafkaProducer011 
data sinks
    in the job graph, their transactional.id would collide with one another. 
Fix is to
    use operator's unique ID as well along task name and subtask id.
    
    This change requires backwardcompatibility check for recovering from older 
savepoints.

----


> FlinkKafkaProducer011 sometimes throws ProducerFencedExceptions when in 
> EXACTLY_ONCE mode
> -----------------------------------------------------------------------------------------
>
>                 Key: FLINK-9295
>                 URL: https://issues.apache.org/jira/browse/FLINK-9295
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.4.2
>            Reporter: Christopher Ng
>            Assignee: Piotr Nowojski
>            Priority: Major
>             Fix For: 1.5.0
>
>
> {{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple 
> sinks are used within the same sub-task.  This can happen when 
> operator-chaining results in two different sinks in the same topology being 
> chained into a task, and thus into each of its sub-tasks.
> The problem is that {{TransactionIdsGenerator}} only takes into account the 
> task name, the subtask index, the number of subtasks, and a couple of other 
> things.  All the attributes are the same between different 
> {{FlinkKafkaProducer011s}} within the same sub-task, so they get the same 
> transaction ids and one of them ends up failing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to