moradology opened a new issue, #30675: URL: https://github.com/apache/beam/issues/30675
OK, so I'm very interested in making the batch subset of apache beam play nicely with EMR-Serverless. Unfortunately, this is difficult to pull off with the portable runner - perhaps impossible even - as there is an assumption so far as I can tell that the spark master UI be available to take work from the beam's job runner. To that end, I've begun adapting roughly the strategy found in the dask runner in the python SDK to build up pyspark RDDs that are submitted directly via whatever `SparkSession` pyspark finds at runtime. So far, so good. I even have a (partial) implementation of support for side inputs! Unfortunately, here, I am running into some difficulties and would love to get some feedback on whatever it is that I might be missing. As runner authors will surely be aware, it is necessary to distinguish between `AsIter` and `AsSingleton` `AsSideInput` instances. Fair enough, but by the time I am traversing `AppliedPTransform` instances to evaluate, that information appears to be gone. Perhaps lost in some of the serialization/deserialization that occurs during `Transform` application! Here's what I'm seeing when I print out some context about a given `AppliedPTransform` [at this point in the runner](https://github.com/moradology/beam-pyspark-runner/blob/real_traversal_infrastructure/beam_pyspark_runner/pyspark_runner.py#L125) (so far, I've only run some visitors over the AST to collect some context that I use later in planning out execution): ``` 'write test/Write/WriteImpl/WriteBundles': {'input_producer_labels': ['write ' 'test/Write/WriteImpl/WindowInto(WindowIntoFn)'], 'input_producers': [AppliedPTransform(write test/Write/WriteImpl/WindowInto(WindowIntoFn), WindowInto)], 'inputs': (<PCollection[write test/Write/WriteImpl/WindowInto(WindowIntoFn).None] at 0x123cbfd90>,), 'outputs': dict_values([<PCollection[write test/Write/WriteImpl/WriteBundles.None] at 0x123c6a2d0>]), 'parent': 'write ' 'test/Write/WriteImpl', 'side_inputs': (<apache_beam.pvalue._UnpickledSideInput object at 0x123cadad0>,), 'type': 'ParDo', 'xform_side_inputs': [<apache_beam.pvalue._UnpickledSideInput object at 0x123cadad0>]}} ``` Note that I have an `_UnpickledSideInput`. This type does not include the `AsIter` and `AsSingleton` context that appears to be absolutely necessary to decide how results of a side-input should be passed to a consumer (whether the whole list or else just its head). What am I missing here? If I drop a debugger in beam's source for `core.ParDo`, I can see this information. It just appears to be lost later on. Example: `<ParDo(PTransform) label=[ParDo(_WriteBundleDoFn)] side_inputs=[AsSingleton(PCollection[write test/Write/WriteImpl/InitializeWrite.None])] at 0x12d7c8410>` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
