GitHub user tzulitai opened a pull request:
https://github.com/apache/flink/pull/5189
[FLINK-8283] [kafka] Introduce KafkaOffsetCommitter interface
## What is the purpose of the change
This PR is built upon the reworked `FlinkKafkaConsumerBaseTest` in #5188.
The broken `FlinkKafkaConsumerBaseTest` is properly fixed only when both
#5188 and this PR is merged.
Prior to this PR, offset committing was coupled tightly with the
`AbstractFetcher`, making unit tests for offset committing behaviours hard to
compose concisely. For example, we had tests that required mocking the offset
commit methods on `AbstractFetcher`, while ideally, it would be best that those
methods are made final (thus, unable to be mocked) to prevent accidental
overrides.
This PR decouples offset committing as a separate service behind a new
`KafkaOffsetCommitter` interface. For now, the `AbstractFetcher` is reused as
an implementation of this service, so that this PR does not introduce any more
change other than introducing a new layer of abstraction.
Unit tests that verify offset committing behaviour now provide a dummy
verifiable implementation of the `KafkaOffsetCommitter` (instead of using mocks
on AbstractFetcher) and test against that.
## Brief change log
- Migrate `AbstractFetcher::commitInternalOffsetsToKafka` method to the
newly introduced `KafkaOffsetCommitter` interface.
- Let `AbstractFetcher` implement `KafkaOffsetCommitter`
- In the `FlinkKafkaConsumerBase`, let "offset committing" and "record
fetching" be logically separated to be handled by two services, i.e. namely a
`KafkaOffsetCommitter` and a `AbstractFetcher`. Physically, the fetcher
instance sits behind both service abstractions.
- In `FlinkKafkaConsumerBaseTest`, remove all mocks on
`AbstractFetcher::commitInternalOffsetsToKafka`, and test against a
`KafkaOffsetCommitter` instead.
## Verifying this change
This PR does not add any new functionality. Reworked test also do not
affect test coverage.
`FlinkKafkaConsumerBaseTest` verifies all changes.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: no
- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? n/a
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tzulitai/flink FLINK-8283-KafkaOffsetCommitter
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5189.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5189
----
commit 620d500b1a14f8f5016e56fdc3d65d853ce8848d
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date: 2017-12-20T00:10:44Z
[FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to not rely on Java
reflection
Reflection was mainly used to inject mocks into private fields of the
FlinkKafkaConsumerBase, without the need to fully execute all operator
life cycle methods. This, however, caused the unit tests to be too
implementation-specific.
This commit reworks the FlinkKafkaConsumerBaseTest to remove test
consumer instantiation methods that rely on reflection for dependency
injection. All tests now instantiate dummy test consumers normally, and
let all tests properly execute all operator life cycle methods
regardless of the tested logic.
commit 0d19e99d3fb3359f43c2db91611257a5edb2e17f
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date: 2017-12-19T19:55:16Z
[FLINK-8283] [kafka] Introduce KafkaOffsetCommitter interface
Prior to this commit, offset committing was coupled tightly with the
AbstractFetcher, making unit tests for offset committing behaviours hard
to compose concisely. For example, we had tests that mock the offset
commit methods on AbstractFetcher, while ideally, it would be best that
those methods are made final to prevent accidental overrides.
This commit decouples offset committing as a separate service behind a
new KafkaOffsetCommitter interface. For now, the AbstractFetcher is
reused as an implementation of this service.
Unit tests that verify offset committing behaviour now provide a dummy
verifyable implementation of the KafkaOffsetCommitter, instead of using
mocks on AbstractFetcher.
----
---