The list looks good to me. Thanks for summarizing. Feel free to dive into any of these issues yourself :).
On Fri, Aug 2, 2019 at 6:24 PM jincheng sun <sunjincheng...@gmail.com> wrote: > > Hi all, > > > Thanks a lot for sharing your thoughts! > > > It seems that we have already reached consensus for the following items. > Could you please read through them again and double-check if you all agree > with these? If yes, then I would start creating JIRA issues for those that > don’t yet have a JIRA issue > > > 1. Items that require improvements of Beam: > > > 1) The configuration of "semi_persist_dir" should be configurable. (TODO) > > https://github.com/apache/beam/blob/master/sdks/python/container/boot.go#L48 > > > 2) Time-based cache threshold should be supported. (TODO) > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/data_plane.py#L259 > > https://github.com/apache/beam/blob/master/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java > > > 3) Cross-bundle cache should be supported. > (https://issues.apache.org/jira/browse/BEAM-5428) > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349 > > > 4) Allows to configure the log level. (TODO) > > https://issues.apache.org/jira/browse/BEAM-5468 > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker_main.py#L102 > > > 5) Improves the interfaces of classes such as FnDataService, BundleProcessor, > ActiveBundle, etc to change the parameter type from WindowedValue<T> to T. > (TODO) > > > 6) Python 3 is already supported in Beam. The warning should be removed. > (TODO) > > https://github.com/apache/beam/blob/master/sdks/python/setup.py#L179 > > > 7) The coder of WindowedValue should be configurable which makes it possible > to use customization coder such as ValueOnlyWindowedValueCoder. (TODO) > > https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L91 > > > 8) The schema work can be used to solve the performance issue of the extra > prefixing length of encoding. However, it should also be supported in Python. > (https://github.com/apache/beam/pull/9188) > > https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto > > > 9) MapState should be supported in the gRPC protocol. (TODO) > > https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L662 > > > > > 2. Items where we don’t need to do anything for now: > > > 1) The default buffer size is enough for most cases and there is no need to > make it configurable for now. > > 2) Do not support ValueState in the gRPC protocol for now unless we have > evidence it matters. > > > > If there are any incorrect understanding, please feel free to correct me :) > > > -------------------- > > > There are also some items that I didn’t bring up earlier which require > further discussion: > > 1) The input queue size of the input buffer in Python SDK Harness is not size > limited. We should give a reasonable default size. > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/data_plane.py#L175 > > > 2) Refactor the code to avoid unnecessary dependencies pull in. For example, > beam-sdks-java-core(11MB) is a package for Java SDK users and it is pull in > because there are a few classes in beam-sdks-java-core are used in > beam-runners-java-fn-execution, such as: > > PipelineOptions used in DefaultJobBundleFactory FileSystems used in > BeamFileSystemArtifactRetrievalService. > > It means maybe we can add a new module such as beam-sdks-java-common to hold > the classes used by both runner and SDK. > > > 3) Allows to start up StatusServer according to configuration in Python SDK > Harness. Currently the StatusServer is start up by default. > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker_main.py#L113 > > > > Best, > > Jincheng > > > jincheng sun <sunjincheng...@gmail.com> 于2019年8月2日周五 下午4:14写道: >> >> Thanks for share the detail of the current StandardCoders Max! >> That's true, Flink may should defined some of coders, And I will share the >> POC in the Flink Python UDFs DISCUSS Thread later :) >> >> Best, >> Jincheng >> >> Maximilian Michels <m...@apache.org> 于2019年7月31日周三 下午2:53写道: >>> >>> Hi Jincheng, >>> >>> Thanks for getting back to us. >>> >>> > For the next major release of Flink, we plan to add Python user defined >>> > functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam >>> > portability framework and think that it is perfect for our requirements. >>> > However we also find some improvements needed for Beam: >>> >>> That sounds great! The improvement list contains very reasonable >>> suggestions, some of them which are already on our TODO list. I think >>> Thomas and Robert already provided the answers you were looking for. >>> >>> > Open questions: >>> > --------------------- >>> > 1) Which coders should be / can be defined in StandardCoders? >>> >>> The ones which are present now those are: >>> >>> BYTES_CODER >>> INT64_CODER >>> STRING_UTF8 >>> ITERABLE_CODER >>> TIMER_CODER >>> KV_CODER >>> LENGTH_PREFIX_CODER >>> GLOBAL_WINDOW_CODER >>> INTERVAL_WINDOW_CODER >>> WINDOWED_VALUE_CODER >>> DOUBLE_CODER >>> >>> Note, that this is just across SDK borders. If you stay within one SDK, >>> you can use any coder. If a Runner wants to replace a particular coder >>> with its own coder implementation, it could do that. Flink may want to >>> use its own set of coders for the sake of coder migration. Another >>> option Robert alluded to, would be to make use of Schema were possible, >>> which has been built with migration in mind. >>> >>> Thanks, >>> Max >>> >>> > >>> > Must Have: >>> > ---------------- >>> > 1) Currently only BagState is supported in gRPC protocol and I think we >>> > should support more kinds of state types, such as MapState, ValueState, >>> > ReducingState, CombiningState(AggregatingState in Flink), etc. That's >>> > because these kinds of state will be used in both user-defined function >>> > or Flink Python DataStream API. >>> > >>> > 2) There are warnings that Python 3 is not fully supported in Beam >>> > (beam/sdks/python/setup.py). We should support Python 3.x for the beam >>> > portability framework due to Python 2 will be not supported officially. >>> > >>> > 3) The configuration "semi_persist_dir" is not set in EnvironmentFactory >>> > at the runner side. Why I think it's must to have is because when the >>> > environment type is "PROCESS", the default value "/tmp" may become a big >>> > problem. >>> > >>> > 4) The buffer size configure policy should be improved, such as: >>> > At runner side, the buffer limit in >>> > BeamFnDataBufferingOutboundObserver is size based. We should also support >>> > time based especially for the streaming case. >>> > At Python SDK Harness, the buffer size is not configurable in >>> > GrpcDataService. The input queue size of the input buffer in Python SDK >>> > Harness is not size limited. >>> > The flush threshold of the output buffer in Python SDK Harness is 10 MB >>> > by default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make the >>> > threshold configurable and support time based threshold. >>> > >>> > Nice To Have: >>> > ------------------- >>> > 1) Improves the interfaces of FnDataService, BundleProcessor, >>> > ActiveBundle, etc, to change the parameter type from WindowedValue<T> to >>> > T. (We have already discussed in the previous mails) >>> > >>> > 2) Refactor the code to avoid unnecessary dependencies pull in. For >>> > example, beam-sdks-java-core(11MB) is a package for Java SDK users and it >>> > is pull in because there are a few classes in beam-sdks-java-core are >>> > used in beam-runners-java-fn-execution, such as: >>> > PipelineOptions used in DefaultJobBundleFactory FileSystems used in >>> > BeamFileSystemArtifactRetrievalService. >>> > It means maybe we can add a new module such as beam-sdks-java-common to >>> > hold the classes used by both runner and SDK. >>> > >>> > 3) State cache is not shared between bundles which is performance >>> > critical for streaming jobs. >>> > >>> > 4) The coder of WindowedValue cannot be configured and most of time we >>> > don't need to serialize and deserialize the timestamp, window and pane >>> > properties in Flink. But currently FullWindowedValueCoder is used by >>> > default in WireCoders.addWireCoder, I suggest to make the coder >>> > configurable (i.e. allowing to use ValueOnlyWindowedValueCoder) >>> > >>> > 5) Currently if a coder is not defined in StandardCoders, it will be >>> > wrapped with LengthPrefixedCoder (WireCoders.addWireCoder -> >>> > LengthPrefixUnknownCoders.addLengthPrefixedCoder). However, only a few >>> > coders are defined in StandardCoders. It means that for most coder, a >>> > length will be added to the serialized bytes which is not necessary in my >>> > thoughts. My suggestion is maybe we can add some interfaces or tags for >>> > the coder which indicate whether the coder is needed a length prefix or >>> > not. >>> > >>> > 6) Set log level according to PipelineOption in Python SDK Harness. >>> > Currently the log level is set to INFO by default. >>> > >>> > 7) Allows to start up StatusServer according to PipelineOption in Python >>> > SDK Harness. Currently the StatusServer is start up by default. >>> > >>> > Although I put 3) 4) 5) into the "Nice to Have" as they are performance >>> > related, I still think they are very critical for Python UDF execution >>> > performance. >>> > >>> > Open questions: >>> > --------------------- >>> > 1) Which coders should be / can be defined in StandardCoders? >>> > >>> > Currently we are preparing the design of how to support Python UDF in >>> > Flink based on the Beam portability framework and we will bring up the >>> > discussion in Flink community. We may propose more changes for Beam >>> > during that time and may need more support from Beam community. >>> > >>> > To be honest, I'm not an expert of Beam and so please feel free to >>> > correct me if my understanding is wrong. Welcome any feedback. >>> > >>> > Best, >>> > Jincheng >>> >>> >>> >>> >>> >>> >>> Cheers, >>> Max >>> >>> On 31.07.19 12:16, Robert Bradshaw wrote: >>> > Yep, Python support under active development, >>> > e.g. https://github.com/apache/beam/pull/9188 >>> > >>> > On Wed, Jul 31, 2019 at 9:24 AM jincheng sun <sunjincheng...@gmail.com >>> > <mailto:sunjincheng...@gmail.com>> wrote: >>> > >>> > Thanks a lot for sharing the link. I take a quick look at the design >>> > and the implementation in Java and think it could address my >>> > concern. It seems that it's still not supported in the Python SDK >>> > Harness. Is there any plan on that? >>> > >>> > Robert Bradshaw <rober...@google.com <mailto:rober...@google.com>> >>> > 于2019年7月30日周二 下午12:33写道: >>> > >>> > On Tue, Jul 30, 2019 at 11:52 AM jincheng sun >>> > <sunjincheng...@gmail.com <mailto:sunjincheng...@gmail.com>> >>> > wrote: >>> > >>> > >>> > Is it possible to add an interface such as >>> > `isSelfContained()` to the `Coder`? This interface >>> > indicates >>> > whether the serialized bytes are self contained. If >>> > it returns true, then there is no need to add a >>> > prefixing length. >>> > In this way, there is no need to introduce an extra >>> > protocol, Please correct me if I missed something :) >>> > >>> > >>> > The question is how it is self contained. E.g. >>> > DoubleCoder is self contained because it always uses >>> > exactly 8 bytes, but one needs to know the double coder >>> > to leverage this. VarInt coder is self-contained a >>> > different way, as is StringCoder (which does just do >>> > prefixing). >>> > >>> > >>> > Yes, you are right! I think it again that we can not add >>> > such interface for the coder, due to runner can not call it. >>> > And just one more thought: does it make sense to add a >>> > method such as "registerSelfContained Coder(xxx)" or so to >>> > let users register the coders which can be processed in the >>> > SDK Harness? It's the responsibility of the SDK harness to >>> > ensure that the coder is supported. >>> > >>> > >>> > Basically, a "please don't add length prefixing to this coder, >>> > assume everyone else can understand it (and errors will ensue if >>> > anyone doesn't)" at the user level? Seems a bit dangerous. Also, >>> > there is not "the SDK"--there may be multiple other SDKs in >>> > general, and of course runner components, some of which may >>> > understand the coder in question and some of which may not. >>> > >>> > I would say that if this becomes a problem, we could look at the >>> > pros and cons of various remedies, this being one alternative. >>> > >>> > >>> > >>> > >>> > I am hopeful that schemas give us a rich enough way to >>> > encode the vast majority of types that we will want to >>> > transmit across language barriers (possibly with some >>> > widening promotions). For high performance one will want >>> > to use formats like arrow rather than one-off coders as >>> > well, which also biases us towards the schema work. The >>> > set of StandardCoders is not closed, and nor is the >>> > possibility of figuring out a way to communicate outside >>> > this set for a particular pair of languages, but I think >>> > it makes sense to avoid going that direction unless we >>> > have to due to the increased API surface aread and >>> > complexity it imposes on all runners and SDKs. >>> > >>> > >>> > Great! Could you share some links about the schema work. It >>> > seems very interesting and promising. >>> > >>> > >>> > https://beam.apache.org/contribute/design-documents/#sql--schema >>> > and >>> > of particular relevance https://s.apache.org/beam-schemas >>> > >>> > >>> >