I wanted to add some more details about the state discussion.

BEAM-7000 is about adding support for a gRPC message saying that the SDK is
now blocked on one of its requests. This would allow for an easy
optimization on the runner side where it gathers requests and is able to
batch them knowing that the SDK is only blocked once it sees one of the
blocked gRPC messages. This would make it easy for the runner to gather up
clear + append calls and convert them to sets internally.

Also, most of the reason around map state not existing has been since we
haven't discuessed the changes to the gRPC APIs that we need. (things like,
can you lookup/clear/append to ranges?, map or multimap?, should we really
just get rid of bag state in favor of a multimap state?, can you enumerate
keys?, know how many keys there are?, ...)

On Wed, Aug 7, 2019 at 9:52 AM Robert Bradshaw <rober...@google.com> wrote:

> The list looks good to me. Thanks for summarizing. Feel free to dive
> into any of these issues yourself :).
>
> On Fri, Aug 2, 2019 at 6:24 PM jincheng sun <sunjincheng...@gmail.com>
> wrote:
> >
> > Hi all,
> >
> >
> > Thanks a lot for sharing your thoughts!
> >
> >
> > It seems that we have already reached consensus for the following items.
> Could you please read through them again and double-check if you all agree
> with these? If yes, then I would start creating JIRA issues for those that
> don’t yet have a JIRA issue
> >
> >
> > 1. Items that require improvements of Beam:
> >
> >
> > 1) The configuration of "semi_persist_dir" should be configurable. (TODO)
> >
> >
> https://github.com/apache/beam/blob/master/sdks/python/container/boot.go#L48
> >
> >
> > 2) Time-based cache threshold should be supported. (TODO)
> >
> >
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/data_plane.py#L259
> >
> >
> https://github.com/apache/beam/blob/master/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
> >
> >
> > 3) Cross-bundle cache should be supported. (
> https://issues.apache.org/jira/browse/BEAM-5428)
> >
> >
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
> >
> >
> > 4) Allows to configure the log level. (TODO)
> >
> > https://issues.apache.org/jira/browse/BEAM-5468
> >
> >
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker_main.py#L102
> >
> >
> > 5) Improves the interfaces of classes such as FnDataService,
> BundleProcessor, ActiveBundle, etc to change the parameter type from
> WindowedValue<T> to T. (TODO)
> >
> >
> > 6) Python 3 is already supported in Beam. The warning should be removed.
> (TODO)
> >
> > https://github.com/apache/beam/blob/master/sdks/python/setup.py#L179
> >
> >
> > 7) The coder of WindowedValue should be configurable which makes it
> possible to use customization coder such as ValueOnlyWindowedValueCoder.
> (TODO)
> >
> >
> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L91
> >
> >
> > 8) The schema work can be used to solve the performance issue of the
> extra prefixing length of encoding. However, it should also be supported in
> Python. (https://github.com/apache/beam/pull/9188)
> >
> >
> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto
> >
> >
> > 9) MapState should be supported in the gRPC protocol. (TODO)
> >
> >
> https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L662
> >
> >
> >
> >
> > 2. Items where we don’t need to do anything for now:
> >
> >
> > 1) The default buffer size is enough for most cases and there is no need
> to make it configurable for now.
> >
> > 2) Do not support ValueState in the gRPC protocol for now unless we have
> evidence it matters.
> >
> >
> >
> > If there are any incorrect understanding,  please feel free to correct
> me :)
> >
> >
> > --------------------
> >
> >
> > There are also some items that I didn’t bring up earlier which require
> further discussion:
> >
> > 1) The input queue size of the input buffer in Python SDK Harness is not
> size limited. We should give a reasonable default size.
> >
> >
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/data_plane.py#L175
> >
> >
> > 2) Refactor the code to avoid unnecessary dependencies pull in. For
> example, beam-sdks-java-core(11MB) is a package for Java SDK users and it
> is pull in because there are a few classes in beam-sdks-java-core are used
> in beam-runners-java-fn-execution, such as:
> >
> > PipelineOptions used in DefaultJobBundleFactory FileSystems used in
> BeamFileSystemArtifactRetrievalService.
> >
> > It means maybe we can add a new module such as beam-sdks-java-common to
> hold the classes used by both runner and SDK.
> >
> >
> > 3) Allows to start up StatusServer according to configuration in Python
> SDK Harness. Currently the StatusServer is start up by default.
> >
> >
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker_main.py#L113
> >
> >
> >
> > Best,
> >
> > Jincheng
> >
> >
> > jincheng sun <sunjincheng...@gmail.com> 于2019年8月2日周五 下午4:14写道:
> >>
> >> Thanks for share the detail of the current StandardCoders Max!
> >> That's true, Flink may should defined some of coders, And I will share
> the POC in the Flink Python UDFs DISCUSS Thread later :)
> >>
> >> Best,
> >> Jincheng
> >>
> >> Maximilian Michels <m...@apache.org> 于2019年7月31日周三 下午2:53写道:
> >>>
> >>> Hi Jincheng,
> >>>
> >>> Thanks for getting back to us.
> >>>
> >>> > For the next major release of Flink, we plan to add Python user
> defined functions(UDF, UDTF, UDAF) support in Flink and I have go over the
> Beam portability framework and think that it is perfect for our
> requirements. However we also find some improvements needed for Beam:
> >>>
> >>> That sounds great! The improvement list contains very reasonable
> >>> suggestions, some of them which are already on our TODO list. I think
> >>> Thomas and Robert already provided the answers you were looking for.
> >>>
> >>> > Open questions:
> >>> > ---------------------
> >>> > 1) Which coders should be / can be defined in StandardCoders?
> >>>
> >>> The ones which are present now those are:
> >>>
> >>>           BYTES_CODER
> >>>           INT64_CODER
> >>>           STRING_UTF8
> >>>           ITERABLE_CODER
> >>>           TIMER_CODER
> >>>           KV_CODER
> >>>           LENGTH_PREFIX_CODER
> >>>           GLOBAL_WINDOW_CODER
> >>>           INTERVAL_WINDOW_CODER
> >>>           WINDOWED_VALUE_CODER
> >>>           DOUBLE_CODER
> >>>
> >>> Note, that this is just across SDK borders. If you stay within one SDK,
> >>> you can use any coder. If a Runner wants to replace a particular coder
> >>> with its own coder implementation, it could do that. Flink may want to
> >>> use its own set of coders for the sake of coder migration. Another
> >>> option Robert alluded to, would be to make use of Schema were possible,
> >>> which has been built with migration in mind.
> >>>
> >>> Thanks,
> >>> Max
> >>>
> >>> >
> >>> > Must Have:
> >>> > ----------------
> >>> > 1) Currently only BagState is supported in gRPC protocol and I think
> we should support more kinds of state types, such as MapState, ValueState,
> ReducingState, CombiningState(AggregatingState in Flink), etc. That's
> because these kinds of state will be used in both user-defined function or
> Flink Python DataStream API.
> >>> >
> >>> > 2) There are warnings that Python 3 is not fully supported in Beam
> (beam/sdks/python/setup.py). We should support Python 3.x for the beam
> portability framework due to Python 2 will be not supported officially.
> >>> >
> >>> > 3) The configuration "semi_persist_dir" is not set in
> EnvironmentFactory at the runner side. Why I think it's  must to have is
> because when the environment type is "PROCESS", the default value "/tmp"
> may become a big problem.
> >>> >
> >>> > 4) The buffer size configure policy should be improved, such as:
> >>> >    At runner side, the buffer limit in
> BeamFnDataBufferingOutboundObserver is size based. We should also support
> time based especially for the streaming case.
> >>> >    At Python SDK Harness, the buffer size is not configurable in
> GrpcDataService. The input queue size of the input buffer in Python SDK
> Harness is not size limited.
> >>> >   The flush threshold of the output buffer in Python SDK Harness is
> 10 MB by default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make
> the threshold configurable and support time based threshold.
> >>> >
> >>> > Nice To Have:
> >>> > -------------------
> >>> > 1) Improves the interfaces of FnDataService, BundleProcessor,
> ActiveBundle, etc, to change the parameter type from WindowedValue<T> to T.
> (We have already discussed in the previous mails)
> >>> >
> >>> > 2) Refactor the code to avoid unnecessary dependencies pull in. For
> example, beam-sdks-java-core(11MB) is a package for Java SDK users and it
> is pull in because there are a few classes in beam-sdks-java-core are used
> in beam-runners-java-fn-execution, such as:
> >>> > PipelineOptions used in DefaultJobBundleFactory FileSystems used in
> BeamFileSystemArtifactRetrievalService.
> >>> > It means maybe we can add a new module such as beam-sdks-java-common
> to hold the classes used by both runner and SDK.
> >>> >
> >>> > 3) State cache is not shared between bundles which is performance
> critical for streaming jobs.
> >>> >
> >>> > 4) The coder of WindowedValue cannot be configured and most of time
> we don't need to serialize and deserialize the timestamp, window and pane
> properties in Flink. But currently FullWindowedValueCoder is used by
> default in WireCoders.addWireCoder, I suggest to make the coder
> configurable (i.e. allowing to use ValueOnlyWindowedValueCoder)
> >>> >
> >>> > 5) Currently if a coder is not defined in StandardCoders, it will be
> wrapped with LengthPrefixedCoder (WireCoders.addWireCoder ->
> LengthPrefixUnknownCoders.addLengthPrefixedCoder). However, only a few
> coders are defined in StandardCoders. It means that for most coder, a
> length will be added to the serialized bytes which is not necessary in my
> thoughts. My suggestion is maybe we can add some interfaces or tags for the
> coder which indicate whether the coder is needed a length prefix or not.
> >>> >
> >>> > 6) Set log level according to PipelineOption in Python SDK Harness.
> Currently the log level is set to INFO by default.
> >>> >
> >>> > 7) Allows to start up StatusServer according to PipelineOption in
> Python SDK Harness. Currently the StatusServer is start up by default.
> >>> >
> >>> > Although I put 3) 4) 5) into the "Nice to Have" as they are
> performance related, I still think they are very critical for Python UDF
> execution performance.
> >>> >
> >>> > Open questions:
> >>> > ---------------------
> >>> > 1) Which coders should be / can be defined in StandardCoders?
> >>> >
> >>> > Currently we are preparing the design of how to support Python UDF
> in Flink based on the Beam portability framework and we will bring up the
> discussion in Flink community. We may propose more changes for Beam during
> that time and may need more support from Beam community.
> >>> >
> >>> > To be honest, I'm not an expert of Beam and so please feel free to
> correct me if my understanding is wrong. Welcome any feedback.
> >>> >
> >>> > Best,
> >>> > Jincheng
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> Cheers,
> >>> Max
> >>>
> >>> On 31.07.19 12:16, Robert Bradshaw wrote:
> >>> > Yep, Python support under active development,
> >>> > e.g. https://github.com/apache/beam/pull/9188
> >>> >
> >>> > On Wed, Jul 31, 2019 at 9:24 AM jincheng sun <
> sunjincheng...@gmail.com
> >>> > <mailto:sunjincheng...@gmail.com>> wrote:
> >>> >
> >>> >     Thanks a lot for sharing the link. I take a quick look at the
> design
> >>> >     and the implementation in Java and think it could address my
> >>> >     concern. It seems that it's still not supported in the Python SDK
> >>> >     Harness. Is there any plan on that?
> >>> >
> >>> >     Robert Bradshaw <rober...@google.com <mailto:rober...@google.com
> >>
> >>> >     于2019年7月30日周二 下午12:33写道:
> >>> >
> >>> >         On Tue, Jul 30, 2019 at 11:52 AM jincheng sun
> >>> >         <sunjincheng...@gmail.com <mailto:sunjincheng...@gmail.com>>
> wrote:
> >>> >
> >>> >
> >>> >                     Is it possible to add an interface such as
> >>> >                     `isSelfContained()` to the `Coder`? This
> interface
> >>> >                     indicates
> >>> >                     whether the serialized bytes are self contained.
> If
> >>> >                     it returns true, then there is no need to add a
> >>> >                     prefixing length.
> >>> >                     In this way, there is no need to introduce an
> extra
> >>> >                     protocol,  Please correct me if I missed
> something :)
> >>> >
> >>> >
> >>> >                 The question is how it is self contained. E.g.
> >>> >                 DoubleCoder is self contained because it always uses
> >>> >                 exactly 8 bytes, but one needs to know the double
> coder
> >>> >                 to leverage this. VarInt coder is self-contained a
> >>> >                 different way, as is StringCoder (which does just do
> >>> >                 prefixing).
> >>> >
> >>> >
> >>> >             Yes, you are right! I think it again that we can not add
> >>> >             such interface for the coder, due to runner can not call
> it.
> >>> >             And just one more thought: does it make sense to add a
> >>> >             method such as "registerSelfContained Coder(xxx)" or so
> to
> >>> >             let users register the coders which can be processed in
> the
> >>> >             SDK Harness?  It's the responsibility of the SDK harness
> to
> >>> >             ensure that the coder is supported.
> >>> >
> >>> >
> >>> >         Basically, a "please don't add length prefixing to this
> coder,
> >>> >         assume everyone else can understand it (and errors will
> ensue if
> >>> >         anyone doesn't)" at the user level? Seems a bit dangerous.
> Also,
> >>> >         there is not "the SDK"--there may be multiple other SDKs in
> >>> >         general, and of course runner components, some of which may
> >>> >         understand the coder in question and some of which may not.
> >>> >
> >>> >         I would say that if this becomes a problem, we could look at
> the
> >>> >         pros and cons of various remedies, this being one
> alternative.
> >>> >
> >>> >
> >>> >
> >>> >
> >>> >                 I am hopeful that schemas give us a rich enough way
> to
> >>> >                 encode the vast majority of types that we will want
> to
> >>> >                 transmit across language barriers (possibly with some
> >>> >                 widening promotions). For high performance one will
> want
> >>> >                 to use formats like arrow rather than one-off coders
> as
> >>> >                 well, which also biases us towards the schema work.
> The
> >>> >                 set of StandardCoders is not closed, and nor is the
> >>> >                 possibility of figuring out a way to communicate
> outside
> >>> >                 this set for a particular pair of languages, but I
> think
> >>> >                 it makes sense to avoid going that direction unless
> we
> >>> >                 have to due to the increased API surface aread and
> >>> >                 complexity it imposes on all runners and SDKs.
> >>> >
> >>> >
> >>> >             Great! Could you share some links about the schema work.
> It
> >>> >             seems very interesting and promising.
> >>> >
> >>> >
> >>> >
> https://beam.apache.org/contribute/design-documents/#sql--schema and
> >>> >         of particular relevance https://s.apache.org/beam-schemas
> >>> >
> >>> >
> >>> >
>

Reply via email to