cisaacstern commented on PR #27618:
URL: https://github.com/apache/beam/pull/27618#issuecomment-1868141165

   > and all the tests here are now passing for me locally 🎉
   
   Today I realized that the side input tests that existed in this PR were not 
actually testing side inputs (they were testing arg/kwarg passing, which IIUC, 
is not the same thing). So 
https://github.com/apache/beam/pull/27618/commits/3c0fbc68d2734945b5a560557251dd2c290fc325
 makes the side input tests actually test side inputs.
   
   And these tests do **_not_** yet pass. 😄 
   
   (To be clear the upstream `dask` work mentioned in the previous comment is 
still _totally essential_, insofar as it unlocks the ability to do _any real 
testing at all_. So the discovery of these test failures is actually a  ✨ 
*positive development* ✨ in a way, because it reflects progress here: we can 
finally actually test, and therefore find out what works and what doesn't!)
   
   A fair amount of debugging around led to the skeleton `SideInputMap` draft 
in 
https://github.com/apache/beam/pull/27618/commits/8dc89718568ab917ca658ff8db4affcc1de828c1,
 which is not yet functional but hopefully at least a basis for further 
consideration.
   
   On a separate note, during debugging I found myself changing the runner as 
follows, to allow for synchronous execution:
   
   ```python
   dask_options = options.view_as(DaskOptions).get_all_options(
           drop_default=True)
   # comment-out the client so we don't start a 
   # thread/process pool during synchronous testing
   # client = ddist.Client(**dask_options)
   
   pipeline.replace_all(dask_overrides())
   
   dask_visitor = self.to_dask_bag_visitor()
   pipeline.visit(dask_visitor)
   opt_graph = dask.optimize(*list(dask_visitor.bags.values()))
   # execute synchronously for easier debugging
   opt_graph[-1].compute(scheduler="sync")
   # futures = client.compute(opt_graph)
   # return DaskRunnerResult(client, futures)
   ```
   
   Possibly there's a way to make that configuration settable via 
PipelineOptions, and perhaps also as the default setup for 
`TestPipeline(runner=DaskRunner)` (though we wouldn't want to miss out on all 
of the multiprocessing bugs that we catch by using a thread/process pool... 
curious to look into how this is handled in DirectRunner testing).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to