Makes sense to me. I don't see any problem. Kenn
On Mon, Apr 22, 2019 at 12:08 AM jincheng sun <sunjincheng...@gmail.com> wrote: > Hi Kenn, > > Thanks for your reply, and explained the design of WindowValue clearly! > > At present, the definitions of `FnDataService` and `BeamFnDataClient` in > Data Plane are very clear and universal, such as: send(...)/receive(...). > If it is only applied in the project of Beam, it is already very good. > Because `WindowValue` is a very basic data structure in the Beam project, > both the Runner and the SDK harness have define the WindowedValue data > structure. > > The reason I want to change the interface parameter from > `WindowedValue<T>` to T is because I want to make the `Data Plane` > interface into a class library that can be used by other projects (such as > Apache Flink), so that other projects Can have its own `FnDataService` > implementation. However, the definition of `WindowedValue` does not apply > to all projects. For example, Apache Flink also has a definition similar to > WindowedValue. For example, Apache Flink Stream has StreamRecord. If we > change `WindowedValue<T>` to T, then other project's implementation does > not need to wrap WindowedValue, the interface will become more concise. > Furthermore, we only need one T, such as the Apache Flink DataSet operator. > > So, I agree with your understanding, I don't expect `WindowedValueXXX<T>` > in the FnDataService interface, I hope to just use a `T`. > > Have you seen some problem if we change the interface parameter from > `WindowedValue<T>` to T? > > Thanks, > Jincheng > > Kenneth Knowles <k...@apache.org> 于2019年4月20日周六 上午2:38写道: > >> WindowedValue has always been an interface, not a concrete >> representation: >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java >> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L52>. >> It is an abstract class because we started in Java 7 where you could not >> have default methods, and just due to legacy style concerns. it is not just >> discussed, but implemented, that there are WindowedValue implementations >> with fewer allocations. >> At the coder level, it was also always intended to have multiple >> encodings. We already do have separate encodings based on whether there is >> 1 window or multiple windows. The coder for a particular kind of >> WindowedValue should decide this. Before the Fn API none of this had to be >> standardized, because the runner could just choose whatever it wants. Now >> we have to standardize any encodings that runners and harnesses both need >> to know. There should be many, and adding more should be just a matter of >> standardization, no new design. >> >> None of this should be user-facing or in the runner API / pipeline graph >> - that is critical to making it flexible on the backend between the runner >> & SDK harness. >> >> If I understand it, from our offline discussion, you are interested in >> the case where you issue a ProcessBundleRequest to the SDK harness and none >> of the primitives in the subgraph will ever observe the metadata. So you >> want to not even have a tiny >> "WindowedValueWithNoMetadata". Is that accurate? >> >> Kenn >> >> On Fri, Apr 19, 2019 at 10:17 AM jincheng sun <sunjincheng...@gmail.com> >> wrote: >> >>> Thank you! And have a nice weekend! >>> >>> >>> Lukasz Cwik <lc...@google.com> 于2019年4月20日周六 上午1:14写道: >>> >>>> I have added you as a contributor. >>>> >>>> On Fri, Apr 19, 2019 at 9:56 AM jincheng sun <sunjincheng...@gmail.com> >>>> wrote: >>>> >>>>> Hi Lukasz, >>>>> >>>>> Thanks for your affirmation and provide more contextual information. :) >>>>> >>>>> Would you please give me the contributor permission? My JIRA ID is >>>>> sunjincheng121. >>>>> >>>>> I would like to create/assign tickets for this work. >>>>> >>>>> Thanks, >>>>> Jincheng >>>>> >>>>> Lukasz Cwik <lc...@google.com> 于2019年4月20日周六 上午12:26写道: >>>>> >>>>>> Since I don't think this is a contentious change. >>>>>> >>>>>> On Fri, Apr 19, 2019 at 9:25 AM Lukasz Cwik <lc...@google.com> wrote: >>>>>> >>>>>>> Yes, using T makes sense. >>>>>>> >>>>>>> The WindowedValue was meant to be a context object in the SDK >>>>>>> harness that propagates various information about the current element. >>>>>>> We >>>>>>> have discussed in the past about: >>>>>>> * making optimizations which would pass around less of the context >>>>>>> information if we know that the DoFns don't need it (for example, all >>>>>>> the >>>>>>> values share the same window). >>>>>>> * versioning the encoding separately from the WindowedValue context >>>>>>> object (see recent discussion about element timestamp precision [1]) >>>>>>> * the runner may want its own representation of a context object >>>>>>> that makes sense for it which isn't a WindowedValue necessarily. >>>>>>> >>>>>>> Feel free to cut a JIRA about this and start working on a change >>>>>>> towards this. >>>>>>> >>>>>>> 1: >>>>>>> https://lists.apache.org/thread.html/221b06e81bba335d0ea8d770212cc7ee047dba65bec7978368a51473@%3Cdev.beam.apache.org%3E >>>>>>> >>>>>>> On Fri, Apr 19, 2019 at 3:18 AM jincheng sun < >>>>>>> sunjincheng...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Beam devs, >>>>>>>> >>>>>>>> I read some of the docs about `Communicating over the Fn API` in >>>>>>>> Beam. I feel that Beam has a very good design for Control Plane/Data >>>>>>>> Plane/State Plane/Logging services, and it is described in <How to >>>>>>>> send and >>>>>>>> receive data> document. When communicating between Runner and SDK >>>>>>>> Harness, >>>>>>>> the DataPlane API will be WindowedValue(An immutable triple of value, >>>>>>>> timestamp, and windows.) As a contract object between Runner and SDK >>>>>>>> Harness. I see the interface definitions for sending and receiving >>>>>>>> data in >>>>>>>> the code as follows: >>>>>>>> >>>>>>>> - org.apache.beam.runners.fnexecution.data.FnDataService >>>>>>>> >>>>>>>> public interface FnDataService { >>>>>>>>> <T> InboundDataClient receive(LogicalEndpoint inputLocation, >>>>>>>>> Coder<WindowedValue<T>> coder, FnDataReceiver<WindowedValue<T>> >>>>>>>>> listener); >>>>>>>>> <T> CloseableFnDataReceiver<WindowedValue<T>> send( >>>>>>>>> LogicalEndpoint outputLocation, Coder<WindowedValue<T>> >>>>>>>>> coder); >>>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> - org.apache.beam.fn.harness.data.BeamFnDataClient >>>>>>>> >>>>>>>> public interface BeamFnDataClient { >>>>>>>>> <T> InboundDataClient receive(ApiServiceDescriptor >>>>>>>>> apiServiceDescriptor, LogicalEndpoint inputLocation, >>>>>>>>> Coder<WindowedValue<T>> coder, FnDataReceiver<WindowedValue<T>> >>>>>>>>> receiver); >>>>>>>>> <T> CloseableFnDataReceiver<WindowedValue<T>> >>>>>>>>> send(BeamFnDataGrpcClient Endpoints.ApiServiceDescriptor >>>>>>>>> apiServiceDescriptor, LogicalEndpoint outputLocation, >>>>>>>>> Coder<WindowedValue<T>> coder); >>>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> Both `Coder<WindowedValue<T>>` and >>>>>>>> `FnDataReceiver<WindowedValue<T>>` use `WindowedValue` as the data >>>>>>>> structure that both sides of Runner and SDK Harness know each other. >>>>>>>> Control Plane/Data Plane/State Plane/Logging is a highly abstraction, >>>>>>>> such >>>>>>>> as Control Plane and Logging, these are common requirements for all >>>>>>>> multi-language platforms. For example, the Flink community is also >>>>>>>> discussing how to support Python UDF, as well as how to deal with >>>>>>>> docker >>>>>>>> environment. how to data transfer, how to state access, how to logging >>>>>>>> etc. >>>>>>>> If Beam can further abstract these service interfaces, i.e., interface >>>>>>>> definitions are compatible with multiple engines, and finally provided >>>>>>>> to >>>>>>>> other projects in the form of class libraries, it definitely will help >>>>>>>> other platforms that want to support multiple languages. So could beam >>>>>>>> can >>>>>>>> further abstract the interface definition of FnDataService's >>>>>>>> BeamFnDataClient? Here I am to throw out a minnow to catch a whale, >>>>>>>> take >>>>>>>> the FnDataService#receive interface as an example, and turn >>>>>>>> `WindowedValue<T>` into `T` so that other platforms can be extended >>>>>>>> arbitrarily, as follows: >>>>>>>> >>>>>>>> <T> InboundDataClient receive(LogicalEndpoint inputLocation, >>>>>>>> Coder<T> coder, FnDataReceiver<T>> listener); >>>>>>>> >>>>>>>> What do you think? >>>>>>>> >>>>>>>> Feel free to correct me if there any incorrect understanding. And >>>>>>>> welcome any feedback! >>>>>>>> >>>>>>>> >>>>>>>> Regards, >>>>>>>> Jincheng >>>>>>>> >>>>>>>