Thought I'd mention a valuable enhancement that has been proposed a couple of times: when inferring a coder, choose a deterministic coder when one is needed. Our current behavior of picking a coder first and then crashing if we picked the wrong one is suboptimal, for no real reason.
xlang does throw this feature into doubt. I don't see an obvious solution. Kenn On Wed, May 20, 2020 at 12:10 PM Robert Bradshaw <rober...@google.com> wrote: > On Wed, May 20, 2020 at 11:09 AM Sam Rohde <sro...@google.com> wrote: > >> +Robert Bradshaw <rober...@google.com> who is the reviewer on >> https://github.com/apache/beam/pull/11503. How does that sound to you? >> Skip the "is input deterministic" check for GBKs embedded in x-lang >> transforms? >> > > Yes, I think this is the right situation in this case. Longer-term, we may > want to handle cases like > > [java produces KVs] > [python performs GBK] > [java consumes GBK results] > > where properties like this may need to be exposed, but this may also be > ruled out by rejecting "unknown" coders at the boundaries (rather than ones > that are entirely internal). > > >> On Wed, May 20, 2020 at 10:56 AM Sam Rohde <sro...@google.com> wrote: >> >>> Thanks for your comments, here's a little more to the problem I'm >>> working on: I have a PR to make GBK a primitive >>> <https://github.com/apache/beam/pull/11503> and the aforementioned >>> test_combine_globally was check failing in the run_pipeline method of the >>> DataflowRunner. >>> Specifically what is failing is when the DataflowRunner visits each >>> transform, it checks if the GBK has a deterministic input coder. This fails >>> when the GBK is expanded from the expansion service because the resulting >>> ExternalCoder doesn't override the is_deterministic method. >>> >>> This wasn't being hit before because this deterministic input check only >>> occurred during the apply_GroupByKey method. However, I moved it to when >>> the DataflowRunner is creating a V1B3 pipeline during the run_pipeline >>> stage. >>> >>> >>> On Wed, May 20, 2020 at 10:13 AM Luke Cwik <lc...@google.com> wrote: >>> >>>> If the CombineGlobally is being returned by the expansion service, the >>>> expansion service is on the hook for ensuring that intermediate >>>> PCollections/PTransforms/... are constructed correctly. >>>> >>> Okay, this was kind of my hunch. If the DataflowRunner is making sure >>> that the input coder to a GBK is deterministic, then we should skip the >>> check if we receive an x-lang transform (seen in the Python SDK as a >>> RunnerAPITransformHolder). >>> >>> >>>> >>>> I thought this question was about what to do if you want to take the >>>> output of an XLang pipeline and process it through some generic transform >>>> that doesn't care about the types and treats it like an opaque blob (like >>>> the Count transform) and how to make that work when you don't know the >>>> output properties. I don't think anyone has shared a design doc for this >>>> problem that covered the different approaches. >>>> >>> Aside from the DataflowRunner GBK problem, I was also curious if there >>> was any need for metadata around the Coder proto and why there currently is >>> no metadata. If there was more metadata, like an is_deterministic field, >>> then the GBK deterministic input check could also work. >>> >>> >>> >>>> >>>> On Tue, May 19, 2020 at 9:47 PM Chamikara Jayalath < >>>> chamik...@google.com> wrote: >>>> >>>>> I think you are hitting GroupByKey [1] that is internal to the Java >>>>> CombineGlobally implementation that takes a KV with a Void type (with >>>>> VoidCoder) [2] as input. >>>>> >>>>> ExternalCoder was added to Python SDK to represent coders within >>>>> external transforms that are not standard coders (in this case the >>>>> VoidCoder). This is needed to perform the "pipeline proto -> Python object >>>>> graph -> Dataflow job request" conversion. >>>>> >>>>> Seems like today, a runner is unable to perform this particular >>>>> validation (and maybe others ?) for pipeline segments received through a >>>>> cross-language transform expansion with or without the ExternalCoder. Note >>>>> that a runner is not involved during cross-language transform expansion, >>>>> so >>>>> pipeline submission is the only location where a runner would get a chance >>>>> to perform this kind of validation for cross-language transforms. >>>>> >>>>> [1] >>>>> https://github.com/apache/beam/blob/2967e3ae513a9bdb13c2da8ffa306fdc092370f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1596 >>>>> [2] >>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1172 >>>>> >>>>> On Tue, May 19, 2020 at 8:31 PM Luke Cwik <lc...@google.com> wrote: >>>>> >>>>>> Since combine globally is a case where you don't need to know what >>>>>> the key or value is and could treat them as bytes allowing you to build >>>>>> and >>>>>> execute this pipeline (assuming you ignored properties such as >>>>>> is_deterministic). >>>>>> >>>>>> Regardless, I still think it makes sense to provide criteria on what >>>>>> your output shape must be during xlang pipeline expansion which is yet to >>>>>> be defined to support such a case. Your suggested solution of adding >>>>>> properties to coders is one possible solution but I think we have to >>>>>> take a >>>>>> step back and consider xlang as a whole since there are still several yet >>>>>> to be solved issues within it. >>>>>> >>>>>> >>>>>> On Tue, May 19, 2020 at 4:56 PM Sam Rohde <sro...@google.com> wrote: >>>>>> >>>>>>> I have a PR that makes GBK a primitive in which the >>>>>>> test_combine_globally >>>>>>> <https://github.com/apache/beam/blob/10dc1bb683aa9c219397cb3474b676a4fbac5a0e/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py#L162> >>>>>>> is failing on the DataflowRunner. In particular, the DataflowRunner runs >>>>>>> over the transform in the run_pipeline method. I moved a method that >>>>>>> verifies that coders as inputs to GBKs are deterministic during this >>>>>>> run_pipeline. Previously, this was during the apply_GroupByKey. >>>>>>> >>>>>>> On Tue, May 19, 2020 at 4:48 PM Brian Hulette <bhule...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Yes I'm unclear on how a PCollection with ExternalCoder made it >>>>>>>> into a downstream transform that enforces is_deterministic. My >>>>>>>> understanding of ExternalCoder (admittedly just based on a quick look >>>>>>>> at >>>>>>>> commit history) is that it's a shim added so the Python SDK can handle >>>>>>>> coders that are internal to cross-language transforms. >>>>>>>> I think that if the Python SDK is trying to introspect an >>>>>>>> ExternalCoder instance then something is wrong. >>>>>>>> >>>>>>>> Brian >>>>>>>> >>>>>>>> On Tue, May 19, 2020 at 4:01 PM Luke Cwik <lc...@google.com> wrote: >>>>>>>> >>>>>>>>> I see. The problem is that you are trying to know certain >>>>>>>>> properties of the coder to use in a downstream transform which >>>>>>>>> enforces >>>>>>>>> that it is deterministic like GroupByKey. >>>>>>>>> >>>>>>>>> In all the scenarios so far that I have seen we have required both >>>>>>>>> SDKs to understand the coder, how are you having a cross language >>>>>>>>> pipeline >>>>>>>>> where the downstream SDK doesn't understand the coder and works? >>>>>>>>> >>>>>>>>> Also, an alternative strategy would be to tell the expansion >>>>>>>>> service that you need to choose a coder that is deterministic on the >>>>>>>>> output. This would require building the pipeline and before >>>>>>>>> submission to >>>>>>>>> the job server perform the expansion telling it all the limitations >>>>>>>>> that >>>>>>>>> the SDK has imposed on it. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, May 19, 2020 at 3:45 PM Sam Rohde <sro...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi all, >>>>>>>>>> >>>>>>>>>> Should there be more metadata in the Coder Proto? For example, >>>>>>>>>> adding an "is_deterministic" boolean field. This will allow for a >>>>>>>>>> language-agnostic way for SDKs to infer properties about a coder >>>>>>>>>> received >>>>>>>>>> from the expansion service. >>>>>>>>>> >>>>>>>>>> My motivation for this is that I recently ran into a problem in >>>>>>>>>> which an "ExternalCoder" in the Python SDK was erroneously marked as >>>>>>>>>> non-deterministic. The reason being is that the Coder proto doesn't >>>>>>>>>> have an >>>>>>>>>> "is_deterministic" and when the coder fails to be recreated in >>>>>>>>>> Python, the >>>>>>>>>> ExternalCoder defaults to False. >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Sam >>>>>>>>>> >>>>>>>>>>