I just started having an issue that looks similar this morning. I'm trying
out running the Python SqlTransform tests with fn_runner (currently they
only execute continuously on Flink and Spark), but I'm running into
occasional failures. The errors always come from either python or java
attempting to hydrate a coder that's native for the other SDK. I'm not sure
but maybe this is also caused by attempting to execute a transform in the
wrong environment?

One interesting data point is that this issue can be flakey.
SqlTransformTest.test_generate_data fails ~25% of the time when run on the
fn_api_runner. SqlTransformTest.test_tagged_join on the other hand just
fails 100% of the time.
Neither of these tests flakes for me at all when using --runner=FlinkRunner
(out of 10 runs).

Maybe we're seeing different manifestations of the same bug?

On Fri, May 15, 2020 at 3:08 AM Paweł Urbanowicz <
[email protected]> wrote:

> Hey,
> I created a transform method in Java and now I want to use it in Python
> using Cross-language.
>
> I got pretty stuck with the following problem:
> p
> | GenerateSequence(...)
> |ExternalTransform(...)
> *=> is working like a charm *
>
>
> p
> | Create(...)
> | ExternalTransform(...)
> *=> getting assert pardo_payload.do_fn.urn ==
> python_urns.PICKLED_DOFN_INFO *
>
> Based on https://www.mail-archive.com/[email protected]/msg04887.html
> it seems like a Create transform is being registered as a Java transform
> but executed as Python transform.
> Do you have any idea what is going on here? Thanks a lot for any help :)
>
>
> Traceback (most recent call last):
> File
> "/Users/urban/projects/beam/sdks/python/apache_beam/io/external/snowflake_test.py",
> line 92, in test_snowflake_write_read
> self.run_write()
> File
> "/Users/urban/projects/beam/sdks/python/apache_beam/io/external/snowflake_test.py",
> line 129, in run_write
> expansion_service=self.expansion_service,
> File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> line 528, in __exit__
> self.run().wait_until_finish()
> File
> "/Users/urban/projects/beam/sdks/python/apache_beam/testing/test_pipeline.py",
> line 112, in run
> False if self.not_use_test_runner_api else test_runner_api))
> File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> line 501, in run
> self._options).run(False)
> File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> line 858, in from_runner_api
> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
> File
> "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py",
> line 103, in get_by_id
> self._id_to_proto[id], self._pipeline_context)
> File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> line 1231, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py",
> line 103, in get_by_id
> self._id_to_proto[id], self._pipeline_context)
> File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> line 1231, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py",
> line 103, in get_by_id
> self._id_to_proto[id], self._pipeline_context)
> File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> line 1231, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py",
> line 103, in get_by_id
> self._id_to_proto[id], self._pipeline_context)
> File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> line 1231, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py",
> line 103, in get_by_id
> self._id_to_proto[id], self._pipeline_context)
> File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> line 1170, in from_runner_api
> transform = ptransform.PTransform.from_runner_api(proto, context)
> File
> "/Users/urban/projects/beam/sdks/python/apache_beam/transforms/ptransform.py",
> line 685, in from_runner_api
> context)
> File
> "/Users/urban/projects/beam/sdks/python/apache_beam/transforms/core.py",
> line 1380, in from_runner_api_parameter
> assert pardo_payload.do_fn.urn == python_urns.PICKLED_DOFN_INFO
> AssertionError
>

Reply via email to