[ 
https://issues.apache.org/jira/browse/FLINK-8306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16300588#comment-16300588
 ] 

ASF GitHub Bot commented on FLINK-8306:
---------------------------------------

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/5200

    [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter interface

    ## What is the purpose of the change
    
    This PR is built upon the reworked `FlinkKafkaConsumerBaseTest` in #5188.
    
    Prior to this PR, offset committing was coupled tightly with the 
`AbstractFetcher`, making unit tests for offset committing behaviours hard to 
compose concisely. For example, we had tests that required mocking the offset 
commit methods on `AbstractFetcher`, while ideally, it would be best that those 
methods are made final (thus, unable to be mocked) to prevent accidental 
overrides.
    
    This PR decouples offset committing as a separate service behind a new 
`KafkaOffsetCommitter` interface. For now, the `AbstractFetcher` is reused as 
an implementation of this service, so that this PR does not introduce any more 
change other than introducing a new layer of abstraction.
    
    Unit tests that verify offset committing behaviour now provide a dummy 
verifiable implementation of the `KafkaOffsetCommitter` (instead of using mocks 
on `AbstractFetcher`) and test against that.
    
    ## Brief change log
    
    - Migrate `AbstractFetcher::commitInternalOffsetsToKafka` method to the 
newly introduced `KafkaOffsetCommitter` interface.
    - Let `AbstractFetcher` implement `KafkaOffsetCommitter`
    - In the `FlinkKafkaConsumerBase`, let "offset committing" and "record 
fetching" be logically separated to be handled by two services, i.e. namely a 
`KafkaOffsetCommitter` and a `AbstractFetcher`. Physically, the fetcher 
instance sits behind both service abstractions.
    - In `FlinkKafkaConsumerBaseTest`, remove all mocks on 
`AbstractFetcher::commitInternalOffsetsToKafka`, and test against a 
`KafkaOffsetCommitter` instead.
    - Additional hotfix 2906968 that fixes a stale comment referring to an old 
`AbstractFetcher` behaviour to avoid confusion.
    
    ## Verifying this change
    
    This PR does not add any new functionality. Reworked test also do not 
affect test coverage.
    `FlinkKafkaConsumerBaseTest` verifies all changes.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: no
      - The S3 file system connector: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? no
      - If yes, how is the feature documented? n/a


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-8306

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5200.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5200
    
----
commit cb3f488877b7fb2ab0dfcc5c24fe53035bc765e7
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2017-12-20T00:10:44Z

    [FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to not rely on Java 
reflection
    
    Reflection was mainly used to inject mocks into private fields of the
    FlinkKafkaConsumerBase, without the need to fully execute all operator
    life cycle methods. This, however, caused the unit tests to be too
    implementation-specific.
    
    This commit reworks the FlinkKafkaConsumerBaseTest to remove test
    consumer instantiation methods that rely on reflection for dependency
    injection. All tests now instantiate dummy test consumers normally, and
    let all tests properly execute all operator life cycle methods
    regardless of the tested logic.

commit 21491d567ef5d3b8294deec2890b48900c22dd56
Author: Nico Kruber <nico@...>
Date:   2017-12-19T17:14:19Z

    [FLINK-8295] [cassandra] [build] Properly shade netty for the datastax 
driver
    
    com.datastax.driver.core.NettyUtil expects netty to be present either at its
    original package or relocated to com.datastax.shaded.netty. By relocating it
    to this package we make sure the driver follows its designated path.
    
    This closes #5183.

commit ef40aaa9d942cc49c3d51816eacdaa5e7dbe9fa5
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2017-12-19T19:55:16Z

    [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter interface
    
    Prior to this commit, offset committing was coupled tightly with the
    AbstractFetcher, making unit tests for offset committing behaviours hard
    to compose concisely. For example, we had tests that mock the offset
    commit methods on AbstractFetcher, while ideally, it would be best that
    those methods are made final to prevent accidental overrides.
    
    This commit decouples offset committing as a separate service behind a
    new KafkaOffsetCommitter interface. For now, the AbstractFetcher is
    reused as an implementation of this service.
    
    Unit tests that verify offset committing behaviour now provide a dummy
    verifyable implementation of the KafkaOffsetCommitter, instead of using
    mocks on AbstractFetcher.

commit 59917867ed19bf9f1ef9b6fba696983675e2747e
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2017-12-20T19:54:40Z

    [hotfix] [kafka] Remove stale comment on publishing procedures of 
AbstractFetcher
    
    The previous comment mentioned "only now will the fetcher return at
    least the restored offsets when calling snapshotCurrentState()". This is
    a remnant of the previous fetcher initialization behaviour, where in the
    past the fetcher wasn't directly seeded with restored offsets on
    instantiation.
    
    Since this is no longer true, this commit fixes the stale comment to
    avoid confusion.

----


> FlinkKafkaConsumerBaseTest has invalid mocks on final methods
> -------------------------------------------------------------
>
>                 Key: FLINK-8306
>                 URL: https://issues.apache.org/jira/browse/FLINK-8306
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector, Tests
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Critical
>             Fix For: 1.5.0, 1.4.1
>
>
> The {{FlinkKafkaConsumerBaseTest}} has invalid mocks on a final 
> {{AbstractFetcher::commitInternalOffsetsToKafka(...)}} method. While an easy 
> fix would be to simply make that method non-final, that is not ideal since it 
> would be best that the method is left final to prevent overrides in 
> subclasses.
> This suggests that offset committing functionality is too tightly coupled 
> with the {{AbstractFetcher}}, making it hard to perform concise tests to 
> verify offset committing.
> I suggest that we decouple record fetching and offset committing as separate 
> services behind different interfaces. We should introduce a new interface, 
> say {{KafkaOffsetCommitter}}, and test against that instead. Initially, we 
> can simply let {{AbstractFetcher}} implement {{KafkaOffsetCommitter}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to