Thanks for share the detail of the current StandardCoders Max!
That's true, Flink may should defined some of coders, And I will share the
POC in the Flink Python UDFs DISCUSS Thread later :)

Best,
Jincheng

Maximilian Michels <m...@apache.org> 于2019年7月31日周三 下午2:53写道:

> Hi Jincheng,
>
> Thanks for getting back to us.
>
> > For the next major release of Flink, we plan to add Python user defined
> functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam
> portability framework and think that it is perfect for our requirements.
> However we also find some improvements needed for Beam:
>
> That sounds great! The improvement list contains very reasonable
> suggestions, some of them which are already on our TODO list. I think
> Thomas and Robert already provided the answers you were looking for.
>
> > Open questions:
> > ---------------------
> > 1) Which coders should be / can be defined in StandardCoders?
>
> The ones which are present now those are:
>
>           BYTES_CODER
>           INT64_CODER
>           STRING_UTF8
>           ITERABLE_CODER
>           TIMER_CODER
>           KV_CODER
>           LENGTH_PREFIX_CODER
>           GLOBAL_WINDOW_CODER
>           INTERVAL_WINDOW_CODER
>           WINDOWED_VALUE_CODER
>           DOUBLE_CODER
>
> Note, that this is just across SDK borders. If you stay within one SDK,
> you can use any coder. If a Runner wants to replace a particular coder
> with its own coder implementation, it could do that. Flink may want to
> use its own set of coders for the sake of coder migration. Another
> option Robert alluded to, would be to make use of Schema were possible,
> which has been built with migration in mind.
>
> Thanks,
> Max
>
> >
> > Must Have:
> > ----------------
> > 1) Currently only BagState is supported in gRPC protocol and I think we
> should support more kinds of state types, such as MapState, ValueState,
> ReducingState, CombiningState(AggregatingState in Flink), etc. That's
> because these kinds of state will be used in both user-defined function or
> Flink Python DataStream API.
> >
> > 2) There are warnings that Python 3 is not fully supported in Beam
> (beam/sdks/python/setup.py). We should support Python 3.x for the beam
> portability framework due to Python 2 will be not supported officially.
> >
> > 3) The configuration "semi_persist_dir" is not set in EnvironmentFactory
> at the runner side. Why I think it's  must to have is because when the
> environment type is "PROCESS", the default value "/tmp" may become a big
> problem.
> >
> > 4) The buffer size configure policy should be improved, such as:
> >    At runner side, the buffer limit in
> BeamFnDataBufferingOutboundObserver is size based. We should also support
> time based especially for the streaming case.
> >    At Python SDK Harness, the buffer size is not configurable in
> GrpcDataService. The input queue size of the input buffer in Python SDK
> Harness is not size limited.
> >   The flush threshold of the output buffer in Python SDK Harness is 10
> MB by default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make the
> threshold configurable and support time based threshold.
> >
> > Nice To Have:
> > -------------------
> > 1) Improves the interfaces of FnDataService, BundleProcessor,
> ActiveBundle, etc, to change the parameter type from WindowedValue<T> to T.
> (We have already discussed in the previous mails)
> >
> > 2) Refactor the code to avoid unnecessary dependencies pull in. For
> example, beam-sdks-java-core(11MB) is a package for Java SDK users and it
> is pull in because there are a few classes in beam-sdks-java-core are used
> in beam-runners-java-fn-execution, such as:
> > PipelineOptions used in DefaultJobBundleFactory FileSystems used in
> BeamFileSystemArtifactRetrievalService.
> > It means maybe we can add a new module such as beam-sdks-java-common to
> hold the classes used by both runner and SDK.
> >
> > 3) State cache is not shared between bundles which is performance
> critical for streaming jobs.
> >
> > 4) The coder of WindowedValue cannot be configured and most of time we
> don't need to serialize and deserialize the timestamp, window and pane
> properties in Flink. But currently FullWindowedValueCoder is used by
> default in WireCoders.addWireCoder, I suggest to make the coder
> configurable (i.e. allowing to use ValueOnlyWindowedValueCoder)
> >
> > 5) Currently if a coder is not defined in StandardCoders, it will be
> wrapped with LengthPrefixedCoder (WireCoders.addWireCoder ->
> LengthPrefixUnknownCoders.addLengthPrefixedCoder). However, only a few
> coders are defined in StandardCoders. It means that for most coder, a
> length will be added to the serialized bytes which is not necessary in my
> thoughts. My suggestion is maybe we can add some interfaces or tags for the
> coder which indicate whether the coder is needed a length prefix or not.
> >
> > 6) Set log level according to PipelineOption in Python SDK Harness.
> Currently the log level is set to INFO by default.
> >
> > 7) Allows to start up StatusServer according to PipelineOption in Python
> SDK Harness. Currently the StatusServer is start up by default.
> >
> > Although I put 3) 4) 5) into the "Nice to Have" as they are performance
> related, I still think they are very critical for Python UDF execution
> performance.
> >
> > Open questions:
> > ---------------------
> > 1) Which coders should be / can be defined in StandardCoders?
> >
> > Currently we are preparing the design of how to support Python UDF in
> Flink based on the Beam portability framework and we will bring up the
> discussion in Flink community. We may propose more changes for Beam during
> that time and may need more support from Beam community.
> >
> > To be honest, I'm not an expert of Beam and so please feel free to
> correct me if my understanding is wrong. Welcome any feedback.
> >
> > Best,
> > Jincheng
>
>
>
>
>
>
> Cheers,
> Max
>
> On 31.07.19 12:16, Robert Bradshaw wrote:
> > Yep, Python support under active development,
> > e.g. https://github.com/apache/beam/pull/9188
> >
> > On Wed, Jul 31, 2019 at 9:24 AM jincheng sun <sunjincheng...@gmail.com
> > <mailto:sunjincheng...@gmail.com>> wrote:
> >
> >     Thanks a lot for sharing the link. I take a quick look at the design
> >     and the implementation in Java and think it could address my
> >     concern. It seems that it's still not supported in the Python SDK
> >     Harness. Is there any plan on that?
> >
> >     Robert Bradshaw <rober...@google.com <mailto:rober...@google.com>>
> >     于2019年7月30日周二 下午12:33写道:
> >
> >         On Tue, Jul 30, 2019 at 11:52 AM jincheng sun
> >         <sunjincheng...@gmail.com <mailto:sunjincheng...@gmail.com>>
> wrote:
> >
> >
> >                     Is it possible to add an interface such as
> >                     `isSelfContained()` to the `Coder`? This interface
> >                     indicates
> >                     whether the serialized bytes are self contained. If
> >                     it returns true, then there is no need to add a
> >                     prefixing length.
> >                     In this way, there is no need to introduce an extra
> >                     protocol,  Please correct me if I missed something :)
> >
> >
> >                 The question is how it is self contained. E.g.
> >                 DoubleCoder is self contained because it always uses
> >                 exactly 8 bytes, but one needs to know the double coder
> >                 to leverage this. VarInt coder is self-contained a
> >                 different way, as is StringCoder (which does just do
> >                 prefixing).
> >
> >
> >             Yes, you are right! I think it again that we can not add
> >             such interface for the coder, due to runner can not call it.
> >             And just one more thought: does it make sense to add a
> >             method such as "registerSelfContained Coder(xxx)" or so to
> >             let users register the coders which can be processed in the
> >             SDK Harness?  It's the responsibility of the SDK harness to
> >             ensure that the coder is supported.
> >
> >
> >         Basically, a "please don't add length prefixing to this coder,
> >         assume everyone else can understand it (and errors will ensue if
> >         anyone doesn't)" at the user level? Seems a bit dangerous. Also,
> >         there is not "the SDK"--there may be multiple other SDKs in
> >         general, and of course runner components, some of which may
> >         understand the coder in question and some of which may not.
> >
> >         I would say that if this becomes a problem, we could look at the
> >         pros and cons of various remedies, this being one alternative.
> >
> >
> >
> >
> >                 I am hopeful that schemas give us a rich enough way to
> >                 encode the vast majority of types that we will want to
> >                 transmit across language barriers (possibly with some
> >                 widening promotions). For high performance one will want
> >                 to use formats like arrow rather than one-off coders as
> >                 well, which also biases us towards the schema work. The
> >                 set of StandardCoders is not closed, and nor is the
> >                 possibility of figuring out a way to communicate outside
> >                 this set for a particular pair of languages, but I think
> >                 it makes sense to avoid going that direction unless we
> >                 have to due to the increased API surface aread and
> >                 complexity it imposes on all runners and SDKs.
> >
> >
> >             Great! Could you share some links about the schema work. It
> >             seems very interesting and promising.
> >
> >
> >         https://beam.apache.org/contribute/design-documents/#sql--schema
>  and
> >         of particular relevance https://s.apache.org/beam-schemas
> >
> >
> >
>

Reply via email to