On Mon, Jul 29, 2019 at 4:14 PM jincheng sun <sunjincheng...@gmail.com>
wrote:

> Hi Robert,
>
> Thanks for your detail comments, I would have added a few pointers inline.
>
> Best,
> Jincheng
>
> Robert Bradshaw <rober...@google.com> 于2019年7月29日周一 下午12:35写道:
>
>> On Sun, Jul 28, 2019 at 6:51 AM jincheng sun <sunjincheng...@gmail.com>
>> wrote:
>> >
>> > Hi, Thomas & Robert, Thanks for your comments and providing relevant
>> discussions and JIRA links, very helpful to me!
>> >
>> > I am glad to see your affirmative response,  And I am glad to add my
>> thoughts on the comment you left:
>> > -----------------
>> >
>> > >> There are two distinct levels at which one can talk about a certain
>> type of state being supported: the user-visible SDK's API and the runner's
>> API. For example, BagState, ValueState, ReducingState, CombiningState,  can
>> all be implemented on top of a runner-offered MapState in the SDK. On the
>> one hand, there's a desire to keep the number of "primitive" states types
>> to a minimum (to ease the use of authoring runners), but if a runner can
>> perform a specific optimization due to knowing about the particular state
>> type it might be preferable to pass it through rather than emulate it in
>> the SDK.
>> > -------------------
>> > Agree. Regarding MapState, it's definitely needed as it cannot be
>> implemented on top of the existing BagState.
>> > Regarding ValueState, it can be implemented on top of BagState.
>> However, we can do optimization if we know a state is ValueState.
>> > For example, if a key is updated with a new value, if the ValueState is
>> implemented on top of BagState, two RPC calls are needed
>> > to write the new value back to runner: clear + append; if we know it's
>> ValueState, just one RPC call is enough: set.
>> > We can discuss case by case whether a state type is needed.
>>
>> In the Beam APIs [1] multiple state requests are consumed as a stream
>> in a single RPC, so clear followed by append still has low overhead.
>> Is that optimization not sufficient?
>>
>> [1]
>> https://github.com/apache/beam/blob/release-2.14.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L573
>>
>>
> Actually there are two kinds of overhead:
> 1) the RPC overhead(I think in this point  may be sufficient for RPC)
> 2) the state read/write overhead, i.e., If there is no optimization, the
> runner needs to clear the state firstly and then set a new value for the
> state.
>

It's certainly an option to keep open. I'd avoid prematurely optimizing
until we have evidence that it matters.


>
> > -------------------
>> >
>> > >> Note that in the protos, the GRPC ports have a coder attribute
>> > specifically to allow this kind of customization (and the SDKs should
>> > be respecting that). We've also talked about going beyond per-element
>> > encodings (e.g. using arrow to serialize entire batches across the
>> > whire). I think all the runner code simply uses the default and we
>> > could be more intelligent there.
>> > -------------------
>> >
>> > Yes, the gRPC allows to use customization coder. However, I'm afraid
>> that this is not enough as we want to use
>> > Beam's portability framework by depending on the modules used
>> (beam-runners-java-fn-execution and the Python SDK Harness) instead
>> > of copying that part of code to Flink. So it should also allow to use
>> the customization coder in beam-runners-java-fn-execution.
>> > Otherwise, we have to copy a lot of code to Flink to use the
>> customization coder.
>>
>> Agreed, beam-runners-java-fn-execution does not take advantage of the
>> full flexibility of the protocol, and would make a lot of sense to
>> enhance it to be able to.
>>
>> > -------------------
>> >
>> > >> I'm wary of having too many buffer size configuration options (is
>> > there a compelling reason to make it bigger or smaller?) but something
>> > timebased would be very useful.
>> > -------------------
>> >
>> > I think the default values of buffer size are not needed to change for
>> most cases. I'm not sure for just one case: _DEFAULT_FLUSH_THRESHOLD=10MB.
>> > Will 1MB makes more sense?
>>
>> IIRC, 10MB was the point at which, according to benchmarks Luke did
>> quite a while ago, there was clearly no performance benefit in making
>> it larger. Coupled with a time-based threshold, I don't see much of an
>> advantage to lowering it.
>
>
> My concern is that the SDK harness may be shared by a lot of runners and
> there will be at least one write buffer for each runner. Is it possible
> that there are too many write buffers used which take up a lot of memory
> and users want to lower it? Nevertheless, I think this problem is not
> critical considering that we all agree a time-based threshold should be
> supported. :)
>

Yeah, we can cross that bridge when (if) we come to it.


>
>
>> > -------------------
>> >
>> > >> The idea of StandardCoders is it's a set of coders that all runners
>> > and SDKs can be assumed to understand. If you have an element encoded
>> > with something other Coder, then there's no way to know if the other
>> > side will be able to decode it (or, indeed, even properly detect
>> > element boundaries in the stream of contiguous encoded elements).
>> > Adding a length prefixed coder wrapping allows the other side to at
>> > least pull it out and pass it around as encoded bytes. In other words,
>> > whether an encoded element needs length prefixing is a function of the
>> > other process you're trying to communicate with (and we don't have the
>> > mechanisms, and I'm not sure it's worth the complexity, to do some
>> > kind of coder negotiation between the processes here.) Of course for a
>> > UDF, if the other side does not know about the element type in
>> > question it'd be difficult (in general) to meaningfully process the
>> > element.
>> >
>> > The work on schemas, and making those portable, will result in a much
>> > richer set of element types that can be passed through "standard"
>> > coders.
>> > -------------------
>> >
>> > The design makes sense to me. My concern is that if a coder is not
>> among the StandardCoders, it will be prefixed with a length even if the
>> harness knows how to decode it.
>>
>> If the harness knows how to decode it, the length prefixing is just a
>> lost optimization opportunity, but it'll still work. Whether this is a
>> big enough loss to merit introducing an extra protocol to negotiate on
>> the set of commonly known coders beyond standard coders is still TBD,
>> but probably not for v1 (and possibly not ever, especially as schemas
>> become more expressive).
>>
>
> Is it possible to add an interface such as `isSelfContained()` to the
> `Coder`? This interface indicates
> whether the serialized bytes are self contained. If it returns true, then
> there is no need to add a prefixing length.
> In this way, there is no need to introduce an extra protocol,  Please
> correct me if I missed something :)
>

The question is how it is self contained. E.g. DoubleCoder is self
contained because it always uses exactly 8 bytes, but one needs to know the
double coder to leverage this. VarInt coder is self-contained a different
way, as is StringCoder (which does just do prefixing).


> Besides, it's also should be noted that the prefixing length IS NOT per
> record basis, it may be added for each field in the record. If there are a
> lot of fields in the record and the coder for each record is not among the
> StandardCoder, the extra prefixing length will take a lot of bandwidth.
>
> What do you think?
>

This is true. Worse case it will double it (though generally it'll be a lot
better as most fields encode into many bytes, and the bigger the encoding
the smaller the overhead).


>
> > Besides, I'm also curious about the standard whether a coder can be put
>> into StandardCoders.
>> > For example, I noticed that FLOAT is not among StandardCoders, while
>> DOUBLE is among it.
>>
>> StandardCoders is supposed to be some sort of lowest common
>> denominator, but theres no hard and fast criteria. For this example,
>> some languages (e.g. Python) don't have the notion of FLOAT, and using
>> a FLOAT coder for Python floats (whose underling representation is
>> double) gets tricky as this coder is not faithful. We also don't have
>> specific int coders for smaller-than-64-bit types which, like float,
>> are easily promoted.
>>
>
> Sounds reasonable. However, it should be noted that this will mean that
> the number of coders which can be put into the StandardCoder will be very
> little. This means that for most fields, a prefixing length will be added.
>

I am hopeful that schemas give us a rich enough way to encode the vast
majority of types that we will want to transmit across language barriers
(possibly with some widening promotions). For high performance one will
want to use formats like arrow rather than one-off coders as well, which
also biases us towards the schema work. The set of StandardCoders is not
closed, and nor is the possibility of figuring out a way to communicate
outside this set for a particular pair of languages, but I think it makes
sense to avoid going that direction unless we have to due to the increased
API surface aread and complexity it imposes on all runners and SDKs.


>
>>
>> > Best, Jincheng
>> >
>> > Robert Bradshaw <rober...@google.com> 于2019年7月25日周四 下午2:00写道:
>> >>
>> >> On Thu, Jul 25, 2019 at 5:31 AM Thomas Weise <t...@apache.org> wrote:
>> >> >
>> >> > Hi Jincheng,
>> >> >
>> >> > It is very exciting to see this follow-up, that you have done your
>> research on the current state and that there is the intention to join
>> forces on the portability effort!
>> >> >
>> >> > I have added a few pointers inline.
>> >> >
>> >> > Several of the issues you identified affect our usage of Beam as
>> well. These present an opportunity for collaboration.
>> >>
>> >> +1, a lot of this aligns with improvements we'd like to make as well.
>> >>
>> >> > On Wed, Jul 24, 2019 at 2:53 AM jincheng sun <
>> sunjincheng...@gmail.com> wrote:
>> >> >>
>> >> >> Hi all,
>> >> >>
>> >> >> Thanks Max and all of your kind words. :)
>> >> >>
>> >> >> Sorry for the late reply as I'm busy working on the Flink 1.9
>> release. For the next major release of Flink, we plan to add Python user
>> defined functions(UDF, UDTF, UDAF) support in Flink and I have go over the
>> Beam portability framework and think that it is perfect for our
>> requirements. However we also find some improvements needed for Beam:
>> >> >>
>> >> >> Must Have:
>> >> >> ----------------
>> >> >> 1) Currently only BagState is supported in gRPC protocol and I
>> think we should support more kinds of state types, such as MapState,
>> ValueState, ReducingState, CombiningState(AggregatingState in Flink), etc.
>> That's because these kinds of state will be used in both user-defined
>> function or Flink Python DataStream API.
>> >> >
>> >> > There has been discussion about the need for different state types
>> and to efficiently support those on the runner side there may be a need to
>> look at the over the wire representation also.
>> >> >
>> >> >
>> https://lists.apache.org/thread.html/ccc0d548e440b63897b6784cd7896c266498df64c9c63ce6c52ae098@%3Cdev.beam.apache.org%3E
>> >> >
>> https://lists.apache.org/thread.html/ccf8529a49003a7be622b4d3403eba2c633caeaf5ced033e25d4c2e2@%3Cdev.beam.apache.org%3E
>> >>
>> >> There are two distinct levels at which one can talk about a certain
>> >> type of state being supported: the user-visible SDK's API and the
>> >> runner's API. For example, BagState, ValueState, ReducingState,
>> >> CombiningState,  can all be implemented on top of a runner-offered
>> >> MapState in the SDK. On the one hand, there's a desire to keep the
>> >> number of "primitive" states types to a minimum (to ease the use of
>> >> authoring runners), but if a runner can perform a specific
>> >> optimization due to knowing about the particular state type it might
>> >> be preferable to pass it through rather than emulate it in the SDK.
>> >>
>> >> >> 2) There are warnings that Python 3 is not fully supported in Beam
>> (beam/sdks/python/setup.py). We should support Python 3.x for the beam
>> portability framework due to Python 2 will be not supported officially.
>> >> >
>> >> > This must be obsolete per latest comments on:
>> https://issues.apache.org/jira/browse/BEAM-1251
>> >> >
>> >> >> 3) The configuration "semi_persist_dir" is not set in
>> EnvironmentFactory at the runner side. Why I think it's  must to have is
>> because when the environment type is "PROCESS", the default value "/tmp"
>> may become a big problem.
>> >>
>> >> There are still some issues to be worked out around exactly how
>> >> environments are set up (most notably around dependencies that are
>> >> external to the docker images, but also things like this).
>> >>
>> >> >> 4) The buffer size configure policy should be improved, such as:
>> >> >>    At runner side, the buffer limit in
>> BeamFnDataBufferingOutboundObserver is size based. We should also support
>> time based especially for the streaming case.
>> >> >>    At Python SDK Harness, the buffer size is not configurable in
>> GrpcDataService. The input queue size of the input buffer in Python SDK
>> Harness is not size limited.
>> >> >>   The flush threshold of the output buffer in Python SDK Harness is
>> 10 MB by default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make
>> the threshold configurable and support time based threshold.
>> >>
>> >> I'm wary of having too many buffer size configuration options (is
>> >> there a compelling reason to make it bigger or smaller?) but something
>> >> timebased would be very useful.
>> >>
>> >> >> Nice To Have:
>> >> >> -------------------
>> >> >> 1) Improves the interfaces of FnDataService, BundleProcessor,
>> ActiveBundle, etc, to change the parameter type from WindowedValue<T> to T.
>> (We have already discussed in the previous mails)
>> >> >>
>> >> >> 2) Refactor the code to avoid unnecessary dependencies pull in. For
>> example, beam-sdks-java-core(11MB) is a package for Java SDK users and it
>> is pull in because there are a few classes in beam-sdks-java-core are used
>> in beam-runners-java-fn-execution, such as:
>> >> >> PipelineOptions used in DefaultJobBundleFactory FileSystems used in
>> BeamFileSystemArtifactRetrievalService.
>> >> >> It means maybe we can add a new module such as
>> beam-sdks-java-common to hold the classes used by both runner and SDK.
>> >> >>
>> >> >> 3) State cache is not shared between bundles which is performance
>> critical for streaming jobs.
>> >> >
>> >> > This is rather important to address:
>> >> >
>> >> >
>> https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E
>> >> >
>> >> >>
>> >> >>
>> >> >> 4) The coder of WindowedValue cannot be configured and most of time
>> we don't need to serialize and deserialize the timestamp, window and pane
>> properties in Flink. But currently FullWindowedValueCoder is used by
>> default in WireCoders.addWireCoder, I suggest to make the coder
>> configurable (i.e. allowing to use ValueOnlyWindowedValueCoder)
>> >>
>> >> For sure.
>> >>
>> >> Note that in the protos, the GRPC ports have a coder attribute
>> >> specifically to allow this kind of customization (and the SDKs should
>> >> be respecting that). We've also talked about going beyond per-element
>> >> encodings (e.g. using arrow to serialize entire batches across the
>> >> whire). I think all the runner code simply uses the default and we
>> >> could be more intelligent there.
>> >>
>> >> >> 5) Currently if a coder is not defined in StandardCoders, it will
>> be wrapped with LengthPrefixedCoder (WireCoders.addWireCoder ->
>> LengthPrefixUnknownCoders.addLengthPrefixedCoder). However, only a few
>> coders are defined in StandardCoders. It means that for most coder, a
>> length will be added to the serialized bytes which is not necessary in my
>> thoughts. My suggestion is maybe we can add some interfaces or tags for the
>> coder which indicate whether the coder is needed a length prefix or not.
>> >>
>> >> The idea of StandardCoders is it's a set of coders that all runners
>> >> and SDKs can be assumed to understand. If you have an element encoded
>> >> with something other Coder, then there's no way to know if the other
>> >> side will be able to decode it (or, indeed, even properly detect
>> >> element boundaries in the stream of contiguous encoded elements).
>> >> Adding a length prefixed coder wrapping allows the other side to at
>> >> least pull it out and pass it around as encoded bytes. In other words,
>> >> whether an encoded element needs length prefixing is a function of the
>> >> other process you're trying to communicate with (and we don't have the
>> >> mechanisms, and I'm not sure it's worth the complexity, to do some
>> >> kind of coder negotiation between the processes here.) Of course for a
>> >> UDF, if the other side does not know about the element type in
>> >> question it'd be difficult (in general) to meaningfully process the
>> >> element.
>> >>
>> >> The work on schemas, and making those portable, will result in a much
>> >> richer set of element types that can be passed through "standard"
>> >> coders.
>> >>
>> >> (Hopefully this answers your question below as well.)
>> >>
>> >> >> 6) Set log level according to PipelineOption in Python SDK Harness.
>> Currently the log level is set to INFO by default.
>> >> >
>> >> > https://issues.apache.org/jira/browse/BEAM-5468
>> >>
>> >> Yeah, someone needs to just go and do this.
>> >>
>> >> >> 7) Allows to start up StatusServer according to PipelineOption in
>> Python SDK Harness. Currently the StatusServer is start up by default.
>> >> >>
>> >> >> Although I put 3) 4) 5) into the "Nice to Have" as they are
>> performance related, I still think they are very critical for Python UDF
>> execution performance.
>> >> >>
>> >> >> Open questions:
>> >> >> ---------------------
>> >> >> 1) Which coders should be / can be defined in StandardCoders?
>> >> >>
>> >> >> Currently we are preparing the design of how to support Python UDF
>> in Flink based on the Beam portability framework and we will bring up the
>> discussion in Flink community. We may propose more changes for Beam during
>> that time and may need more support from Beam community.
>> >> >>
>> >> >> To be honest, I'm not an expert of Beam and so please feel free to
>> correct me if my understanding is wrong. Welcome any feedback.
>> >> >>
>> >> >> Best,
>> >> >> Jincheng
>> >> >>
>> >> >> Maximilian Michels <m...@apache.org> 于2019年4月25日周四 上午12:14写道:
>> >> >>>
>> >> >>> Fully agree that this is an effort that goes beyond changing a type
>> >> >>> parameter but I think we have a chance here to cooperate between
>> the two
>> >> >>> projects. I would be happy to help out where I can.
>> >> >>>
>> >> >>> I'm not sure at this point what exactly is feasible for reuse but I
>> >> >>> would imagine the Runner-related code to be useful as well for the
>> >> >>> interaction with the SDK Harness. There are some fundamental
>> differences
>> >> >>> in the model, e.g. how windowing works, which might be challenging
>> to
>> >> >>> work around.
>> >> >>>
>> >> >>> Thanks,
>> >> >>> Max
>> >> >>>
>> >> >>> On 24.04.19 12:03, jincheng sun wrote:
>> >> >>> >
>> >> >>> > Hi Kenn, I think you are right, the Python SDK harness can be
>> shared to
>> >> >>> > Flink, and also need to add some new primitive operations.
>> Regarding
>> >> >>> > runner-side, I think most of the code which in runners/java-fun-
>> >> >>> > Execution is can be shared(but need some improvement, such as
>> >> >>> > FnDataService), some of them cannot be shared, such as job
>> submission
>> >> >>> > code. So, we may need to set up a document to clearly analyze
>> which ones
>> >> >>> > can be shared, which ones can be shared but need to do some
>> changes, and
>> >> >>> > which ones are definitely cannot be shared.
>> >> >>> >
>> >> >>> > Hi Max, Thanks for sharing your opinion, I also prefer to using
>> beam Fn
>> >> >>> > service as a library, also willing to do more efforts for this.
>> >> >>> >  From the view of the current code, abstracting Fn Service into
>> a class
>> >> >>> > library that other projects can rely on requires a lot of effort
>> from
>> >> >>> > the Beam community. Turn `WindowedValue<T>` into `T` is just the
>> >> >>> > beginning of this effort. If the Beam community is willing on
>> >> >>> > abstracting Fn Service into a class library that can be relied
>> upon by
>> >> >>> > other projects, I can try to draft a document, of course during
>> this
>> >> >>> > period may need a lot of help from you, Kenn, Lukasz, and the
>> Beam
>> >> >>> > community. (I am a recruit in the Beam community :-))
>> >> >>> >
>> >> >>> > What do you think?
>> >> >>> >
>> >> >>> > Regards,
>> >> >>> > Jincheng
>> >> >>> >
>> >> >>> > Kenneth Knowles <k...@apache.org <mailto:k...@apache.org>>
>> 于2019年4月24
>> >> >>> > 日周三 上午3:32写道:
>> >> >>> >
>> >> >>> >     It seems to me that the most valuable code to share and keep
>> up with
>> >> >>> >     is the Python/Go/etc SDK harness; they would need to be
>> enhanced
>> >> >>> >     with new primitive operations. So you would want to depend
>> directly
>> >> >>> >     and share the original proto-generated classes too, which
>> Beam
>> >> >>> >     publishes as separate artifacts for Java. Is the runner-side
>> support
>> >> >>> >     code that valuable for direct integration into Flink? I
>> would expect
>> >> >>> >     once you get past trivial wrappers (that you can copy/paste
>> with no
>> >> >>> >     loss) you would hit differences in architecture so you would
>> diverge
>> >> >>> >     anyhow.
>> >> >>> >
>> >> >>> >     Kenn
>> >> >>> >
>> >> >>> >     On Tue, Apr 23, 2019 at 5:32 AM Maximilian Michels <
>> m...@apache.org
>> >> >>> >     <mailto:m...@apache.org>> wrote:
>> >> >>> >
>> >> >>> >         Hi Jincheng,
>> >> >>> >
>> >> >>> >         Copying code is a solution for the short term. In the
>> long run
>> >> >>> >         I'd like
>> >> >>> >         the Fn services to be a library not only for the Beam
>> >> >>> >         portability layer
>> >> >>> >         but also for other projects which want to leverage it.
>> We should
>> >> >>> >         thus
>> >> >>> >         make an effort to make it more generic/extensible where
>> >> >>> >         necessary and
>> >> >>> >         feasible.
>> >> >>> >
>> >> >>> >         Since you are investigating reuse of Beam portability in
>> the
>> >> >>> >         context of
>> >> >>> >         Flink, do you think it would make sense to setup a
>> document
>> >> >>> >         where we
>> >> >>> >         collect ideas and challenges?
>> >> >>> >
>> >> >>> >         Thanks,
>> >> >>> >         Max
>> >> >>> >
>> >> >>> >         On 23.04.19 13:00, jincheng sun wrote:
>> >> >>> >          > Hi Reuven,
>> >> >>> >          >
>> >> >>> >          > I think you have provided an optional solution for
>> other
>> >> >>> >         community which
>> >> >>> >          > wants to take advantage of Beam's existing
>> >> >>> >         achievements. Thank you very
>> >> >>> >          > much!
>> >> >>> >          >
>> >> >>> >          > I think the Flink community can choose to copy from
>> Beam's
>> >> >>> >         code or
>> >> >>> >          > choose to rely directly on the beam's class library.
>> The
>> >> >>> >         Flink community
>> >> >>> >          > also initiated a discussion, more info can be found
>> here
>> >> >>> >          >
>> >> >>> >         <
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html#a28096
>> >
>> >> >>> >          >
>> >> >>> >          > The purpose of Turns `WindowedValue<T>` into `T` is to
>> >> >>> >         promote the
>> >> >>> >          > interface design of Beam more versatile, so that
>> other open
>> >> >>> >         source
>> >> >>> >          > projects have the opportunity to take advantage of
>> Beam's
>> >> >>> >         existing
>> >> >>> >          > achievements. Of course, just changing the
>> `WindowedValue<T>`
>> >> >>> >         into `T`
>> >> >>> >          > is not enough to be shared by other projects in the
>> form of a
>> >> >>> >         class
>> >> >>> >          > library, we need to do more efforts. If Beam can
>> provide a
>> >> >>> >         class library
>> >> >>> >          > in the future, other community contributors will also
>> have the
>> >> >>> >          > willingness to contribute to the beam community. This
>> will
>> >> >>> >         benefit both
>> >> >>> >          > the community that wants to take advantage of Beam's
>> existing
>> >> >>> >          > achievements and the Beam community itself. And
>> thanks to
>> >> >>> >         Thomas for
>> >> >>> >          > that he has also made a lot of efforts in this regard.
>> >> >>> >          >
>> >> >>> >          > Thanks again for your valuable suggestion, and
>> welcome any
>> >> >>> >         feedback!
>> >> >>> >          >
>> >> >>> >          > Best,
>> >> >>> >          > Jincheng
>> >> >>> >          >
>> >> >>> >          > Reuven Lax <re...@google.com <mailto:re...@google.com
>> >
>> >> >>> >         <mailto:re...@google.com <mailto:re...@google.com>>>
>> 于2019年4月
>> >> >>> >         23日
>> >> >>> >          > 周二 上午1:00写道:
>> >> >>> >          >
>> >> >>> >          >     One concern here: these interfaces are intended
>> for use
>> >> >>> >         within the
>> >> >>> >          >     Beam project. Beam may decide to make specific
>> changes to
>> >> >>> >         them to
>> >> >>> >          >     support needed functionality in Beam. If they are
>> being
>> >> >>> >         reused by
>> >> >>> >          >     other projects, then those changes risk breaking
>> those other
>> >> >>> >          >     projects in unexpected ways. I don't think we can
>> >> >>> >         guarantee that we
>> >> >>> >          >     don't do that. If this is useful in Flink, it
>> would be
>> >> >>> >         safer to copy
>> >> >>> >          >     the code IMO rather than to directly depend on it.
>> >> >>> >          >
>> >> >>> >          >     On Mon, Apr 22, 2019 at 12:08 AM jincheng sun
>> >> >>> >          >     <sunjincheng...@gmail.com
>> >> >>> >         <mailto:sunjincheng...@gmail.com>
>> >> >>> >         <mailto:sunjincheng...@gmail.com
>> >> >>> >         <mailto:sunjincheng...@gmail.com>>> wrote:
>> >> >>> >          >
>> >> >>> >          >         Hi Kenn,
>> >> >>> >          >
>> >> >>> >          >         Thanks for your reply, and explained the
>> design of
>> >> >>> >         WindowValue
>> >> >>> >          >         clearly!
>> >> >>> >          >
>> >> >>> >          >         At present, the definitions of
>> `FnDataService` and
>> >> >>> >          >         `BeamFnDataClient` in Data Plane are very
>> clear and
>> >> >>> >         universal,
>> >> >>> >          >         such as: send(...)/receive(...). If it is only
>> >> >>> >         applied in the
>> >> >>> >          >         project of Beam, it is already very good.
>> Because
>> >> >>> >         `WindowValue`
>> >> >>> >          >         is a very basic data structure in the Beam
>> project,
>> >> >>> >         both the
>> >> >>> >          >         Runner and the SDK harness have define the
>> >> >>> >         WindowedValue data
>> >> >>> >          >         structure.
>> >> >>> >          >
>> >> >>> >          >         The reason I want to change the interface
>> parameter from
>> >> >>> >          >         `WindowedValue<T>` to T is because I want to
>> make the
>> >> >>> >         `Data
>> >> >>> >          >         Plane` interface into a class library that
>> can be
>> >> >>> >         used by other
>> >> >>> >          >         projects (such as Apache Flink), so that other
>> >> >>> >         projects Can have
>> >> >>> >          >         its own `FnDataService` implementation.
>> However, the
>> >> >>> >         definition
>> >> >>> >          >         of `WindowedValue` does not apply to all
>> projects.
>> >> >>> >         For example,
>> >> >>> >          >         Apache Flink also has a definition similar to
>> >> >>> >         WindowedValue. For
>> >> >>> >          >         example, Apache Flink Stream has
>> StreamRecord. If we
>> >> >>> >         change
>> >> >>> >          >         `WindowedValue<T>` to T, then other project's
>> >> >>> >         implementation
>> >> >>> >          >         does not need to wrap WindowedValue, the
>> interface
>> >> >>> >         will become
>> >> >>> >          >         more concise.  Furthermore,  we only need one
>> T, such
>> >> >>> >         as the
>> >> >>> >          >         Apache Flink DataSet operator.
>> >> >>> >          >
>> >> >>> >          >         So, I agree with your understanding, I don't
>> expect
>> >> >>> >          >         `WindowedValueXXX<T>` in the FnDataService
>> interface,
>> >> >>> >         I hope to
>> >> >>> >          >         just use a `T`.
>> >> >>> >          >
>> >> >>> >          >         Have you seen some problem if we change the
>> interface
>> >> >>> >         parameter
>> >> >>> >          >         from `WindowedValue<T>` to T?
>> >> >>> >          >
>> >> >>> >          >         Thanks,
>> >> >>> >          >         Jincheng
>> >> >>> >          >
>> >> >>> >          >         Kenneth Knowles <k...@apache.org
>> >> >>> >         <mailto:k...@apache.org> <mailto:k...@apache.org
>> >> >>> >         <mailto:k...@apache.org>>> 于
>> >> >>> >          >         2019年4月20日周六 上午2:38写道:
>> >> >>> >          >
>> >> >>> >          >             WindowedValue has always been an
>> interface, not a
>> >> >>> >         concrete
>> >> >>> >          >             representation:
>> >> >>> >          >
>> >> >>> >
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
>> >> >>> >          >
>> >> >>> >           <
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L52
>> >.
>> >> >>> >          >             It is an abstract class because we
>> started in
>> >> >>> >         Java 7 where
>> >> >>> >          >             you could not have default methods, and
>> just due
>> >> >>> >         to legacy
>> >> >>> >          >             style concerns. it is not just discussed,
>> but
>> >> >>> >         implemented,
>> >> >>> >          >             that there are WindowedValue
>> implementations with
>> >> >>> >         fewer
>> >> >>> >          >             allocations.
>> >> >>> >          >             At the coder level, it was also always
>> intended
>> >> >>> >         to have
>> >> >>> >          >             multiple encodings. We already do have
>> separate
>> >> >>> >         encodings
>> >> >>> >          >             based on whether there is 1 window or
>> multiple
>> >> >>> >         windows. The
>> >> >>> >          >             coder for a particular kind of
>> WindowedValue
>> >> >>> >         should decide
>> >> >>> >          >             this. Before the Fn API none of this had
>> to be
>> >> >>> >         standardized,
>> >> >>> >          >             because the runner could just choose
>> whatever it
>> >> >>> >         wants. Now
>> >> >>> >          >             we have to standardize any encodings that
>> runners and
>> >> >>> >          >             harnesses both need to know. There should
>> be
>> >> >>> >         many, and
>> >> >>> >          >             adding more should be just a matter of
>> >> >>> >         standardization, no
>> >> >>> >          >             new design.
>> >> >>> >          >
>> >> >>> >          >             None of this should be user-facing or in
>> the
>> >> >>> >         runner API /
>> >> >>> >          >             pipeline graph - that is critical to
>> making it
>> >> >>> >         flexible on
>> >> >>> >          >             the backend between the runner & SDK
>> harness.
>> >> >>> >          >
>> >> >>> >          >             If I understand it, from our offline
>> discussion,
>> >> >>> >         you are
>> >> >>> >          >             interested in the case where you issue a
>> >> >>> >          >             ProcessBundleRequest to the SDK harness
>> and none
>> >> >>> >         of the
>> >> >>> >          >             primitives in the subgraph will ever
>> observe the
>> >> >>> >         metadata.
>> >> >>> >          >             So you want to not even have a tiny
>> >> >>> >          >             "WindowedValueWithNoMetadata". Is that
>> accurate?
>> >> >>> >          >
>> >> >>> >          >             Kenn
>> >> >>> >          >
>> >> >>> >          >             On Fri, Apr 19, 2019 at 10:17 AM jincheng
>> sun
>> >> >>> >          >             <sunjincheng...@gmail.com
>> >> >>> >         <mailto:sunjincheng...@gmail.com>
>> >> >>> >         <mailto:sunjincheng...@gmail.com <mailto:
>> sunjincheng...@gmail.com>>>
>> >> >>> >          >             wrote:
>> >> >>> >          >
>> >> >>> >          >                 Thank you! And have a nice weekend!
>> >> >>> >          >
>> >> >>> >          >
>> >> >>> >          >                 Lukasz Cwik <lc...@google.com
>> >> >>> >         <mailto:lc...@google.com> <mailto:lc...@google.com
>> >> >>> >         <mailto:lc...@google.com>>>
>> >> >>> >          >                 于2019年4月20日周六 上午1:14写道:
>> >> >>> >          >
>> >> >>> >          >                     I have added you as a contributor.
>> >> >>> >          >
>> >> >>> >          >                     On Fri, Apr 19, 2019 at 9:56 AM
>> jincheng sun
>> >> >>> >          >                     <sunjincheng...@gmail.com
>> >> >>> >         <mailto:sunjincheng...@gmail.com>
>> >> >>> >          >                     <mailto:sunjincheng...@gmail.com
>> >> >>> >         <mailto:sunjincheng...@gmail.com>>> wrote:
>> >> >>> >          >
>> >> >>> >          >                         Hi Lukasz,
>> >> >>> >          >
>> >> >>> >          >                         Thanks for your affirmation
>> and
>> >> >>> >         provide more
>> >> >>> >          >                         contextual information. :)
>> >> >>> >          >
>> >> >>> >          >                         Would you please give me the
>> contributor
>> >> >>> >          >                         permission?  My JIRA ID is
>> >> >>> >         sunjincheng121.
>> >> >>> >          >
>> >> >>> >          >                         I would like to create/assign
>> tickets
>> >> >>> >         for this work.
>> >> >>> >          >
>> >> >>> >          >                         Thanks,
>> >> >>> >          >                         Jincheng
>> >> >>> >          >
>> >> >>> >          >                         Lukasz Cwik <lc...@google.com
>> >> >>> >         <mailto:lc...@google.com>
>> >> >>> >          >                         <mailto:lc...@google.com
>> >> >>> >         <mailto:lc...@google.com>>> 于2019年4月20日周六
>> >> >>> >          >                         上午12:26写道:
>> >> >>> >          >
>> >> >>> >          >                             Since I don't think this
>> is a
>> >> >>> >         contentious
>> >> >>> >          >                             change.
>> >> >>> >          >
>> >> >>> >          >                             On Fri, Apr 19, 2019 at
>> 9:25 AM
>> >> >>> >         Lukasz Cwik
>> >> >>> >          >                             <lc...@google.com
>> >> >>> >         <mailto:lc...@google.com> <mailto:lc...@google.com
>> >> >>> >         <mailto:lc...@google.com>>>
>> >> >>> >          >                             wrote:
>> >> >>> >          >
>> >> >>> >          >                                 Yes, using T makes
>> sense.
>> >> >>> >          >
>> >> >>> >          >                                 The WindowedValue was
>> meant
>> >> >>> >         to be a
>> >> >>> >          >                                 context object in the
>> SDK
>> >> >>> >         harness that
>> >> >>> >          >                                 propagates various
>> >> >>> >         information about the
>> >> >>> >          >                                 current element. We
>> have
>> >> >>> >         discussed in
>> >> >>> >          >                                 the past about:
>> >> >>> >          >                                 * making
>> optimizations which
>> >> >>> >         would pass
>> >> >>> >          >                                 around less of the
>> context
>> >> >>> >         information
>> >> >>> >          >                                 if we know that the
>> DoFns
>> >> >>> >         don't need it
>> >> >>> >          >                                 (for example, all the
>> values
>> >> >>> >         share the
>> >> >>> >          >                                 same window).
>> >> >>> >          >                                 * versioning the
>> encoding
>> >> >>> >         separately
>> >> >>> >          >                                 from the WindowedValue
>> >> >>> >         context object
>> >> >>> >          >                                 (see recent
>> discussion about
>> >> >>> >         element
>> >> >>> >          >                                 timestamp precision
>> [1])
>> >> >>> >          >                                 * the runner may want
>> its own
>> >> >>> >          >                                 representation of a
>> context
>> >> >>> >         object that
>> >> >>> >          >                                 makes sense for it
>> which isn't a
>> >> >>> >          >                                 WindowedValue
>> necessarily.
>> >> >>> >          >
>> >> >>> >          >                                 Feel free to cut a
>> JIRA about
>> >> >>> >         this and
>> >> >>> >          >                                 start working on a
>> change
>> >> >>> >         towards this.
>> >> >>> >          >
>> >> >>> >          >                                 1:
>> >> >>> >          >
>> >> >>> >
>> https://lists.apache.org/thread.html/221b06e81bba335d0ea8d770212cc7ee047dba65bec7978368a51473@%3Cdev.beam.apache.org%3E
>> >> >>> >          >
>> >> >>> >          >                                 On Fri, Apr 19, 2019
>> at 3:18
>> >> >>> >         AM jincheng
>> >> >>> >          >                                 sun <
>> sunjincheng...@gmail.com
>> >> >>> >         <mailto:sunjincheng...@gmail.com>
>> >> >>> >          >
>> >> >>> >           <mailto:sunjincheng...@gmail.com
>> >> >>> >         <mailto:sunjincheng...@gmail.com>>> wrote:
>> >> >>> >          >
>> >> >>> >          >                                     Hi Beam devs,
>> >> >>> >          >
>> >> >>> >          >                                     I read some of
>> the docs about
>> >> >>> >          >                                     `Communicating
>> over the
>> >> >>> >         Fn API` in
>> >> >>> >          >                                     Beam. I feel that
>> Beam
>> >> >>> >         has a very
>> >> >>> >          >                                     good design for
>> Control
>> >> >>> >         Plane/Data
>> >> >>> >          >                                     Plane/State
>> Plane/Logging
>> >> >>> >         services,
>> >> >>> >          >                                     and it is
>> described in
>> >> >>> >         <How to send
>> >> >>> >          >                                     and receive data>
>> >> >>> >         document. When
>> >> >>> >          >                                     communicating
>> between
>> >> >>> >         Runner and SDK
>> >> >>> >          >                                     Harness, the
>> DataPlane
>> >> >>> >         API will be
>> >> >>> >          >                                     WindowedValue(An
>> >> >>> >         immutable triple of
>> >> >>> >          >                                     value, timestamp,
>> and
>> >> >>> >         windows.) As a
>> >> >>> >          >                                     contract object
>> between
>> >> >>> >         Runner and
>> >> >>> >          >                                     SDK Harness. I
>> see the
>> >> >>> >         interface
>> >> >>> >          >                                     definitions for
>> sending and
>> >> >>> >          >                                     receiving data in
>> the
>> >> >>> >         code as follows:
>> >> >>> >          >
>> >> >>> >          >                                     -
>> >> >>> >          >
>> >> >>> >           org.apache.beam.runners.fnexecution.data.FnDataService
>> >> >>> >          >
>> >> >>> >          >                                         public
>> interface
>> >> >>> >         FnDataService {
>> >> >>> >          >                                            <T>
>> InboundDataClient
>> >> >>> >          >
>>  receive(LogicalEndpoint
>> >> >>> >          >                                         inputLocation,
>> >> >>> >          >
>> >> >>> >           Coder<WindowedValue<T>> coder,
>> >> >>> >          >
>> >> >>> >           FnDataReceiver<WindowedValue<T>>
>> >> >>> >          >                                         listener);
>> >> >>> >          >                                            <T>
>> >> >>> >          >
>> >> >>> >           CloseableFnDataReceiver<WindowedValue<T>>
>> >> >>> >          >                                         send(
>> >> >>> >          >
>> LogicalEndpoint
>> >> >>> >          >
>>  outputLocation,
>> >> >>> >          >
>> >> >>> >           Coder<WindowedValue<T>> coder);
>> >> >>> >          >                                         }
>> >> >>> >          >
>> >> >>> >          >
>> >> >>> >          >
>> >> >>> >          >                                     -
>> >> >>> >          >
>> >> >>> >           org.apache.beam.fn.harness.data.BeamFnDataClient
>> >> >>> >          >
>> >> >>> >          >                                         public
>> interface
>> >> >>> >         BeamFnDataClient {
>> >> >>> >          >                                            <T>
>> InboundDataClient
>> >> >>> >          >
>> >> >>> >           receive(ApiServiceDescriptor
>> >> >>> >          >
>>  apiServiceDescriptor,
>> >> >>> >          >
>>  LogicalEndpoint
>> >> >>> >         inputLocation,
>> >> >>> >          >
>> >> >>> >           Coder<WindowedValue<T>> coder,
>> >> >>> >          >
>> >> >>> >           FnDataReceiver<WindowedValue<T>>
>> >> >>> >          >                                         receiver);
>> >> >>> >          >                                            <T>
>> >> >>> >          >
>> >> >>> >           CloseableFnDataReceiver<WindowedValue<T>>
>> >> >>> >          >
>>  send(BeamFnDataGrpcClient
>> >> >>> >          >
>> >> >>> >           Endpoints.ApiServiceDescriptor
>> >> >>> >          >
>>  apiServiceDescriptor,
>> >> >>> >          >
>>  LogicalEndpoint
>> >> >>> >         outputLocation,
>> >> >>> >          >
>> >> >>> >           Coder<WindowedValue<T>> coder);
>> >> >>> >          >                                         }
>> >> >>> >          >
>> >> >>> >          >
>> >> >>> >          >                                     Both
>> >> >>> >         `Coder<WindowedValue<T>>` and
>> >> >>> >          >
>> >> >>> >           `FnDataReceiver<WindowedValue<T>>`
>> >> >>> >          >                                     use
>> `WindowedValue` as
>> >> >>> >         the data
>> >> >>> >          >                                     structure that
>> both sides
>> >> >>> >         of Runner
>> >> >>> >          >                                     and SDK Harness
>> know each
>> >> >>> >         other.
>> >> >>> >          >                                     Control Plane/Data
>> >> >>> >         Plane/State
>> >> >>> >          >                                     Plane/Logging is
>> a highly
>> >> >>> >          >                                     abstraction, such
>> as
>> >> >>> >         Control Plane
>> >> >>> >          >                                     and Logging,
>> these are common
>> >> >>> >          >                                     requirements for
>> all
>> >> >>> >         multi-language
>> >> >>> >          >                                     platforms. For
>> example,
>> >> >>> >         the Flink
>> >> >>> >          >                                     community is also
>> >> >>> >         discussing how to
>> >> >>> >          >                                     support Python
>> UDF, as
>> >> >>> >         well as how
>> >> >>> >          >                                     to deal with
>> docker
>> >> >>> >         environment. how
>> >> >>> >          >                                     to data transfer,
>> how to
>> >> >>> >         state
>> >> >>> >          >                                     access, how to
>> logging
>> >> >>> >         etc. If Beam
>> >> >>> >          >                                     can further
>> abstract
>> >> >>> >         these service
>> >> >>> >          >                                     interfaces, i.e.,
>> interface
>> >> >>> >          >                                     definitions are
>> >> >>> >         compatible with
>> >> >>> >          >                                     multiple engines,
>> and finally
>> >> >>> >          >                                     provided to other
>> >> >>> >         projects in the
>> >> >>> >          >                                     form of class
>> libraries, it
>> >> >>> >          >                                     definitely will
>> help
>> >> >>> >         other platforms
>> >> >>> >          >                                     that want to
>> support multiple
>> >> >>> >          >                                     languages. So
>> could beam
>> >> >>> >         can further
>> >> >>> >          >                                     abstract the
>> interface
>> >> >>> >         definition of
>> >> >>> >          >                                     FnDataService's
>> >> >>> >         BeamFnDataClient?
>> >> >>> >          >                                     Here I am to
>> throw out a
>> >> >>> >         minnow to
>> >> >>> >          >                                     catch a whale,
>> take the
>> >> >>> >          >
>>  FnDataService#receive
>> >> >>> >         interface as
>> >> >>> >          >                                     an example, and
>> turn
>> >> >>> >          >
>>  `WindowedValue<T>` into
>> >> >>> >         `T` so that
>> >> >>> >          >                                     other platforms
>> can be
>> >> >>> >         extended
>> >> >>> >          >                                     arbitrarily, as
>> follows:
>> >> >>> >          >
>> >> >>> >          >                                     <T>
>> InboundDataClient
>> >> >>> >          >
>>  receive(LogicalEndpoint
>> >> >>> >          >                                     inputLocation,
>> Coder<T>
>> >> >>> >         coder,
>> >> >>> >          >
>>  FnDataReceiver<T>> listener);
>> >> >>> >          >
>> >> >>> >          >                                     What do you think?
>> >> >>> >          >
>> >> >>> >          >                                     Feel free to
>> correct me
>> >> >>> >         if there any
>> >> >>> >          >                                     incorrect
>> understanding.
>> >> >>> >         And welcome
>> >> >>> >          >                                     any feedback!
>> >> >>> >          >
>> >> >>> >          >
>> >> >>> >          >                                     Regards,
>> >> >>> >          >                                     Jincheng
>> >> >>> >          >
>> >> >>> >
>>
>

Reply via email to