moradology opened a new issue, #30675:
URL: https://github.com/apache/beam/issues/30675

   OK, so I'm very interested in making the batch subset of apache beam play 
nicely with EMR-Serverless. Unfortunately, this is difficult to pull off with 
the portable runner - perhaps impossible even - as there is an assumption so 
far as I can tell that the spark master UI be available to take work from the 
beam's job runner. To that end, I've begun adapting roughly the strategy found 
in the dask runner in the python SDK to build up pyspark RDDs that are 
submitted directly via whatever `SparkSession` pyspark finds at runtime. So 
far, so good. I even have a (partial) implementation of support for side inputs!
   
   Unfortunately, here, I am running into some difficulties and would love to 
get some feedback on whatever it is that I might be missing. As runner authors 
will surely be aware, it is necessary to distinguish between `AsIter` and 
`AsSingleton` `AsSideInput` instances. Fair enough, but by the time I am 
traversing `AppliedPTransform` instances to evaluate,  that information appears 
to be gone. Perhaps lost in some of the serialization/deserialization that 
occurs during `Transform` application!
   
   Here's what I'm seeing when I print out some context about a given 
`AppliedPTransform` [at this point in the 
runner](https://github.com/moradology/beam-pyspark-runner/blob/real_traversal_infrastructure/beam_pyspark_runner/pyspark_runner.py#L125)
 (so far, I've only run some visitors over the AST to collect some context that 
I use later in planning out execution):
   ```
    'write test/Write/WriteImpl/WriteBundles': {'input_producer_labels': 
['write '
                                                                          
'test/Write/WriteImpl/WindowInto(WindowIntoFn)'],
                                                'input_producers': 
[AppliedPTransform(write test/Write/WriteImpl/WindowInto(WindowIntoFn), 
WindowInto)],
                                                'inputs': (<PCollection[write 
test/Write/WriteImpl/WindowInto(WindowIntoFn).None] at 0x123cbfd90>,),
                                                'outputs': 
dict_values([<PCollection[write test/Write/WriteImpl/WriteBundles.None] at 
0x123c6a2d0>]),
                                                'parent': 'write '
                                                          
'test/Write/WriteImpl',
                                                'side_inputs': 
(<apache_beam.pvalue._UnpickledSideInput object at 0x123cadad0>,),
                                                'type': 'ParDo',
                                                'xform_side_inputs': 
[<apache_beam.pvalue._UnpickledSideInput object at 0x123cadad0>]}}
   ```
   
   Note that I have an `_UnpickledSideInput`. This type does not include the 
`AsIter` and `AsSingleton` context that appears to be absolutely necessary to 
decide how results of a side-input should be passed to a consumer (whether the 
whole list or else just its head).
   
   What am I missing here? If I drop a debugger in beam's source for 
`core.ParDo`, I can see this information. It just appears to be lost later on. 
Example: `<ParDo(PTransform) label=[ParDo(_WriteBundleDoFn)] 
side_inputs=[AsSingleton(PCollection[write 
test/Write/WriteImpl/InitializeWrite.None])] at 0x12d7c8410>`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to