Thought I'd mention a valuable enhancement that has been proposed a couple
of times: when inferring a coder, choose a deterministic coder when one is
needed. Our current behavior of picking a coder first and then crashing if
we picked the wrong one is suboptimal, for no real reason.

xlang does throw this feature into doubt. I don't see an obvious solution.

Kenn

On Wed, May 20, 2020 at 12:10 PM Robert Bradshaw <rober...@google.com>
wrote:

> On Wed, May 20, 2020 at 11:09 AM Sam Rohde <sro...@google.com> wrote:
>
>> +Robert Bradshaw <rober...@google.com> who is the reviewer on
>> https://github.com/apache/beam/pull/11503. How does that sound to you?
>> Skip the "is input deterministic" check for GBKs embedded in x-lang
>> transforms?
>>
>
> Yes, I think this is the right situation in this case. Longer-term, we may
> want to handle cases like
>
> [java produces KVs]
> [python performs GBK]
> [java consumes GBK results]
>
> where properties like this may need to be exposed, but this may also be
> ruled out by rejecting "unknown" coders at the boundaries (rather than ones
> that are entirely internal).
>
>
>> On Wed, May 20, 2020 at 10:56 AM Sam Rohde <sro...@google.com> wrote:
>>
>>> Thanks for your comments, here's a little more to the problem I'm
>>> working on: I have a PR to make GBK a primitive
>>> <https://github.com/apache/beam/pull/11503> and the aforementioned
>>> test_combine_globally was check failing in the run_pipeline method of the
>>> DataflowRunner.
>>> Specifically what is failing is when the DataflowRunner visits each
>>> transform, it checks if the GBK has a deterministic input coder. This fails
>>> when the GBK is expanded from the expansion service because the resulting
>>> ExternalCoder doesn't override the is_deterministic method.
>>>
>>> This wasn't being hit before because this deterministic input check only
>>> occurred during the apply_GroupByKey method. However, I moved it to when
>>> the DataflowRunner is creating a V1B3 pipeline during the run_pipeline
>>> stage.
>>>
>>>
>>> On Wed, May 20, 2020 at 10:13 AM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> If the CombineGlobally is being returned by the expansion service, the
>>>> expansion service is on the hook for ensuring that intermediate
>>>> PCollections/PTransforms/... are constructed correctly.
>>>>
>>> Okay, this was kind of my hunch. If the DataflowRunner is making sure
>>> that the input coder to a GBK is deterministic, then we should skip the
>>> check if we receive an x-lang transform (seen in the Python SDK as a
>>> RunnerAPITransformHolder).
>>>
>>>
>>>>
>>>> I thought this question was about what to do if you want to take the
>>>> output of an XLang pipeline and process it through some generic transform
>>>> that doesn't care about the types and treats it like an opaque blob (like
>>>> the Count transform) and how to make that work when you don't know the
>>>> output properties. I don't think anyone has shared a design doc for this
>>>> problem that covered the different approaches.
>>>>
>>> Aside from the DataflowRunner GBK problem, I was also curious if there
>>> was any need for metadata around the Coder proto and why there currently is
>>> no metadata. If there was more metadata, like an is_deterministic field,
>>> then the GBK deterministic input check could also work.
>>>
>>>
>>>
>>>>
>>>> On Tue, May 19, 2020 at 9:47 PM Chamikara Jayalath <
>>>> chamik...@google.com> wrote:
>>>>
>>>>> I think you are hitting GroupByKey [1] that is internal to the Java
>>>>> CombineGlobally implementation that takes a KV with a Void type (with
>>>>> VoidCoder) [2] as input.
>>>>>
>>>>> ExternalCoder was added to Python SDK to represent coders within
>>>>> external transforms that are not standard coders (in this case the
>>>>> VoidCoder). This is needed to perform the "pipeline proto -> Python object
>>>>> graph -> Dataflow job request" conversion.
>>>>>
>>>>> Seems like today, a runner is unable to perform this particular
>>>>> validation (and maybe others ?) for pipeline segments received through a
>>>>> cross-language transform expansion with or without the ExternalCoder. Note
>>>>> that a runner is not involved during cross-language transform expansion, 
>>>>> so
>>>>> pipeline submission is the only location where a runner would get a chance
>>>>> to perform this kind of validation for cross-language transforms.
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/beam/blob/2967e3ae513a9bdb13c2da8ffa306fdc092370f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1596
>>>>> [2]
>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1172
>>>>>
>>>>> On Tue, May 19, 2020 at 8:31 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> Since combine globally is a case where you don't need to know what
>>>>>> the key or value is and could treat them as bytes allowing you to build 
>>>>>> and
>>>>>> execute this pipeline (assuming you ignored properties such as
>>>>>> is_deterministic).
>>>>>>
>>>>>> Regardless, I still think it makes sense to provide criteria on what
>>>>>> your output shape must be during xlang pipeline expansion which is yet to
>>>>>> be defined to support such a case. Your suggested solution of adding
>>>>>> properties to coders is one possible solution but I think we have to 
>>>>>> take a
>>>>>> step back and consider xlang as a whole since there are still several yet
>>>>>> to be solved issues within it.
>>>>>>
>>>>>>
>>>>>> On Tue, May 19, 2020 at 4:56 PM Sam Rohde <sro...@google.com> wrote:
>>>>>>
>>>>>>> I have a PR that makes GBK a primitive in which the
>>>>>>> test_combine_globally
>>>>>>> <https://github.com/apache/beam/blob/10dc1bb683aa9c219397cb3474b676a4fbac5a0e/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py#L162>
>>>>>>> is failing on the DataflowRunner. In particular, the DataflowRunner runs
>>>>>>> over the transform in the run_pipeline method. I moved a method that
>>>>>>> verifies that coders as inputs to GBKs are deterministic during this
>>>>>>> run_pipeline. Previously, this was during the apply_GroupByKey.
>>>>>>>
>>>>>>> On Tue, May 19, 2020 at 4:48 PM Brian Hulette <bhule...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yes I'm unclear on how a PCollection with ExternalCoder made it
>>>>>>>> into a downstream transform that enforces is_deterministic. My
>>>>>>>> understanding of ExternalCoder (admittedly just based on a quick look 
>>>>>>>> at
>>>>>>>> commit history) is that it's a shim added so the Python SDK can handle
>>>>>>>> coders that are internal to cross-language transforms.
>>>>>>>> I think that if the Python SDK is trying to introspect an
>>>>>>>> ExternalCoder instance then something is wrong.
>>>>>>>>
>>>>>>>> Brian
>>>>>>>>
>>>>>>>> On Tue, May 19, 2020 at 4:01 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>>>
>>>>>>>>> I see. The problem is that you are trying to know certain
>>>>>>>>> properties of the coder to use in a downstream transform which 
>>>>>>>>> enforces
>>>>>>>>> that it is deterministic like GroupByKey.
>>>>>>>>>
>>>>>>>>> In all the scenarios so far that I have seen we have required both
>>>>>>>>> SDKs to understand the coder, how are you having a cross language 
>>>>>>>>> pipeline
>>>>>>>>> where the downstream SDK doesn't understand the coder and works?
>>>>>>>>>
>>>>>>>>> Also, an alternative strategy would be to tell the expansion
>>>>>>>>> service that you need to choose a coder that is deterministic on the
>>>>>>>>> output. This would require building the pipeline and before 
>>>>>>>>> submission to
>>>>>>>>> the job server perform the expansion telling it all the limitations 
>>>>>>>>> that
>>>>>>>>> the SDK has imposed on it.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, May 19, 2020 at 3:45 PM Sam Rohde <sro...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>> Should there be more metadata in the Coder Proto? For example,
>>>>>>>>>> adding an "is_deterministic" boolean field. This will allow for a
>>>>>>>>>> language-agnostic way for SDKs to infer properties about a coder 
>>>>>>>>>> received
>>>>>>>>>> from the expansion service.
>>>>>>>>>>
>>>>>>>>>> My motivation for this is that I recently ran into a problem in
>>>>>>>>>> which an "ExternalCoder" in the Python SDK was erroneously marked as
>>>>>>>>>> non-deterministic. The reason being is that the Coder proto doesn't 
>>>>>>>>>> have an
>>>>>>>>>> "is_deterministic" and when the coder fails to be recreated in 
>>>>>>>>>> Python, the
>>>>>>>>>> ExternalCoder defaults to False.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Sam
>>>>>>>>>>
>>>>>>>>>>

Reply via email to