Hi all, Thanks for your confirmation Robert! :)
Thanks for share the details about the state discussion Luke! :) The MapState is a bit complex, I think it's better to add some detail design doc when we deal with the map state supported. I will create JIRAs and follow up on subsequent developments. If there are big changes, I will provide detailed design documentation and bring up the discussion in ML. Thanks everyone for joining this discussion. Best, Jincheng Lukasz Cwik <lc...@google.com> 于2019年8月7日周三 下午8:19写道: > I wanted to add some more details about the state discussion. > > BEAM-7000 is about adding support for a gRPC message saying that the SDK > is now blocked on one of its requests. This would allow for an easy > optimization on the runner side where it gathers requests and is able to > batch them knowing that the SDK is only blocked once it sees one of the > blocked gRPC messages. This would make it easy for the runner to gather up > clear + append calls and convert them to sets internally. > > Also, most of the reason around map state not existing has been since we > haven't discuessed the changes to the gRPC APIs that we need. (things like, > can you lookup/clear/append to ranges?, map or multimap?, should we really > just get rid of bag state in favor of a multimap state?, can you enumerate > keys?, know how many keys there are?, ...) > > On Wed, Aug 7, 2019 at 9:52 AM Robert Bradshaw <rober...@google.com> > wrote: > >> The list looks good to me. Thanks for summarizing. Feel free to dive >> into any of these issues yourself :). >> >> On Fri, Aug 2, 2019 at 6:24 PM jincheng sun <sunjincheng...@gmail.com> >> wrote: >> > >> > Hi all, >> > >> > >> > Thanks a lot for sharing your thoughts! >> > >> > >> > It seems that we have already reached consensus for the following >> items. Could you please read through them again and double-check if you all >> agree with these? If yes, then I would start creating JIRA issues for those >> that don’t yet have a JIRA issue >> > >> > >> > 1. Items that require improvements of Beam: >> > >> > >> > 1) The configuration of "semi_persist_dir" should be configurable. >> (TODO) >> > >> > >> https://github.com/apache/beam/blob/master/sdks/python/container/boot.go#L48 >> > >> > >> > 2) Time-based cache threshold should be supported. (TODO) >> > >> > >> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/data_plane.py#L259 >> > >> > >> https://github.com/apache/beam/blob/master/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java >> > >> > >> > 3) Cross-bundle cache should be supported. ( >> https://issues.apache.org/jira/browse/BEAM-5428) >> > >> > >> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349 >> > >> > >> > 4) Allows to configure the log level. (TODO) >> > >> > https://issues.apache.org/jira/browse/BEAM-5468 >> > >> > >> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker_main.py#L102 >> > >> > >> > 5) Improves the interfaces of classes such as FnDataService, >> BundleProcessor, ActiveBundle, etc to change the parameter type from >> WindowedValue<T> to T. (TODO) >> > >> > >> > 6) Python 3 is already supported in Beam. The warning should be >> removed. (TODO) >> > >> > https://github.com/apache/beam/blob/master/sdks/python/setup.py#L179 >> > >> > >> > 7) The coder of WindowedValue should be configurable which makes it >> possible to use customization coder such as ValueOnlyWindowedValueCoder. >> (TODO) >> > >> > >> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L91 >> > >> > >> > 8) The schema work can be used to solve the performance issue of the >> extra prefixing length of encoding. However, it should also be supported in >> Python. (https://github.com/apache/beam/pull/9188) >> > >> > >> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto >> > >> > >> > 9) MapState should be supported in the gRPC protocol. (TODO) >> > >> > >> https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L662 >> > >> > >> > >> > >> > 2. Items where we don’t need to do anything for now: >> > >> > >> > 1) The default buffer size is enough for most cases and there is no >> need to make it configurable for now. >> > >> > 2) Do not support ValueState in the gRPC protocol for now unless we >> have evidence it matters. >> > >> > >> > >> > If there are any incorrect understanding, please feel free to correct >> me :) >> > >> > >> > -------------------- >> > >> > >> > There are also some items that I didn’t bring up earlier which require >> further discussion: >> > >> > 1) The input queue size of the input buffer in Python SDK Harness is >> not size limited. We should give a reasonable default size. >> > >> > >> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/data_plane.py#L175 >> > >> > >> > 2) Refactor the code to avoid unnecessary dependencies pull in. For >> example, beam-sdks-java-core(11MB) is a package for Java SDK users and it >> is pull in because there are a few classes in beam-sdks-java-core are used >> in beam-runners-java-fn-execution, such as: >> > >> > PipelineOptions used in DefaultJobBundleFactory FileSystems used in >> BeamFileSystemArtifactRetrievalService. >> > >> > It means maybe we can add a new module such as beam-sdks-java-common to >> hold the classes used by both runner and SDK. >> > >> > >> > 3) Allows to start up StatusServer according to configuration in Python >> SDK Harness. Currently the StatusServer is start up by default. >> > >> > >> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker_main.py#L113 >> > >> > >> > >> > Best, >> > >> > Jincheng >> > >> > >> > jincheng sun <sunjincheng...@gmail.com> 于2019年8月2日周五 下午4:14写道: >> >> >> >> Thanks for share the detail of the current StandardCoders Max! >> >> That's true, Flink may should defined some of coders, And I will share >> the POC in the Flink Python UDFs DISCUSS Thread later :) >> >> >> >> Best, >> >> Jincheng >> >> >> >> Maximilian Michels <m...@apache.org> 于2019年7月31日周三 下午2:53写道: >> >>> >> >>> Hi Jincheng, >> >>> >> >>> Thanks for getting back to us. >> >>> >> >>> > For the next major release of Flink, we plan to add Python user >> defined functions(UDF, UDTF, UDAF) support in Flink and I have go over the >> Beam portability framework and think that it is perfect for our >> requirements. However we also find some improvements needed for Beam: >> >>> >> >>> That sounds great! The improvement list contains very reasonable >> >>> suggestions, some of them which are already on our TODO list. I think >> >>> Thomas and Robert already provided the answers you were looking for. >> >>> >> >>> > Open questions: >> >>> > --------------------- >> >>> > 1) Which coders should be / can be defined in StandardCoders? >> >>> >> >>> The ones which are present now those are: >> >>> >> >>> BYTES_CODER >> >>> INT64_CODER >> >>> STRING_UTF8 >> >>> ITERABLE_CODER >> >>> TIMER_CODER >> >>> KV_CODER >> >>> LENGTH_PREFIX_CODER >> >>> GLOBAL_WINDOW_CODER >> >>> INTERVAL_WINDOW_CODER >> >>> WINDOWED_VALUE_CODER >> >>> DOUBLE_CODER >> >>> >> >>> Note, that this is just across SDK borders. If you stay within one >> SDK, >> >>> you can use any coder. If a Runner wants to replace a particular coder >> >>> with its own coder implementation, it could do that. Flink may want to >> >>> use its own set of coders for the sake of coder migration. Another >> >>> option Robert alluded to, would be to make use of Schema were >> possible, >> >>> which has been built with migration in mind. >> >>> >> >>> Thanks, >> >>> Max >> >>> >> >>> > >> >>> > Must Have: >> >>> > ---------------- >> >>> > 1) Currently only BagState is supported in gRPC protocol and I >> think we should support more kinds of state types, such as MapState, >> ValueState, ReducingState, CombiningState(AggregatingState in Flink), etc. >> That's because these kinds of state will be used in both user-defined >> function or Flink Python DataStream API. >> >>> > >> >>> > 2) There are warnings that Python 3 is not fully supported in Beam >> (beam/sdks/python/setup.py). We should support Python 3.x for the beam >> portability framework due to Python 2 will be not supported officially. >> >>> > >> >>> > 3) The configuration "semi_persist_dir" is not set in >> EnvironmentFactory at the runner side. Why I think it's must to have is >> because when the environment type is "PROCESS", the default value "/tmp" >> may become a big problem. >> >>> > >> >>> > 4) The buffer size configure policy should be improved, such as: >> >>> > At runner side, the buffer limit in >> BeamFnDataBufferingOutboundObserver is size based. We should also support >> time based especially for the streaming case. >> >>> > At Python SDK Harness, the buffer size is not configurable in >> GrpcDataService. The input queue size of the input buffer in Python SDK >> Harness is not size limited. >> >>> > The flush threshold of the output buffer in Python SDK Harness is >> 10 MB by default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make >> the threshold configurable and support time based threshold. >> >>> > >> >>> > Nice To Have: >> >>> > ------------------- >> >>> > 1) Improves the interfaces of FnDataService, BundleProcessor, >> ActiveBundle, etc, to change the parameter type from WindowedValue<T> to T. >> (We have already discussed in the previous mails) >> >>> > >> >>> > 2) Refactor the code to avoid unnecessary dependencies pull in. For >> example, beam-sdks-java-core(11MB) is a package for Java SDK users and it >> is pull in because there are a few classes in beam-sdks-java-core are used >> in beam-runners-java-fn-execution, such as: >> >>> > PipelineOptions used in DefaultJobBundleFactory FileSystems used in >> BeamFileSystemArtifactRetrievalService. >> >>> > It means maybe we can add a new module such as >> beam-sdks-java-common to hold the classes used by both runner and SDK. >> >>> > >> >>> > 3) State cache is not shared between bundles which is performance >> critical for streaming jobs. >> >>> > >> >>> > 4) The coder of WindowedValue cannot be configured and most of time >> we don't need to serialize and deserialize the timestamp, window and pane >> properties in Flink. But currently FullWindowedValueCoder is used by >> default in WireCoders.addWireCoder, I suggest to make the coder >> configurable (i.e. allowing to use ValueOnlyWindowedValueCoder) >> >>> > >> >>> > 5) Currently if a coder is not defined in StandardCoders, it will >> be wrapped with LengthPrefixedCoder (WireCoders.addWireCoder -> >> LengthPrefixUnknownCoders.addLengthPrefixedCoder). However, only a few >> coders are defined in StandardCoders. It means that for most coder, a >> length will be added to the serialized bytes which is not necessary in my >> thoughts. My suggestion is maybe we can add some interfaces or tags for the >> coder which indicate whether the coder is needed a length prefix or not. >> >>> > >> >>> > 6) Set log level according to PipelineOption in Python SDK Harness. >> Currently the log level is set to INFO by default. >> >>> > >> >>> > 7) Allows to start up StatusServer according to PipelineOption in >> Python SDK Harness. Currently the StatusServer is start up by default. >> >>> > >> >>> > Although I put 3) 4) 5) into the "Nice to Have" as they are >> performance related, I still think they are very critical for Python UDF >> execution performance. >> >>> > >> >>> > Open questions: >> >>> > --------------------- >> >>> > 1) Which coders should be / can be defined in StandardCoders? >> >>> > >> >>> > Currently we are preparing the design of how to support Python UDF >> in Flink based on the Beam portability framework and we will bring up the >> discussion in Flink community. We may propose more changes for Beam during >> that time and may need more support from Beam community. >> >>> > >> >>> > To be honest, I'm not an expert of Beam and so please feel free to >> correct me if my understanding is wrong. Welcome any feedback. >> >>> > >> >>> > Best, >> >>> > Jincheng >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> Cheers, >> >>> Max >> >>> >> >>> On 31.07.19 12:16, Robert Bradshaw wrote: >> >>> > Yep, Python support under active development, >> >>> > e.g. https://github.com/apache/beam/pull/9188 >> >>> > >> >>> > On Wed, Jul 31, 2019 at 9:24 AM jincheng sun < >> sunjincheng...@gmail.com >> >>> > <mailto:sunjincheng...@gmail.com>> wrote: >> >>> > >> >>> > Thanks a lot for sharing the link. I take a quick look at the >> design >> >>> > and the implementation in Java and think it could address my >> >>> > concern. It seems that it's still not supported in the Python >> SDK >> >>> > Harness. Is there any plan on that? >> >>> > >> >>> > Robert Bradshaw <rober...@google.com <mailto: >> rober...@google.com>> >> >>> > 于2019年7月30日周二 下午12:33写道: >> >>> > >> >>> > On Tue, Jul 30, 2019 at 11:52 AM jincheng sun >> >>> > <sunjincheng...@gmail.com <mailto:sunjincheng...@gmail.com>> >> wrote: >> >>> > >> >>> > >> >>> > Is it possible to add an interface such as >> >>> > `isSelfContained()` to the `Coder`? This >> interface >> >>> > indicates >> >>> > whether the serialized bytes are self >> contained. If >> >>> > it returns true, then there is no need to add a >> >>> > prefixing length. >> >>> > In this way, there is no need to introduce an >> extra >> >>> > protocol, Please correct me if I missed >> something :) >> >>> > >> >>> > >> >>> > The question is how it is self contained. E.g. >> >>> > DoubleCoder is self contained because it always uses >> >>> > exactly 8 bytes, but one needs to know the double >> coder >> >>> > to leverage this. VarInt coder is self-contained a >> >>> > different way, as is StringCoder (which does just do >> >>> > prefixing). >> >>> > >> >>> > >> >>> > Yes, you are right! I think it again that we can not add >> >>> > such interface for the coder, due to runner can not >> call it. >> >>> > And just one more thought: does it make sense to add a >> >>> > method such as "registerSelfContained Coder(xxx)" or so >> to >> >>> > let users register the coders which can be processed in >> the >> >>> > SDK Harness? It's the responsibility of the SDK >> harness to >> >>> > ensure that the coder is supported. >> >>> > >> >>> > >> >>> > Basically, a "please don't add length prefixing to this >> coder, >> >>> > assume everyone else can understand it (and errors will >> ensue if >> >>> > anyone doesn't)" at the user level? Seems a bit dangerous. >> Also, >> >>> > there is not "the SDK"--there may be multiple other SDKs in >> >>> > general, and of course runner components, some of which may >> >>> > understand the coder in question and some of which may not. >> >>> > >> >>> > I would say that if this becomes a problem, we could look >> at the >> >>> > pros and cons of various remedies, this being one >> alternative. >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > I am hopeful that schemas give us a rich enough way >> to >> >>> > encode the vast majority of types that we will want >> to >> >>> > transmit across language barriers (possibly with >> some >> >>> > widening promotions). For high performance one will >> want >> >>> > to use formats like arrow rather than one-off >> coders as >> >>> > well, which also biases us towards the schema work. >> The >> >>> > set of StandardCoders is not closed, and nor is the >> >>> > possibility of figuring out a way to communicate >> outside >> >>> > this set for a particular pair of languages, but I >> think >> >>> > it makes sense to avoid going that direction unless >> we >> >>> > have to due to the increased API surface aread and >> >>> > complexity it imposes on all runners and SDKs. >> >>> > >> >>> > >> >>> > Great! Could you share some links about the schema >> work. It >> >>> > seems very interesting and promising. >> >>> > >> >>> > >> >>> > >> https://beam.apache.org/contribute/design-documents/#sql--schema and >> >>> > of particular relevance https://s.apache.org/beam-schemas >> >>> > >> >>> > >> >>> > >> >