I'm trying to write an xlang transform that uses Reshuffle internally, and ran into this as well. Is there any workaround to this for now (other than removing the reshuffle), or do I just need to wait for what Chamikara mentioned? I noticed the same issue was mentioned in the SnowflakeIO.Read PR as well [1].
https://github.com/apache/beam/pull/12149#discussion_r463710165 On Wed, Aug 26, 2020 at 10:55 PM Boyuan Zhang <[email protected]> wrote: > That explains a lot. Thanks, Cham! > > On Wed, Aug 26, 2020 at 7:44 PM Chamikara Jayalath <[email protected]> > wrote: > >> Due to the proto -> object -> proto conversion we do today, Python needs >> to parse the full sub-graph from Java. We have hooks for PTransforms and >> Coders but not for windowing operations. This limitation will go away after >> we have direct Beam proto to Dataflow proto conversion in place. >> >> On Wed, Aug 26, 2020 at 7:03 PM Robert Burke <[email protected]> wrote: >> >>> Coders should only be checked over the language boundaries. >>> >>> On Wed, Aug 26, 2020, 6:24 PM Boyuan Zhang <[email protected]> wrote: >>> >>>> Thanks Cham! >>>> >>>> I just realized that the *beam:window_fn:serialized_**java:v1 *is >>>> introduced by Java *Reshuffle.viaRandomKey()*. But >>>> *Reshuffle.viaRandomKey()* does rewindowed into original window >>>> strategy(which is *GlobalWindows *in my case). Is it expected that we >>>> also check intermediate PCollection rather than only the PCollection that >>>> across the language boundary? >>>> >>>> More about my Ptransform: >>>> MyExternalPTransform -- expand to -- ParDo() -> >>>> Reshuffle.viaRandomKey() -> ParDo() -> WindowInto(FixWindow) -> ParDo() -> >>>> output void >>>> >>>> | >>>> >>>> -> ParDo() -> >>>> output PCollection to Python SDK >>>> >>>> On Tue, Aug 25, 2020 at 6:29 PM Chamikara Jayalath < >>>> [email protected]> wrote: >>>> >>>>> Also it's strange that Java used (beam:window_fn:serialized_java:v1) >>>>> for the URN here instead of "beam:window_fn:fixed_windows:v1" [1] >>>>> which is what is being registered by Python [2]. This seems to be the >>>>> immediate issue. Tracking bug for supporting custom windows is >>>>> https://issues.apache.org/jira/browse/BEAM-10507. >>>>> >>>>> [1] >>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L55 >>>>> [2] >>>>> https://github.com/apache/beam/blob/bd4df94ae10a7e7b0763c1917746d2faf5aeed6c/sdks/python/apache_beam/transforms/window.py#L449 >>>>> >>>>> On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath < >>>>> [email protected]> wrote: >>>>> >>>>>> Pipelines that use external WindowingStrategies might be failing >>>>>> during proto -> object -> proto conversion we do today. This limitation >>>>>> will go away once Dataflow directly starts reading Beam protos. We are >>>>>> working on this now. >>>>>> >>>>>> Thanks, >>>>>> Cham >>>>>> >>>>>> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Thanks, Robert! I want to add more details on my External PTransform: >>>>>>> >>>>>>> MyExternalPTransform -- expand to -- ParDo() -> >>>>>>> WindowInto(FixWindow) -> ParDo() -> output void >>>>>>> | >>>>>>> >>>>>>> -> ParDo() -> output PCollection to Python SDK >>>>>>> The full stacktrace: >>>>>>> >>>>>>> INFO:root:Using Java SDK harness container image >>>>>>> dataflow-dev.gcr.io/boyuanz/java:latest >>>>>>> Starting expansion service at localhost:53569 >>>>>>> Aug 13, 2020 7:42:11 PM >>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService >>>>>>> loadRegisteredTransforms >>>>>>> INFO: Registering external transforms: >>>>>>> [beam:external:java:kafka:read:v1, beam:external:java:kafka:write:v1, >>>>>>> beam:external:java:jdbc:read_rows:v1, beam:external:java:jdbc:write:v1, >>>>>>> beam:external:java:generate_sequence:v1] >>>>>>> beam:external:java:kafka:read:v1: >>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e >>>>>>> beam:external:java:kafka:write:v1: >>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21 >>>>>>> beam:external:java:jdbc:read_rows:v1: >>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712 >>>>>>> beam:external:java:jdbc:write:v1: >>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938 >>>>>>> beam:external:java:generate_sequence:v1: >>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311 >>>>>>> WARNING:apache_beam.options.pipeline_options_validator:Option --zone is >>>>>>> deprecated. Please use --worker_zone instead. >>>>>>> Aug 13, 2020 7:42:12 PM >>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService expand >>>>>>> INFO: Expanding 'WriteToKafka' with URN >>>>>>> 'beam:external:java:kafka:write:v1' >>>>>>> Aug 13, 2020 7:42:14 PM >>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService expand >>>>>>> INFO: Expanding 'ReadFromKafka' with URN >>>>>>> 'beam:external:java:kafka:read:v1' >>>>>>> >>>>>>> WARNING:root:Make sure that locally built Python SDK docker image has >>>>>>> Python 3.6 interpreter. >>>>>>> INFO:root:Using Python SDK docker image: >>>>>>> apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available at >>>>>>> local, we will try to pull from hub.docker.com >>>>>>> Traceback (most recent call last): >>>>>>> File "<embedded module '_launcher'>", line 165, in >>>>>>> run_filename_as_main >>>>>>> File "<embedded module '_launcher'>", line 39, in _run_code_in_main >>>>>>> File "apache_beam/integration/cross_language_kafkaio_test.py", line >>>>>>> 87, in <module> >>>>>>> run() >>>>>>> File "apache_beam/integration/cross_language_kafkaio_test.py", line >>>>>>> 82, in run >>>>>>> test_method(beam.Pipeline(options=pipeline_options)) >>>>>>> File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, in >>>>>>> run_xlang_kafkaio >>>>>>> pipeline.run(False) >>>>>>> File "apache_beam/pipeline.py", line 534, in run >>>>>>> return self.runner.run_pipeline(self, self._options) >>>>>>> File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, in >>>>>>> run_pipeline >>>>>>> allow_proto_holders=True) >>>>>>> File "apache_beam/pipeline.py", line 879, in from_runner_api >>>>>>> p.transforms_stack = >>>>>>> [context.transforms.get_by_id(root_transform_id)] >>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>> File "apache_beam/pipeline.py", line 1272, in from_runner_api >>>>>>> id in proto.outputs.items() >>>>>>> File "apache_beam/pipeline.py", line 1272, in <dictcomp> >>>>>>> id in proto.outputs.items() >>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>> File "apache_beam/pvalue.py", line 217, in from_runner_api >>>>>>> proto.windowing_strategy_id), >>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>> File "apache_beam/transforms/core.py", line 2597, in from_runner_api >>>>>>> windowfn=WindowFn.from_runner_api(proto.window_fn, context), >>>>>>> File "apache_beam/utils/urns.py", line 186, in from_runner_api >>>>>>> parameter_type, constructor = cls._known_urns[fn_proto.urn] >>>>>>> KeyError: 'beam:window_fn:serialized_java:v1' >>>>>>> >>>>>>> >>>>>>> On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> You should be able to use a WindowInto with any of the common >>>>>>>> windowing operations (e.g. global, fixed, sliding, sessions) in an >>>>>>>> external transform. You should also be able to window into an >>>>>>>> arbitrary WindowFn as long as it produces standards window types, >>>>>>>> but >>>>>>>> if there's a bug here you could possibly work around it by windowing >>>>>>>> into a more standard windowing fn before returning. >>>>>>>> >>>>>>>> What is the full traceback? >>>>>>>> >>>>>>>> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <[email protected]> >>>>>>>> wrote: >>>>>>>> > >>>>>>>> > Hi team, >>>>>>>> > >>>>>>>> > I'm trying to create an External transform in Java SDK, which >>>>>>>> expands into several ParDo and a Window.into(FixWindow). When I use >>>>>>>> this >>>>>>>> transform in Python SDK, I get an pipeline construction error: >>>>>>>> > >>>>>>>> > apache_beam/utils/urns.py", line 186, in from_runner_api >>>>>>>> > parameter_type, constructor = cls._known_urns[fn_proto.urn] >>>>>>>> > KeyError: 'beam:window_fn:serialized_java:v1' >>>>>>>> > >>>>>>>> > Is it expected that I cannot use a Window.into when building >>>>>>>> External Ptransform? Or do I miss anything here? >>>>>>>> > >>>>>>>> > >>>>>>>> > Thanks for your help! >>>>>>>> >>>>>>>
