WindowedValue has always been an interface, not a concrete representation: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L52>. It is an abstract class because we started in Java 7 where you could not have default methods, and just due to legacy style concerns. it is not just discussed, but implemented, that there are WindowedValue implementations with fewer allocations. At the coder level, it was also always intended to have multiple encodings. We already do have separate encodings based on whether there is 1 window or multiple windows. The coder for a particular kind of WindowedValue should decide this. Before the Fn API none of this had to be standardized, because the runner could just choose whatever it wants. Now we have to standardize any encodings that runners and harnesses both need to know. There should be many, and adding more should be just a matter of standardization, no new design.
None of this should be user-facing or in the runner API / pipeline graph - that is critical to making it flexible on the backend between the runner & SDK harness. If I understand it, from our offline discussion, you are interested in the case where you issue a ProcessBundleRequest to the SDK harness and none of the primitives in the subgraph will ever observe the metadata. So you want to not even have a tiny "WindowedValueWithNoMetadata". Is that accurate? Kenn On Fri, Apr 19, 2019 at 10:17 AM jincheng sun <sunjincheng...@gmail.com> wrote: > Thank you! And have a nice weekend! > > > Lukasz Cwik <lc...@google.com> 于2019年4月20日周六 上午1:14写道: > >> I have added you as a contributor. >> >> On Fri, Apr 19, 2019 at 9:56 AM jincheng sun <sunjincheng...@gmail.com> >> wrote: >> >>> Hi Lukasz, >>> >>> Thanks for your affirmation and provide more contextual information. :) >>> >>> Would you please give me the contributor permission? My JIRA ID is >>> sunjincheng121. >>> >>> I would like to create/assign tickets for this work. >>> >>> Thanks, >>> Jincheng >>> >>> Lukasz Cwik <lc...@google.com> 于2019年4月20日周六 上午12:26写道: >>> >>>> Since I don't think this is a contentious change. >>>> >>>> On Fri, Apr 19, 2019 at 9:25 AM Lukasz Cwik <lc...@google.com> wrote: >>>> >>>>> Yes, using T makes sense. >>>>> >>>>> The WindowedValue was meant to be a context object in the SDK harness >>>>> that propagates various information about the current element. We have >>>>> discussed in the past about: >>>>> * making optimizations which would pass around less of the context >>>>> information if we know that the DoFns don't need it (for example, all the >>>>> values share the same window). >>>>> * versioning the encoding separately from the WindowedValue context >>>>> object (see recent discussion about element timestamp precision [1]) >>>>> * the runner may want its own representation of a context object that >>>>> makes sense for it which isn't a WindowedValue necessarily. >>>>> >>>>> Feel free to cut a JIRA about this and start working on a change >>>>> towards this. >>>>> >>>>> 1: >>>>> https://lists.apache.org/thread.html/221b06e81bba335d0ea8d770212cc7ee047dba65bec7978368a51473@%3Cdev.beam.apache.org%3E >>>>> >>>>> On Fri, Apr 19, 2019 at 3:18 AM jincheng sun <sunjincheng...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Beam devs, >>>>>> >>>>>> I read some of the docs about `Communicating over the Fn API` in >>>>>> Beam. I feel that Beam has a very good design for Control Plane/Data >>>>>> Plane/State Plane/Logging services, and it is described in <How to send >>>>>> and >>>>>> receive data> document. When communicating between Runner and SDK >>>>>> Harness, >>>>>> the DataPlane API will be WindowedValue(An immutable triple of value, >>>>>> timestamp, and windows.) As a contract object between Runner and SDK >>>>>> Harness. I see the interface definitions for sending and receiving data >>>>>> in >>>>>> the code as follows: >>>>>> >>>>>> - org.apache.beam.runners.fnexecution.data.FnDataService >>>>>> >>>>>> public interface FnDataService { >>>>>>> <T> InboundDataClient receive(LogicalEndpoint inputLocation, >>>>>>> Coder<WindowedValue<T>> coder, FnDataReceiver<WindowedValue<T>> >>>>>>> listener); >>>>>>> <T> CloseableFnDataReceiver<WindowedValue<T>> send( >>>>>>> LogicalEndpoint outputLocation, Coder<WindowedValue<T>> coder); >>>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> - org.apache.beam.fn.harness.data.BeamFnDataClient >>>>>> >>>>>> public interface BeamFnDataClient { >>>>>>> <T> InboundDataClient receive(ApiServiceDescriptor >>>>>>> apiServiceDescriptor, LogicalEndpoint inputLocation, >>>>>>> Coder<WindowedValue<T>> coder, FnDataReceiver<WindowedValue<T>> >>>>>>> receiver); >>>>>>> <T> CloseableFnDataReceiver<WindowedValue<T>> >>>>>>> send(BeamFnDataGrpcClient Endpoints.ApiServiceDescriptor >>>>>>> apiServiceDescriptor, LogicalEndpoint outputLocation, >>>>>>> Coder<WindowedValue<T>> coder); >>>>>>> } >>>>>> >>>>>> >>>>>> Both `Coder<WindowedValue<T>>` and `FnDataReceiver<WindowedValue<T>>` >>>>>> use `WindowedValue` as the data structure that both sides of Runner and >>>>>> SDK >>>>>> Harness know each other. Control Plane/Data Plane/State Plane/Logging is >>>>>> a >>>>>> highly abstraction, such as Control Plane and Logging, these are common >>>>>> requirements for all multi-language platforms. For example, the Flink >>>>>> community is also discussing how to support Python UDF, as well as how to >>>>>> deal with docker environment. how to data transfer, how to state access, >>>>>> how to logging etc. If Beam can further abstract these service >>>>>> interfaces, >>>>>> i.e., interface definitions are compatible with multiple engines, and >>>>>> finally provided to other projects in the form of class libraries, it >>>>>> definitely will help other platforms that want to support multiple >>>>>> languages. So could beam can further abstract the interface definition of >>>>>> FnDataService's BeamFnDataClient? Here I am to throw out a minnow to >>>>>> catch >>>>>> a whale, take the FnDataService#receive interface as an example, and turn >>>>>> `WindowedValue<T>` into `T` so that other platforms can be extended >>>>>> arbitrarily, as follows: >>>>>> >>>>>> <T> InboundDataClient receive(LogicalEndpoint inputLocation, Coder<T> >>>>>> coder, FnDataReceiver<T>> listener); >>>>>> >>>>>> What do you think? >>>>>> >>>>>> Feel free to correct me if there any incorrect understanding. And >>>>>> welcome any feedback! >>>>>> >>>>>> >>>>>> Regards, >>>>>> Jincheng >>>>>> >>>>>