On Sun, Jul 25, 2021 at 6:33 AM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
I'll start from the end.
I don't think we should be breaking language agnostic API
layers (for example, definition of model coders) just to
support "use_deprecated_read".
"Breaking" and "fixing" can only be a matter of the
definition of the object at hand. I don't think, that Coder
can be totally language agnostic - yes, the mapping between
serialized form and deserialized form can be _defined_ in a
language agnostic way, but must be_implemented_ in a specific
language. If we choose the implementing language, what makes
us treat SDK-specific coders defined by the SDK of the same
language as "unknown"? It is only our decision, that seems to
have no practical benefits.
In general, language-neutral APIs and protocols are a key feature
of portable Beam. See here:
https://beam.apache.org/roadmap/portability/
<https://beam.apache.org/roadmap/portability/>
(I did not look into all the old discussions and votes related to
this but I'm sure they are there)
Moreover, including SDK-specific coders into supported coders
of the SDK runner construction counterpart (that is, runner
core-construction-java for Java SDK) is a necessary
prerequisite for unifying "classical" and "portable" runners,
because the runner needs to understand *all* SDK coders so
that it can _inline_ the complete Pipeline (if the Pipeline
SDK has the same language as the runner), instead of running
it through SDK harness. This need therefore is not specific
to supporting use_deprecated_read, but is a generic
requirement, which only has the first manifestation in the
support of a transform not supported by SDK harness.
I think "use_deprecated_read" should be considered a
stop-gap measure for Flink (and Spark ?) till we have proper
support for SDF. In fact I don't think an arbitrary portable
runner can support "use_deprecated_read" due to the following.
There seems to be nothing special about Flink regarding the
support of primitive Read. I think any Java native runner can
implement it pretty much the same way as Flink does. The
question is if any other runner might want to do that. The
problem with Flink is that
Not all runners are implemented using Java. For example, the
portable DirectRunner (FnAPI runner) is implemented using Python
and Dataflow is implemented using C++. Such runners will not be
able to do this.
1) portable SDF seems not to work [1]
2) even classical Flink runner has still issues with SDF -
there are reports of watermark being stuck when reading data
via SDF, this gets resolved using use_deprecated_read
3) Flink actually does not have any benefits from SDF,
because it cannot make use of the dynamic splitting, so this
actually brings only implementation burden without any
practical benefit
Similarly, I think there were previous discussions related to
using SDF as the source framework for portable runners.
I understand that there are some bugs related to SDF and portable
Flink currently. How much work do you think is needed here ? Will
it be better to focus our efforts on fixing remaining issues for
SDF and portable runners instead of supporting
"use_deprecated_read" for that path ? Note that I'm fine with
fixing any issues related to "use_deprecated_read" for classic
(non-portable) Flink but I think you are trying to use x-lang
hence probably need portable Flink.
Thanks,
Cham
I think that we should reiterate on the decision of
deprecating Read - if we can implement it via SDF, what is
the reason to forbid a runner to make use of a simpler
implementation? The expansion of Read might be runner
dependent, that is something we do all the time, or am I
missing something?
Jan
[1] https://issues.apache.org/jira/browse/BEAM-10940
<https://issues.apache.org/jira/browse/BEAM-10940>
On 7/25/21 1:38 AM, Chamikara Jayalath wrote:
I think we might be going down a bit of a rabbit hole with
the support for "use_deprecated_read" for portable Flink :)
I think "use_deprecated_read" should be considered a
stop-gap measure for Flink (and Spark ?) till we have proper
support for SDF. In fact I don't think an arbitrary portable
runner can support "use_deprecated_read" due to the following.
(1) SDK Harness is not aware of
BoundedSource/UnboundedSource. Only source framework SDK
Harness is aware of is SDF.
(2) Invoking BoundedSource/UnboundedSource is not a part of
the Fn API
(3) A non-Java Beam portable runner will probably not be
able to directly invoke legacy Read transforms similar to
the way Flink does today.
I don't think we should be breaking language agnostic API
layers (for example, definition of model coders) just to
support "use_deprecated_read".
Thanks,
Cham
On Sat, Jul 24, 2021 at 11:50 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
On 7/24/21 12:34 AM, Robert Bradshaw wrote:
> On Thu, Jul 22, 2021 at 10:20 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
>> Hi,
>>
>> this was a ride. But I managed to get that working.
I'd like to discuss two points, though:
>>
>> a) I had to push Java coders to ModelCoders for
Java (which makes sense to me, but is that correct?).
See [1]. It is needed so that the Read transform
(executed directly in TaskManager) can correctly
communicate with Java SDK harness using custom coders
(which is tested here [2]).
> I think the intent was that ModelCoders represent the
set of
> language-agnostic in the model, though I have to admit
I've always
> been a bit fuzzy on when a coder must or must not be
in that list.
I think that this definition works as long, as runner
does not itself
interfere with the Pipeline. Once the runner starts (by
itself, not via
SdkHarnessClient) producing data, it starts to be part
of the
environment, and therefore it should understand its own
Coders. I'd
propose the definition of "model coders" to be Coders
that the SDK is
able to understand, which then works naturally for the
ModelCoders
located in "core-construction-java", that it should
understand Javs SDK
Coders.
>
>> b) I'd strongly prefer if we moved the handling of
use_deprecated_read from outside of the Read PTransform
directly into expand method, see [3]. Though this is not
needed for the Read on Flink to work, it seems cleaner.
>>
>> WDYT?
> The default value of use_deprecated_read should depend
on the runner
> (e.g. some runners don't work well with it, others
require it). As
> such should not be visible to the PTransform's expand.
I think we should know what is the expected outcome. If
a runner does
not support primitive Read (and therefore
use_deprecated_read), what
should we do, if we have such experiment set? Should the
Pipeline fail,
or should it be silently ignored? I think that we should
fail, because
user expects something that cannot be fulfilled.
Therefore, we have two
options - handling the experiment explicitly in runners
that do not
support it, or handle it explicitly in all cases (both
supported and
unsupported). The latter case is when we force runners
to call explicit
conversion method (convertPrimitiveRead....). Every
runner that does not
support primitive Read must handle the experiment either
way, because
otherwise the experiment would be simply silently
ignored, which is not
exactly user-friendly.
>
>> Jan
>>
>> [1]
https://github.com/apache/beam/pull/15181/commits/394ddc3fdbaacc805d8f7ce02ad2698953f34375
<https://github.com/apache/beam/pull/15181/commits/394ddc3fdbaacc805d8f7ce02ad2698953f34375>
>>
>> [2]
https://github.com/apache/beam/pull/15181/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R201
<https://github.com/apache/beam/pull/15181/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R201>
>>
>> [3]
https://github.com/apache/beam/pull/15181/commits/f1d3fd0217e5513995a72e92f68fe3d1d665c5bb
<https://github.com/apache/beam/pull/15181/commits/f1d3fd0217e5513995a72e92f68fe3d1d665c5bb>
>>
>> On 7/18/21 6:29 PM, Jan Lukavský wrote:
>>
>> Hi,
>>
>> I was debugging the issue and it relates to pipeline
fusion - it seems that the primitive Read transform gets
fused and then is 'missing' as source. I'm a little lost
in the code, but the most strange parts are that:
>>
>> a) I tried to reject fusion of primitive Read by
adding GreedyPCollectionFusers::cannotFuse for
PTransformTranslation.READ_TRANSFORM_URN to
GreedyPCollectionFusers.URN_FUSIBILITY_CHECKERS, but
that didn't change the exception
>>
>> b) I tried adding Reshuffle.viaRandomKey between
Read and PAssert, but that didn't change it either
>>
>> c) when I run portable Pipeline with
use_deprecated_read on Flink it actually runs (though it
fails when it actually reads any data, but if the input
is empty, the job runs), so it does not hit the same
issue, which is a mystery to me
>>
>> If anyone has any pointers that I can investigate,
I'd be really grateful.
>>
>> Thanks in advance,
>>
>> Jan
>>
>>
>>
>> On 7/16/21 2:00 PM, Jan Lukavský wrote:
>>
>> Hi,
>>
>> I hit another issue with the portable Flink runner.
Long story short - reading from Kafka is not working in
portable Flink. After solving issues with expansion
service configuration (ability to add
use_deprecated_read) option, because flink portable
runner has issues with SDF [1], [2]. After being able to
inject the use_deprecated_read into expansion service I
was able to get an execution DAG that has the
UnboundedSource, but then more and more issues appeared
(probably related to missing LengthPrefixCoder somewhere
- maybe at the output from the primitive Read). I wanted
to create a test for it and I found out, that there
actually is ReadSourcePortableTest in FlinkRunner, but
_it tests nothing_. The problem is that Read is
transformed to SDF, so this test tests the SDF, not the
Read transform. As a result, the Read transform does not
work.
>>
>> I tried using
convertReadBasedSplittableDoFnsToPrimitiveReads so that
I could make the test fail and debug that, but I got into
>>
>> java.lang.IllegalArgumentException: PCollectionNodes
[PCollectionNode{id=PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output,
PCollection=unique_name:
"PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output"
>> coder_id: "IterableCoder"
>> is_bounded: BOUNDED
>> windowing_strategy_id: "WindowingStrategy(GlobalWindows)"
>> }] were consumed but never produced
>>
>>
>> which gave me the last knock-out. :)
>>
>> My current impression is that starting from Beam
2.25.0, portable FlinkRunner is not able to read from
Kafka. Could someone give me a hint about what is wrong
with using
convertReadBasedSplittableDoFnsToPrimitiveReads in the
test [3]?
>>
>> Jan
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-11991
<https://issues.apache.org/jira/browse/BEAM-11991>
>>
>> [2] https://issues.apache.org/jira/browse/BEAM-11998
<https://issues.apache.org/jira/browse/BEAM-11998>
>>
>> [3] https://github.com/apache/beam/pull/15181
<https://github.com/apache/beam/pull/15181>