In my case I'm getting above error every time when using --runner=FlinkRunner.
I think it could be related but it needs more investigation. On 2020/05/15 18:43:14, Brian Hulette <[email protected]> wrote: > I just started having an issue that looks similar this morning. I'm trying > out running the Python SqlTransform tests with fn_runner (currently they > only execute continuously on Flink and Spark), but I'm running into > occasional failures. The errors always come from either python or java > attempting to hydrate a coder that's native for the other SDK. I'm not sure > but maybe this is also caused by attempting to execute a transform in the > wrong environment? > > One interesting data point is that this issue can be flakey. > SqlTransformTest.test_generate_data fails ~25% of the time when run on the > fn_api_runner. SqlTransformTest.test_tagged_join on the other hand just > fails 100% of the time. > Neither of these tests flakes for me at all when using --runner=FlinkRunner > (out of 10 runs). > > Maybe we're seeing different manifestations of the same bug? > > On Fri, May 15, 2020 at 3:08 AM Paweł Urbanowicz < > [email protected]> wrote: > > > Hey, > > I created a transform method in Java and now I want to use it in Python > > using Cross-language. > > > > I got pretty stuck with the following problem: > > p > > | GenerateSequence(...) > > |ExternalTransform(...) > > *=> is working like a charm * > > > > > > p > > | Create(...) > > | ExternalTransform(...) > > *=> getting assert pardo_payload.do_fn.urn == > > python_urns.PICKLED_DOFN_INFO * > > > > Based on https://www.mail-archive.com/[email protected]/msg04887.html > > it seems like a Create transform is being registered as a Java transform > > but executed as Python transform. > > Do you have any idea what is going on here? Thanks a lot for any help :) > > > > > > Traceback (most recent call last): > > File > > "/Users/urban/projects/beam/sdks/python/apache_beam/io/external/snowflake_test.py", > > line 92, in test_snowflake_write_read > > self.run_write() > > File > > "/Users/urban/projects/beam/sdks/python/apache_beam/io/external/snowflake_test.py", > > line 129, in run_write > > expansion_service=self.expansion_service, > > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", > > line 528, in __exit__ > > self.run().wait_until_finish() > > File > > "/Users/urban/projects/beam/sdks/python/apache_beam/testing/test_pipeline.py", > > line 112, in run > > False if self.not_use_test_runner_api else test_runner_api)) > > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", > > line 501, in run > > self._options).run(False) > > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", > > line 858, in from_runner_api > > p.transforms_stack = [context.transforms.get_by_id(root_transform_id)] > > File > > "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py", > > line 103, in get_by_id > > self._id_to_proto[id], self._pipeline_context) > > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", > > line 1231, in from_runner_api > > part = context.transforms.get_by_id(transform_id) > > File > > "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py", > > line 103, in get_by_id > > self._id_to_proto[id], self._pipeline_context) > > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", > > line 1231, in from_runner_api > > part = context.transforms.get_by_id(transform_id) > > File > > "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py", > > line 103, in get_by_id > > self._id_to_proto[id], self._pipeline_context) > > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", > > line 1231, in from_runner_api > > part = context.transforms.get_by_id(transform_id) > > File > > "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py", > > line 103, in get_by_id > > self._id_to_proto[id], self._pipeline_context) > > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", > > line 1231, in from_runner_api > > part = context.transforms.get_by_id(transform_id) > > File > > "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py", > > line 103, in get_by_id > > self._id_to_proto[id], self._pipeline_context) > > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", > > line 1170, in from_runner_api > > transform = ptransform.PTransform.from_runner_api(proto, context) > > File > > "/Users/urban/projects/beam/sdks/python/apache_beam/transforms/ptransform.py", > > line 685, in from_runner_api > > context) > > File > > "/Users/urban/projects/beam/sdks/python/apache_beam/transforms/core.py", > > line 1380, in from_runner_api_parameter > > assert pardo_payload.do_fn.urn == python_urns.PICKLED_DOFN_INFO > > AssertionError > > >
