Thanks for your comments, here's a little more to the problem I'm working on: I have a PR to make GBK a primitive <https://github.com/apache/beam/pull/11503> and the aforementioned test_combine_globally was check failing in the run_pipeline method of the DataflowRunner. Specifically what is failing is when the DataflowRunner visits each transform, it checks if the GBK has a deterministic input coder. This fails when the GBK is expanded from the expansion service because the resulting ExternalCoder doesn't override the is_deterministic method.
This wasn't being hit before because this deterministic input check only occurred during the apply_GroupByKey method. However, I moved it to when the DataflowRunner is creating a V1B3 pipeline during the run_pipeline stage. On Wed, May 20, 2020 at 10:13 AM Luke Cwik <lc...@google.com> wrote: > If the CombineGlobally is being returned by the expansion service, the > expansion service is on the hook for ensuring that intermediate > PCollections/PTransforms/... are constructed correctly. > Okay, this was kind of my hunch. If the DataflowRunner is making sure that the input coder to a GBK is deterministic, then we should skip the check if we receive an x-lang transform (seen in the Python SDK as a RunnerAPITransformHolder). > > I thought this question was about what to do if you want to take the > output of an XLang pipeline and process it through some generic transform > that doesn't care about the types and treats it like an opaque blob (like > the Count transform) and how to make that work when you don't know the > output properties. I don't think anyone has shared a design doc for this > problem that covered the different approaches. > Aside from the DataflowRunner GBK problem, I was also curious if there was any need for metadata around the Coder proto and why there currently is no metadata. If there was more metadata, like an is_deterministic field, then the GBK deterministic input check could also work. > > On Tue, May 19, 2020 at 9:47 PM Chamikara Jayalath <chamik...@google.com> > wrote: > >> I think you are hitting GroupByKey [1] that is internal to the Java >> CombineGlobally implementation that takes a KV with a Void type (with >> VoidCoder) [2] as input. >> >> ExternalCoder was added to Python SDK to represent coders within external >> transforms that are not standard coders (in this case the VoidCoder). This >> is needed to perform the "pipeline proto -> Python object graph -> Dataflow >> job request" conversion. >> >> Seems like today, a runner is unable to perform this particular >> validation (and maybe others ?) for pipeline segments received through a >> cross-language transform expansion with or without the ExternalCoder. Note >> that a runner is not involved during cross-language transform expansion, so >> pipeline submission is the only location where a runner would get a chance >> to perform this kind of validation for cross-language transforms. >> >> [1] >> https://github.com/apache/beam/blob/2967e3ae513a9bdb13c2da8ffa306fdc092370f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1596 >> [2] >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1172 >> >> On Tue, May 19, 2020 at 8:31 PM Luke Cwik <lc...@google.com> wrote: >> >>> Since combine globally is a case where you don't need to know what the >>> key or value is and could treat them as bytes allowing you to build and >>> execute this pipeline (assuming you ignored properties such as >>> is_deterministic). >>> >>> Regardless, I still think it makes sense to provide criteria on what >>> your output shape must be during xlang pipeline expansion which is yet to >>> be defined to support such a case. Your suggested solution of adding >>> properties to coders is one possible solution but I think we have to take a >>> step back and consider xlang as a whole since there are still several yet >>> to be solved issues within it. >>> >>> >>> On Tue, May 19, 2020 at 4:56 PM Sam Rohde <sro...@google.com> wrote: >>> >>>> I have a PR that makes GBK a primitive in which the >>>> test_combine_globally >>>> <https://github.com/apache/beam/blob/10dc1bb683aa9c219397cb3474b676a4fbac5a0e/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py#L162> >>>> is failing on the DataflowRunner. In particular, the DataflowRunner runs >>>> over the transform in the run_pipeline method. I moved a method that >>>> verifies that coders as inputs to GBKs are deterministic during this >>>> run_pipeline. Previously, this was during the apply_GroupByKey. >>>> >>>> On Tue, May 19, 2020 at 4:48 PM Brian Hulette <bhule...@google.com> >>>> wrote: >>>> >>>>> Yes I'm unclear on how a PCollection with ExternalCoder made it into a >>>>> downstream transform that enforces is_deterministic. My understanding of >>>>> ExternalCoder (admittedly just based on a quick look at commit history) is >>>>> that it's a shim added so the Python SDK can handle coders that are >>>>> internal to cross-language transforms. >>>>> I think that if the Python SDK is trying to introspect an >>>>> ExternalCoder instance then something is wrong. >>>>> >>>>> Brian >>>>> >>>>> On Tue, May 19, 2020 at 4:01 PM Luke Cwik <lc...@google.com> wrote: >>>>> >>>>>> I see. The problem is that you are trying to know certain properties >>>>>> of the coder to use in a downstream transform which enforces that it is >>>>>> deterministic like GroupByKey. >>>>>> >>>>>> In all the scenarios so far that I have seen we have required both >>>>>> SDKs to understand the coder, how are you having a cross language >>>>>> pipeline >>>>>> where the downstream SDK doesn't understand the coder and works? >>>>>> >>>>>> Also, an alternative strategy would be to tell the expansion service >>>>>> that you need to choose a coder that is deterministic on the output. This >>>>>> would require building the pipeline and before submission to the job >>>>>> server >>>>>> perform the expansion telling it all the limitations that the SDK has >>>>>> imposed on it. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Tue, May 19, 2020 at 3:45 PM Sam Rohde <sro...@google.com> wrote: >>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> Should there be more metadata in the Coder Proto? For example, >>>>>>> adding an "is_deterministic" boolean field. This will allow for a >>>>>>> language-agnostic way for SDKs to infer properties about a coder >>>>>>> received >>>>>>> from the expansion service. >>>>>>> >>>>>>> My motivation for this is that I recently ran into a problem in >>>>>>> which an "ExternalCoder" in the Python SDK was erroneously marked as >>>>>>> non-deterministic. The reason being is that the Coder proto doesn't >>>>>>> have an >>>>>>> "is_deterministic" and when the coder fails to be recreated in Python, >>>>>>> the >>>>>>> ExternalCoder defaults to False. >>>>>>> >>>>>>> Regards, >>>>>>> Sam >>>>>>> >>>>>>>