Thanks for share the detail of the current StandardCoders Max! That's true, Flink may should defined some of coders, And I will share the POC in the Flink Python UDFs DISCUSS Thread later :)
Best, Jincheng Maximilian Michels <m...@apache.org> 于2019年7月31日周三 下午2:53写道: > Hi Jincheng, > > Thanks for getting back to us. > > > For the next major release of Flink, we plan to add Python user defined > functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam > portability framework and think that it is perfect for our requirements. > However we also find some improvements needed for Beam: > > That sounds great! The improvement list contains very reasonable > suggestions, some of them which are already on our TODO list. I think > Thomas and Robert already provided the answers you were looking for. > > > Open questions: > > --------------------- > > 1) Which coders should be / can be defined in StandardCoders? > > The ones which are present now those are: > > BYTES_CODER > INT64_CODER > STRING_UTF8 > ITERABLE_CODER > TIMER_CODER > KV_CODER > LENGTH_PREFIX_CODER > GLOBAL_WINDOW_CODER > INTERVAL_WINDOW_CODER > WINDOWED_VALUE_CODER > DOUBLE_CODER > > Note, that this is just across SDK borders. If you stay within one SDK, > you can use any coder. If a Runner wants to replace a particular coder > with its own coder implementation, it could do that. Flink may want to > use its own set of coders for the sake of coder migration. Another > option Robert alluded to, would be to make use of Schema were possible, > which has been built with migration in mind. > > Thanks, > Max > > > > > Must Have: > > ---------------- > > 1) Currently only BagState is supported in gRPC protocol and I think we > should support more kinds of state types, such as MapState, ValueState, > ReducingState, CombiningState(AggregatingState in Flink), etc. That's > because these kinds of state will be used in both user-defined function or > Flink Python DataStream API. > > > > 2) There are warnings that Python 3 is not fully supported in Beam > (beam/sdks/python/setup.py). We should support Python 3.x for the beam > portability framework due to Python 2 will be not supported officially. > > > > 3) The configuration "semi_persist_dir" is not set in EnvironmentFactory > at the runner side. Why I think it's must to have is because when the > environment type is "PROCESS", the default value "/tmp" may become a big > problem. > > > > 4) The buffer size configure policy should be improved, such as: > > At runner side, the buffer limit in > BeamFnDataBufferingOutboundObserver is size based. We should also support > time based especially for the streaming case. > > At Python SDK Harness, the buffer size is not configurable in > GrpcDataService. The input queue size of the input buffer in Python SDK > Harness is not size limited. > > The flush threshold of the output buffer in Python SDK Harness is 10 > MB by default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make the > threshold configurable and support time based threshold. > > > > Nice To Have: > > ------------------- > > 1) Improves the interfaces of FnDataService, BundleProcessor, > ActiveBundle, etc, to change the parameter type from WindowedValue<T> to T. > (We have already discussed in the previous mails) > > > > 2) Refactor the code to avoid unnecessary dependencies pull in. For > example, beam-sdks-java-core(11MB) is a package for Java SDK users and it > is pull in because there are a few classes in beam-sdks-java-core are used > in beam-runners-java-fn-execution, such as: > > PipelineOptions used in DefaultJobBundleFactory FileSystems used in > BeamFileSystemArtifactRetrievalService. > > It means maybe we can add a new module such as beam-sdks-java-common to > hold the classes used by both runner and SDK. > > > > 3) State cache is not shared between bundles which is performance > critical for streaming jobs. > > > > 4) The coder of WindowedValue cannot be configured and most of time we > don't need to serialize and deserialize the timestamp, window and pane > properties in Flink. But currently FullWindowedValueCoder is used by > default in WireCoders.addWireCoder, I suggest to make the coder > configurable (i.e. allowing to use ValueOnlyWindowedValueCoder) > > > > 5) Currently if a coder is not defined in StandardCoders, it will be > wrapped with LengthPrefixedCoder (WireCoders.addWireCoder -> > LengthPrefixUnknownCoders.addLengthPrefixedCoder). However, only a few > coders are defined in StandardCoders. It means that for most coder, a > length will be added to the serialized bytes which is not necessary in my > thoughts. My suggestion is maybe we can add some interfaces or tags for the > coder which indicate whether the coder is needed a length prefix or not. > > > > 6) Set log level according to PipelineOption in Python SDK Harness. > Currently the log level is set to INFO by default. > > > > 7) Allows to start up StatusServer according to PipelineOption in Python > SDK Harness. Currently the StatusServer is start up by default. > > > > Although I put 3) 4) 5) into the "Nice to Have" as they are performance > related, I still think they are very critical for Python UDF execution > performance. > > > > Open questions: > > --------------------- > > 1) Which coders should be / can be defined in StandardCoders? > > > > Currently we are preparing the design of how to support Python UDF in > Flink based on the Beam portability framework and we will bring up the > discussion in Flink community. We may propose more changes for Beam during > that time and may need more support from Beam community. > > > > To be honest, I'm not an expert of Beam and so please feel free to > correct me if my understanding is wrong. Welcome any feedback. > > > > Best, > > Jincheng > > > > > > > Cheers, > Max > > On 31.07.19 12:16, Robert Bradshaw wrote: > > Yep, Python support under active development, > > e.g. https://github.com/apache/beam/pull/9188 > > > > On Wed, Jul 31, 2019 at 9:24 AM jincheng sun <sunjincheng...@gmail.com > > <mailto:sunjincheng...@gmail.com>> wrote: > > > > Thanks a lot for sharing the link. I take a quick look at the design > > and the implementation in Java and think it could address my > > concern. It seems that it's still not supported in the Python SDK > > Harness. Is there any plan on that? > > > > Robert Bradshaw <rober...@google.com <mailto:rober...@google.com>> > > 于2019年7月30日周二 下午12:33写道: > > > > On Tue, Jul 30, 2019 at 11:52 AM jincheng sun > > <sunjincheng...@gmail.com <mailto:sunjincheng...@gmail.com>> > wrote: > > > > > > Is it possible to add an interface such as > > `isSelfContained()` to the `Coder`? This interface > > indicates > > whether the serialized bytes are self contained. If > > it returns true, then there is no need to add a > > prefixing length. > > In this way, there is no need to introduce an extra > > protocol, Please correct me if I missed something :) > > > > > > The question is how it is self contained. E.g. > > DoubleCoder is self contained because it always uses > > exactly 8 bytes, but one needs to know the double coder > > to leverage this. VarInt coder is self-contained a > > different way, as is StringCoder (which does just do > > prefixing). > > > > > > Yes, you are right! I think it again that we can not add > > such interface for the coder, due to runner can not call it. > > And just one more thought: does it make sense to add a > > method such as "registerSelfContained Coder(xxx)" or so to > > let users register the coders which can be processed in the > > SDK Harness? It's the responsibility of the SDK harness to > > ensure that the coder is supported. > > > > > > Basically, a "please don't add length prefixing to this coder, > > assume everyone else can understand it (and errors will ensue if > > anyone doesn't)" at the user level? Seems a bit dangerous. Also, > > there is not "the SDK"--there may be multiple other SDKs in > > general, and of course runner components, some of which may > > understand the coder in question and some of which may not. > > > > I would say that if this becomes a problem, we could look at the > > pros and cons of various remedies, this being one alternative. > > > > > > > > > > I am hopeful that schemas give us a rich enough way to > > encode the vast majority of types that we will want to > > transmit across language barriers (possibly with some > > widening promotions). For high performance one will want > > to use formats like arrow rather than one-off coders as > > well, which also biases us towards the schema work. The > > set of StandardCoders is not closed, and nor is the > > possibility of figuring out a way to communicate outside > > this set for a particular pair of languages, but I think > > it makes sense to avoid going that direction unless we > > have to due to the increased API surface aread and > > complexity it imposes on all runners and SDKs. > > > > > > Great! Could you share some links about the schema work. It > > seems very interesting and promising. > > > > > > https://beam.apache.org/contribute/design-documents/#sql--schema > and > > of particular relevance https://s.apache.org/beam-schemas > > > > > > >