If the CombineGlobally is being returned by the expansion service, the
expansion service is on the hook for ensuring that intermediate
PCollections/PTransforms/... are constructed correctly.

I thought this question was about what to do if you want to take the output
of an XLang pipeline and process it through some generic transform that
doesn't care about the types and treats it like an opaque blob (like the
Count transform) and how to make that work when you don't know the output
properties. I don't think anyone has shared a design doc for this problem
that covered the different approaches.

On Tue, May 19, 2020 at 9:47 PM Chamikara Jayalath <chamik...@google.com>
wrote:

> I think you are hitting GroupByKey [1] that is internal to the Java
> CombineGlobally implementation that takes a KV with a Void type (with
> VoidCoder) [2] as input.
>
> ExternalCoder was added to Python SDK to represent coders within external
> transforms that are not standard coders (in this case the VoidCoder). This
> is needed to perform the "pipeline proto -> Python object graph -> Dataflow
> job request" conversion.
>
> Seems like today, a runner is unable to perform this particular validation
> (and maybe others ?) for pipeline segments received through a
> cross-language transform expansion with or without the ExternalCoder. Note
> that a runner is not involved during cross-language transform expansion, so
> pipeline submission is the only location where a runner would get a chance
> to perform this kind of validation for cross-language transforms.
>
> [1]
> https://github.com/apache/beam/blob/2967e3ae513a9bdb13c2da8ffa306fdc092370f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1596
> [2]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1172
>
> On Tue, May 19, 2020 at 8:31 PM Luke Cwik <lc...@google.com> wrote:
>
>> Since combine globally is a case where you don't need to know what the
>> key or value is and could treat them as bytes allowing you to build and
>> execute this pipeline (assuming you ignored properties such as
>> is_deterministic).
>>
>> Regardless, I still think it makes sense to provide criteria on what your
>> output shape must be during xlang pipeline expansion which is yet to be
>> defined to support such a case. Your suggested solution of adding
>> properties to coders is one possible solution but I think we have to take a
>> step back and consider xlang as a whole since there are still several yet
>> to be solved issues within it.
>>
>>
>> On Tue, May 19, 2020 at 4:56 PM Sam Rohde <sro...@google.com> wrote:
>>
>>> I have a PR that makes GBK a primitive in which the
>>> test_combine_globally
>>> <https://github.com/apache/beam/blob/10dc1bb683aa9c219397cb3474b676a4fbac5a0e/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py#L162>
>>> is failing on the DataflowRunner. In particular, the DataflowRunner runs
>>> over the transform in the run_pipeline method. I moved a method that
>>> verifies that coders as inputs to GBKs are deterministic during this
>>> run_pipeline. Previously, this was during the apply_GroupByKey.
>>>
>>> On Tue, May 19, 2020 at 4:48 PM Brian Hulette <bhule...@google.com>
>>> wrote:
>>>
>>>> Yes I'm unclear on how a PCollection with ExternalCoder made it into a
>>>> downstream transform that enforces is_deterministic. My understanding of
>>>> ExternalCoder (admittedly just based on a quick look at commit history) is
>>>> that it's a shim added so the Python SDK can handle coders that are
>>>> internal to cross-language transforms.
>>>> I think that if the Python SDK is trying to introspect an ExternalCoder
>>>> instance then something is wrong.
>>>>
>>>> Brian
>>>>
>>>> On Tue, May 19, 2020 at 4:01 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> I see. The problem is that you are trying to know certain properties
>>>>> of the coder to use in a downstream transform which enforces that it is
>>>>> deterministic like GroupByKey.
>>>>>
>>>>> In all the scenarios so far that I have seen we have required both
>>>>> SDKs to understand the coder, how are you having a cross language pipeline
>>>>> where the downstream SDK doesn't understand the coder and works?
>>>>>
>>>>> Also, an alternative strategy would be to tell the expansion service
>>>>> that you need to choose a coder that is deterministic on the output. This
>>>>> would require building the pipeline and before submission to the job 
>>>>> server
>>>>> perform the expansion telling it all the limitations that the SDK has
>>>>> imposed on it.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, May 19, 2020 at 3:45 PM Sam Rohde <sro...@google.com> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> Should there be more metadata in the Coder Proto? For example, adding
>>>>>> an "is_deterministic" boolean field. This will allow for a
>>>>>> language-agnostic way for SDKs to infer properties about a coder received
>>>>>> from the expansion service.
>>>>>>
>>>>>> My motivation for this is that I recently ran into a problem in which
>>>>>> an "ExternalCoder" in the Python SDK was erroneously marked as
>>>>>> non-deterministic. The reason being is that the Coder proto doesn't have 
>>>>>> an
>>>>>> "is_deterministic" and when the coder fails to be recreated in Python, 
>>>>>> the
>>>>>> ExternalCoder defaults to False.
>>>>>>
>>>>>> Regards,
>>>>>> Sam
>>>>>>
>>>>>>

Reply via email to