TheNeuralBit commented on a change in pull request #15623:
URL: https://github.com/apache/beam/pull/15623#discussion_r779683142



##########
File path: sdks/python/apache_beam/runners/interactive/pipeline_fragment_test.py
##########
@@ -129,6 +129,41 @@ def test_fragment_does_not_prune_teststream(self):
     # resulting graph is invalid and the following call will raise an 
exception.
     fragment.to_runner_api()
 
+  @patch('IPython.get_ipython', new_callable=mock_get_ipython)
+  def test_pipeline_composites(self, cell):
+    """Tests that composites are supported.
+    """
+    with cell:  # Cell 1
+      p = beam.Pipeline(ir.InteractiveRunner())
+      ib.watch({'p': p})
+
+    with cell:  # Cell 2
+      # pylint: disable=range-builtin-not-iterating
+      init = p | 'Init' >> beam.Create(range(5))
+
+    with cell:  # Cell 3
+      # Have a composite within a composite to test that all transforms under a
+      # composite are added.
+
+      @beam.ptransform_fn
+      def Bar(pcoll):
+        return pcoll | beam.Map(lambda n: n)
+
+      @beam.ptransform_fn
+      def Foo(pcoll):
+        p1 = pcoll | beam.Map(lambda n: n)
+        p2 = pcoll | beam.Map(str)
+        bar = pcoll | Bar()
+        return {'pc1': p1, 'pc2': p2, 'bar': bar}
+
+      res = init | Foo()
+
+    ib.watch(locals())
+    pc = res['pc1']
+
+    result = pf.PipelineFragment([pc]).run()
+    self.assertEqual([0, 1, 2, 3, 4], list(result.get(pc)))

Review comment:
       Awesome, thanks for the quick turnaround!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to