Actually Reshuffle uses a custom non-merging Window (IdentityWindowFn) [1].
Dataflow Runner v2 (which is required for multi-language pipelines on
Dataflow) currently does not support custom windowing functions I believe.
So getting Reshuffle (and by extension connectors such as Snowflake) for
Dataflow Python as a cross-language transform will require support for
custom Window functions on Dataflow Runner v2 in addition to
https://issues.apache.org/jira/browse/BEAM-11360 unfortunately. I believe
we are working on the prior but I'm not sure about the exact ETA. Latter
should be done by the end of the quarter.

+Harsh Vardhan <anan...@google.com> +Robert Bradshaw <rober...@google.com>

Thanks,
Cham

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L79

On Mon, Nov 30, 2020 at 10:35 AM Chamikara Jayalath <chamik...@google.com>
wrote:

> Please follow https://issues.apache.org/jira/browse/BEAM-11360 instead.
>
> Thanks,
> Cham
>
> On Mon, Nov 30, 2020 at 10:26 AM Steve Niemitz <sniem...@apache.org>
> wrote:
>
>> alright, thank you.  Is BEAM-10507 the jira to watch for any progress on
>> that?
>>
>> On Mon, Nov 30, 2020 at 12:55 PM Boyuan Zhang <boyu...@google.com> wrote:
>>
>>> Hi Steve,
>>>
>>> Unfortunately I don't think there is a workaround before we have the
>>> change that Cham mentions.
>>>
>>> On Mon, Nov 30, 2020 at 8:16 AM Steve Niemitz <sniem...@apache.org>
>>> wrote:
>>>
>>>> I'm trying to write an xlang transform that uses Reshuffle internally,
>>>> and ran into this as well.  Is there any workaround to this for now (other
>>>> than removing the reshuffle), or do I just need to wait for what Chamikara
>>>> mentioned?  I noticed the same issue was mentioned in the SnowflakeIO.Read
>>>> PR as well [1].
>>>>
>>>> https://github.com/apache/beam/pull/12149#discussion_r463710165
>>>>
>>>> On Wed, Aug 26, 2020 at 10:55 PM Boyuan Zhang <boyu...@google.com>
>>>> wrote:
>>>>
>>>>> That explains a lot. Thanks, Cham!
>>>>>
>>>>> On Wed, Aug 26, 2020 at 7:44 PM Chamikara Jayalath <
>>>>> chamik...@google.com> wrote:
>>>>>
>>>>>> Due to the proto -> object -> proto conversion we do today, Python
>>>>>> needs to parse the full sub-graph from Java. We have hooks for 
>>>>>> PTransforms
>>>>>> and Coders but not for windowing operations. This limitation will go away
>>>>>> after we have direct Beam proto to Dataflow proto conversion in place.
>>>>>>
>>>>>> On Wed, Aug 26, 2020 at 7:03 PM Robert Burke <rob...@frantil.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Coders should only be checked over the language boundaries.
>>>>>>>
>>>>>>> On Wed, Aug 26, 2020, 6:24 PM Boyuan Zhang <boyu...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Cham!
>>>>>>>>
>>>>>>>>  I just realized that the *beam:window_fn:serialized_**java:v1 *is
>>>>>>>> introduced by Java *Reshuffle.viaRandomKey()*. But
>>>>>>>> *Reshuffle.viaRandomKey()* does rewindowed into original window
>>>>>>>> strategy(which is *GlobalWindows *in my case). Is it expected that
>>>>>>>> we also check intermediate PCollection rather than only the PCollection
>>>>>>>> that across the language boundary?
>>>>>>>>
>>>>>>>> More about my Ptransform:
>>>>>>>> MyExternalPTransform  -- expand to --  ParDo() ->
>>>>>>>> Reshuffle.viaRandomKey() -> ParDo() -> WindowInto(FixWindow) -> 
>>>>>>>> ParDo() ->
>>>>>>>> output void
>>>>>>>>
>>>>>>>>                                                                  |
>>>>>>>>
>>>>>>>>                                                                   ->
>>>>>>>> ParDo() -> output PCollection to Python SDK
>>>>>>>>
>>>>>>>> On Tue, Aug 25, 2020 at 6:29 PM Chamikara Jayalath <
>>>>>>>> chamik...@google.com> wrote:
>>>>>>>>
>>>>>>>>> Also it's strange that Java used (beam:window_fn:serialized_java:v1)
>>>>>>>>> for the URN here instead of "beam:window_fn:fixed_windows:v1" [1]
>>>>>>>>> which is what is being registered by Python [2]. This seems to be the
>>>>>>>>> immediate issue. Tracking bug for supporting custom windows is
>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-10507.
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L55
>>>>>>>>> [2]
>>>>>>>>> https://github.com/apache/beam/blob/bd4df94ae10a7e7b0763c1917746d2faf5aeed6c/sdks/python/apache_beam/transforms/window.py#L449
>>>>>>>>>
>>>>>>>>> On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath <
>>>>>>>>> chamik...@google.com> wrote:
>>>>>>>>>
>>>>>>>>>> Pipelines that use external WindowingStrategies might be failing
>>>>>>>>>> during proto -> object -> proto conversion we do today. This 
>>>>>>>>>> limitation
>>>>>>>>>> will go away once Dataflow directly starts reading Beam protos. We 
>>>>>>>>>> are
>>>>>>>>>> working on this now.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Cham
>>>>>>>>>>
>>>>>>>>>> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang <boyu...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks, Robert! I want to add more details on my External
>>>>>>>>>>> PTransform:
>>>>>>>>>>>
>>>>>>>>>>> MyExternalPTransform  -- expand to --  ParDo() ->
>>>>>>>>>>> WindowInto(FixWindow) -> ParDo() -> output void
>>>>>>>>>>>
>>>>>>>>>>>     |
>>>>>>>>>>>
>>>>>>>>>>>     -> ParDo() -> output PCollection to Python SDK
>>>>>>>>>>> The full stacktrace:
>>>>>>>>>>>
>>>>>>>>>>> INFO:root:Using Java SDK harness container image 
>>>>>>>>>>> dataflow-dev.gcr.io/boyuanz/java:latest
>>>>>>>>>>> Starting expansion service at localhost:53569
>>>>>>>>>>> Aug 13, 2020 7:42:11 PM 
>>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService 
>>>>>>>>>>> loadRegisteredTransforms
>>>>>>>>>>> INFO: Registering external transforms: 
>>>>>>>>>>> [beam:external:java:kafka:read:v1, 
>>>>>>>>>>> beam:external:java:kafka:write:v1, 
>>>>>>>>>>> beam:external:java:jdbc:read_rows:v1, 
>>>>>>>>>>> beam:external:java:jdbc:write:v1, 
>>>>>>>>>>> beam:external:java:generate_sequence:v1]
>>>>>>>>>>>     beam:external:java:kafka:read:v1: 
>>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e
>>>>>>>>>>>     beam:external:java:kafka:write:v1: 
>>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21
>>>>>>>>>>>     beam:external:java:jdbc:read_rows:v1: 
>>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712
>>>>>>>>>>>     beam:external:java:jdbc:write:v1: 
>>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938
>>>>>>>>>>>     beam:external:java:generate_sequence:v1: 
>>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311
>>>>>>>>>>> WARNING:apache_beam.options.pipeline_options_validator:Option 
>>>>>>>>>>> --zone is deprecated. Please use --worker_zone instead.
>>>>>>>>>>> Aug 13, 2020 7:42:12 PM 
>>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService expand
>>>>>>>>>>> INFO: Expanding 'WriteToKafka' with URN 
>>>>>>>>>>> 'beam:external:java:kafka:write:v1'
>>>>>>>>>>> Aug 13, 2020 7:42:14 PM 
>>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService expand
>>>>>>>>>>> INFO: Expanding 'ReadFromKafka' with URN 
>>>>>>>>>>> 'beam:external:java:kafka:read:v1'
>>>>>>>>>>>
>>>>>>>>>>> WARNING:root:Make sure that locally built Python SDK docker image 
>>>>>>>>>>> has Python 3.6 interpreter.
>>>>>>>>>>> INFO:root:Using Python SDK docker image: 
>>>>>>>>>>> apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available 
>>>>>>>>>>> at local, we will try to pull from hub.docker.com
>>>>>>>>>>> Traceback (most recent call last):
>>>>>>>>>>>   File "<embedded module '_launcher'>", line 165, in 
>>>>>>>>>>> run_filename_as_main
>>>>>>>>>>>   File "<embedded module '_launcher'>", line 39, in 
>>>>>>>>>>> _run_code_in_main
>>>>>>>>>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", 
>>>>>>>>>>> line 87, in <module>
>>>>>>>>>>>     run()
>>>>>>>>>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", 
>>>>>>>>>>> line 82, in run
>>>>>>>>>>>     test_method(beam.Pipeline(options=pipeline_options))
>>>>>>>>>>>   File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, 
>>>>>>>>>>> in run_xlang_kafkaio
>>>>>>>>>>>     pipeline.run(False)
>>>>>>>>>>>   File "apache_beam/pipeline.py", line 534, in run
>>>>>>>>>>>     return self.runner.run_pipeline(self, self._options)
>>>>>>>>>>>   File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, 
>>>>>>>>>>> in run_pipeline
>>>>>>>>>>>     allow_proto_holders=True)
>>>>>>>>>>>   File "apache_beam/pipeline.py", line 879, in from_runner_api
>>>>>>>>>>>     p.transforms_stack = 
>>>>>>>>>>> [context.transforms.get_by_id(root_transform_id)]
>>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in 
>>>>>>>>>>> get_by_id
>>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in 
>>>>>>>>>>> get_by_id
>>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in 
>>>>>>>>>>> get_by_id
>>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in 
>>>>>>>>>>> get_by_id
>>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in 
>>>>>>>>>>> get_by_id
>>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in 
>>>>>>>>>>> get_by_id
>>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in 
>>>>>>>>>>> get_by_id
>>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>>>>>>>>     part = context.transforms.get_by_id(transform_id)
>>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in 
>>>>>>>>>>> get_by_id
>>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>   File "apache_beam/pipeline.py", line 1272, in from_runner_api
>>>>>>>>>>>     id in proto.outputs.items()
>>>>>>>>>>>   File "apache_beam/pipeline.py", line 1272, in <dictcomp>
>>>>>>>>>>>     id in proto.outputs.items()
>>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in 
>>>>>>>>>>> get_by_id
>>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>   File "apache_beam/pvalue.py", line 217, in from_runner_api
>>>>>>>>>>>     proto.windowing_strategy_id),
>>>>>>>>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in 
>>>>>>>>>>> get_by_id
>>>>>>>>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>>>>>>>>   File "apache_beam/transforms/core.py", line 2597, in 
>>>>>>>>>>> from_runner_api
>>>>>>>>>>>     windowfn=WindowFn.from_runner_api(proto.window_fn, context),
>>>>>>>>>>>   File "apache_beam/utils/urns.py", line 186, in from_runner_api
>>>>>>>>>>>     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>>>>>>>>>> KeyError: 'beam:window_fn:serialized_java:v1'
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw <
>>>>>>>>>>> rober...@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> You should be able to use a WindowInto with any of the common
>>>>>>>>>>>> windowing operations (e.g. global, fixed, sliding, sessions) in
>>>>>>>>>>>> an
>>>>>>>>>>>> external transform. You should also be able to window into an
>>>>>>>>>>>> arbitrary WindowFn as long as it produces standards window
>>>>>>>>>>>> types, but
>>>>>>>>>>>> if there's a bug here you could possibly work around it by
>>>>>>>>>>>> windowing
>>>>>>>>>>>> into a more standard windowing fn before returning.
>>>>>>>>>>>>
>>>>>>>>>>>> What is the full traceback?
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <
>>>>>>>>>>>> boyu...@google.com> wrote:
>>>>>>>>>>>> >
>>>>>>>>>>>> > Hi team,
>>>>>>>>>>>> >
>>>>>>>>>>>> > I'm trying to create an External transform in Java SDK, which
>>>>>>>>>>>> expands into several ParDo and a Window.into(FixWindow). When I 
>>>>>>>>>>>> use this
>>>>>>>>>>>> transform in Python SDK, I get an pipeline construction error:
>>>>>>>>>>>> >
>>>>>>>>>>>> > apache_beam/utils/urns.py", line 186, in from_runner_api
>>>>>>>>>>>> >     parameter_type, constructor =
>>>>>>>>>>>> cls._known_urns[fn_proto.urn]
>>>>>>>>>>>> > KeyError: 'beam:window_fn:serialized_java:v1'
>>>>>>>>>>>> >
>>>>>>>>>>>> > Is it expected that I cannot use a Window.into when building
>>>>>>>>>>>> External Ptransform? Or do I miss anything here?
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> > Thanks for your help!
>>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to