I just started having an issue that looks similar this morning. I'm trying out running the Python SqlTransform tests with fn_runner (currently they only execute continuously on Flink and Spark), but I'm running into occasional failures. The errors always come from either python or java attempting to hydrate a coder that's native for the other SDK. I'm not sure but maybe this is also caused by attempting to execute a transform in the wrong environment?
One interesting data point is that this issue can be flakey. SqlTransformTest.test_generate_data fails ~25% of the time when run on the fn_api_runner. SqlTransformTest.test_tagged_join on the other hand just fails 100% of the time. Neither of these tests flakes for me at all when using --runner=FlinkRunner (out of 10 runs). Maybe we're seeing different manifestations of the same bug? On Fri, May 15, 2020 at 3:08 AM Paweł Urbanowicz < [email protected]> wrote: > Hey, > I created a transform method in Java and now I want to use it in Python > using Cross-language. > > I got pretty stuck with the following problem: > p > | GenerateSequence(...) > |ExternalTransform(...) > *=> is working like a charm * > > > p > | Create(...) > | ExternalTransform(...) > *=> getting assert pardo_payload.do_fn.urn == > python_urns.PICKLED_DOFN_INFO * > > Based on https://www.mail-archive.com/[email protected]/msg04887.html > it seems like a Create transform is being registered as a Java transform > but executed as Python transform. > Do you have any idea what is going on here? Thanks a lot for any help :) > > > Traceback (most recent call last): > File > "/Users/urban/projects/beam/sdks/python/apache_beam/io/external/snowflake_test.py", > line 92, in test_snowflake_write_read > self.run_write() > File > "/Users/urban/projects/beam/sdks/python/apache_beam/io/external/snowflake_test.py", > line 129, in run_write > expansion_service=self.expansion_service, > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", > line 528, in __exit__ > self.run().wait_until_finish() > File > "/Users/urban/projects/beam/sdks/python/apache_beam/testing/test_pipeline.py", > line 112, in run > False if self.not_use_test_runner_api else test_runner_api)) > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", > line 501, in run > self._options).run(False) > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", > line 858, in from_runner_api > p.transforms_stack = [context.transforms.get_by_id(root_transform_id)] > File > "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py", > line 103, in get_by_id > self._id_to_proto[id], self._pipeline_context) > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", > line 1231, in from_runner_api > part = context.transforms.get_by_id(transform_id) > File > "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py", > line 103, in get_by_id > self._id_to_proto[id], self._pipeline_context) > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", > line 1231, in from_runner_api > part = context.transforms.get_by_id(transform_id) > File > "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py", > line 103, in get_by_id > self._id_to_proto[id], self._pipeline_context) > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", > line 1231, in from_runner_api > part = context.transforms.get_by_id(transform_id) > File > "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py", > line 103, in get_by_id > self._id_to_proto[id], self._pipeline_context) > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", > line 1231, in from_runner_api > part = context.transforms.get_by_id(transform_id) > File > "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py", > line 103, in get_by_id > self._id_to_proto[id], self._pipeline_context) > File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", > line 1170, in from_runner_api > transform = ptransform.PTransform.from_runner_api(proto, context) > File > "/Users/urban/projects/beam/sdks/python/apache_beam/transforms/ptransform.py", > line 685, in from_runner_api > context) > File > "/Users/urban/projects/beam/sdks/python/apache_beam/transforms/core.py", > line 1380, in from_runner_api_parameter > assert pardo_payload.do_fn.urn == python_urns.PICKLED_DOFN_INFO > AssertionError >
