Makes sense to me. I don't see any problem.

Kenn

On Mon, Apr 22, 2019 at 12:08 AM jincheng sun <sunjincheng...@gmail.com>
wrote:

> Hi Kenn,
>
> Thanks for your reply, and explained the design of WindowValue clearly!
>
> At present, the definitions of `FnDataService` and `BeamFnDataClient` in
> Data Plane are very clear and universal, such as: send(...)/receive(...).
> If it is only applied in the project of Beam, it is already very good.
> Because `WindowValue` is a very basic data structure in the Beam project,
> both the Runner and the SDK harness have define the WindowedValue data
> structure.
>
> The reason I want to change the interface parameter from
> `WindowedValue<T>` to T is because I want to make the `Data Plane`
> interface into a class library that can be used by other projects (such as
> Apache Flink), so that other projects Can have its own `FnDataService`
> implementation. However, the definition of `WindowedValue` does not apply
> to all projects. For example, Apache Flink also has a definition similar to
> WindowedValue. For example, Apache Flink Stream has StreamRecord. If we
> change `WindowedValue<T>` to T, then other project's implementation does
> not need to wrap WindowedValue, the interface will become more concise.
> Furthermore,  we only need one T, such as the Apache Flink DataSet operator.
>
> So, I agree with your understanding, I don't expect `WindowedValueXXX<T>`
> in the FnDataService interface, I hope to just use a `T`.
>
> Have you seen some problem if we change the interface parameter from
> `WindowedValue<T>` to T?
>
> Thanks,
> Jincheng
>
> Kenneth Knowles <k...@apache.org> 于2019年4月20日周六 上午2:38写道:
>
>> WindowedValue has always been an interface, not a concrete
>> representation:
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L52>.
>> It is an abstract class because we started in Java 7 where you could not
>> have default methods, and just due to legacy style concerns. it is not just
>> discussed, but implemented, that there are WindowedValue implementations
>> with fewer allocations.
>> At the coder level, it was also always intended to have multiple
>> encodings. We already do have separate encodings based on whether there is
>> 1 window or multiple windows. The coder for a particular kind of
>> WindowedValue should decide this. Before the Fn API none of this had to be
>> standardized, because the runner could just choose whatever it wants. Now
>> we have to standardize any encodings that runners and harnesses both need
>> to know. There should be many, and adding more should be just a matter of
>> standardization, no new design.
>>
>> None of this should be user-facing or in the runner API / pipeline graph
>> - that is critical to making it flexible on the backend between the runner
>> & SDK harness.
>>
>> If I understand it, from our offline discussion, you are interested in
>> the case where you issue a ProcessBundleRequest to the SDK harness and none
>> of the primitives in the subgraph will ever observe the metadata. So you
>> want to not even have a tiny
>> "WindowedValueWithNoMetadata". Is that accurate?
>>
>> Kenn
>>
>> On Fri, Apr 19, 2019 at 10:17 AM jincheng sun <sunjincheng...@gmail.com>
>> wrote:
>>
>>> Thank you! And have a nice weekend!
>>>
>>>
>>> Lukasz Cwik <lc...@google.com> 于2019年4月20日周六 上午1:14写道:
>>>
>>>> I have added you as a contributor.
>>>>
>>>> On Fri, Apr 19, 2019 at 9:56 AM jincheng sun <sunjincheng...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Lukasz,
>>>>>
>>>>> Thanks for your affirmation and provide more contextual information. :)
>>>>>
>>>>> Would you please give me the contributor permission?  My JIRA ID is
>>>>> sunjincheng121.
>>>>>
>>>>> I would like to create/assign tickets for this work.
>>>>>
>>>>> Thanks,
>>>>> Jincheng
>>>>>
>>>>> Lukasz Cwik <lc...@google.com> 于2019年4月20日周六 上午12:26写道:
>>>>>
>>>>>> Since I don't think this is a contentious change.
>>>>>>
>>>>>> On Fri, Apr 19, 2019 at 9:25 AM Lukasz Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> Yes, using T makes sense.
>>>>>>>
>>>>>>> The WindowedValue was meant to be a context object in the SDK
>>>>>>> harness that propagates various information about the current element. 
>>>>>>> We
>>>>>>> have discussed in the past about:
>>>>>>> * making optimizations which would pass around less of the context
>>>>>>> information if we know that the DoFns don't need it (for example, all 
>>>>>>> the
>>>>>>> values share the same window).
>>>>>>> * versioning the encoding separately from the WindowedValue context
>>>>>>> object (see recent discussion about element timestamp precision [1])
>>>>>>> * the runner may want its own representation of a context object
>>>>>>> that makes sense for it which isn't a WindowedValue necessarily.
>>>>>>>
>>>>>>> Feel free to cut a JIRA about this and start working on a change
>>>>>>> towards this.
>>>>>>>
>>>>>>> 1:
>>>>>>> https://lists.apache.org/thread.html/221b06e81bba335d0ea8d770212cc7ee047dba65bec7978368a51473@%3Cdev.beam.apache.org%3E
>>>>>>>
>>>>>>> On Fri, Apr 19, 2019 at 3:18 AM jincheng sun <
>>>>>>> sunjincheng...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Beam devs,
>>>>>>>>
>>>>>>>> I read some of the docs about `Communicating over the Fn API` in
>>>>>>>> Beam. I feel that Beam has a very good design for Control Plane/Data
>>>>>>>> Plane/State Plane/Logging services, and it is described in <How to 
>>>>>>>> send and
>>>>>>>> receive data> document. When communicating between Runner and SDK 
>>>>>>>> Harness,
>>>>>>>> the DataPlane API will be WindowedValue(An immutable triple of value,
>>>>>>>> timestamp, and windows.) As a contract object between Runner and SDK
>>>>>>>> Harness. I see the interface definitions for sending and receiving 
>>>>>>>> data in
>>>>>>>> the code as follows:
>>>>>>>>
>>>>>>>> - org.apache.beam.runners.fnexecution.data.FnDataService
>>>>>>>>
>>>>>>>> public interface FnDataService {
>>>>>>>>>   <T> InboundDataClient receive(LogicalEndpoint inputLocation,
>>>>>>>>> Coder<WindowedValue<T>> coder, FnDataReceiver<WindowedValue<T>> 
>>>>>>>>> listener);
>>>>>>>>>   <T> CloseableFnDataReceiver<WindowedValue<T>> send(
>>>>>>>>>       LogicalEndpoint outputLocation, Coder<WindowedValue<T>>
>>>>>>>>> coder);
>>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> - org.apache.beam.fn.harness.data.BeamFnDataClient
>>>>>>>>
>>>>>>>> public interface BeamFnDataClient {
>>>>>>>>>   <T> InboundDataClient receive(ApiServiceDescriptor
>>>>>>>>> apiServiceDescriptor, LogicalEndpoint inputLocation,
>>>>>>>>> Coder<WindowedValue<T>> coder, FnDataReceiver<WindowedValue<T>> 
>>>>>>>>> receiver);
>>>>>>>>>   <T> CloseableFnDataReceiver<WindowedValue<T>>
>>>>>>>>> send(BeamFnDataGrpcClient Endpoints.ApiServiceDescriptor
>>>>>>>>> apiServiceDescriptor, LogicalEndpoint outputLocation,
>>>>>>>>> Coder<WindowedValue<T>> coder);
>>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> Both `Coder<WindowedValue<T>>` and
>>>>>>>> `FnDataReceiver<WindowedValue<T>>` use `WindowedValue` as the data
>>>>>>>> structure that both sides of Runner and SDK Harness know each other.
>>>>>>>> Control Plane/Data Plane/State Plane/Logging is a highly abstraction, 
>>>>>>>> such
>>>>>>>> as Control Plane and Logging, these are common requirements for all
>>>>>>>> multi-language platforms. For example, the Flink community is also
>>>>>>>> discussing how to support Python UDF, as well as how to deal with 
>>>>>>>> docker
>>>>>>>> environment. how to data transfer, how to state access, how to logging 
>>>>>>>> etc.
>>>>>>>> If Beam can further abstract these service interfaces, i.e., interface
>>>>>>>> definitions are compatible with multiple engines, and finally provided 
>>>>>>>> to
>>>>>>>> other projects in the form of class libraries, it definitely will help
>>>>>>>> other platforms that want to support multiple languages. So could beam 
>>>>>>>> can
>>>>>>>> further abstract the interface definition of FnDataService's
>>>>>>>> BeamFnDataClient? Here I am to throw out a minnow to catch a whale, 
>>>>>>>> take
>>>>>>>> the FnDataService#receive interface as an example, and turn
>>>>>>>> `WindowedValue<T>` into `T` so that other platforms can be extended
>>>>>>>> arbitrarily, as follows:
>>>>>>>>
>>>>>>>> <T> InboundDataClient receive(LogicalEndpoint inputLocation,
>>>>>>>> Coder<T> coder, FnDataReceiver<T>> listener);
>>>>>>>>
>>>>>>>> What do you think?
>>>>>>>>
>>>>>>>> Feel free to correct me if there any incorrect understanding. And
>>>>>>>> welcome any feedback!
>>>>>>>>
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Jincheng
>>>>>>>>
>>>>>>>

Reply via email to