On Wed, May 20, 2020 at 11:09 AM Sam Rohde <sro...@google.com> wrote:

> +Robert Bradshaw <rober...@google.com> who is the reviewer on
> https://github.com/apache/beam/pull/11503. How does that sound to you?
> Skip the "is input deterministic" check for GBKs embedded in x-lang
> transforms?
> On Wed, May 20, 2020 at 10:56 AM Sam Rohde <sro...@google.com> wrote:
>> Thanks for your comments, here's a little more to the problem I'm working
>> on: I have a PR to make GBK a primitive
>> <https://github.com/apache/beam/pull/11503> and the aforementioned
>> test_combine_globally was check failing in the run_pipeline method of the
>> DataflowRunner.
>> Specifically what is failing is when the DataflowRunner visits each
>> transform, it checks if the GBK has a deterministic input coder. This fails
>> when the GBK is expanded from the expansion service because the resulting
>> ExternalCoder doesn't override the is_deterministic method.
>> This wasn't being hit before because this deterministic input check only
>> occurred during the apply_GroupByKey method. However, I moved it to when
>> the DataflowRunner is creating a V1B3 pipeline during the run_pipeline
>> stage.
>> On Wed, May 20, 2020 at 10:13 AM Luke Cwik <lc...@google.com> wrote:
>>> If the CombineGlobally is being returned by the expansion service, the
>>> expansion service is on the hook for ensuring that intermediate
>>> PCollections/PTransforms/... are constructed correctly.
>> Okay, this was kind of my hunch. If the DataflowRunner is making sure
>> that the input coder to a GBK is deterministic, then we should skip the
>> check if we receive an x-lang transform (seen in the Python SDK as a
>> RunnerAPITransformHolder).
>>> I thought this question was about what to do if you want to take the
>>> output of an XLang pipeline and process it through some generic transform
>>> that doesn't care about the types and treats it like an opaque blob (like
>>> the Count transform) and how to make that work when you don't know the
>>> output properties. I don't think anyone has shared a design doc for this
>>> problem that covered the different approaches.
>> Aside from the DataflowRunner GBK problem, I was also curious if there
>> was any need for metadata around the Coder proto and why there currently is
>> no metadata. If there was more metadata, like an is_deterministic field,
>> then the GBK deterministic input check could also work.
It doesn't exist because there was no reason for those properties to be
exposed since it was all at pipeline construction time and all these
details could be held within the SDK. Once the pipeline is converted to
proto, the contract for using the beam:transform:group_by_key:v1 transform
is that the key encoding is deterministic and it was upto SDKs to perform
this validation. Since pipeline construction has now spilled over to
include transmitting parts of the pipeline in proto form because of how
XLang expansion works, it might be necessary to expose more of these
properties but this is yet to be designed.

>>> On Tue, May 19, 2020 at 9:47 PM Chamikara Jayalath <chamik...@google.com>
>>> wrote:
>>>> I think you are hitting GroupByKey [1] that is internal to the Java
>>>> CombineGlobally implementation that takes a KV with a Void type (with
>>>> VoidCoder) [2] as input.
>>>> ExternalCoder was added to Python SDK to represent coders within
>>>> external transforms that are not standard coders (in this case the
>>>> VoidCoder). This is needed to perform the "pipeline proto -> Python object
>>>> graph -> Dataflow job request" conversion.
>>>> Seems like today, a runner is unable to perform this particular
>>>> validation (and maybe others ?) for pipeline segments received through a
>>>> cross-language transform expansion with or without the ExternalCoder. Note
>>>> that a runner is not involved during cross-language transform expansion, so
>>>> pipeline submission is the only location where a runner would get a chance
>>>> to perform this kind of validation for cross-language transforms.
>>>> [1]
>>>> https://github.com/apache/beam/blob/2967e3ae513a9bdb13c2da8ffa306fdc092370f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1596
>>>> [2]
>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1172
>>>> On Tue, May 19, 2020 at 8:31 PM Luke Cwik <lc...@google.com> wrote:
>>>>> Since combine globally is a case where you don't need to know what the
>>>>> key or value is and could treat them as bytes allowing you to build and
>>>>> execute this pipeline (assuming you ignored properties such as
>>>>> is_deterministic).
>>>>> Regardless, I still think it makes sense to provide criteria on what
>>>>> your output shape must be during xlang pipeline expansion which is yet to
>>>>> be defined to support such a case. Your suggested solution of adding
>>>>> properties to coders is one possible solution but I think we have to take 
>>>>> a
>>>>> step back and consider xlang as a whole since there are still several yet
>>>>> to be solved issues within it.
>>>>> On Tue, May 19, 2020 at 4:56 PM Sam Rohde <sro...@google.com> wrote:
>>>>>> I have a PR that makes GBK a primitive in which the
>>>>>> test_combine_globally
>>>>>> <https://github.com/apache/beam/blob/10dc1bb683aa9c219397cb3474b676a4fbac5a0e/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py#L162>
>>>>>> is failing on the DataflowRunner. In particular, the DataflowRunner runs
>>>>>> over the transform in the run_pipeline method. I moved a method that
>>>>>> verifies that coders as inputs to GBKs are deterministic during this
>>>>>> run_pipeline. Previously, this was during the apply_GroupByKey.
>>>>>> On Tue, May 19, 2020 at 4:48 PM Brian Hulette <bhule...@google.com>
>>>>>> wrote:
>>>>>>> Yes I'm unclear on how a PCollection with ExternalCoder made it into
>>>>>>> a downstream transform that enforces is_deterministic. My understanding 
>>>>>>> of
>>>>>>> ExternalCoder (admittedly just based on a quick look at commit history) 
>>>>>>> is
>>>>>>> that it's a shim added so the Python SDK can handle coders that are
>>>>>>> internal to cross-language transforms.
>>>>>>> I think that if the Python SDK is trying to introspect an
>>>>>>> ExternalCoder instance then something is wrong.
>>>>>>> Brian
>>>>>>> On Tue, May 19, 2020 at 4:01 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>>> I see. The problem is that you are trying to know certain
>>>>>>>> properties of the coder to use in a downstream transform which enforces
>>>>>>>> that it is deterministic like GroupByKey.
>>>>>>>> In all the scenarios so far that I have seen we have required both
>>>>>>>> SDKs to understand the coder, how are you having a cross language 
>>>>>>>> pipeline
>>>>>>>> where the downstream SDK doesn't understand the coder and works?
>>>>>>>> Also, an alternative strategy would be to tell the expansion
>>>>>>>> service that you need to choose a coder that is deterministic on the
>>>>>>>> output. This would require building the pipeline and before submission 
>>>>>>>> to
>>>>>>>> the job server perform the expansion telling it all the limitations 
>>>>>>>> that
>>>>>>>> the SDK has imposed on it.
>>>>>>>> On Tue, May 19, 2020 at 3:45 PM Sam Rohde <sro...@google.com>
>>>>>>>> wrote:
>>>>>>>>> Hi all,
>>>>>>>>> Should there be more metadata in the Coder Proto? For example,
>>>>>>>>> adding an "is_deterministic" boolean field. This will allow for a
>>>>>>>>> language-agnostic way for SDKs to infer properties about a coder 
>>>>>>>>> received
>>>>>>>>> from the expansion service.
>>>>>>>>> My motivation for this is that I recently ran into a problem in
>>>>>>>>> which an "ExternalCoder" in the Python SDK was erroneously marked as
>>>>>>>>> non-deterministic. The reason being is that the Coder proto doesn't 
>>>>>>>>> have an
>>>>>>>>> "is_deterministic" and when the coder fails to be recreated in 
>>>>>>>>> Python, the
>>>>>>>>> ExternalCoder defaults to False.
>>>>>>>>> Regards,
>>>>>>>>> Sam

Reply via email to