Please follow https://issues.apache.org/jira/browse/BEAM-11360 instead.
Thanks, Cham On Mon, Nov 30, 2020 at 10:26 AM Steve Niemitz <[email protected]> wrote: > alright, thank you. Is BEAM-10507 the jira to watch for any progress on > that? > > On Mon, Nov 30, 2020 at 12:55 PM Boyuan Zhang <[email protected]> wrote: > >> Hi Steve, >> >> Unfortunately I don't think there is a workaround before we have the >> change that Cham mentions. >> >> On Mon, Nov 30, 2020 at 8:16 AM Steve Niemitz <[email protected]> >> wrote: >> >>> I'm trying to write an xlang transform that uses Reshuffle internally, >>> and ran into this as well. Is there any workaround to this for now (other >>> than removing the reshuffle), or do I just need to wait for what Chamikara >>> mentioned? I noticed the same issue was mentioned in the SnowflakeIO.Read >>> PR as well [1]. >>> >>> https://github.com/apache/beam/pull/12149#discussion_r463710165 >>> >>> On Wed, Aug 26, 2020 at 10:55 PM Boyuan Zhang <[email protected]> >>> wrote: >>> >>>> That explains a lot. Thanks, Cham! >>>> >>>> On Wed, Aug 26, 2020 at 7:44 PM Chamikara Jayalath < >>>> [email protected]> wrote: >>>> >>>>> Due to the proto -> object -> proto conversion we do today, Python >>>>> needs to parse the full sub-graph from Java. We have hooks for PTransforms >>>>> and Coders but not for windowing operations. This limitation will go away >>>>> after we have direct Beam proto to Dataflow proto conversion in place. >>>>> >>>>> On Wed, Aug 26, 2020 at 7:03 PM Robert Burke <[email protected]> >>>>> wrote: >>>>> >>>>>> Coders should only be checked over the language boundaries. >>>>>> >>>>>> On Wed, Aug 26, 2020, 6:24 PM Boyuan Zhang <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Thanks Cham! >>>>>>> >>>>>>> I just realized that the *beam:window_fn:serialized_**java:v1 *is >>>>>>> introduced by Java *Reshuffle.viaRandomKey()*. But >>>>>>> *Reshuffle.viaRandomKey()* does rewindowed into original window >>>>>>> strategy(which is *GlobalWindows *in my case). Is it expected that >>>>>>> we also check intermediate PCollection rather than only the PCollection >>>>>>> that across the language boundary? >>>>>>> >>>>>>> More about my Ptransform: >>>>>>> MyExternalPTransform -- expand to -- ParDo() -> >>>>>>> Reshuffle.viaRandomKey() -> ParDo() -> WindowInto(FixWindow) -> ParDo() >>>>>>> -> >>>>>>> output void >>>>>>> >>>>>>> | >>>>>>> >>>>>>> -> >>>>>>> ParDo() -> output PCollection to Python SDK >>>>>>> >>>>>>> On Tue, Aug 25, 2020 at 6:29 PM Chamikara Jayalath < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Also it's strange that Java used (beam:window_fn:serialized_java:v1) >>>>>>>> for the URN here instead of "beam:window_fn:fixed_windows:v1" [1] >>>>>>>> which is what is being registered by Python [2]. This seems to be the >>>>>>>> immediate issue. Tracking bug for supporting custom windows is >>>>>>>> https://issues.apache.org/jira/browse/BEAM-10507. >>>>>>>> >>>>>>>> [1] >>>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L55 >>>>>>>> [2] >>>>>>>> https://github.com/apache/beam/blob/bd4df94ae10a7e7b0763c1917746d2faf5aeed6c/sdks/python/apache_beam/transforms/window.py#L449 >>>>>>>> >>>>>>>> On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Pipelines that use external WindowingStrategies might be failing >>>>>>>>> during proto -> object -> proto conversion we do today. This >>>>>>>>> limitation >>>>>>>>> will go away once Dataflow directly starts reading Beam protos. We are >>>>>>>>> working on this now. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Cham >>>>>>>>> >>>>>>>>> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks, Robert! I want to add more details on my External >>>>>>>>>> PTransform: >>>>>>>>>> >>>>>>>>>> MyExternalPTransform -- expand to -- ParDo() -> >>>>>>>>>> WindowInto(FixWindow) -> ParDo() -> output void >>>>>>>>>> >>>>>>>>>> | >>>>>>>>>> >>>>>>>>>> -> ParDo() -> output PCollection to Python SDK >>>>>>>>>> The full stacktrace: >>>>>>>>>> >>>>>>>>>> INFO:root:Using Java SDK harness container image >>>>>>>>>> dataflow-dev.gcr.io/boyuanz/java:latest >>>>>>>>>> Starting expansion service at localhost:53569 >>>>>>>>>> Aug 13, 2020 7:42:11 PM >>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService >>>>>>>>>> loadRegisteredTransforms >>>>>>>>>> INFO: Registering external transforms: >>>>>>>>>> [beam:external:java:kafka:read:v1, >>>>>>>>>> beam:external:java:kafka:write:v1, >>>>>>>>>> beam:external:java:jdbc:read_rows:v1, >>>>>>>>>> beam:external:java:jdbc:write:v1, >>>>>>>>>> beam:external:java:generate_sequence:v1] >>>>>>>>>> beam:external:java:kafka:read:v1: >>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e >>>>>>>>>> beam:external:java:kafka:write:v1: >>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21 >>>>>>>>>> beam:external:java:jdbc:read_rows:v1: >>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712 >>>>>>>>>> beam:external:java:jdbc:write:v1: >>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938 >>>>>>>>>> beam:external:java:generate_sequence:v1: >>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311 >>>>>>>>>> WARNING:apache_beam.options.pipeline_options_validator:Option --zone >>>>>>>>>> is deprecated. Please use --worker_zone instead. >>>>>>>>>> Aug 13, 2020 7:42:12 PM >>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService expand >>>>>>>>>> INFO: Expanding 'WriteToKafka' with URN >>>>>>>>>> 'beam:external:java:kafka:write:v1' >>>>>>>>>> Aug 13, 2020 7:42:14 PM >>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService expand >>>>>>>>>> INFO: Expanding 'ReadFromKafka' with URN >>>>>>>>>> 'beam:external:java:kafka:read:v1' >>>>>>>>>> >>>>>>>>>> WARNING:root:Make sure that locally built Python SDK docker image >>>>>>>>>> has Python 3.6 interpreter. >>>>>>>>>> INFO:root:Using Python SDK docker image: >>>>>>>>>> apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available >>>>>>>>>> at local, we will try to pull from hub.docker.com >>>>>>>>>> Traceback (most recent call last): >>>>>>>>>> File "<embedded module '_launcher'>", line 165, in >>>>>>>>>> run_filename_as_main >>>>>>>>>> File "<embedded module '_launcher'>", line 39, in _run_code_in_main >>>>>>>>>> File "apache_beam/integration/cross_language_kafkaio_test.py", >>>>>>>>>> line 87, in <module> >>>>>>>>>> run() >>>>>>>>>> File "apache_beam/integration/cross_language_kafkaio_test.py", >>>>>>>>>> line 82, in run >>>>>>>>>> test_method(beam.Pipeline(options=pipeline_options)) >>>>>>>>>> File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, >>>>>>>>>> in run_xlang_kafkaio >>>>>>>>>> pipeline.run(False) >>>>>>>>>> File "apache_beam/pipeline.py", line 534, in run >>>>>>>>>> return self.runner.run_pipeline(self, self._options) >>>>>>>>>> File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, >>>>>>>>>> in run_pipeline >>>>>>>>>> allow_proto_holders=True) >>>>>>>>>> File "apache_beam/pipeline.py", line 879, in from_runner_api >>>>>>>>>> p.transforms_stack = >>>>>>>>>> [context.transforms.get_by_id(root_transform_id)] >>>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>>> get_by_id >>>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>>> get_by_id >>>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>>> get_by_id >>>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>>> get_by_id >>>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>>> get_by_id >>>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>>> get_by_id >>>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>>> get_by_id >>>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>>> get_by_id >>>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>>> File "apache_beam/pipeline.py", line 1272, in from_runner_api >>>>>>>>>> id in proto.outputs.items() >>>>>>>>>> File "apache_beam/pipeline.py", line 1272, in <dictcomp> >>>>>>>>>> id in proto.outputs.items() >>>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>>> get_by_id >>>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>>> File "apache_beam/pvalue.py", line 217, in from_runner_api >>>>>>>>>> proto.windowing_strategy_id), >>>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>>> get_by_id >>>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>>> File "apache_beam/transforms/core.py", line 2597, in >>>>>>>>>> from_runner_api >>>>>>>>>> windowfn=WindowFn.from_runner_api(proto.window_fn, context), >>>>>>>>>> File "apache_beam/utils/urns.py", line 186, in from_runner_api >>>>>>>>>> parameter_type, constructor = cls._known_urns[fn_proto.urn] >>>>>>>>>> KeyError: 'beam:window_fn:serialized_java:v1' >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> You should be able to use a WindowInto with any of the common >>>>>>>>>>> windowing operations (e.g. global, fixed, sliding, sessions) in >>>>>>>>>>> an >>>>>>>>>>> external transform. You should also be able to window into an >>>>>>>>>>> arbitrary WindowFn as long as it produces standards window >>>>>>>>>>> types, but >>>>>>>>>>> if there's a bug here you could possibly work around it by >>>>>>>>>>> windowing >>>>>>>>>>> into a more standard windowing fn before returning. >>>>>>>>>>> >>>>>>>>>>> What is the full traceback? >>>>>>>>>>> >>>>>>>>>>> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> > >>>>>>>>>>> > Hi team, >>>>>>>>>>> > >>>>>>>>>>> > I'm trying to create an External transform in Java SDK, which >>>>>>>>>>> expands into several ParDo and a Window.into(FixWindow). When I use >>>>>>>>>>> this >>>>>>>>>>> transform in Python SDK, I get an pipeline construction error: >>>>>>>>>>> > >>>>>>>>>>> > apache_beam/utils/urns.py", line 186, in from_runner_api >>>>>>>>>>> > parameter_type, constructor = cls._known_urns[fn_proto.urn] >>>>>>>>>>> > KeyError: 'beam:window_fn:serialized_java:v1' >>>>>>>>>>> > >>>>>>>>>>> > Is it expected that I cannot use a Window.into when building >>>>>>>>>>> External Ptransform? Or do I miss anything here? >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > Thanks for your help! >>>>>>>>>>> >>>>>>>>>>
