Hi all,

Thanks for your confirmation Robert! :)

Thanks for share the details about the state discussion Luke! :)

The MapState is a bit complex, I think it's better to add some detail
design doc when we deal with the map state supported.

I will create JIRAs and follow up on subsequent developments. If there are
big changes, I will provide detailed design documentation and bring up the
discussion in ML.

Thanks everyone for joining this discussion.

Best,
Jincheng

Lukasz Cwik <lc...@google.com> 于2019年8月7日周三 下午8:19写道:

> I wanted to add some more details about the state discussion.
>
> BEAM-7000 is about adding support for a gRPC message saying that the SDK
> is now blocked on one of its requests. This would allow for an easy
> optimization on the runner side where it gathers requests and is able to
> batch them knowing that the SDK is only blocked once it sees one of the
> blocked gRPC messages. This would make it easy for the runner to gather up
> clear + append calls and convert them to sets internally.
>
> Also, most of the reason around map state not existing has been since we
> haven't discuessed the changes to the gRPC APIs that we need. (things like,
> can you lookup/clear/append to ranges?, map or multimap?, should we really
> just get rid of bag state in favor of a multimap state?, can you enumerate
> keys?, know how many keys there are?, ...)
>
> On Wed, Aug 7, 2019 at 9:52 AM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> The list looks good to me. Thanks for summarizing. Feel free to dive
>> into any of these issues yourself :).
>>
>> On Fri, Aug 2, 2019 at 6:24 PM jincheng sun <sunjincheng...@gmail.com>
>> wrote:
>> >
>> > Hi all,
>> >
>> >
>> > Thanks a lot for sharing your thoughts!
>> >
>> >
>> > It seems that we have already reached consensus for the following
>> items. Could you please read through them again and double-check if you all
>> agree with these? If yes, then I would start creating JIRA issues for those
>> that don’t yet have a JIRA issue
>> >
>> >
>> > 1. Items that require improvements of Beam:
>> >
>> >
>> > 1) The configuration of "semi_persist_dir" should be configurable.
>> (TODO)
>> >
>> >
>> https://github.com/apache/beam/blob/master/sdks/python/container/boot.go#L48
>> >
>> >
>> > 2) Time-based cache threshold should be supported. (TODO)
>> >
>> >
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/data_plane.py#L259
>> >
>> >
>> https://github.com/apache/beam/blob/master/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
>> >
>> >
>> > 3) Cross-bundle cache should be supported. (
>> https://issues.apache.org/jira/browse/BEAM-5428)
>> >
>> >
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
>> >
>> >
>> > 4) Allows to configure the log level. (TODO)
>> >
>> > https://issues.apache.org/jira/browse/BEAM-5468
>> >
>> >
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker_main.py#L102
>> >
>> >
>> > 5) Improves the interfaces of classes such as FnDataService,
>> BundleProcessor, ActiveBundle, etc to change the parameter type from
>> WindowedValue<T> to T. (TODO)
>> >
>> >
>> > 6) Python 3 is already supported in Beam. The warning should be
>> removed. (TODO)
>> >
>> > https://github.com/apache/beam/blob/master/sdks/python/setup.py#L179
>> >
>> >
>> > 7) The coder of WindowedValue should be configurable which makes it
>> possible to use customization coder such as ValueOnlyWindowedValueCoder.
>> (TODO)
>> >
>> >
>> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L91
>> >
>> >
>> > 8) The schema work can be used to solve the performance issue of the
>> extra prefixing length of encoding. However, it should also be supported in
>> Python. (https://github.com/apache/beam/pull/9188)
>> >
>> >
>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto
>> >
>> >
>> > 9) MapState should be supported in the gRPC protocol. (TODO)
>> >
>> >
>> https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L662
>> >
>> >
>> >
>> >
>> > 2. Items where we don’t need to do anything for now:
>> >
>> >
>> > 1) The default buffer size is enough for most cases and there is no
>> need to make it configurable for now.
>> >
>> > 2) Do not support ValueState in the gRPC protocol for now unless we
>> have evidence it matters.
>> >
>> >
>> >
>> > If there are any incorrect understanding,  please feel free to correct
>> me :)
>> >
>> >
>> > --------------------
>> >
>> >
>> > There are also some items that I didn’t bring up earlier which require
>> further discussion:
>> >
>> > 1) The input queue size of the input buffer in Python SDK Harness is
>> not size limited. We should give a reasonable default size.
>> >
>> >
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/data_plane.py#L175
>> >
>> >
>> > 2) Refactor the code to avoid unnecessary dependencies pull in. For
>> example, beam-sdks-java-core(11MB) is a package for Java SDK users and it
>> is pull in because there are a few classes in beam-sdks-java-core are used
>> in beam-runners-java-fn-execution, such as:
>> >
>> > PipelineOptions used in DefaultJobBundleFactory FileSystems used in
>> BeamFileSystemArtifactRetrievalService.
>> >
>> > It means maybe we can add a new module such as beam-sdks-java-common to
>> hold the classes used by both runner and SDK.
>> >
>> >
>> > 3) Allows to start up StatusServer according to configuration in Python
>> SDK Harness. Currently the StatusServer is start up by default.
>> >
>> >
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker_main.py#L113
>> >
>> >
>> >
>> > Best,
>> >
>> > Jincheng
>> >
>> >
>> > jincheng sun <sunjincheng...@gmail.com> 于2019年8月2日周五 下午4:14写道:
>> >>
>> >> Thanks for share the detail of the current StandardCoders Max!
>> >> That's true, Flink may should defined some of coders, And I will share
>> the POC in the Flink Python UDFs DISCUSS Thread later :)
>> >>
>> >> Best,
>> >> Jincheng
>> >>
>> >> Maximilian Michels <m...@apache.org> 于2019年7月31日周三 下午2:53写道:
>> >>>
>> >>> Hi Jincheng,
>> >>>
>> >>> Thanks for getting back to us.
>> >>>
>> >>> > For the next major release of Flink, we plan to add Python user
>> defined functions(UDF, UDTF, UDAF) support in Flink and I have go over the
>> Beam portability framework and think that it is perfect for our
>> requirements. However we also find some improvements needed for Beam:
>> >>>
>> >>> That sounds great! The improvement list contains very reasonable
>> >>> suggestions, some of them which are already on our TODO list. I think
>> >>> Thomas and Robert already provided the answers you were looking for.
>> >>>
>> >>> > Open questions:
>> >>> > ---------------------
>> >>> > 1) Which coders should be / can be defined in StandardCoders?
>> >>>
>> >>> The ones which are present now those are:
>> >>>
>> >>>           BYTES_CODER
>> >>>           INT64_CODER
>> >>>           STRING_UTF8
>> >>>           ITERABLE_CODER
>> >>>           TIMER_CODER
>> >>>           KV_CODER
>> >>>           LENGTH_PREFIX_CODER
>> >>>           GLOBAL_WINDOW_CODER
>> >>>           INTERVAL_WINDOW_CODER
>> >>>           WINDOWED_VALUE_CODER
>> >>>           DOUBLE_CODER
>> >>>
>> >>> Note, that this is just across SDK borders. If you stay within one
>> SDK,
>> >>> you can use any coder. If a Runner wants to replace a particular coder
>> >>> with its own coder implementation, it could do that. Flink may want to
>> >>> use its own set of coders for the sake of coder migration. Another
>> >>> option Robert alluded to, would be to make use of Schema were
>> possible,
>> >>> which has been built with migration in mind.
>> >>>
>> >>> Thanks,
>> >>> Max
>> >>>
>> >>> >
>> >>> > Must Have:
>> >>> > ----------------
>> >>> > 1) Currently only BagState is supported in gRPC protocol and I
>> think we should support more kinds of state types, such as MapState,
>> ValueState, ReducingState, CombiningState(AggregatingState in Flink), etc.
>> That's because these kinds of state will be used in both user-defined
>> function or Flink Python DataStream API.
>> >>> >
>> >>> > 2) There are warnings that Python 3 is not fully supported in Beam
>> (beam/sdks/python/setup.py). We should support Python 3.x for the beam
>> portability framework due to Python 2 will be not supported officially.
>> >>> >
>> >>> > 3) The configuration "semi_persist_dir" is not set in
>> EnvironmentFactory at the runner side. Why I think it's  must to have is
>> because when the environment type is "PROCESS", the default value "/tmp"
>> may become a big problem.
>> >>> >
>> >>> > 4) The buffer size configure policy should be improved, such as:
>> >>> >    At runner side, the buffer limit in
>> BeamFnDataBufferingOutboundObserver is size based. We should also support
>> time based especially for the streaming case.
>> >>> >    At Python SDK Harness, the buffer size is not configurable in
>> GrpcDataService. The input queue size of the input buffer in Python SDK
>> Harness is not size limited.
>> >>> >   The flush threshold of the output buffer in Python SDK Harness is
>> 10 MB by default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make
>> the threshold configurable and support time based threshold.
>> >>> >
>> >>> > Nice To Have:
>> >>> > -------------------
>> >>> > 1) Improves the interfaces of FnDataService, BundleProcessor,
>> ActiveBundle, etc, to change the parameter type from WindowedValue<T> to T.
>> (We have already discussed in the previous mails)
>> >>> >
>> >>> > 2) Refactor the code to avoid unnecessary dependencies pull in. For
>> example, beam-sdks-java-core(11MB) is a package for Java SDK users and it
>> is pull in because there are a few classes in beam-sdks-java-core are used
>> in beam-runners-java-fn-execution, such as:
>> >>> > PipelineOptions used in DefaultJobBundleFactory FileSystems used in
>> BeamFileSystemArtifactRetrievalService.
>> >>> > It means maybe we can add a new module such as
>> beam-sdks-java-common to hold the classes used by both runner and SDK.
>> >>> >
>> >>> > 3) State cache is not shared between bundles which is performance
>> critical for streaming jobs.
>> >>> >
>> >>> > 4) The coder of WindowedValue cannot be configured and most of time
>> we don't need to serialize and deserialize the timestamp, window and pane
>> properties in Flink. But currently FullWindowedValueCoder is used by
>> default in WireCoders.addWireCoder, I suggest to make the coder
>> configurable (i.e. allowing to use ValueOnlyWindowedValueCoder)
>> >>> >
>> >>> > 5) Currently if a coder is not defined in StandardCoders, it will
>> be wrapped with LengthPrefixedCoder (WireCoders.addWireCoder ->
>> LengthPrefixUnknownCoders.addLengthPrefixedCoder). However, only a few
>> coders are defined in StandardCoders. It means that for most coder, a
>> length will be added to the serialized bytes which is not necessary in my
>> thoughts. My suggestion is maybe we can add some interfaces or tags for the
>> coder which indicate whether the coder is needed a length prefix or not.
>> >>> >
>> >>> > 6) Set log level according to PipelineOption in Python SDK Harness.
>> Currently the log level is set to INFO by default.
>> >>> >
>> >>> > 7) Allows to start up StatusServer according to PipelineOption in
>> Python SDK Harness. Currently the StatusServer is start up by default.
>> >>> >
>> >>> > Although I put 3) 4) 5) into the "Nice to Have" as they are
>> performance related, I still think they are very critical for Python UDF
>> execution performance.
>> >>> >
>> >>> > Open questions:
>> >>> > ---------------------
>> >>> > 1) Which coders should be / can be defined in StandardCoders?
>> >>> >
>> >>> > Currently we are preparing the design of how to support Python UDF
>> in Flink based on the Beam portability framework and we will bring up the
>> discussion in Flink community. We may propose more changes for Beam during
>> that time and may need more support from Beam community.
>> >>> >
>> >>> > To be honest, I'm not an expert of Beam and so please feel free to
>> correct me if my understanding is wrong. Welcome any feedback.
>> >>> >
>> >>> > Best,
>> >>> > Jincheng
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> Cheers,
>> >>> Max
>> >>>
>> >>> On 31.07.19 12:16, Robert Bradshaw wrote:
>> >>> > Yep, Python support under active development,
>> >>> > e.g. https://github.com/apache/beam/pull/9188
>> >>> >
>> >>> > On Wed, Jul 31, 2019 at 9:24 AM jincheng sun <
>> sunjincheng...@gmail.com
>> >>> > <mailto:sunjincheng...@gmail.com>> wrote:
>> >>> >
>> >>> >     Thanks a lot for sharing the link. I take a quick look at the
>> design
>> >>> >     and the implementation in Java and think it could address my
>> >>> >     concern. It seems that it's still not supported in the Python
>> SDK
>> >>> >     Harness. Is there any plan on that?
>> >>> >
>> >>> >     Robert Bradshaw <rober...@google.com <mailto:
>> rober...@google.com>>
>> >>> >     于2019年7月30日周二 下午12:33写道:
>> >>> >
>> >>> >         On Tue, Jul 30, 2019 at 11:52 AM jincheng sun
>> >>> >         <sunjincheng...@gmail.com <mailto:sunjincheng...@gmail.com>>
>> wrote:
>> >>> >
>> >>> >
>> >>> >                     Is it possible to add an interface such as
>> >>> >                     `isSelfContained()` to the `Coder`? This
>> interface
>> >>> >                     indicates
>> >>> >                     whether the serialized bytes are self
>> contained. If
>> >>> >                     it returns true, then there is no need to add a
>> >>> >                     prefixing length.
>> >>> >                     In this way, there is no need to introduce an
>> extra
>> >>> >                     protocol,  Please correct me if I missed
>> something :)
>> >>> >
>> >>> >
>> >>> >                 The question is how it is self contained. E.g.
>> >>> >                 DoubleCoder is self contained because it always uses
>> >>> >                 exactly 8 bytes, but one needs to know the double
>> coder
>> >>> >                 to leverage this. VarInt coder is self-contained a
>> >>> >                 different way, as is StringCoder (which does just do
>> >>> >                 prefixing).
>> >>> >
>> >>> >
>> >>> >             Yes, you are right! I think it again that we can not add
>> >>> >             such interface for the coder, due to runner can not
>> call it.
>> >>> >             And just one more thought: does it make sense to add a
>> >>> >             method such as "registerSelfContained Coder(xxx)" or so
>> to
>> >>> >             let users register the coders which can be processed in
>> the
>> >>> >             SDK Harness?  It's the responsibility of the SDK
>> harness to
>> >>> >             ensure that the coder is supported.
>> >>> >
>> >>> >
>> >>> >         Basically, a "please don't add length prefixing to this
>> coder,
>> >>> >         assume everyone else can understand it (and errors will
>> ensue if
>> >>> >         anyone doesn't)" at the user level? Seems a bit dangerous.
>> Also,
>> >>> >         there is not "the SDK"--there may be multiple other SDKs in
>> >>> >         general, and of course runner components, some of which may
>> >>> >         understand the coder in question and some of which may not.
>> >>> >
>> >>> >         I would say that if this becomes a problem, we could look
>> at the
>> >>> >         pros and cons of various remedies, this being one
>> alternative.
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> >                 I am hopeful that schemas give us a rich enough way
>> to
>> >>> >                 encode the vast majority of types that we will want
>> to
>> >>> >                 transmit across language barriers (possibly with
>> some
>> >>> >                 widening promotions). For high performance one will
>> want
>> >>> >                 to use formats like arrow rather than one-off
>> coders as
>> >>> >                 well, which also biases us towards the schema work.
>> The
>> >>> >                 set of StandardCoders is not closed, and nor is the
>> >>> >                 possibility of figuring out a way to communicate
>> outside
>> >>> >                 this set for a particular pair of languages, but I
>> think
>> >>> >                 it makes sense to avoid going that direction unless
>> we
>> >>> >                 have to due to the increased API surface aread and
>> >>> >                 complexity it imposes on all runners and SDKs.
>> >>> >
>> >>> >
>> >>> >             Great! Could you share some links about the schema
>> work. It
>> >>> >             seems very interesting and promising.
>> >>> >
>> >>> >
>> >>> >
>> https://beam.apache.org/contribute/design-documents/#sql--schema and
>> >>> >         of particular relevance https://s.apache.org/beam-schemas
>> >>> >
>> >>> >
>> >>> >
>>
>

Reply via email to