On 7/29/21 6:45 PM, Robert Bradshaw wrote:

On Thu, Jul 29, 2021 at 3:04 AM Jan Lukavský <[email protected]> wrote:
Hi,

I'd like to move the discussion of this topic further. Because it seems that 
fixing the portable SDF is a larger work, I think there are two options:
+1

  a) extend the definition of model coders to include SDK coders of the language that implement the 
model (that would mean that the definition of model coder is not "language agnostic 
coders", but "coders that a given SDK can instantiate"), or

  b) make the model coders extensible so that a runner can modify it - that 
would make it possible for each runner to have a slightly different definition 
of these model coders

I'm strongly in favor of a), but I can live with b) as well.
We should probably just rename "ModelCoders" to
"JavaCoders[Registrar]" and stick everything there. ModelCoders is not
understood or used by anything but Java. (That or we just discard the
whole ModelCoders thing and just let Coders define their own portable
representations, possibly with a registration system.)
Coders must be Serializable, so it seems to me, that all Java Coders are quite easily serialized and a registration is not exactly needed for that. Renaming ModelCoders to Java(Portable)Coders looks good to me.


Thanks in advance for any comments on this.

  Jan

On 7/25/21 8:59 PM, Jan Lukavský wrote:

I didn't want to say that Flink should not support SDF. I only do not see any 
benefits of it for a native streaming source - like Kafka - without the ability 
to use dynamic splitting. The potential benefits of composability and 
extensibility do not apply here. Yes, it would be good to have as low number of 
source transforms as possible. And another yes, there probably isn't anything 
that would fundamentally disable Flink to correctly support SDF. On the other 
hand, the current state is such we cannot use KafkaIO in Flink. I think we 
should fix this by the shortest possible path, because the technically correct 
solution is currently unknown (at least to me, if anyone can give pointers 
about how to fix the SDF, I'd be grateful).

I still think that enabling a runner to support Read natively, when 
appropriate, has value by itself. And it requires SDK Coders to be 'known' to 
the runner, at least that was the result of my tests.

On 7/25/21 8:31 PM, Chamikara Jayalath wrote:



On Sun, Jul 25, 2021 at 11:09 AM Jan Lukavský <[email protected]> wrote:
In general, language-neutral APIs and protocols are a key feature of portable 
Beam.

Yes, sure, that is well understood. But - language neutral APIs requires 
language neutral environment. That is why the portable Pipeline representation 
is built around protocol buffers and gRPC. That is truly language-neutral. Once 
we implement something around that - like in the case of ModelCoders.java - we 
use a specific language for that and the language-neutral part is already gone. 
The decision to include same-language-SDK coders into such language-specific 
object plays no role in the fact it already is language-specific.

Not all runners are implemented  using Java. For example, the portable 
DirectRunner (FnAPI runner) is implemented using Python and Dataflow is 
implemented using C++. Such runners will not be able to do this.

Yes, I'm aware of that and that is why I said "any Java native runner". It is 
true, that non-Java runners *must* (as long as we don't include Read into SDK harness) 
resort to expanding it to SDF. That is why use_deprecated_read is invalid setting for 
such runner and should be handled accordingly.

Similarly, I think there were previous discussions related to using SDF as the 
source framework for portable runners.

Don't get me wrong, I'm not trying to revoke this decision. On the other hand I 
still think that the decision to use SDF implementation of Read or not should 
be left to the runner.

I understand that there are some bugs related to SDF and portable Flink currently. How 
much work do you think is needed here ? Will it be better to focus our efforts on fixing 
remaining issues for SDF and portable runners instead of supporting 
"use_deprecated_read" for that path ?

I'm not sure. I don't know portability and the SDK harness well enough to be 
able to answer this. But we should really know why we do that. What exactly 
does SDF bring to the Flink runner (and let's leave Flink aside of this - what 
does it bring to runners that cannot make use of dynamic splitting, being it 
admittedly a very cool feature)? Yes, supporting Java Read makes it impossible 
to implement it in Python. But practically, I think that most of the Pipelines 
will use x-lang for that. It makes very much sense to offload IOs to a more 
performant environment.

A bit old, but please see the following for the benefits of SDF and the 
motivation for it.

https://beam.apache.org/blog/splittable-do-fn/
https://s.apache.org/splittable-do-fn

Thanks,
Cham

  Jan

On 7/25/21 6:54 PM, Chamikara Jayalath wrote:



On Sun, Jul 25, 2021 at 6:33 AM Jan Lukavský <[email protected]> wrote:
I'll start from the end.

I don't think we should be breaking language agnostic API layers (for example, definition 
of model coders) just to support "use_deprecated_read".

"Breaking" and "fixing" can only be a matter of the definition of the object at hand. I 
don't think, that Coder can be totally language agnostic - yes, the mapping between serialized form and 
deserialized form can be _defined_ in a language agnostic way, but must be_implemented_ in a specific 
language. If we choose the implementing language, what makes us treat SDK-specific coders defined by the SDK 
of the same language as "unknown"? It is only our decision, that seems to have no practical 
benefits.

In general, language-neutral APIs and protocols are a key feature of portable 
Beam. See here: https://beam.apache.org/roadmap/portability/
(I did not look into all the old discussions and votes related to this but I'm 
sure they are there)

Moreover, including SDK-specific coders into supported coders of the SDK runner construction 
counterpart (that is, runner core-construction-java for Java SDK) is a necessary prerequisite for 
unifying "classical" and "portable" runners, because the runner needs to 
understand *all* SDK coders so that it can _inline_ the complete Pipeline (if the Pipeline SDK has 
the same language as the runner), instead of running it through SDK harness. This need therefore is 
not specific to supporting use_deprecated_read, but is a generic requirement, which only has the 
first manifestation in the support of a transform not supported by SDK harness.

I think "use_deprecated_read" should be considered a stop-gap measure for Flink (and 
Spark ?) till we have proper support for SDF. In fact I don't think an arbitrary portable runner 
can support "use_deprecated_read" due to the following.

There seems to be nothing special about Flink regarding the support of 
primitive Read. I think any Java native runner can implement it pretty much the 
same way as Flink does. The question is if any other runner might want to do 
that. The problem with Flink is that

Not all runners are implemented  using Java. For example, the portable 
DirectRunner (FnAPI runner) is implemented using Python and Dataflow is 
implemented using C++. Such runners will not be able to do this.
  1) portable SDF seems not to work [1]

  2) even classical Flink runner has still issues with SDF - there are reports 
of watermark being stuck when reading data via SDF, this gets resolved using 
use_deprecated_read

  3) Flink actually does not have any benefits from SDF, because it cannot make 
use of the dynamic splitting, so this actually brings only implementation 
burden without any practical benefit
Similarly, I think there were previous discussions related to using SDF as the 
source framework for portable runners.
I understand that there are some bugs related to SDF and portable Flink currently. How much work do 
you think is needed here ? Will it be better to focus our efforts on fixing remaining issues for 
SDF and portable runners instead of supporting "use_deprecated_read" for that path ? Note 
that I'm fine with fixing any issues related to "use_deprecated_read" for classic 
(non-portable) Flink but I think you are trying to use x-lang hence probably need portable Flink.

Thanks,
Cham

I think that we should reiterate on the decision of deprecating Read - if we 
can implement it via SDF, what is the reason to forbid a runner to make use of 
a simpler implementation? The expansion of Read might be runner dependent, that 
is something we do all the time, or am I missing something?

  Jan

[1] https://issues.apache.org/jira/browse/BEAM-10940

On 7/25/21 1:38 AM, Chamikara Jayalath wrote:

I think we might be going down a bit of a rabbit hole with the support for 
"use_deprecated_read" for portable Flink :)

I think "use_deprecated_read" should be considered a stop-gap measure for Flink (and 
Spark ?) till we have proper support for SDF. In fact I don't think an arbitrary portable runner 
can support "use_deprecated_read" due to the following.

(1) SDK Harness is not aware of BoundedSource/UnboundedSource. Only source 
framework SDK Harness is aware of is SDF.
(2) Invoking BoundedSource/UnboundedSource is not a part of the Fn API
(3) A non-Java Beam portable runner will probably not be able to directly 
invoke legacy Read transforms similar to the way Flink does today.

I don't think we should be breaking language agnostic API layers (for example, definition 
of model coders) just to support "use_deprecated_read".

Thanks,
Cham

On Sat, Jul 24, 2021 at 11:50 AM Jan Lukavský <[email protected]> wrote:
On 7/24/21 12:34 AM, Robert Bradshaw wrote:

   On Thu, Jul 22, 2021 at 10:20 AM Jan Lukavský <[email protected]> wrote:
Hi,

this was a ride. But I managed to get that working. I'd like to discuss two 
points, though:

   a) I had to push Java coders to ModelCoders for Java (which makes sense to 
me, but is that correct?). See [1]. It is needed so that the Read transform 
(executed directly in TaskManager) can correctly communicate with Java SDK 
harness using custom coders (which is tested here [2]).
I think the intent was that ModelCoders represent the set of
language-agnostic in the model, though I have to admit I've always
been a bit fuzzy on when a coder must or must not be in that list.
I think that this definition works as long, as runner does not itself
interfere with the Pipeline. Once the runner starts (by itself, not via
SdkHarnessClient) producing data, it starts to be part of the
environment, and therefore it should understand its own Coders. I'd
propose the definition of "model coders" to be Coders that the SDK is
able to understand, which then works naturally for the ModelCoders
located in "core-construction-java", that it should understand Javs SDK
Coders.
   b) I'd strongly prefer if we moved the handling of use_deprecated_read from 
outside of the Read PTransform directly into expand method, see [3]. Though 
this is not needed for the Read on Flink to work, it seems cleaner.

WDYT?
The default value of use_deprecated_read should depend on the runner
(e.g. some runners don't work well with it, others require it). As
such should not be visible to the PTransform's expand.
I think we should know what is the expected outcome. If a runner does
not support primitive Read (and therefore use_deprecated_read), what
should we do, if we have such experiment set? Should the Pipeline fail,
or should it be silently ignored? I think that we should fail, because
user expects something that cannot be fulfilled. Therefore, we have two
options - handling the experiment explicitly in runners that do not
support it, or handle it explicitly in all cases (both supported and
unsupported). The latter case is when we force runners to call explicit
conversion method (convertPrimitiveRead....). Every runner that does not
support primitive Read must handle the experiment either way, because
otherwise the experiment would be simply silently ignored, which is not
exactly user-friendly.
   Jan

[1] 
https://github.com/apache/beam/pull/15181/commits/394ddc3fdbaacc805d8f7ce02ad2698953f34375

[2] 
https://github.com/apache/beam/pull/15181/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R201

[3] 
https://github.com/apache/beam/pull/15181/commits/f1d3fd0217e5513995a72e92f68fe3d1d665c5bb

On 7/18/21 6:29 PM, Jan Lukavský wrote:

Hi,

I was debugging the issue and it relates to pipeline fusion - it seems that the 
primitive Read transform gets fused and then is 'missing' as source. I'm a 
little lost in the code, but the most strange parts are that:

   a) I tried to reject fusion of primitive Read by adding 
GreedyPCollectionFusers::cannotFuse for 
PTransformTranslation.READ_TRANSFORM_URN to 
GreedyPCollectionFusers.URN_FUSIBILITY_CHECKERS, but that didn't change the 
exception

   b) I tried adding Reshuffle.viaRandomKey between Read and PAssert, but that 
didn't change it either

   c) when I run portable Pipeline with use_deprecated_read on Flink it 
actually runs (though it fails when it actually reads any data, but if the 
input is empty, the job runs), so it does not hit the same issue, which is a 
mystery to me

If anyone has any pointers that I can investigate, I'd be really grateful.

Thanks in advance,

   Jan



On 7/16/21 2:00 PM, Jan Lukavský wrote:

Hi,

I hit another issue with the portable Flink runner. Long story short - reading 
from Kafka is not working in portable Flink. After solving issues with 
expansion service configuration (ability to add use_deprecated_read) option, 
because flink portable runner has issues with SDF [1], [2]. After being able to 
inject the use_deprecated_read into expansion service I was able to get an 
execution DAG that has the UnboundedSource, but then more and more issues 
appeared (probably related to missing LengthPrefixCoder somewhere - maybe at 
the output from the primitive Read). I wanted to create a test for it and I 
found out, that there actually is ReadSourcePortableTest in FlinkRunner, but 
_it tests nothing_. The problem is that Read is transformed to SDF, so this 
test tests the SDF, not the Read transform. As a result, the Read transform 
does not work.

I tried using convertReadBasedSplittableDoFnsToPrimitiveReads so that I could 
make the test fail and debug that, but I got into

java.lang.IllegalArgumentException: PCollectionNodes 
[PCollectionNode{id=PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output,
 PCollection=unique_name: 
"PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output"
coder_id: "IterableCoder"
is_bounded: BOUNDED
windowing_strategy_id: "WindowingStrategy(GlobalWindows)"
}] were consumed but never produced


which gave me the last knock-out. :)

My current impression is that starting from Beam 2.25.0, portable FlinkRunner 
is not able to read from Kafka. Could someone give me a hint about what is 
wrong with using convertReadBasedSplittableDoFnsToPrimitiveReads in the test 
[3]?

   Jan

[1] https://issues.apache.org/jira/browse/BEAM-11991

[2] https://issues.apache.org/jira/browse/BEAM-11998

[3] https://github.com/apache/beam/pull/15181

Reply via email to