The list looks good to me. Thanks for summarizing. Feel free to dive
into any of these issues yourself :).

On Fri, Aug 2, 2019 at 6:24 PM jincheng sun <sunjincheng...@gmail.com> wrote:
>
> Hi all,
>
>
> Thanks a lot for sharing your thoughts!
>
>
> It seems that we have already reached consensus for the following items. 
> Could you please read through them again and double-check if you all agree 
> with these? If yes, then I would start creating JIRA issues for those that 
> don’t yet have a JIRA issue
>
>
> 1. Items that require improvements of Beam:
>
>
> 1) The configuration of "semi_persist_dir" should be configurable. (TODO)
>
> https://github.com/apache/beam/blob/master/sdks/python/container/boot.go#L48
>
>
> 2) Time-based cache threshold should be supported. (TODO)
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/data_plane.py#L259
>
> https://github.com/apache/beam/blob/master/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
>
>
> 3) Cross-bundle cache should be supported. 
> (https://issues.apache.org/jira/browse/BEAM-5428)
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
>
>
> 4) Allows to configure the log level. (TODO)
>
> https://issues.apache.org/jira/browse/BEAM-5468
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker_main.py#L102
>
>
> 5) Improves the interfaces of classes such as FnDataService, BundleProcessor, 
> ActiveBundle, etc to change the parameter type from WindowedValue<T> to T. 
> (TODO)
>
>
> 6) Python 3 is already supported in Beam. The warning should be removed. 
> (TODO)
>
> https://github.com/apache/beam/blob/master/sdks/python/setup.py#L179
>
>
> 7) The coder of WindowedValue should be configurable which makes it possible 
> to use customization coder such as ValueOnlyWindowedValueCoder. (TODO)
>
> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L91
>
>
> 8) The schema work can be used to solve the performance issue of the extra 
> prefixing length of encoding. However, it should also be supported in Python. 
> (https://github.com/apache/beam/pull/9188)
>
> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto
>
>
> 9) MapState should be supported in the gRPC protocol. (TODO)
>
> https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L662
>
>
>
>
> 2. Items where we don’t need to do anything for now:
>
>
> 1) The default buffer size is enough for most cases and there is no need to 
> make it configurable for now.
>
> 2) Do not support ValueState in the gRPC protocol for now unless we have 
> evidence it matters.
>
>
>
> If there are any incorrect understanding,  please feel free to correct me :)
>
>
> --------------------
>
>
> There are also some items that I didn’t bring up earlier which require 
> further discussion:
>
> 1) The input queue size of the input buffer in Python SDK Harness is not size 
> limited. We should give a reasonable default size.
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/data_plane.py#L175
>
>
> 2) Refactor the code to avoid unnecessary dependencies pull in. For example, 
> beam-sdks-java-core(11MB) is a package for Java SDK users and it is pull in 
> because there are a few classes in beam-sdks-java-core are used in 
> beam-runners-java-fn-execution, such as:
>
> PipelineOptions used in DefaultJobBundleFactory FileSystems used in 
> BeamFileSystemArtifactRetrievalService.
>
> It means maybe we can add a new module such as beam-sdks-java-common to hold 
> the classes used by both runner and SDK.
>
>
> 3) Allows to start up StatusServer according to configuration in Python SDK 
> Harness. Currently the StatusServer is start up by default.
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker_main.py#L113
>
>
>
> Best,
>
> Jincheng
>
>
> jincheng sun <sunjincheng...@gmail.com> 于2019年8月2日周五 下午4:14写道:
>>
>> Thanks for share the detail of the current StandardCoders Max!
>> That's true, Flink may should defined some of coders, And I will share the 
>> POC in the Flink Python UDFs DISCUSS Thread later :)
>>
>> Best,
>> Jincheng
>>
>> Maximilian Michels <m...@apache.org> 于2019年7月31日周三 下午2:53写道:
>>>
>>> Hi Jincheng,
>>>
>>> Thanks for getting back to us.
>>>
>>> > For the next major release of Flink, we plan to add Python user defined 
>>> > functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam 
>>> > portability framework and think that it is perfect for our requirements. 
>>> > However we also find some improvements needed for Beam:
>>>
>>> That sounds great! The improvement list contains very reasonable
>>> suggestions, some of them which are already on our TODO list. I think
>>> Thomas and Robert already provided the answers you were looking for.
>>>
>>> > Open questions:
>>> > ---------------------
>>> > 1) Which coders should be / can be defined in StandardCoders?
>>>
>>> The ones which are present now those are:
>>>
>>>           BYTES_CODER
>>>           INT64_CODER
>>>           STRING_UTF8
>>>           ITERABLE_CODER
>>>           TIMER_CODER
>>>           KV_CODER
>>>           LENGTH_PREFIX_CODER
>>>           GLOBAL_WINDOW_CODER
>>>           INTERVAL_WINDOW_CODER
>>>           WINDOWED_VALUE_CODER
>>>           DOUBLE_CODER
>>>
>>> Note, that this is just across SDK borders. If you stay within one SDK,
>>> you can use any coder. If a Runner wants to replace a particular coder
>>> with its own coder implementation, it could do that. Flink may want to
>>> use its own set of coders for the sake of coder migration. Another
>>> option Robert alluded to, would be to make use of Schema were possible,
>>> which has been built with migration in mind.
>>>
>>> Thanks,
>>> Max
>>>
>>> >
>>> > Must Have:
>>> > ----------------
>>> > 1) Currently only BagState is supported in gRPC protocol and I think we 
>>> > should support more kinds of state types, such as MapState, ValueState, 
>>> > ReducingState, CombiningState(AggregatingState in Flink), etc. That's 
>>> > because these kinds of state will be used in both user-defined function 
>>> > or Flink Python DataStream API.
>>> >
>>> > 2) There are warnings that Python 3 is not fully supported in Beam 
>>> > (beam/sdks/python/setup.py). We should support Python 3.x for the beam 
>>> > portability framework due to Python 2 will be not supported officially.
>>> >
>>> > 3) The configuration "semi_persist_dir" is not set in EnvironmentFactory 
>>> > at the runner side. Why I think it's  must to have is because when the 
>>> > environment type is "PROCESS", the default value "/tmp" may become a big 
>>> > problem.
>>> >
>>> > 4) The buffer size configure policy should be improved, such as:
>>> >    At runner side, the buffer limit in 
>>> > BeamFnDataBufferingOutboundObserver is size based. We should also support 
>>> > time based especially for the streaming case.
>>> >    At Python SDK Harness, the buffer size is not configurable in 
>>> > GrpcDataService. The input queue size of the input buffer in Python SDK 
>>> > Harness is not size limited.
>>> >   The flush threshold of the output buffer in Python SDK Harness is 10 MB 
>>> > by default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make the 
>>> > threshold configurable and support time based threshold.
>>> >
>>> > Nice To Have:
>>> > -------------------
>>> > 1) Improves the interfaces of FnDataService, BundleProcessor, 
>>> > ActiveBundle, etc, to change the parameter type from WindowedValue<T> to 
>>> > T. (We have already discussed in the previous mails)
>>> >
>>> > 2) Refactor the code to avoid unnecessary dependencies pull in. For 
>>> > example, beam-sdks-java-core(11MB) is a package for Java SDK users and it 
>>> > is pull in because there are a few classes in beam-sdks-java-core are 
>>> > used in beam-runners-java-fn-execution, such as:
>>> > PipelineOptions used in DefaultJobBundleFactory FileSystems used in 
>>> > BeamFileSystemArtifactRetrievalService.
>>> > It means maybe we can add a new module such as beam-sdks-java-common to 
>>> > hold the classes used by both runner and SDK.
>>> >
>>> > 3) State cache is not shared between bundles which is performance 
>>> > critical for streaming jobs.
>>> >
>>> > 4) The coder of WindowedValue cannot be configured and most of time we 
>>> > don't need to serialize and deserialize the timestamp, window and pane 
>>> > properties in Flink. But currently FullWindowedValueCoder is used by 
>>> > default in WireCoders.addWireCoder, I suggest to make the coder 
>>> > configurable (i.e. allowing to use ValueOnlyWindowedValueCoder)
>>> >
>>> > 5) Currently if a coder is not defined in StandardCoders, it will be 
>>> > wrapped with LengthPrefixedCoder (WireCoders.addWireCoder -> 
>>> > LengthPrefixUnknownCoders.addLengthPrefixedCoder). However, only a few 
>>> > coders are defined in StandardCoders. It means that for most coder, a 
>>> > length will be added to the serialized bytes which is not necessary in my 
>>> > thoughts. My suggestion is maybe we can add some interfaces or tags for 
>>> > the coder which indicate whether the coder is needed a length prefix or 
>>> > not.
>>> >
>>> > 6) Set log level according to PipelineOption in Python SDK Harness. 
>>> > Currently the log level is set to INFO by default.
>>> >
>>> > 7) Allows to start up StatusServer according to PipelineOption in Python 
>>> > SDK Harness. Currently the StatusServer is start up by default.
>>> >
>>> > Although I put 3) 4) 5) into the "Nice to Have" as they are performance 
>>> > related, I still think they are very critical for Python UDF execution 
>>> > performance.
>>> >
>>> > Open questions:
>>> > ---------------------
>>> > 1) Which coders should be / can be defined in StandardCoders?
>>> >
>>> > Currently we are preparing the design of how to support Python UDF in 
>>> > Flink based on the Beam portability framework and we will bring up the 
>>> > discussion in Flink community. We may propose more changes for Beam 
>>> > during that time and may need more support from Beam community.
>>> >
>>> > To be honest, I'm not an expert of Beam and so please feel free to 
>>> > correct me if my understanding is wrong. Welcome any feedback.
>>> >
>>> > Best,
>>> > Jincheng
>>>
>>>
>>>
>>>
>>>
>>>
>>> Cheers,
>>> Max
>>>
>>> On 31.07.19 12:16, Robert Bradshaw wrote:
>>> > Yep, Python support under active development,
>>> > e.g. https://github.com/apache/beam/pull/9188
>>> >
>>> > On Wed, Jul 31, 2019 at 9:24 AM jincheng sun <sunjincheng...@gmail.com
>>> > <mailto:sunjincheng...@gmail.com>> wrote:
>>> >
>>> >     Thanks a lot for sharing the link. I take a quick look at the design
>>> >     and the implementation in Java and think it could address my
>>> >     concern. It seems that it's still not supported in the Python SDK
>>> >     Harness. Is there any plan on that?
>>> >
>>> >     Robert Bradshaw <rober...@google.com <mailto:rober...@google.com>>
>>> >     于2019年7月30日周二 下午12:33写道:
>>> >
>>> >         On Tue, Jul 30, 2019 at 11:52 AM jincheng sun
>>> >         <sunjincheng...@gmail.com <mailto:sunjincheng...@gmail.com>> 
>>> > wrote:
>>> >
>>> >
>>> >                     Is it possible to add an interface such as
>>> >                     `isSelfContained()` to the `Coder`? This interface
>>> >                     indicates
>>> >                     whether the serialized bytes are self contained. If
>>> >                     it returns true, then there is no need to add a
>>> >                     prefixing length.
>>> >                     In this way, there is no need to introduce an extra
>>> >                     protocol,  Please correct me if I missed something :)
>>> >
>>> >
>>> >                 The question is how it is self contained. E.g.
>>> >                 DoubleCoder is self contained because it always uses
>>> >                 exactly 8 bytes, but one needs to know the double coder
>>> >                 to leverage this. VarInt coder is self-contained a
>>> >                 different way, as is StringCoder (which does just do
>>> >                 prefixing).
>>> >
>>> >
>>> >             Yes, you are right! I think it again that we can not add
>>> >             such interface for the coder, due to runner can not call it.
>>> >             And just one more thought: does it make sense to add a
>>> >             method such as "registerSelfContained Coder(xxx)" or so to
>>> >             let users register the coders which can be processed in the
>>> >             SDK Harness?  It's the responsibility of the SDK harness to
>>> >             ensure that the coder is supported.
>>> >
>>> >
>>> >         Basically, a "please don't add length prefixing to this coder,
>>> >         assume everyone else can understand it (and errors will ensue if
>>> >         anyone doesn't)" at the user level? Seems a bit dangerous. Also,
>>> >         there is not "the SDK"--there may be multiple other SDKs in
>>> >         general, and of course runner components, some of which may
>>> >         understand the coder in question and some of which may not.
>>> >
>>> >         I would say that if this becomes a problem, we could look at the
>>> >         pros and cons of various remedies, this being one alternative.
>>> >
>>> >
>>> >
>>> >
>>> >                 I am hopeful that schemas give us a rich enough way to
>>> >                 encode the vast majority of types that we will want to
>>> >                 transmit across language barriers (possibly with some
>>> >                 widening promotions). For high performance one will want
>>> >                 to use formats like arrow rather than one-off coders as
>>> >                 well, which also biases us towards the schema work. The
>>> >                 set of StandardCoders is not closed, and nor is the
>>> >                 possibility of figuring out a way to communicate outside
>>> >                 this set for a particular pair of languages, but I think
>>> >                 it makes sense to avoid going that direction unless we
>>> >                 have to due to the increased API surface aread and
>>> >                 complexity it imposes on all runners and SDKs.
>>> >
>>> >
>>> >             Great! Could you share some links about the schema work. It
>>> >             seems very interesting and promising.
>>> >
>>> >
>>> >         https://beam.apache.org/contribute/design-documents/#sql--schema 
>>> > and
>>> >         of particular relevance https://s.apache.org/beam-schemas
>>> >
>>> >
>>> >

Reply via email to