Hi Steve,

Unfortunately I don't think there is a workaround before we have the change
that Cham mentions.

On Mon, Nov 30, 2020 at 8:16 AM Steve Niemitz <[email protected]> wrote:

> I'm trying to write an xlang transform that uses Reshuffle internally, and
> ran into this as well.  Is there any workaround to this for now (other than
> removing the reshuffle), or do I just need to wait for what Chamikara
> mentioned?  I noticed the same issue was mentioned in the SnowflakeIO.Read
> PR as well [1].
>
> https://github.com/apache/beam/pull/12149#discussion_r463710165
>
> On Wed, Aug 26, 2020 at 10:55 PM Boyuan Zhang <[email protected]> wrote:
>
>> That explains a lot. Thanks, Cham!
>>
>> On Wed, Aug 26, 2020 at 7:44 PM Chamikara Jayalath <[email protected]>
>> wrote:
>>
>>> Due to the proto -> object -> proto conversion we do today, Python needs
>>> to parse the full sub-graph from Java. We have hooks for PTransforms and
>>> Coders but not for windowing operations. This limitation will go away after
>>> we have direct Beam proto to Dataflow proto conversion in place.
>>>
>>> On Wed, Aug 26, 2020 at 7:03 PM Robert Burke <[email protected]> wrote:
>>>
>>>> Coders should only be checked over the language boundaries.
>>>>
>>>> On Wed, Aug 26, 2020, 6:24 PM Boyuan Zhang <[email protected]> wrote:
>>>>
>>>>> Thanks Cham!
>>>>>
>>>>>  I just realized that the *beam:window_fn:serialized_**java:v1 *is
>>>>> introduced by Java *Reshuffle.viaRandomKey()*. But
>>>>> *Reshuffle.viaRandomKey()* does rewindowed into original window
>>>>> strategy(which is *GlobalWindows *in my case). Is it expected that we
>>>>> also check intermediate PCollection rather than only the PCollection that
>>>>> across the language boundary?
>>>>>
>>>>> More about my Ptransform:
>>>>> MyExternalPTransform  -- expand to --  ParDo() ->
>>>>> Reshuffle.viaRandomKey() -> ParDo() -> WindowInto(FixWindow) -> ParDo() ->
>>>>> output void
>>>>>
>>>>>                                                                |
>>>>>
>>>>>                                                                 -> ParDo()
>>>>> -> output PCollection to Python SDK
>>>>>
>>>>> On Tue, Aug 25, 2020 at 6:29 PM Chamikara Jayalath <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Also it's strange that Java used (beam:window_fn:serialized_java:v1)
>>>>>> for the URN here instead of "beam:window_fn:fixed_windows:v1" [1]
>>>>>> which is what is being registered by Python [2]. This seems to be the
>>>>>> immediate issue. Tracking bug for supporting custom windows is
>>>>>> https://issues.apache.org/jira/browse/BEAM-10507.
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L55
>>>>>> [2]
>>>>>> https://github.com/apache/beam/blob/bd4df94ae10a7e7b0763c1917746d2faf5aeed6c/sdks/python/apache_beam/transforms/window.py#L449
>>>>>>
>>>>>> On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Pipelines that use external WindowingStrategies might be failing
>>>>>>> during proto -> object -> proto conversion we do today. This limitation
>>>>>>> will go away once Dataflow directly starts reading Beam protos. We are
>>>>>>> working on this now.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Cham
>>>>>>>
>>>>>>> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks, Robert! I want to add more details on my External
>>>>>>>> PTransform:
>>>>>>>>
>>>>>>>> MyExternalPTransform  -- expand to --  ParDo() ->
>>>>>>>> WindowInto(FixWindow) -> ParDo() -> output void
>>>>>>>>
>>>>>>>> |
>>>>>>>>
>>>>>>>> -> ParDo() -> output PCollection to Python SDK
>>>>>>>> The full stacktrace:
>>>>>>>>
>>>>>>>> INFO:root:Using Java SDK harness container image 
>>>>>>>> dataflow-dev.gcr.io/boyuanz/java:latest
>>>>>>>> Starting expansion service at localhost:53569
>>>>>>>> Aug 13, 2020 7:42:11 PM 
>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService 
>>>>>>>> loadRegisteredTransforms
>>>>>>>> INFO: Registering external transforms: 
>>>>>>>> [beam:external:java:kafka:read:v1, beam:external:java:kafka:write:v1, 
>>>>>>>> beam:external:java:jdbc:read_rows:v1, 
>>>>>>>> beam:external:java:jdbc:write:v1, 
>>>>>>>> beam:external:java:generate_sequence:v1]
>>>>>>>>        beam:external:java:kafka:read:v1: 
>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e
>>>>>>>>        beam:external:java:kafka:write:v1: 
>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21
>>>>>>>>        beam:external:java:jdbc:read_rows:v1: 
>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712
>>>>>>>>        beam:external:java:jdbc:write:v1: 
>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938
>>>>>>>>        beam:external:java:generate_sequence:v1: 
>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311
>>>>>>>> WARNING:apache_beam.options.pipeline_options_validator:Option --zone 
>>>>>>>> is deprecated. Please use --worker_zone instead.
>>>>>>>> Aug 13, 2020 7:42:12 PM 
>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService expand
>>>>>>>> INFO: Expanding 'WriteToKafka' with URN 
>>>>>>>> 'beam:external:java:kafka:write:v1'
>>>>>>>> Aug 13, 2020 7:42:14 PM 
>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService expand
>>>>>>>> INFO: Expanding 'ReadFromKafka' with URN 
>>>>>>>> 'beam:external:java:kafka:read:v1'
>>>>>>>>
>>>>>>>> WARNING:root:Make sure that locally built Python SDK docker image has 
>>>>>>>> Python 3.6 interpreter.
>>>>>>>> INFO:root:Using Python SDK docker image: 
>>>>>>>> apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available at 
>>>>>>>> local, we will try to pull from hub.docker.com
>>>>>>>> Traceback (most recent call last):
>>>>>>>>   File "<embedded module '_launcher'>", line 165, in 
>>>>>>>> run_filename_as_main
>>>>>>>>   File "<embedded module '_launcher'>", line 39, in _run_code_in_main
>>>>>>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 
>>>>>>>> 87, in <module>
>>>>>>>>     run()
>>>>>>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 
>>>>>>>> 82, in run
>>>>>>>>     test_method(beam.Pipeline(options=pipeline_options))
>>>>>>>>   File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, in 
>>>>>>>> run_xlang_kafkaio
>>>>>>>>     pipeline.run(False)
>>>>>>>>   File "apache_beam/pipeline.py", line 534, in run
>>>>>>>>     return self.runner.run_pipeline(self, self._options)
>>>>>>>>   File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, in 
>>>>>>>> run_pipeline
>>>>>>>>     allow_proto_holders=True)
>>>>>>>>   File "apache_beam/pipeline.py", line 879, in from_runner_api
>>>>>>>>     p.transforms_stack = 
>>>>>>>> [context.transforms.get_by_id(root_transform_id)]
>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>   File "apache_beam/pipeline.py", line 1272, in from_runner_api
>>>>>>>>     id in proto.outputs.items()
>>>>>>>>   File "apache_beam/pipeline.py", line 1272, in <dictcomp>
>>>>>>>>     id in proto.outputs.items()
>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>   File "apache_beam/pvalue.py", line 217, in from_runner_api
>>>>>>>>     proto.windowing_strategy_id),
>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>   File "apache_beam/transforms/core.py", line 2597, in from_runner_api
>>>>>>>>     windowfn=WindowFn.from_runner_api(proto.window_fn, context),
>>>>>>>>   File "apache_beam/utils/urns.py", line 186, in from_runner_api
>>>>>>>>     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>>>>>>> KeyError: 'beam:window_fn:serialized_java:v1'
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> You should be able to use a WindowInto with any of the common
>>>>>>>>> windowing operations (e.g. global, fixed, sliding, sessions) in an
>>>>>>>>> external transform. You should also be able to window into an
>>>>>>>>> arbitrary WindowFn as long as it produces standards window types,
>>>>>>>>> but
>>>>>>>>> if there's a bug here you could possibly work around it by
>>>>>>>>> windowing
>>>>>>>>> into a more standard windowing fn before returning.
>>>>>>>>>
>>>>>>>>> What is the full traceback?
>>>>>>>>>
>>>>>>>>> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>> >
>>>>>>>>> > Hi team,
>>>>>>>>> >
>>>>>>>>> > I'm trying to create an External transform in Java SDK, which
>>>>>>>>> expands into several ParDo and a Window.into(FixWindow). When I use 
>>>>>>>>> this
>>>>>>>>> transform in Python SDK, I get an pipeline construction error:
>>>>>>>>> >
>>>>>>>>> > apache_beam/utils/urns.py", line 186, in from_runner_api
>>>>>>>>> >     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>>>>>>>> > KeyError: 'beam:window_fn:serialized_java:v1'
>>>>>>>>> >
>>>>>>>>> > Is it expected that I cannot use a Window.into when building
>>>>>>>>> External Ptransform? Or do I miss anything here?
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > Thanks for your help!
>>>>>>>>>
>>>>>>>>

Reply via email to