I think you are hitting GroupByKey [1] that is internal to the Java
CombineGlobally implementation that takes a KV with a Void type (with
VoidCoder) [2] as input.

ExternalCoder was added to Python SDK to represent coders within external
transforms that are not standard coders (in this case the VoidCoder). This
is needed to perform the "pipeline proto -> Python object graph -> Dataflow
job request" conversion.

Seems like today, a runner is unable to perform this particular validation
(and maybe others ?) for pipeline segments received through a
cross-language transform expansion with or without the ExternalCoder. Note
that a runner is not involved during cross-language transform expansion, so
pipeline submission is the only location where a runner would get a chance
to perform this kind of validation for cross-language transforms.

[1]
https://github.com/apache/beam/blob/2967e3ae513a9bdb13c2da8ffa306fdc092370f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1596
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1172

On Tue, May 19, 2020 at 8:31 PM Luke Cwik <lc...@google.com> wrote:

> Since combine globally is a case where you don't need to know what the key
> or value is and could treat them as bytes allowing you to build and execute
> this pipeline (assuming you ignored properties such as is_deterministic).
>
> Regardless, I still think it makes sense to provide criteria on what your
> output shape must be during xlang pipeline expansion which is yet to be
> defined to support such a case. Your suggested solution of adding
> properties to coders is one possible solution but I think we have to take a
> step back and consider xlang as a whole since there are still several yet
> to be solved issues within it.
>
>
> On Tue, May 19, 2020 at 4:56 PM Sam Rohde <sro...@google.com> wrote:
>
>> I have a PR that makes GBK a primitive in which the test_combine_globally
>> <https://github.com/apache/beam/blob/10dc1bb683aa9c219397cb3474b676a4fbac5a0e/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py#L162>
>> is failing on the DataflowRunner. In particular, the DataflowRunner runs
>> over the transform in the run_pipeline method. I moved a method that
>> verifies that coders as inputs to GBKs are deterministic during this
>> run_pipeline. Previously, this was during the apply_GroupByKey.
>>
>> On Tue, May 19, 2020 at 4:48 PM Brian Hulette <bhule...@google.com>
>> wrote:
>>
>>> Yes I'm unclear on how a PCollection with ExternalCoder made it into a
>>> downstream transform that enforces is_deterministic. My understanding of
>>> ExternalCoder (admittedly just based on a quick look at commit history) is
>>> that it's a shim added so the Python SDK can handle coders that are
>>> internal to cross-language transforms.
>>> I think that if the Python SDK is trying to introspect an ExternalCoder
>>> instance then something is wrong.
>>>
>>> Brian
>>>
>>> On Tue, May 19, 2020 at 4:01 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> I see. The problem is that you are trying to know certain properties of
>>>> the coder to use in a downstream transform which enforces that it is
>>>> deterministic like GroupByKey.
>>>>
>>>> In all the scenarios so far that I have seen we have required both SDKs
>>>> to understand the coder, how are you having a cross language pipeline where
>>>> the downstream SDK doesn't understand the coder and works?
>>>>
>>>> Also, an alternative strategy would be to tell the expansion service
>>>> that you need to choose a coder that is deterministic on the output. This
>>>> would require building the pipeline and before submission to the job server
>>>> perform the expansion telling it all the limitations that the SDK has
>>>> imposed on it.
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, May 19, 2020 at 3:45 PM Sam Rohde <sro...@google.com> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> Should there be more metadata in the Coder Proto? For example, adding
>>>>> an "is_deterministic" boolean field. This will allow for a
>>>>> language-agnostic way for SDKs to infer properties about a coder received
>>>>> from the expansion service.
>>>>>
>>>>> My motivation for this is that I recently ran into a problem in which
>>>>> an "ExternalCoder" in the Python SDK was erroneously marked as
>>>>> non-deterministic. The reason being is that the Coder proto doesn't have 
>>>>> an
>>>>> "is_deterministic" and when the coder fails to be recreated in Python, the
>>>>> ExternalCoder defaults to False.
>>>>>
>>>>> Regards,
>>>>> Sam
>>>>>
>>>>>

Reply via email to