Hi Steve, Unfortunately I don't think there is a workaround before we have the change that Cham mentions.
On Mon, Nov 30, 2020 at 8:16 AM Steve Niemitz <[email protected]> wrote: > I'm trying to write an xlang transform that uses Reshuffle internally, and > ran into this as well. Is there any workaround to this for now (other than > removing the reshuffle), or do I just need to wait for what Chamikara > mentioned? I noticed the same issue was mentioned in the SnowflakeIO.Read > PR as well [1]. > > https://github.com/apache/beam/pull/12149#discussion_r463710165 > > On Wed, Aug 26, 2020 at 10:55 PM Boyuan Zhang <[email protected]> wrote: > >> That explains a lot. Thanks, Cham! >> >> On Wed, Aug 26, 2020 at 7:44 PM Chamikara Jayalath <[email protected]> >> wrote: >> >>> Due to the proto -> object -> proto conversion we do today, Python needs >>> to parse the full sub-graph from Java. We have hooks for PTransforms and >>> Coders but not for windowing operations. This limitation will go away after >>> we have direct Beam proto to Dataflow proto conversion in place. >>> >>> On Wed, Aug 26, 2020 at 7:03 PM Robert Burke <[email protected]> wrote: >>> >>>> Coders should only be checked over the language boundaries. >>>> >>>> On Wed, Aug 26, 2020, 6:24 PM Boyuan Zhang <[email protected]> wrote: >>>> >>>>> Thanks Cham! >>>>> >>>>> I just realized that the *beam:window_fn:serialized_**java:v1 *is >>>>> introduced by Java *Reshuffle.viaRandomKey()*. But >>>>> *Reshuffle.viaRandomKey()* does rewindowed into original window >>>>> strategy(which is *GlobalWindows *in my case). Is it expected that we >>>>> also check intermediate PCollection rather than only the PCollection that >>>>> across the language boundary? >>>>> >>>>> More about my Ptransform: >>>>> MyExternalPTransform -- expand to -- ParDo() -> >>>>> Reshuffle.viaRandomKey() -> ParDo() -> WindowInto(FixWindow) -> ParDo() -> >>>>> output void >>>>> >>>>> | >>>>> >>>>> -> ParDo() >>>>> -> output PCollection to Python SDK >>>>> >>>>> On Tue, Aug 25, 2020 at 6:29 PM Chamikara Jayalath < >>>>> [email protected]> wrote: >>>>> >>>>>> Also it's strange that Java used (beam:window_fn:serialized_java:v1) >>>>>> for the URN here instead of "beam:window_fn:fixed_windows:v1" [1] >>>>>> which is what is being registered by Python [2]. This seems to be the >>>>>> immediate issue. Tracking bug for supporting custom windows is >>>>>> https://issues.apache.org/jira/browse/BEAM-10507. >>>>>> >>>>>> [1] >>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L55 >>>>>> [2] >>>>>> https://github.com/apache/beam/blob/bd4df94ae10a7e7b0763c1917746d2faf5aeed6c/sdks/python/apache_beam/transforms/window.py#L449 >>>>>> >>>>>> On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Pipelines that use external WindowingStrategies might be failing >>>>>>> during proto -> object -> proto conversion we do today. This limitation >>>>>>> will go away once Dataflow directly starts reading Beam protos. We are >>>>>>> working on this now. >>>>>>> >>>>>>> Thanks, >>>>>>> Cham >>>>>>> >>>>>>> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks, Robert! I want to add more details on my External >>>>>>>> PTransform: >>>>>>>> >>>>>>>> MyExternalPTransform -- expand to -- ParDo() -> >>>>>>>> WindowInto(FixWindow) -> ParDo() -> output void >>>>>>>> >>>>>>>> | >>>>>>>> >>>>>>>> -> ParDo() -> output PCollection to Python SDK >>>>>>>> The full stacktrace: >>>>>>>> >>>>>>>> INFO:root:Using Java SDK harness container image >>>>>>>> dataflow-dev.gcr.io/boyuanz/java:latest >>>>>>>> Starting expansion service at localhost:53569 >>>>>>>> Aug 13, 2020 7:42:11 PM >>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService >>>>>>>> loadRegisteredTransforms >>>>>>>> INFO: Registering external transforms: >>>>>>>> [beam:external:java:kafka:read:v1, beam:external:java:kafka:write:v1, >>>>>>>> beam:external:java:jdbc:read_rows:v1, >>>>>>>> beam:external:java:jdbc:write:v1, >>>>>>>> beam:external:java:generate_sequence:v1] >>>>>>>> beam:external:java:kafka:read:v1: >>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e >>>>>>>> beam:external:java:kafka:write:v1: >>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21 >>>>>>>> beam:external:java:jdbc:read_rows:v1: >>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712 >>>>>>>> beam:external:java:jdbc:write:v1: >>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938 >>>>>>>> beam:external:java:generate_sequence:v1: >>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311 >>>>>>>> WARNING:apache_beam.options.pipeline_options_validator:Option --zone >>>>>>>> is deprecated. Please use --worker_zone instead. >>>>>>>> Aug 13, 2020 7:42:12 PM >>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService expand >>>>>>>> INFO: Expanding 'WriteToKafka' with URN >>>>>>>> 'beam:external:java:kafka:write:v1' >>>>>>>> Aug 13, 2020 7:42:14 PM >>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService expand >>>>>>>> INFO: Expanding 'ReadFromKafka' with URN >>>>>>>> 'beam:external:java:kafka:read:v1' >>>>>>>> >>>>>>>> WARNING:root:Make sure that locally built Python SDK docker image has >>>>>>>> Python 3.6 interpreter. >>>>>>>> INFO:root:Using Python SDK docker image: >>>>>>>> apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available at >>>>>>>> local, we will try to pull from hub.docker.com >>>>>>>> Traceback (most recent call last): >>>>>>>> File "<embedded module '_launcher'>", line 165, in >>>>>>>> run_filename_as_main >>>>>>>> File "<embedded module '_launcher'>", line 39, in _run_code_in_main >>>>>>>> File "apache_beam/integration/cross_language_kafkaio_test.py", line >>>>>>>> 87, in <module> >>>>>>>> run() >>>>>>>> File "apache_beam/integration/cross_language_kafkaio_test.py", line >>>>>>>> 82, in run >>>>>>>> test_method(beam.Pipeline(options=pipeline_options)) >>>>>>>> File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, in >>>>>>>> run_xlang_kafkaio >>>>>>>> pipeline.run(False) >>>>>>>> File "apache_beam/pipeline.py", line 534, in run >>>>>>>> return self.runner.run_pipeline(self, self._options) >>>>>>>> File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, in >>>>>>>> run_pipeline >>>>>>>> allow_proto_holders=True) >>>>>>>> File "apache_beam/pipeline.py", line 879, in from_runner_api >>>>>>>> p.transforms_stack = >>>>>>>> [context.transforms.get_by_id(root_transform_id)] >>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>> File "apache_beam/pipeline.py", line 1272, in from_runner_api >>>>>>>> id in proto.outputs.items() >>>>>>>> File "apache_beam/pipeline.py", line 1272, in <dictcomp> >>>>>>>> id in proto.outputs.items() >>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>> File "apache_beam/pvalue.py", line 217, in from_runner_api >>>>>>>> proto.windowing_strategy_id), >>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>> File "apache_beam/transforms/core.py", line 2597, in from_runner_api >>>>>>>> windowfn=WindowFn.from_runner_api(proto.window_fn, context), >>>>>>>> File "apache_beam/utils/urns.py", line 186, in from_runner_api >>>>>>>> parameter_type, constructor = cls._known_urns[fn_proto.urn] >>>>>>>> KeyError: 'beam:window_fn:serialized_java:v1' >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> You should be able to use a WindowInto with any of the common >>>>>>>>> windowing operations (e.g. global, fixed, sliding, sessions) in an >>>>>>>>> external transform. You should also be able to window into an >>>>>>>>> arbitrary WindowFn as long as it produces standards window types, >>>>>>>>> but >>>>>>>>> if there's a bug here you could possibly work around it by >>>>>>>>> windowing >>>>>>>>> into a more standard windowing fn before returning. >>>>>>>>> >>>>>>>>> What is the full traceback? >>>>>>>>> >>>>>>>>> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <[email protected]> >>>>>>>>> wrote: >>>>>>>>> > >>>>>>>>> > Hi team, >>>>>>>>> > >>>>>>>>> > I'm trying to create an External transform in Java SDK, which >>>>>>>>> expands into several ParDo and a Window.into(FixWindow). When I use >>>>>>>>> this >>>>>>>>> transform in Python SDK, I get an pipeline construction error: >>>>>>>>> > >>>>>>>>> > apache_beam/utils/urns.py", line 186, in from_runner_api >>>>>>>>> > parameter_type, constructor = cls._known_urns[fn_proto.urn] >>>>>>>>> > KeyError: 'beam:window_fn:serialized_java:v1' >>>>>>>>> > >>>>>>>>> > Is it expected that I cannot use a Window.into when building >>>>>>>>> External Ptransform? Or do I miss anything here? >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > Thanks for your help! >>>>>>>>> >>>>>>>>
