+Robert Bradshaw <rober...@google.com> who is the reviewer on
https://github.com/apache/beam/pull/11503. How does that sound to you? Skip
the "is input deterministic" check for GBKs embedded in x-lang transforms?

On Wed, May 20, 2020 at 10:56 AM Sam Rohde <sro...@google.com> wrote:

> Thanks for your comments, here's a little more to the problem I'm working
> on: I have a PR to make GBK a primitive
> <https://github.com/apache/beam/pull/11503> and the aforementioned
> test_combine_globally was check failing in the run_pipeline method of the
> DataflowRunner.
> Specifically what is failing is when the DataflowRunner visits each
> transform, it checks if the GBK has a deterministic input coder. This fails
> when the GBK is expanded from the expansion service because the resulting
> ExternalCoder doesn't override the is_deterministic method.
>
> This wasn't being hit before because this deterministic input check only
> occurred during the apply_GroupByKey method. However, I moved it to when
> the DataflowRunner is creating a V1B3 pipeline during the run_pipeline
> stage.
>
>
> On Wed, May 20, 2020 at 10:13 AM Luke Cwik <lc...@google.com> wrote:
>
>> If the CombineGlobally is being returned by the expansion service, the
>> expansion service is on the hook for ensuring that intermediate
>> PCollections/PTransforms/... are constructed correctly.
>>
> Okay, this was kind of my hunch. If the DataflowRunner is making sure that
> the input coder to a GBK is deterministic, then we should skip the check if
> we receive an x-lang transform (seen in the Python SDK as a
> RunnerAPITransformHolder).
>
>
>>
>> I thought this question was about what to do if you want to take the
>> output of an XLang pipeline and process it through some generic transform
>> that doesn't care about the types and treats it like an opaque blob (like
>> the Count transform) and how to make that work when you don't know the
>> output properties. I don't think anyone has shared a design doc for this
>> problem that covered the different approaches.
>>
> Aside from the DataflowRunner GBK problem, I was also curious if there was
> any need for metadata around the Coder proto and why there currently is no
> metadata. If there was more metadata, like an is_deterministic field, then
> the GBK deterministic input check could also work.
>
>
>
>>
>> On Tue, May 19, 2020 at 9:47 PM Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>>> I think you are hitting GroupByKey [1] that is internal to the Java
>>> CombineGlobally implementation that takes a KV with a Void type (with
>>> VoidCoder) [2] as input.
>>>
>>> ExternalCoder was added to Python SDK to represent coders within
>>> external transforms that are not standard coders (in this case the
>>> VoidCoder). This is needed to perform the "pipeline proto -> Python object
>>> graph -> Dataflow job request" conversion.
>>>
>>> Seems like today, a runner is unable to perform this particular
>>> validation (and maybe others ?) for pipeline segments received through a
>>> cross-language transform expansion with or without the ExternalCoder. Note
>>> that a runner is not involved during cross-language transform expansion, so
>>> pipeline submission is the only location where a runner would get a chance
>>> to perform this kind of validation for cross-language transforms.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/2967e3ae513a9bdb13c2da8ffa306fdc092370f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1596
>>> [2]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1172
>>>
>>> On Tue, May 19, 2020 at 8:31 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> Since combine globally is a case where you don't need to know what the
>>>> key or value is and could treat them as bytes allowing you to build and
>>>> execute this pipeline (assuming you ignored properties such as
>>>> is_deterministic).
>>>>
>>>> Regardless, I still think it makes sense to provide criteria on what
>>>> your output shape must be during xlang pipeline expansion which is yet to
>>>> be defined to support such a case. Your suggested solution of adding
>>>> properties to coders is one possible solution but I think we have to take a
>>>> step back and consider xlang as a whole since there are still several yet
>>>> to be solved issues within it.
>>>>
>>>>
>>>> On Tue, May 19, 2020 at 4:56 PM Sam Rohde <sro...@google.com> wrote:
>>>>
>>>>> I have a PR that makes GBK a primitive in which the
>>>>> test_combine_globally
>>>>> <https://github.com/apache/beam/blob/10dc1bb683aa9c219397cb3474b676a4fbac5a0e/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py#L162>
>>>>> is failing on the DataflowRunner. In particular, the DataflowRunner runs
>>>>> over the transform in the run_pipeline method. I moved a method that
>>>>> verifies that coders as inputs to GBKs are deterministic during this
>>>>> run_pipeline. Previously, this was during the apply_GroupByKey.
>>>>>
>>>>> On Tue, May 19, 2020 at 4:48 PM Brian Hulette <bhule...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Yes I'm unclear on how a PCollection with ExternalCoder made it into
>>>>>> a downstream transform that enforces is_deterministic. My understanding 
>>>>>> of
>>>>>> ExternalCoder (admittedly just based on a quick look at commit history) 
>>>>>> is
>>>>>> that it's a shim added so the Python SDK can handle coders that are
>>>>>> internal to cross-language transforms.
>>>>>> I think that if the Python SDK is trying to introspect an
>>>>>> ExternalCoder instance then something is wrong.
>>>>>>
>>>>>> Brian
>>>>>>
>>>>>> On Tue, May 19, 2020 at 4:01 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> I see. The problem is that you are trying to know certain properties
>>>>>>> of the coder to use in a downstream transform which enforces that it is
>>>>>>> deterministic like GroupByKey.
>>>>>>>
>>>>>>> In all the scenarios so far that I have seen we have required both
>>>>>>> SDKs to understand the coder, how are you having a cross language 
>>>>>>> pipeline
>>>>>>> where the downstream SDK doesn't understand the coder and works?
>>>>>>>
>>>>>>> Also, an alternative strategy would be to tell the expansion service
>>>>>>> that you need to choose a coder that is deterministic on the output. 
>>>>>>> This
>>>>>>> would require building the pipeline and before submission to the job 
>>>>>>> server
>>>>>>> perform the expansion telling it all the limitations that the SDK has
>>>>>>> imposed on it.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, May 19, 2020 at 3:45 PM Sam Rohde <sro...@google.com> wrote:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> Should there be more metadata in the Coder Proto? For example,
>>>>>>>> adding an "is_deterministic" boolean field. This will allow for a
>>>>>>>> language-agnostic way for SDKs to infer properties about a coder 
>>>>>>>> received
>>>>>>>> from the expansion service.
>>>>>>>>
>>>>>>>> My motivation for this is that I recently ran into a problem in
>>>>>>>> which an "ExternalCoder" in the Python SDK was erroneously marked as
>>>>>>>> non-deterministic. The reason being is that the Coder proto doesn't 
>>>>>>>> have an
>>>>>>>> "is_deterministic" and when the coder fails to be recreated in Python, 
>>>>>>>> the
>>>>>>>> ExternalCoder defaults to False.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Sam
>>>>>>>>
>>>>>>>>

Reply via email to