StephanEwen opened a new pull request #12155:
URL: https://github.com/apache/flink/pull/12155
## What is the purpose of the change
This PR contains various small fixes and improvements for the
`SourceOperator` implementation.
Nothing is any change in user-facing behavior, these are only improvements
for better future maintenance.
## Brief change log
### (1) Simplify State Access in SourceOperator
The SourceOperator has some boiler plate code taking the bytes out of the
`ListState<byte[]>` and applying the `SimpleVersionedSerializer` to turn them
into the splits.
This change encapsulates that code in a utility class
`SimpleVersionedListState<SplitT>` which wraps a `ListState<byte[]>` and
applies the serialization and de-serialization.
### (2) Initialization of the SourceOperator
Before this change, the `SourceOperator` takes a `Source` in the constructor.
All actual components that the `SourceOperator` relies on when working are
lazily initialized, in `open()` or via setters. This change moves towards more
eager initialization, as is the purpose of the new
`SourceOperatorFactory`-based appraoch.
Relying on something as broad as Source also means that a lot of redundant
context has to be provided to the `SourceOperator` during initialization. The
Source is, for example, also responsible for the `SourceEnumerator`, which is
independent of the `SourceOperator`. However, it needed to be considered during
testing, because the tests need to mock a full `Source` in order to instantiate
a SourceOperator.
This change passes the collaborators of the `SourceOperator` directly
eagerly into the constructor. It is not fully possible with the `SourceReader`,
but for that we can still reduce the scope by passing a targeted factory
function.
## Verifying this change
This PR does not change any behavior, only internal design. The
functionality is already covered by existing unit tests. The PR adjusts the
relevant teste where needed.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): **no**
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: **no**
- The serializers: **no**
- The runtime per-record code paths (performance sensitive): **no**
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **no**
- The S3 file system connector: **no**
## Documentation
- Does this pull request introduce a new feature? **no**
- If yes, how is the feature documented? **not applicable**
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]