[ 
https://issues.apache.org/jira/browse/BEAM-12984?focusedWorklogId=704637&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-704637
 ]

ASF GitHub Bot logged work on BEAM-12984:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Jan/22 16:37
            Start Date: 06/Jan/22 16:37
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on a change in pull request 
#15623:
URL: https://github.com/apache/beam/pull/15623#discussion_r779683142



##########
File path: sdks/python/apache_beam/runners/interactive/pipeline_fragment_test.py
##########
@@ -129,6 +129,41 @@ def test_fragment_does_not_prune_teststream(self):
     # resulting graph is invalid and the following call will raise an 
exception.
     fragment.to_runner_api()
 
+  @patch('IPython.get_ipython', new_callable=mock_get_ipython)
+  def test_pipeline_composites(self, cell):
+    """Tests that composites are supported.
+    """
+    with cell:  # Cell 1
+      p = beam.Pipeline(ir.InteractiveRunner())
+      ib.watch({'p': p})
+
+    with cell:  # Cell 2
+      # pylint: disable=range-builtin-not-iterating
+      init = p | 'Init' >> beam.Create(range(5))
+
+    with cell:  # Cell 3
+      # Have a composite within a composite to test that all transforms under a
+      # composite are added.
+
+      @beam.ptransform_fn
+      def Bar(pcoll):
+        return pcoll | beam.Map(lambda n: n)
+
+      @beam.ptransform_fn
+      def Foo(pcoll):
+        p1 = pcoll | beam.Map(lambda n: n)
+        p2 = pcoll | beam.Map(str)
+        bar = pcoll | Bar()
+        return {'pc1': p1, 'pc2': p2, 'bar': bar}
+
+      res = init | Foo()
+
+    ib.watch(locals())
+    pc = res['pc1']
+
+    result = pf.PipelineFragment([pc]).run()
+    self.assertEqual([0, 1, 2, 3, 4], list(result.get(pc)))

Review comment:
       Awesome, thanks for the quick turnaround!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 704637)
    Time Spent: 2h 40m  (was: 2.5h)

> InteractiveRunner cannot collect PCollections from composites 
> --------------------------------------------------------------
>
>                 Key: BEAM-12984
>                 URL: https://issues.apache.org/jira/browse/BEAM-12984
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-py-interactive
>            Reporter: Sam Rohde
>            Assignee: Sam Rohde
>            Priority: P2
>             Fix For: 2.34.0
>
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> The following code will complain and throw an exception that there is no 
> producer for some PCollection. 
> ```
> import apache_beam as beam
> import apache_beam.runners.interactive.interactive_beam as ib
> import apache_beam.runners.interactive.interactive_runner as ir
>  
> @beam.ptransform_fn
> def Foo(pcoll):
>   p1 = pcoll | 'ident' >> beam.Map(lambda n: n)
>   p2 = pcoll | 'to str' >> beam.Map(str)
>   return {'pc1': p1, 'pc2': p2}
>  
> p = beam.Pipeline(ir.InteractiveRunner())
> res = p | 'my create' >> beam.Create([1]) | 'my foo' >> Foo()
> ib.collect(res['pc1'])
> ```



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to