Hi Reuven, I think you have provided an optional solution for other community which wants to take advantage of Beam's existing achievements. Thank you very much!
I think the Flink community can choose to copy from Beam's code or choose to rely directly on the beam's class library. The Flink community also initiated a discussion, more info can be found here <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html#a28096> The purpose of Turns `WindowedValue<T>` into `T` is to promote the interface design of Beam more versatile, so that other open source projects have the opportunity to take advantage of Beam's existing achievements. Of course, just changing the `WindowedValue<T>` into `T` is not enough to be shared by other projects in the form of a class library, we need to do more efforts. If Beam can provide a class library in the future, other community contributors will also have the willingness to contribute to the beam community. This will benefit both the community that wants to take advantage of Beam's existing achievements and the Beam community itself. And thanks to Thomas for that he has also made a lot of efforts in this regard. Thanks again for your valuable suggestion, and welcome any feedback! Best, Jincheng Reuven Lax <re...@google.com> 于2019年4月23日周二 上午1:00写道: > One concern here: these interfaces are intended for use within the Beam > project. Beam may decide to make specific changes to them to support needed > functionality in Beam. If they are being reused by other projects, then > those changes risk breaking those other projects in unexpected ways. I > don't think we can guarantee that we don't do that. If this is useful in > Flink, it would be safer to copy the code IMO rather than to directly > depend on it. > > On Mon, Apr 22, 2019 at 12:08 AM jincheng sun <sunjincheng...@gmail.com> > wrote: > >> Hi Kenn, >> >> Thanks for your reply, and explained the design of WindowValue clearly! >> >> At present, the definitions of `FnDataService` and `BeamFnDataClient` in >> Data Plane are very clear and universal, such as: send(...)/receive(...). >> If it is only applied in the project of Beam, it is already very good. >> Because `WindowValue` is a very basic data structure in the Beam project, >> both the Runner and the SDK harness have define the WindowedValue data >> structure. >> >> The reason I want to change the interface parameter from >> `WindowedValue<T>` to T is because I want to make the `Data Plane` >> interface into a class library that can be used by other projects (such as >> Apache Flink), so that other projects Can have its own `FnDataService` >> implementation. However, the definition of `WindowedValue` does not apply >> to all projects. For example, Apache Flink also has a definition similar to >> WindowedValue. For example, Apache Flink Stream has StreamRecord. If we >> change `WindowedValue<T>` to T, then other project's implementation does >> not need to wrap WindowedValue, the interface will become more concise. >> Furthermore, we only need one T, such as the Apache Flink DataSet operator. >> >> So, I agree with your understanding, I don't expect `WindowedValueXXX<T>` >> in the FnDataService interface, I hope to just use a `T`. >> >> Have you seen some problem if we change the interface parameter from >> `WindowedValue<T>` to T? >> >> Thanks, >> Jincheng >> >> Kenneth Knowles <k...@apache.org> 于2019年4月20日周六 上午2:38写道: >> >>> WindowedValue has always been an interface, not a concrete >>> representation: >>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java >>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L52>. >>> It is an abstract class because we started in Java 7 where you could not >>> have default methods, and just due to legacy style concerns. it is not just >>> discussed, but implemented, that there are WindowedValue implementations >>> with fewer allocations. >>> At the coder level, it was also always intended to have multiple >>> encodings. We already do have separate encodings based on whether there is >>> 1 window or multiple windows. The coder for a particular kind of >>> WindowedValue should decide this. Before the Fn API none of this had to be >>> standardized, because the runner could just choose whatever it wants. Now >>> we have to standardize any encodings that runners and harnesses both need >>> to know. There should be many, and adding more should be just a matter of >>> standardization, no new design. >>> >>> None of this should be user-facing or in the runner API / pipeline graph >>> - that is critical to making it flexible on the backend between the runner >>> & SDK harness. >>> >>> If I understand it, from our offline discussion, you are interested in >>> the case where you issue a ProcessBundleRequest to the SDK harness and none >>> of the primitives in the subgraph will ever observe the metadata. So you >>> want to not even have a tiny >>> "WindowedValueWithNoMetadata". Is that accurate? >>> >>> Kenn >>> >>> On Fri, Apr 19, 2019 at 10:17 AM jincheng sun <sunjincheng...@gmail.com> >>> wrote: >>> >>>> Thank you! And have a nice weekend! >>>> >>>> >>>> Lukasz Cwik <lc...@google.com> 于2019年4月20日周六 上午1:14写道: >>>> >>>>> I have added you as a contributor. >>>>> >>>>> On Fri, Apr 19, 2019 at 9:56 AM jincheng sun <sunjincheng...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Lukasz, >>>>>> >>>>>> Thanks for your affirmation and provide more contextual information. >>>>>> :) >>>>>> >>>>>> Would you please give me the contributor permission? My JIRA ID is >>>>>> sunjincheng121. >>>>>> >>>>>> I would like to create/assign tickets for this work. >>>>>> >>>>>> Thanks, >>>>>> Jincheng >>>>>> >>>>>> Lukasz Cwik <lc...@google.com> 于2019年4月20日周六 上午12:26写道: >>>>>> >>>>>>> Since I don't think this is a contentious change. >>>>>>> >>>>>>> On Fri, Apr 19, 2019 at 9:25 AM Lukasz Cwik <lc...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Yes, using T makes sense. >>>>>>>> >>>>>>>> The WindowedValue was meant to be a context object in the SDK >>>>>>>> harness that propagates various information about the current element. >>>>>>>> We >>>>>>>> have discussed in the past about: >>>>>>>> * making optimizations which would pass around less of the context >>>>>>>> information if we know that the DoFns don't need it (for example, all >>>>>>>> the >>>>>>>> values share the same window). >>>>>>>> * versioning the encoding separately from the WindowedValue context >>>>>>>> object (see recent discussion about element timestamp precision [1]) >>>>>>>> * the runner may want its own representation of a context object >>>>>>>> that makes sense for it which isn't a WindowedValue necessarily. >>>>>>>> >>>>>>>> Feel free to cut a JIRA about this and start working on a change >>>>>>>> towards this. >>>>>>>> >>>>>>>> 1: >>>>>>>> https://lists.apache.org/thread.html/221b06e81bba335d0ea8d770212cc7ee047dba65bec7978368a51473@%3Cdev.beam.apache.org%3E >>>>>>>> >>>>>>>> On Fri, Apr 19, 2019 at 3:18 AM jincheng sun < >>>>>>>> sunjincheng...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Beam devs, >>>>>>>>> >>>>>>>>> I read some of the docs about `Communicating over the Fn API` in >>>>>>>>> Beam. I feel that Beam has a very good design for Control Plane/Data >>>>>>>>> Plane/State Plane/Logging services, and it is described in <How to >>>>>>>>> send and >>>>>>>>> receive data> document. When communicating between Runner and SDK >>>>>>>>> Harness, >>>>>>>>> the DataPlane API will be WindowedValue(An immutable triple of value, >>>>>>>>> timestamp, and windows.) As a contract object between Runner and SDK >>>>>>>>> Harness. I see the interface definitions for sending and receiving >>>>>>>>> data in >>>>>>>>> the code as follows: >>>>>>>>> >>>>>>>>> - org.apache.beam.runners.fnexecution.data.FnDataService >>>>>>>>> >>>>>>>>> public interface FnDataService { >>>>>>>>>> <T> InboundDataClient receive(LogicalEndpoint inputLocation, >>>>>>>>>> Coder<WindowedValue<T>> coder, FnDataReceiver<WindowedValue<T>> >>>>>>>>>> listener); >>>>>>>>>> <T> CloseableFnDataReceiver<WindowedValue<T>> send( >>>>>>>>>> LogicalEndpoint outputLocation, Coder<WindowedValue<T>> >>>>>>>>>> coder); >>>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> - org.apache.beam.fn.harness.data.BeamFnDataClient >>>>>>>>> >>>>>>>>> public interface BeamFnDataClient { >>>>>>>>>> <T> InboundDataClient receive(ApiServiceDescriptor >>>>>>>>>> apiServiceDescriptor, LogicalEndpoint inputLocation, >>>>>>>>>> Coder<WindowedValue<T>> coder, FnDataReceiver<WindowedValue<T>> >>>>>>>>>> receiver); >>>>>>>>>> <T> CloseableFnDataReceiver<WindowedValue<T>> >>>>>>>>>> send(BeamFnDataGrpcClient Endpoints.ApiServiceDescriptor >>>>>>>>>> apiServiceDescriptor, LogicalEndpoint outputLocation, >>>>>>>>>> Coder<WindowedValue<T>> coder); >>>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> Both `Coder<WindowedValue<T>>` and >>>>>>>>> `FnDataReceiver<WindowedValue<T>>` use `WindowedValue` as the data >>>>>>>>> structure that both sides of Runner and SDK Harness know each other. >>>>>>>>> Control Plane/Data Plane/State Plane/Logging is a highly abstraction, >>>>>>>>> such >>>>>>>>> as Control Plane and Logging, these are common requirements for all >>>>>>>>> multi-language platforms. For example, the Flink community is also >>>>>>>>> discussing how to support Python UDF, as well as how to deal with >>>>>>>>> docker >>>>>>>>> environment. how to data transfer, how to state access, how to >>>>>>>>> logging etc. >>>>>>>>> If Beam can further abstract these service interfaces, i.e., interface >>>>>>>>> definitions are compatible with multiple engines, and finally >>>>>>>>> provided to >>>>>>>>> other projects in the form of class libraries, it definitely will help >>>>>>>>> other platforms that want to support multiple languages. So could >>>>>>>>> beam can >>>>>>>>> further abstract the interface definition of FnDataService's >>>>>>>>> BeamFnDataClient? Here I am to throw out a minnow to catch a whale, >>>>>>>>> take >>>>>>>>> the FnDataService#receive interface as an example, and turn >>>>>>>>> `WindowedValue<T>` into `T` so that other platforms can be extended >>>>>>>>> arbitrarily, as follows: >>>>>>>>> >>>>>>>>> <T> InboundDataClient receive(LogicalEndpoint inputLocation, >>>>>>>>> Coder<T> coder, FnDataReceiver<T>> listener); >>>>>>>>> >>>>>>>>> What do you think? >>>>>>>>> >>>>>>>>> Feel free to correct me if there any incorrect understanding. And >>>>>>>>> welcome any feedback! >>>>>>>>> >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Jincheng >>>>>>>>> >>>>>>>>