What about...
*5) Architect the code for sharing using straightforward builds and
dependencies*
Each versioned directory has an implementation of FlinkRunner constructed
from the common source, with version-specific pieces injected (not with a
complex framework, but by just passing as a parameter).
I just read over the actual amount of stuff you would have to inject. It is
very small at least through 1.8. Basically:
FlinkRunnerBuilder(
Function<Coder, TypeSerializer> coderTypeSerializerFactory,
Function<ExecutionConfig, TypeSerializer>
encodedValueTypeSerializerFactory).build() // returns a FlinkRunner
Now the code indicates exactly what is going on and all builds are
straightforward. All dependencies are actual build dependencies, not src
dir hacks.
*flink/*
+ *common/ *# move src/ to common/src/ for hygiene; compiles against all
supported Flink versions
- build.gradle
+ *src/*
- org/apache/beam/runners/flink/common*/FlinkRunnerBuilder.java*
- ... # the main runner implementation here
+ *1.5_to_1.7_utils/** # common pluggable parts that work for 1.5 through
1.7*
- build.gradle # maybe makes sense to cross-compile and unit test
against 1.5 through 1.7, though dirs below give coverage too
+ *src/*
- org/apache/beam/runners/flink/v1_5_to_1_7/
*CoderTypeSerializerFactory.java*
- org/apache/beam/runners/flink/v1_5_to_1_7/
*EncodedValueTypeSerializerFactory.java*
+ *1.5/*
- build.gradle
+ *src/ **# implementation of FlinkRunner compatible with 1.5, could
have some of its own logic plugged in to FlinkRunnerBuilder, and some
1.5_to_1.7 utils*
- org/apache/beam/runners/flink/*FlinkRunner.java **#
FlinkRunnerBuilder(new v1_5_to_1_7.CoderTypeSerializerFactory(), new *
v1_5_to_1_7.*EncodedValueTypeSerializerFactory())*
+ *1.6/*
- build.gradle
+ *src/ **# implementation of FlinkRunner compatible with 1.6, actually
has none of its own logic but it could*
- org/apache/beam/runners/flink/*FlinkRunner.java **#
FlinkRunnerBuilder(new v1_5_to_1_7.CoderTypeSerializerFactory(), new *
v1_5_to_1_7.*EncodedValueTypeSerializerFactory())*
+ *1.7/*
+ *src/ **# implementation of FlinkRunner compatible with 1.7, actually
has none of its own logic but it could*
- org/apache/beam/runners/flink/*FlinkRunner.java **#
FlinkRunnerBuilder(new v1_5_to_1_7.CoderTypeSerializerFactory(), new *
v1_5_to_1_7.*EncodedValueTypeSerializerFactory())*
+ *1.8/*
- build.gradle
+ *src/*
- org/apache/beam/runners/flink/*FlinkRunner.java **#
FlinkRunnerBuilder(new v1_8_CoderTypeSerializerFactory(), new
EncodedValueTypeSerializerFactory())*
- org/apache/beam/runners/flink/*v1_8/CoderTypeSerializerFactory.java*
- org/apache/beam/runners/flink/
*v1_8/EncodedValueTypeSerializerFactory.java*
Kenn
On Sat, Sep 7, 2019 at 9:00 AM Lukasz Cwik <[email protected]> wrote:
> When we import the Beam code into Google, we also run into issues where
> sometimes we need to transform parts of the code. During import we use
> copybara[1] to do these transformations to the source which are more then
> just copy file X from some other path since most of the time we want to
> change only a few lines and this really helps reduce the maintenance pain.
> Unfortunately I don't see a Gradle plugin for copybara but I do imagine
> there is a plugin that allows one to run SED like expressions or other
> transformations instead of just maintaining duplicate copies of files.
>
> 1: https://github.com/google/copybara
>
> On Sat, Sep 7, 2019 at 3:37 AM David Morávek <[email protected]> wrote:
>
>> Hello,
>>
>> we currently have an opened PR for Flink 1.9
>> <https://github.com/apache/beam/pull/9296>, which greatly improves the
>> runner for batch use-case. In case the PR gets merged, we would be
>> supporting 5 latest major versions of Flink, which obviously come with high
>> maintenance price and makes future development harder (there are already a
>> sub-optimal parts due to compatibility with previous versions). Thomas and
>> Max expressed needs for addressing the issue with the current release.
>>
>> Let's break down possible solution for the problem.
>>
>> *1) Current solution*
>>
>> Currently we maintain separate build for each version. The project
>> structure looks as follows:
>>
>> *flink/*
>> +
>> *1.5/*
>> + *src/** # implementation of classes that differ between versions*
>> - build.gradle
>> + *1.6/*
>> + build.gradle #* the version is backward compatible, so it can reuse
>> "overrides" from 1.5*
>> + *1.7/*
>> + build.gradle #* the version is backward compatible, so it can reuse
>> "overrides" from 1.5*
>> + *1.8/*
>> + *src/ **# implementation of classes that differ between versions*
>> - build.gradle
>> + *1.9/*
>> + *src/ **# implementation of classes that differ between versions*
>> - build.gradle
>> + *src/*
>> * # common source, shared among runner versions*
>> - flink_runner.gradle
>> * # included by each <version>/build.gradle*
>>
>> The problem with this structure is, that we always need to copy all of
>> the version specific classes between backward incompatible versions, which
>> results in *duplicate files* (we can not simply override a single file,
>> because it wouldn't compile due to duplicate classes).
>>
>> *2) Symlink duplicates*
>>
>> Maybe we can simply symlink duplicates between versions and only override
>> the files that need to be changed?
>>
>> *3) Adjusting the gradle build*
>>
>> Currently a version build looks something like this (this one is for
>> 1.7.x version):
>>
>> project.ext {
>> // Set the version of all Flink-related dependencies here.
>> flink_version = '1.7.2'
>> // Main source directory and Flink version specific code.
>> main_source_dirs = ["$basePath/src/main/java", "../1.5/src/main/java"]
>> test_source_dirs = ["$basePath/src/test/java", "../1.5/src/test/java"]
>> main_resources_dirs = ["$basePath/src/main/resources"]
>> test_resources_dirs = ["$basePath/src/test/resources"]
>> archives_base_name = 'beam-runners-flink-1.7'
>> }
>>
>> // Load the main build script which contains all build logic.
>> apply from: "$basePath/flink_runner.gradle"
>>
>> It basically says, take the common source and append version specific
>> implementations from 1.5 version. Let's say we want to override a single
>> file for 1.8. We need to copy everything from 1.5/src and the build file
>> would look as follows:
>>
>> /* All properties required for loading the Flink build script */
>> project.ext {
>> // Set the version of all Flink-related dependencies here.
>> flink_version = '1.8.0'
>> // Main source directory and Flink version specific code.
>> main_source_dirs = ["$basePath/src/main/java", "./src/main/java"]
>> test_source_dirs = ["$basePath/src/test/java", "./src/test/java"]
>> main_resources_dirs = ["$basePath/src/main/resources"]
>> test_resources_dirs = ["$basePath/src/test/resources"]
>> archives_base_name = 'beam-runners-flink-1.8'
>> }
>>
>> // Load the main build script which contains all build logic.
>> apply from: "$basePath/flink_runner.gradle"
>>
>> For simplicity, let's only focus on *main_source_dirs*. What we really
>> want to do is to tell the build, to use everything from 1.5 and override
>> a single class (e.g. CoderTypeSerializer).
>>
>> def copyOverrides = tasks.register('copyOverrides', Copy) {
>> it.from '../1.5/src/', './src'
>> it.into "${project.buildDir}/flink-overrides/src"
>> it.duplicatesStrategy DuplicatesStrategy.INCLUDE // The last duplicate
>> file 'wins'.
>> }
>>
>> compileJava.dependsOn copyOverrides
>>
>> projext.ext {
>> main_source_dirs = ["$basePath/src/main/java",
>> "${project.buildDir}/flink-overrides/src/main/java"]
>> }
>>
>> This would copy all overrides into build directory, and it case of
>> duplicate it picks the latest one. Than the build would simple compile
>> classes from the newly created java files in build directory.
>>
>> *4) Maintaining last 3 major versions only*
>>
>> I recall that Flink community only supports 3 latest major versions
>> <https://flink.apache.org/downloads.html> (please correct me if I'm
>> mistaken). I suggest the the* Beam would do the same*. There is already
>> an opened BEAM-7962 <https://jira.apache.org/jira/browse/BEAM-7962> that
>> suggest dropping 1.5 & 1.6 versions. Maybe this would allow us to keep the
>> current structure with bearable amount of technical debt?
>>
>> Personally I'm in favor of *4)* combined with *3)*.
>>
>> What do you think? Do you have any other suggestions how to solve this?
>>
>> Thanks,
>> D.
>>
>