Thanks, Robert! I want to add more details on my External PTransform:

MyExternalPTransform  -- expand to --  ParDo() -> WindowInto(FixWindow) ->
ParDo() -> output void
                                                                    |
                                                                    ->
ParDo() -> output PCollection to Python SDK
The full stacktrace:

INFO:root:Using Java SDK harness container image
dataflow-dev.gcr.io/boyuanz/java:latest
Starting expansion service at localhost:53569
Aug 13, 2020 7:42:11 PM
org.apache.beam.sdk.expansion.service.ExpansionService
loadRegisteredTransforms
INFO: Registering external transforms:
[beam:external:java:kafka:read:v1, beam:external:java:kafka:write:v1,
beam:external:java:jdbc:read_rows:v1,
beam:external:java:jdbc:write:v1,
beam:external:java:generate_sequence:v1]
        beam:external:java:kafka:read:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e
        beam:external:java:kafka:write:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21
        beam:external:java:jdbc:read_rows:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712
        beam:external:java:jdbc:write:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938
        beam:external:java:generate_sequence:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311
WARNING:apache_beam.options.pipeline_options_validator:Option --zone
is deprecated. Please use --worker_zone instead.
Aug 13, 2020 7:42:12 PM
org.apache.beam.sdk.expansion.service.ExpansionService expand
INFO: Expanding 'WriteToKafka' with URN 'beam:external:java:kafka:write:v1'
Aug 13, 2020 7:42:14 PM
org.apache.beam.sdk.expansion.service.ExpansionService expand
INFO: Expanding 'ReadFromKafka' with URN 'beam:external:java:kafka:read:v1'

WARNING:root:Make sure that locally built Python SDK docker image has
Python 3.6 interpreter.
INFO:root:Using Python SDK docker image:
apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available at
local, we will try to pull from hub.docker.com
Traceback (most recent call last):
  File "<embedded module '_launcher'>", line 165, in run_filename_as_main
  File "<embedded module '_launcher'>", line 39, in _run_code_in_main
  File "apache_beam/integration/cross_language_kafkaio_test.py", line
87, in <module>
    run()
  File "apache_beam/integration/cross_language_kafkaio_test.py", line 82, in run
    test_method(beam.Pipeline(options=pipeline_options))
  File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, in
run_xlang_kafkaio
    pipeline.run(False)
  File "apache_beam/pipeline.py", line 534, in run
    return self.runner.run_pipeline(self, self._options)
  File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, in
run_pipeline
    allow_proto_holders=True)
  File "apache_beam/pipeline.py", line 879, in from_runner_api
    p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
  File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/pipeline.py", line 1266, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/pipeline.py", line 1266, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/pipeline.py", line 1266, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/pipeline.py", line 1266, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/pipeline.py", line 1266, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/pipeline.py", line 1266, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/pipeline.py", line 1266, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/pipeline.py", line 1272, in from_runner_api
    id in proto.outputs.items()
  File "apache_beam/pipeline.py", line 1272, in <dictcomp>
    id in proto.outputs.items()
  File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/pvalue.py", line 217, in from_runner_api
    proto.windowing_strategy_id),
  File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/transforms/core.py", line 2597, in from_runner_api
    windowfn=WindowFn.from_runner_api(proto.window_fn, context),
  File "apache_beam/utils/urns.py", line 186, in from_runner_api
    parameter_type, constructor = cls._known_urns[fn_proto.urn]
KeyError: 'beam:window_fn:serialized_java:v1'


On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw <[email protected]> wrote:

> You should be able to use a WindowInto with any of the common
> windowing operations (e.g. global, fixed, sliding, sessions) in an
> external transform. You should also be able to window into an
> arbitrary WindowFn as long as it produces standards window types, but
> if there's a bug here you could possibly work around it by windowing
> into a more standard windowing fn before returning.
>
> What is the full traceback?
>
> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <[email protected]> wrote:
> >
> > Hi team,
> >
> > I'm trying to create an External transform in Java SDK, which expands
> into several ParDo and a Window.into(FixWindow). When I use this transform
> in Python SDK, I get an pipeline construction error:
> >
> > apache_beam/utils/urns.py", line 186, in from_runner_api
> >     parameter_type, constructor = cls._known_urns[fn_proto.urn]
> > KeyError: 'beam:window_fn:serialized_java:v1'
> >
> > Is it expected that I cannot use a Window.into when building External
> Ptransform? Or do I miss anything here?
> >
> >
> > Thanks for your help!
>

Reply via email to