[
https://issues.apache.org/jira/browse/BEAM-6218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16721459#comment-16721459
]
Maximilian Michels commented on BEAM-6218:
------------------------------------------
This looks like an error from the Python SDK harness.
> TensorFlow Model Analysis Fails when using the portable Flink runner
> --------------------------------------------------------------------
>
> Key: BEAM-6218
> URL: https://issues.apache.org/jira/browse/BEAM-6218
> Project: Beam
> Issue Type: Bug
> Components: runner-flink, sdk-py-harness
> Reporter: Andrew Packer
> Priority: Major
>
> Running a simple model analysis pipeline, trying to use the portable flink
> runner running against a local cluster:
> {code:python}
> import apache_beam as beam
> import tensorflow_model_analysis as tfma
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import portable_runner
> def pipeline(root):
> data_location = './dataset/'
> data = root | 'ReadData' >> beam.io.ReadFromTFRecord(data_location)
> results = data | 'ExtractEvaluateAndWriteResults' >>
> tfma.EvaluateAndWriteResults(
> eval_saved_model_path='./model/15427633886/',
> output_path='./output/',
> display_only_data_location=data_location)
> def run(argv=None):
> runner = portable_runner.PortableRunner()
> pipeline_options =
> PipelineOptions(experiments=['beam_fn_api'],sdk_location='container',job_endpoint='localhost:8099',setup_file='./setup.py')
> runner.run(pipeline, pipeline_options)
> if __name__ == '__main__':
> run()
> {code}
> Versions:
> Apache Beam 2.8.0
> TensorFlow Model Analysis: 0.9.2
> Apache Flink: 1.5.3
>
> Stack Trace:
> {code}
> [flink-runner-job-server] ERROR
> org.apache.beam.runners.flink.FlinkJobInvocation - Error during job
> invocation
> BeamApp-apacker-1212082216-2dd571ba_359d85b7-4e08-49f3-bdc7-34cdb0e779bf.
> org.apache.flink.client.program.ProgramInvocationException: Job
> 22e7e9d229977f3f0518c37f507f5e07 failed.
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:452)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> at
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:216)
> at
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:193)
> at
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
> at
> org.apache.beam.runners.flink.FlinkJobInvocation.runPipeline(FlinkJobInvocation.java:121)
> at
> org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)
> at
> org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)
> at
> org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> completed with illegal application status: UNKNOWN.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:150)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
> ... 13 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: Error received from SDK harness for instruction
> 22: Traceback (most recent call last):
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 131, in _execute
> response = task()
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 166, in <lambda>
> self._execute(lambda: worker.do_instruction(work), work)
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 212, in do_instruction
> request.instruction_id)
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 231, in process_bundle
> self.data_channel_factory)
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 343, in __init__
> self.ops = self.create_execution_tree(self.process_bundle_descriptor)
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 385, in create_execution_tree
> descriptor.transforms, key=topological_height, reverse=True)])
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 320, in wrapper
> result = cache[args] = func(*args)
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 368, in get_operation
> in descriptor.transforms[transform_id].outputs.items()
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 367, in <dictcomp>
> for tag, pcoll_id
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 320, in wrapper
> result = cache[args] = func(*args)
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 368, in get_operation
> in descriptor.transforms[transform_id].outputs.items()
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 367, in <dictcomp>
> for tag, pcoll_id
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 320, in wrapper
> result = cache[args] = func(*args)
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 368, in get_operation
> in descriptor.transforms[transform_id].outputs.items()
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 367, in <dictcomp>
> for tag, pcoll_id
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 320, in wrapper
> result = cache[args] = func(*args)
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 368, in get_operation
> in descriptor.transforms[transform_id].outputs.items()
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 367, in <dictcomp>
> for tag, pcoll_id
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 320, in wrapper
> result = cache[args] = func(*args)
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 368, in get_operation
> in descriptor.transforms[transform_id].outputs.items()
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 367, in <dictcomp>
> for tag, pcoll_id
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 320, in wrapper
> result = cache[args] = func(*args)
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 368, in get_operation
> in descriptor.transforms[transform_id].outputs.items()
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 367, in <dictcomp>
> for tag, pcoll_id
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 320, in wrapper
> result = cache[args] = func(*args)
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 368, in get_operation
> in descriptor.transforms[transform_id].outputs.items()
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 367, in <dictcomp>
> for tag, pcoll_id
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 320, in wrapper
> result = cache[args] = func(*args)
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 371, in get_operation
> transform_id, transform_consumers)
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 482, in create_operation
> return creator(self, transform_id, transform_proto, payload, consumers)
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 654, in create
> serialized_fn, parameter)
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 692, in _create_pardo_operation
> dofn_data = pickler.loads(serialized_fn)
> File
> "/usr/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py",
> line 246, in loads
> return dill.loads(s)
> File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 316, in
> loads
> return load(file, ignore)
> File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 304, in
> load
> obj = pik.load()
> File "/usr/local/lib/python2.7/pickle.py", line 864, in load
> dispatch[key](self)
> File "/usr/local/lib/python2.7/pickle.py", line 1096, in load_global
> klass = self.find_class(module, name)
> File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 465, in
> find_class
> return StockUnpickler.find_class(self, module, name)
> File "/usr/local/lib/python2.7/pickle.py", line 1132, in find_class
> klass = getattr(mod, name)
> AttributeError: 'module' object has no attribute '_SliceDoFn'
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
> at
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:263)
> at
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.$closeResource(FlinkExecutableStageFunction.java:188)
> at
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.mapPartition(FlinkExecutableStageFunction.java:188)
> at
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> ... 1 more
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)