In my case I'm getting above error every time when using --runner=FlinkRunner.

I think it could be related but it needs more investigation. 

On 2020/05/15 18:43:14, Brian Hulette <[email protected]> wrote: 
> I just started having an issue that looks similar this morning. I'm trying
> out running the Python SqlTransform tests with fn_runner (currently they
> only execute continuously on Flink and Spark), but I'm running into
> occasional failures. The errors always come from either python or java
> attempting to hydrate a coder that's native for the other SDK. I'm not sure
> but maybe this is also caused by attempting to execute a transform in the
> wrong environment?
> 
> One interesting data point is that this issue can be flakey.
> SqlTransformTest.test_generate_data fails ~25% of the time when run on the
> fn_api_runner. SqlTransformTest.test_tagged_join on the other hand just
> fails 100% of the time.
> Neither of these tests flakes for me at all when using --runner=FlinkRunner
> (out of 10 runs).
> 
> Maybe we're seeing different manifestations of the same bug?
> 
> On Fri, May 15, 2020 at 3:08 AM Paweł Urbanowicz <
> [email protected]> wrote:
> 
> > Hey,
> > I created a transform method in Java and now I want to use it in Python
> > using Cross-language.
> >
> > I got pretty stuck with the following problem:
> > p
> > | GenerateSequence(...)
> > |ExternalTransform(...)
> > *=> is working like a charm *
> >
> >
> > p
> > | Create(...)
> > | ExternalTransform(...)
> > *=> getting assert pardo_payload.do_fn.urn ==
> > python_urns.PICKLED_DOFN_INFO *
> >
> > Based on https://www.mail-archive.com/[email protected]/msg04887.html
> > it seems like a Create transform is being registered as a Java transform
> > but executed as Python transform.
> > Do you have any idea what is going on here? Thanks a lot for any help :)
> >
> >
> > Traceback (most recent call last):
> > File
> > "/Users/urban/projects/beam/sdks/python/apache_beam/io/external/snowflake_test.py",
> > line 92, in test_snowflake_write_read
> > self.run_write()
> > File
> > "/Users/urban/projects/beam/sdks/python/apache_beam/io/external/snowflake_test.py",
> > line 129, in run_write
> > expansion_service=self.expansion_service,
> > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> > line 528, in __exit__
> > self.run().wait_until_finish()
> > File
> > "/Users/urban/projects/beam/sdks/python/apache_beam/testing/test_pipeline.py",
> > line 112, in run
> > False if self.not_use_test_runner_api else test_runner_api))
> > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> > line 501, in run
> > self._options).run(False)
> > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> > line 858, in from_runner_api
> > p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
> > File
> > "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py",
> > line 103, in get_by_id
> > self._id_to_proto[id], self._pipeline_context)
> > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> > line 1231, in from_runner_api
> > part = context.transforms.get_by_id(transform_id)
> > File
> > "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py",
> > line 103, in get_by_id
> > self._id_to_proto[id], self._pipeline_context)
> > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> > line 1231, in from_runner_api
> > part = context.transforms.get_by_id(transform_id)
> > File
> > "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py",
> > line 103, in get_by_id
> > self._id_to_proto[id], self._pipeline_context)
> > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> > line 1231, in from_runner_api
> > part = context.transforms.get_by_id(transform_id)
> > File
> > "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py",
> > line 103, in get_by_id
> > self._id_to_proto[id], self._pipeline_context)
> > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> > line 1231, in from_runner_api
> > part = context.transforms.get_by_id(transform_id)
> > File
> > "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py",
> > line 103, in get_by_id
> > self._id_to_proto[id], self._pipeline_context)
> > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> > line 1170, in from_runner_api
> > transform = ptransform.PTransform.from_runner_api(proto, context)
> > File
> > "/Users/urban/projects/beam/sdks/python/apache_beam/transforms/ptransform.py",
> > line 685, in from_runner_api
> > context)
> > File
> > "/Users/urban/projects/beam/sdks/python/apache_beam/transforms/core.py",
> > line 1380, in from_runner_api_parameter
> > assert pardo_payload.do_fn.urn == python_urns.PICKLED_DOFN_INFO
> > AssertionError
> >
> 

Reply via email to