[ 
https://issues.apache.org/jira/browse/BEAM-1996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15986341#comment-15986341
 ] 

Vilhelm von Ehrenheim commented on BEAM-1996:
---------------------------------------------

The problem seems to be related to calling the same PTransform (Here utils.Join 
but Iet the same problem for all custom PTransforms) in different tests (i.e 
different test pipelines). 

If you restart the nosetest worker between tests like so:
{noformat}
nosetests --process-restartworker --processes=1
{noformat}
You will not get this error as long as you don't have several test pipelines 
inside the save test file.

> Error about mixing pipelines in nosetests
> -----------------------------------------
>
>                 Key: BEAM-1996
>                 URL: https://issues.apache.org/jira/browse/BEAM-1996
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py
>            Reporter: Vilhelm von Ehrenheim
>            Assignee: Charles Chen
>            Priority: Minor
>
> When testing a PTranform (defined using @ptransform_fn) that merges several 
> PCollections from different sources the following error is raised:
> {noformat}ValueError: Mixing value from different pipelines not 
> allowed.{noformat}
> Actually running the same pipeline in GCP using the `DataflowRunner` does not 
> give any error. Neither does running the test file manually instead of 
> through nose. 
> Here is an example:
> {code:none|title=utils.py}
> # Defined in module `utils`
> @ptransform_fn
> def Join(pcolls, by):
>     return pcolls | beam.CoGroupByKey()
> {code}
> {code:none|title=test_utils.py}
> class UtilsTest(unittest.TestCase):
>     def test_join(self):
>         p = TestPipeline(runner="DirectRunner")
>         p1 = (p
>              | "Create p1" >> beam.Create([
>                  {'a': 1, 'b': 11},
>                  {'a': 2, 'b': 22},
>                  {'a': 3, 'b': 33}]))
>         p2 = (p
>              | "Create p2" >> beam.Create([
>                  {'a': 1, 'c': 111},
>                  {'a': 1, 'c': 112},
>                  {'a': 3, 'c': 333}]))
>         res = ((p1, p2) | "LeftJoin" >> utils.Join(by='a'))
>         beam.assert_that(res, beam.equal_to([
>             {'a': 1, 'b': 11, 'c': 111},
>             {'a': 1, 'b': 11, 'c': 112},
>             {'a': 2, 'b': 22},
>             {'a': 3, 'b': 33, 'c': 333}]))
>         # Run test pipeline
>         p.run()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to