Ankur Goenka created BEAM-7252:
----------------------------------

             Summary: "beam:java:boundedsource" not supported with python 
optimizer
                 Key: BEAM-7252
                 URL: https://issues.apache.org/jira/browse/BEAM-7252
             Project: Beam
          Issue Type: Improvement
          Components: sdk-py-core
            Reporter: Ankur Goenka


python pipeline optimizer does not handle external transforms.

 

Relevant error stack


======================================================================
ERROR: test_external_transforms (__main__.FlinkRunnerTestOptimized)
----------------------------------------------------------------------
Traceback (most recent call last):
 File 
"/tmp/beam/beam/sdks/python/apache_beam/runners/portability/flink_runner_test.py",
 line 174, in test_external_transforms
 assert_that(res, equal_to([i for i in range(1, 10)]))
 File "/tmp/beam/beam/sdks/python/apache_beam/pipeline.py", line 426, in 
__exit__
 self.run().wait_until_finish()
 File 
"/tmp/beam/beam/sdks/python/apache_beam/runners/portability/portable_runner.py",
 line 436, in wait_until_finish
 self._job_id, self._state, self._last_error_message()))
RuntimeError: Pipeline 
test_external_transforms_1557358286.71_f49d7fd6-7c14-4ded-8946-3ac3dad4d4c9 
failed in state FAILED: java.lang.RuntimeException: Error received from SDK 
harness for instruction 4: Traceback (most recent call last):
 File "/tmp/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", 
line 157, in _execute
 response = task()
 File "/tmp/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", 
line 190, in <lambda>
 self._execute(lambda: worker.do_instruction(work), work)
 File "/tmp/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", 
line 333, in do_instruction
 request.instruction_id)
 File "/tmp/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", 
line 353, in process_bundle
 instruction_id, request.process_bundle_descriptor_reference)
 File "/tmp/beam/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", 
line 305, in get
 self.data_channel_factory)
 File 
"/tmp/beam/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", 
line 501, in __init__
 self.ops = self.create_execution_tree(self.process_bundle_descriptor)
 File 
"/tmp/beam/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", 
line 545, in create_execution_tree
 descriptor.transforms, key=topological_height, reverse=True)])
 File 
"/tmp/beam/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", 
line 467, in wrapper
 result = cache[args] = func(*args)
 File 
"/tmp/beam/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", 
line 528, in get_operation
 in descriptor.transforms[transform_id].outputs.items()
 File 
"/tmp/beam/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", 
line 527, in <dictcomp>
 for tag, pcoll_id
 File 
"/tmp/beam/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", 
line 467, in wrapper
 result = cache[args] = func(*args)
 File 
"/tmp/beam/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", 
line 531, in get_operation
 transform_id, transform_consumers)
 File 
"/tmp/beam/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", 
line 790, in create_operation
 return creator(self, transform_id, transform_proto, payload, consumers)
 File 
"/tmp/beam/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", 
line 957, in create
 parameter.source, factory.context),
 File "/tmp/beam/beam/sdks/python/apache_beam/utils/urns.py", line 113, in 
from_runner_api
 parameter_type, constructor = cls._known_urns[fn_proto.spec.urn]
KeyError: u'urn:beam:java:boundedsource:v1'



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to