David, thanks for working on the Flink 1.9 support, this is very exciting!

Previous discussion on list/PRs (predating even the JIRA referenced by
Max), already pointed to the removal of support for Flink 1.5/1.6 support.
Although we generally need to consider what the Beam users need (not just
what the Flink community currently supports), I believe there was agreement
on removing 1.5/1.6 and it is inline with the Beam Flink version support
goals.

Given that, can we start with removing these two runner variants from the
build as the first step (separate PR for BEAM-7962)?

Based on Max's analysis, this will eliminate a major concern with the
current state, the duplication of source files.

Then, we can take another look at your rebased PR for 1.9 and see if there
are any candidates left to apply what Kenn has suggested, which would lead
to more fine grained overrides that avoid duplication of code.

Thanks,
Thomas


On Sun, Sep 8, 2019 at 3:15 AM Maximilian Michels <[email protected]> wrote:

> One minor addition, the "*1.5_to_1.7_utils" directory you listed, is
> already present as the "1.5" directory. Admittedly, it could be renamed to
> make that clearer.
>
> *
> On 08.09.19 10:06, Maximilian Michels wrote:
> > Thanks for your thoughts, David!
> >
> > Thanks Luke, copybara looks interesting.
> >
> > > Thomas and Max expressed needs for addressing the issue with the
> > > current release.
> >
> > I think there is a misunderstanding. Thomas and I expressed the need to
> > review the PR properly before merging it to avoid introducing technical
> > debt. The expectation to open a feature PR a couple days before the
> > release cut and have it, a) reviewed, b) tested, and c) merged, is a lot
> > to ask for the maintainers. We are currently in stage (a) after the
> > first round of reviews.
> >
> > 1.5/1.6 and possibly 1.7 will be removed. This was announced earlier on
> > the mailing list and is tracked here:
> > https://jira.apache.org/jira/browse/BEAM-7962
> >
> > The amount of duplicated files code is very little, two very simple
> > files. After removing 1.5/1.6/1.7 there should be none left. In the PR
> > it looks like there were unnecessary duplications of files. That's why I
> > commented.
> >
> > Due to the above reasons, I don't see the need to change the current way
> > of cross-compiling for Flink versions. Out of David proposals, I still
> > think the current approach is the easiest and least error-prone.
> >
> > Kenn, I appreciate your thoughts. How is your approach different from
> > the current, expect introducing an additional factory method? The source
> > layout is effectively identical to the current. We still have to retain
> > different source directories for each Flink version where Flink internal
> > interfaces change.
> >
> > > Now the code indicates exactly what is going on and all builds are
> > > straightforward. All dependencies are actual build dependencies, not
> > > src dir hacks.
> >
> > Source directories are build dependencies. I can see a minor improvement
> > though in code readability (and that's always great), but still the
> > developer needs to know from which source directory the factory is going
> > to be loaded from, which is very similar to the current approach.
> >
> > Thanks,
> > Max
> >
> > On 07.09.19 20:37, Kenneth Knowles wrote:
> > > What about...
> > >
> > > *5) Architect the code for sharing using straightforward builds and
> > > dependencies*
> > >
> > > Each versioned directory has an implementation of FlinkRunner
> > > constructed from the common source, with version-specific pieces
> > > injected (not with a complex framework, but by just passing as a
> > > parameter).
> > >
> > > I just read over the actual amount of stuff you would have to inject.
> > > It is very small at least through 1.8. Basically:
> > >
> > >      FlinkRunnerBuilder(
> > >        Function<Coder, TypeSerializer> coderTypeSerializerFactory,
> > >        Function<ExecutionConfig, TypeSerializer>
> > > encodedValueTypeSerializerFactory).build() // returns a FlinkRunner
> > >
> > > Now the code indicates exactly what is going on and all builds are
> > > straightforward. All dependencies are actual build dependencies, not
> > > src dir hacks.
> > >
> > > *flink/*
> > > + *common/ *# move src/ to common/src/ for hygiene; compiles against
> > > all supported Flink versions
> > >     - build.gradle
> > >     + *src/*
> > >        - org/apache/beam/runners/flink/common*/FlinkRunnerBuilder.java*
> > > **- ... # the main runner implementation here
> > > + *1.5_to_1.7_utils/*/ # common pluggable parts that work for 1.5
> > > through 1.7/
> > >     - build.gradle # maybe makes sense to cross-compile and unit test
> > > against 1.5 through 1.7, though dirs below give coverage too
> > > + *src/*
> > >
> - org/apache/beam/runners/flink/v1_5_to_1_7/*CoderTypeSerializerFactory.java*
> > >
> > >
> - 
> org/apache/beam/runners/flink/v1_5_to_1_7/*EncodedValueTypeSerializerFactory.java*
> > >
> > > + *1.5/*
> > >     - build.gradle
> > >     + *src/ */# implementation of FlinkRunner compatible with 1.5,
> > > could have some of its own logic plugged in to FlinkRunnerBuilder, and
> > > some 1.5_to_1.7 utils/*
> > > *
> > >        - org/apache/beam/runners/flink/*FlinkRunner.java */#
> > > FlinkRunnerBuilder(new v1_5_to_1_7.CoderTypeSerializerFactory(), new
> > > /v1_5_to_1_7./EncodedValueTypeSerializerFactory())/
> > > + *1.6/*
> > >     - build.gradle
> > >     + *src/ */# implementation of FlinkRunner compatible with 1.6,
> > > actually has none of its own logic but it could/*
> > > *
> > >        - org/apache/beam/runners/flink/*FlinkRunner.java */#
> > > FlinkRunnerBuilder(new v1_5_to_1_7.CoderTypeSerializerFactory(), new
> > > /v1_5_to_1_7./EncodedValueTypeSerializerFactory())/
> > > + *1.7/*
> > >     + *src/ */# implementation of FlinkRunner compatible with 1.7,
> > > actually has none of its own logic but it could/*
> > > *
> > >        - org/apache/beam/runners/flink/*FlinkRunner.java */#
> > > FlinkRunnerBuilder(new v1_5_to_1_7.CoderTypeSerializerFactory(), new
> > > /v1_5_to_1_7./EncodedValueTypeSerializerFactory())/
> > > + *1.8/*
> > >     - build.gradle
> > > + *src/*
> > >        - org/apache/beam/runners/flink/*FlinkRunner.java */#
> > > FlinkRunnerBuilder(new v1_8_CoderTypeSerializerFactory(), new
> > > EncodedValueTypeSerializerFactory())/
> > > - org/apache/beam/runners/flink/*v1_8/CoderTypeSerializerFactory.java*
> > >
> - org/apache/beam/runners/flink/*v1_8/EncodedValueTypeSerializerFactory.java*
> > >
> > >
> > > Kenn
> > >
> > > On Sat, Sep 7, 2019 at 9:00 AM Lukasz Cwik <[email protected]
> > > <mailto:[email protected]>> wrote:
> > >
> > >     When we import the Beam code into Google, we also run into issues
> > >     where sometimes we need to transform parts of the code. During
> > >     import we use copybara[1] to do these transformations to the source
> > >     which are more then just copy file X from some other path since
> most
> > >     of the time we want to change only a few lines and this really
> helps
> > >     reduce the maintenance pain. Unfortunately I don't see a Gradle
> > >     plugin for copybara but I do imagine there is a plugin that allows
> > >     one to run SED like expressions or other transformations instead of
> > >     just maintaining duplicate copies of files.
> > >
> > >     1: https://github.com/google/copybara
> > >
> > >     On Sat, Sep 7, 2019 at 3:37 AM David Morávek <[email protected]
> > >     <mailto:[email protected]>> wrote:
> > >
> > >         Hello,
> > >
> > >         we currently have an opened PR for Flink 1.9
> > >         <https://github.com/apache/beam/pull/9296>, which greatly
> > >         improves the runner for batch use-case. In case the PR gets
> > >         merged, we would be supporting 5 latest major versions of
> Flink,
> > >         which obviously come with high maintenance price and makes
> > >         future development harder (there are already a sub-optimal
> parts
> > >         due to compatibility with previous versions). Thomas and Max
> > >         expressed needs for addressing the issue with the current
> > > release.
> > >
> > >         Let's break down possible solution for the problem.
> > >
> > >         *1) Current solution*
> > >         *
> > >         *
> > >         Currently we maintain separate build for each version. The
> > >         project structure looks as follows:
> > >
> > >         *flink/*
> > >         + *1.5/
> > >         *
> > >             + *src/*/# implementation of classes that differ between
> > >         versions/*
> > >         *
> > >             - build.gradle
> > >         + *1.6/*
> > >             + build.gradle #/the version is backward compatible, so it
> > >         can reuse "overrides" from 1.5/
> > >         + *1.7/*
> > >             + build.gradle #/the version is backward compatible, so it
> > >         can reuse "overrides" from 1.5/
> > >         + *1.8/*
> > >         + *src/ */# implementation of classes that differ between
> > > versions/
> > >             - build.gradle
> > >         + *1.9/*
> > >         + *src///*/# implementation of classes that differ between
> > > versions/
> > >             - build.gradle
> > >         + *src/*/# common source, shared among runner versions
> > >         /
> > >         - flink_runner.gradle/# included by  each
> <version>/build.gradle
> > >         /
> > >
> > >         The problem with this structure is, that we always need to copy
> > >         all of the version specific classes between backward
> > >         incompatible versions, which results in *duplicate files* (we
> > >         can not simply override a single file, because it wouldn't
> > >         compile due to duplicate classes).
> > >         *
> > >         *
> > >         *2) Symlink duplicates*
> > >         *
> > >         *
> > >         Maybe we can simply symlink duplicates between versions and
> only
> > >         override the files that need to be changed?*
> > >         *
> > >
> > >         *3) Adjusting the gradle build*
> > >         *
> > >         *
> > >         Currently a version build looks something like this (this one
> is
> > >         for 1.7.x version):
> > >         *
> > >         *
> > >         project.ext {
> > >            // Set the version of all Flink-related dependencies here.
> > >            flink_version = '1.7.2'
> > >            // Main source directory and Flink version specific code.
> > >            main_source_dirs = ["$basePath/src/main/java",
> > >         "../1.5/src/main/java"]
> > >            test_source_dirs = ["$basePath/src/test/java",
> > >         "../1.5/src/test/java"]
> > >            main_resources_dirs = ["$basePath/src/main/resources"]
> > >            test_resources_dirs = ["$basePath/src/test/resources"]
> > >            archives_base_name = 'beam-runners-flink-1.7'
> > >         }
> > >
> > >         // Load the main build script which contains all build logic.
> > >         apply from: "$basePath/flink_runner.gradle"
> > >
> > >         It basically says, take the common source and append version
> > >         specific implementations from 1.5 version. Let's say we want to
> > >         override a single file for 1.8. We need to copy everything from
> > >         1.5/src and the build file would look as follows:
> > >
> > >         /* All properties required for loading the Flink build script
> */
> > >         project.ext {
> > >            // Set the version of all Flink-related dependencies here.
> > >            flink_version = '1.8.0'
> > >            // Main source directory and Flink version specific code.
> > >            main_source_dirs = ["$basePath/src/main/java",
> > > "./src/main/java"]
> > >            test_source_dirs = ["$basePath/src/test/java",
> > > "./src/test/java"]
> > >            main_resources_dirs = ["$basePath/src/main/resources"]
> > >            test_resources_dirs = ["$basePath/src/test/resources"]
> > >            archives_base_name = 'beam-runners-flink-1.8'
> > >         }
> > >
> > >         // Load the main build script which contains all build logic.
> > >         apply from: "$basePath/flink_runner.gradle"
> > >
> > >         For simplicity, let's only focus on *main_source_dirs*. What we
> > >         really want to do is to tell the build, to use everything from
> > >         1.5 and override a single class (e.g. CoderTypeSerializer).
> > >
> > >         def copyOverrides = tasks.register('copyOverrides', Copy) {
> > >            it.from '../1.5/src/', './src'
> > >            it.into "${project.buildDir}/flink-overrides/src"
> > >            it.duplicatesStrategy DuplicatesStrategy.INCLUDE // The last
> > >         duplicate file 'wins'.
> > >         }
> > >
> > >         compileJava.dependsOn copyOverrides
> > >
> > >         projext.ext {
> > >            main_source_dirs = ["$basePath/src/main/java",
> > >         "${project.buildDir}/flink-overrides/src/main/java"]
> > >         }
> > >
> > >         This would copy all overrides into build directory, and it case
> > >         of duplicate it picks the latest one. Than the build would
> > >         simple compile classes from the newly created java files in
> > >         build directory.
> > >
> > >         *4) Maintaining last 3 major versions only*
> > >
> > >         I recall that Flink community only supports 3 latest major
> > >         versions <https://flink.apache.org/downloads.html> (please
> > >         correct me if I'm mistaken). I suggest the the*Beam would do
> the
> > >         same*. There is already an opened BEAM-7962
> > >         <https://jira.apache.org/jira/browse/BEAM-7962> that suggest
> > >         dropping 1.5 & 1.6 versions. Maybe this would allow us to keep
> > >         the current structure with bearable amount of technical debt?
> > >
> > >         Personally I'm in favor of *4)* combined with *3)*.
> > >
> > >         What do you think? Do you have any other suggestions how to
> > >         solve this?
> > >
> > >         Thanks,
> > >         D.
> > >
>
>

Reply via email to