On Sun, Jul 25, 2021 at 6:33 AM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
I'll start from the end.
I don't think we should be breaking language agnostic API
layers (for example, definition of model coders) just to
support "use_deprecated_read".
"Breaking" and "fixing" can only be a matter of the
definition of the object at hand. I don't think, that Coder
can be totally language agnostic - yes, the mapping between
serialized form and deserialized form can be _defined_ in a
language agnostic way, but must be_implemented_ in a
specific language. If we choose the implementing language,
what makes us treat SDK-specific coders defined by the SDK
of the same language as "unknown"? It is only our decision,
that seems to have no practical benefits.
In general, language-neutral APIs and protocols are a key
feature of portable Beam. See here:
https://beam.apache.org/roadmap/portability/
<https://beam.apache.org/roadmap/portability/>
(I did not look into all the old discussions and votes related
to this but I'm sure they are there)
Moreover, including SDK-specific coders into supported
coders of the SDK runner construction counterpart (that is,
runner core-construction-java for Java SDK) is a necessary
prerequisite for unifying "classical" and "portable"
runners, because the runner needs to understand *all* SDK
coders so that it can _inline_ the complete Pipeline (if the
Pipeline SDK has the same language as the runner), instead
of running it through SDK harness. This need therefore is
not specific to supporting use_deprecated_read, but is a
generic requirement, which only has the first manifestation
in the support of a transform not supported by SDK harness.
I think "use_deprecated_read" should be considered a
stop-gap measure for Flink (and Spark ?) till we have
proper support for SDF. In fact I don't think an arbitrary
portable runner can support "use_deprecated_read" due to
the following.
There seems to be nothing special about Flink regarding the
support of primitive Read. I think any Java native runner
can implement it pretty much the same way as Flink does. The
question is if any other runner might want to do that. The
problem with Flink is that
Not all runners are implemented using Java. For example, the
portable DirectRunner (FnAPI runner) is implemented using Python
and Dataflow is implemented using C++. Such runners will not be
able to do this.
1) portable SDF seems not to work [1]
2) even classical Flink runner has still issues with SDF -
there are reports of watermark being stuck when reading data
via SDF, this gets resolved using use_deprecated_read
3) Flink actually does not have any benefits from SDF,
because it cannot make use of the dynamic splitting, so this
actually brings only implementation burden without any
practical benefit
Similarly, I think there were previous discussions related to
using SDF as the source framework for portable runners.
I understand that there are some bugs related to SDF and
portable Flink currently. How much work do you think is needed
here ? Will it be better to focus our efforts on fixing
remaining issues for SDF and portable runners instead of
supporting "use_deprecated_read" for that path ? Note that I'm
fine with fixing any issues related to "use_deprecated_read" for
classic (non-portable) Flink but I think you are trying to use
x-lang hence probably need portable Flink.
Thanks,
Cham
I think that we should reiterate on the decision of
deprecating Read - if we can implement it via SDF, what is
the reason to forbid a runner to make use of a simpler
implementation? The expansion of Read might be runner
dependent, that is something we do all the time, or am I
missing something?
Jan
[1] https://issues.apache.org/jira/browse/BEAM-10940
<https://issues.apache.org/jira/browse/BEAM-10940>
On 7/25/21 1:38 AM, Chamikara Jayalath wrote:
I think we might be going down a bit of a rabbit hole with
the support for "use_deprecated_read" for portable Flink :)
I think "use_deprecated_read" should be considered a
stop-gap measure for Flink (and Spark ?) till we have
proper support for SDF. In fact I don't think an arbitrary
portable runner can support "use_deprecated_read" due to
the following.
(1) SDK Harness is not aware of
BoundedSource/UnboundedSource. Only source framework SDK
Harness is aware of is SDF.
(2) Invoking BoundedSource/UnboundedSource is not a part of
the Fn API
(3) A non-Java Beam portable runner will probably not be
able to directly invoke legacy Read transforms similar to
the way Flink does today.
I don't think we should be breaking language agnostic API
layers (for example, definition of model coders) just to
support "use_deprecated_read".
Thanks,
Cham
On Sat, Jul 24, 2021 at 11:50 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
On 7/24/21 12:34 AM, Robert Bradshaw wrote:
> On Thu, Jul 22, 2021 at 10:20 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
>> Hi,
>>
>> this was a ride. But I managed to get that working.
I'd like to discuss two points, though:
>>
>> a) I had to push Java coders to ModelCoders for
Java (which makes sense to me, but is that correct?).
See [1]. It is needed so that the Read transform
(executed directly in TaskManager) can correctly
communicate with Java SDK harness using custom coders
(which is tested here [2]).
> I think the intent was that ModelCoders represent the
set of
> language-agnostic in the model, though I have to
admit I've always
> been a bit fuzzy on when a coder must or must not be
in that list.
I think that this definition works as long, as runner
does not itself
interfere with the Pipeline. Once the runner starts (by
itself, not via
SdkHarnessClient) producing data, it starts to be part
of the
environment, and therefore it should understand its own
Coders. I'd
propose the definition of "model coders" to be Coders
that the SDK is
able to understand, which then works naturally for the
ModelCoders
located in "core-construction-java", that it should
understand Javs SDK
Coders.
>
>> b) I'd strongly prefer if we moved the handling of
use_deprecated_read from outside of the Read PTransform
directly into expand method, see [3]. Though this is
not needed for the Read on Flink to work, it seems cleaner.
>>
>> WDYT?
> The default value of use_deprecated_read should
depend on the runner
> (e.g. some runners don't work well with it, others
require it). As
> such should not be visible to the PTransform's expand.
I think we should know what is the expected outcome. If
a runner does
not support primitive Read (and therefore
use_deprecated_read), what
should we do, if we have such experiment set? Should
the Pipeline fail,
or should it be silently ignored? I think that we
should fail, because
user expects something that cannot be fulfilled.
Therefore, we have two
options - handling the experiment explicitly in runners
that do not
support it, or handle it explicitly in all cases (both
supported and
unsupported). The latter case is when we force runners
to call explicit
conversion method (convertPrimitiveRead....). Every
runner that does not
support primitive Read must handle the experiment
either way, because
otherwise the experiment would be simply silently
ignored, which is not
exactly user-friendly.
>
>> Jan
>>
>> [1]
https://github.com/apache/beam/pull/15181/commits/394ddc3fdbaacc805d8f7ce02ad2698953f34375
<https://github.com/apache/beam/pull/15181/commits/394ddc3fdbaacc805d8f7ce02ad2698953f34375>
>>
>> [2]
https://github.com/apache/beam/pull/15181/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R201
<https://github.com/apache/beam/pull/15181/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R201>
>>
>> [3]
https://github.com/apache/beam/pull/15181/commits/f1d3fd0217e5513995a72e92f68fe3d1d665c5bb
<https://github.com/apache/beam/pull/15181/commits/f1d3fd0217e5513995a72e92f68fe3d1d665c5bb>
>>
>> On 7/18/21 6:29 PM, Jan Lukavský wrote:
>>
>> Hi,
>>
>> I was debugging the issue and it relates to pipeline
fusion - it seems that the primitive Read transform
gets fused and then is 'missing' as source. I'm a
little lost in the code, but the most strange parts are
that:
>>
>> a) I tried to reject fusion of primitive Read by
adding GreedyPCollectionFusers::cannotFuse for
PTransformTranslation.READ_TRANSFORM_URN to
GreedyPCollectionFusers.URN_FUSIBILITY_CHECKERS, but
that didn't change the exception
>>
>> b) I tried adding Reshuffle.viaRandomKey between
Read and PAssert, but that didn't change it either
>>
>> c) when I run portable Pipeline with
use_deprecated_read on Flink it actually runs (though
it fails when it actually reads any data, but if the
input is empty, the job runs), so it does not hit the
same issue, which is a mystery to me
>>
>> If anyone has any pointers that I can investigate,
I'd be really grateful.
>>
>> Thanks in advance,
>>
>> Jan
>>
>>
>>
>> On 7/16/21 2:00 PM, Jan Lukavský wrote:
>>
>> Hi,
>>
>> I hit another issue with the portable Flink runner.
Long story short - reading from Kafka is not working in
portable Flink. After solving issues with expansion
service configuration (ability to add
use_deprecated_read) option, because flink portable
runner has issues with SDF [1], [2]. After being able
to inject the use_deprecated_read into expansion
service I was able to get an execution DAG that has the
UnboundedSource, but then more and more issues appeared
(probably related to missing LengthPrefixCoder
somewhere - maybe at the output from the primitive
Read). I wanted to create a test for it and I found
out, that there actually is ReadSourcePortableTest in
FlinkRunner, but _it tests nothing_. The problem is
that Read is transformed to SDF, so this test tests the
SDF, not the Read transform. As a result, the Read
transform does not work.
>>
>> I tried using
convertReadBasedSplittableDoFnsToPrimitiveReads so that
I could make the test fail and debug that, but I got into
>>
>> java.lang.IllegalArgumentException: PCollectionNodes
[PCollectionNode{id=PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output,
PCollection=unique_name:
"PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output"
>> coder_id: "IterableCoder"
>> is_bounded: BOUNDED
>> windowing_strategy_id:
"WindowingStrategy(GlobalWindows)"
>> }] were consumed but never produced
>>
>>
>> which gave me the last knock-out. :)
>>
>> My current impression is that starting from Beam
2.25.0, portable FlinkRunner is not able to read from
Kafka. Could someone give me a hint about what is wrong
with using
convertReadBasedSplittableDoFnsToPrimitiveReads in the
test [3]?
>>
>> Jan
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-11991
<https://issues.apache.org/jira/browse/BEAM-11991>
>>
>> [2] https://issues.apache.org/jira/browse/BEAM-11998
<https://issues.apache.org/jira/browse/BEAM-11998>
>>
>> [3] https://github.com/apache/beam/pull/15181
<https://github.com/apache/beam/pull/15181>