[
https://issues.apache.org/jira/browse/BEAM-9493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17548508#comment-17548508
]
Danny McCormick commented on BEAM-9493:
---------------------------------------
This issue has been migrated to https://github.com/apache/beam/issues/20140
> Apache Beam/Dataflow flowed a CalledProcessError with
> beam.Pipeline("DataflowRunner", options=opts)
> ---------------------------------------------------------------------------------------------------
>
> Key: BEAM-9493
> URL: https://issues.apache.org/jira/browse/BEAM-9493
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow, sdk-py-core
> Affects Versions: 2.16.0
> Environment: apache-beam==2.16.0
> tensorflow==2.1.0
> tensorflow-metadata==0.15.2
> tensorflow-transform==0.15.0
> Python 2.7.13
> pip 20.0.2
> Reporter: PY
> Priority: P3
>
> def preprocess(in_test_mode):
> import os
> import os.path
> import tempfile
> from apache_beam.io import tfrecordio
> from tensorflow_transform.coders import example_proto_coder
> from tensorflow_transform.tf_metadata import dataset_metadata
> from tensorflow_transform.tf_metadata import dataset_schema
> from tensorflow_transform.beam import tft_beam_io
> from tensorflow_transform.beam.tft_beam_io import transform_fn_io
> job_name = 'preprocess-bike-features' + '-' +
> datetime.datetime.now().strftime('%y%m%d-%H%M%S')
> if in_test_mode:
> import shutil
> print('Launching local job ... hang on')
> OUTPUT_DIR = './bike_preproc_tft'
> shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
> EVERY_N = 5
> else:
> print('Launching Dataflow job {} ... hang on'.format(job_name))
> OUTPUT_DIR = 'gs://{0}/bike_preproc_tft/'.format(BUCKET)
> import subprocess
> subprocess.call('gsutil rm -r {}'.format(OUTPUT_DIR).split())
> EVERY_N = 5
> options = {
> 'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
> 'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
> 'job_name': job_name,
> 'project': PROJECT,
> 'max_num_workers': 6,
> 'teardown_policy': 'TEARDOWN_ALWAYS',
> 'no_save_main_session': True,
> 'requirements_file': 'requirements.txt'
> }
> opts = beam.pipeline.PipelineOptions(flags=[], **options)
> if in_test_mode:
> RUNNER = 'DirectRunner'
> else:
> RUNNER = 'DataflowRunner'
> # set up raw data metadata
> raw_data_schema = {
> colname : dataset_schema.ColumnSchema(tf.int64, [],
> dataset_schema.FixedColumnRepresentation())
> for colname in
> 'start_year,start_month,start_day,start_station_id,hire_count'.split(',')
> }
> raw_data_schema.update({
> 'day_of_week' : dataset_schema.ColumnSchema(tf.string, [],
> dataset_schema.FixedColumnRepresentation())
> })
> raw_data_metadata =
> dataset_metadata.DatasetMetadata(dataset_schema.Schema(raw_data_schema))
> # run Beam
> with beam.Pipeline(RUNNER, options=opts) as p:
> with beam_impl.Context(temp_dir=os.path.join(OUTPUT_DIR, 'tmp')):
> # save the raw data metadata
> raw_data_metadata | 'WriteInputMetadata' >> tft_beam_io.WriteMetadata(
> os.path.join(OUTPUT_DIR, 'metadata/rawdata_metadata'),
> pipeline=p)
> # read training data from bigquery and filter rows
> raw_data = (p
> | 'train_read' >>
> beam.io.Read(beam.io.BigQuerySource(query=create_query('train', EVERY_N),
> use_standard_sql=True))
> | 'train_filter' >> beam.Filter(is_valid))
> raw_dataset = (raw_data, raw_data_metadata)
> # analyze and transform training data
> transformed_dataset, transform_fn = (
> raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocess_tft))
> transformed_data, transformed_metadata = transformed_dataset
> # save transformed training data to disk in efficient tfrecord format
> transformed_data | 'WriteTrainData' >> tfrecordio.WriteToTFRecord(
> os.path.join(OUTPUT_DIR, 'train'),
> file_name_suffix='.gz',
> coder=example_proto_coder.ExampleProtoCoder(
> transformed_metadata.schema))
> # read eval data from bigquery and filter rows
> raw_test_data = (p
> | 'eval_read' >>
> beam.io.Read(beam.io.BigQuerySource(query=create_query('valid', EVERY_N),
> use_standard_sql=True))
> | 'eval_filter' >> beam.Filter(is_valid))
> raw_test_dataset = (raw_test_data, raw_data_metadata)
> # transform eval data
> transformed_test_dataset = (
> (raw_test_dataset, transform_fn) | beam_impl.TransformDataset())
> transformed_test_data, _ = transformed_test_dataset
> # save transformed training data to disk in efficient tfrecord format
> transformed_test_data | 'WriteTestData' >> tfrecordio.WriteToTFRecord(
> os.path.join(OUTPUT_DIR, 'eval'),
> file_name_suffix='.gz',
> coder=example_proto_coder.ExampleProtoCoder(
> transformed_metadata.schema))
> # save transformation function to disk for use at serving time
> transform_fn | 'WriteTransformFn' >> transform_fn_io.WriteTransformFn(
> os.path.join(OUTPUT_DIR, 'metadata'))
>
> When I ran the above preprocessing function with RUNNER='DATAFLOWRUNNER', the
> following error pops up.
>
> ---------------------------------------------------------------------------
> CalledProcessError Traceback (most recent call last)
> ~/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py in
> check_output(*args, **kwargs)
> 82 try:
> ---> 83 out = subprocess.check_output(*args, **kwargs)
> 84 except OSError:
> /usr/lib/python3.5/subprocess.py in check_output(timeout, *popenargs,
> **kwargs)
> 315 return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
> --> 316 **kwargs).stdout
> 317
> /usr/lib/python3.5/subprocess.py in run(input, timeout, check, *popenargs,
> **kwargs)
> 397 raise CalledProcessError(retcode, process.args,
> --> 398 output=stdout, stderr=stderr)
> 399 return CompletedProcess(process.args, retcode, stdout, stderr)
> CalledProcessError: Command '['/usr/bin/python3', '-m', 'pip', 'download',
> '--dest', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt',
> '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit
> status 1
> During handling of the above exception, another exception occurred:
> RuntimeError Traceback (most recent call last)
> <ipython-input-13-eac0bb8c8400> in <module>
> 131 os.path.join(OUTPUT_DIR, 'metadata'))
> 132
> --> 133 preprocess(in_test_mode=False) # change to True to run locally
> <ipython-input-13-eac0bb8c8400> in preprocess(in_test_mode)
> 129 # save transformation function to disk for use at serving time
> 130 transform_fn | 'WriteTransformFn' >> transform_fn_io.WriteTransformFn(
> --> 131 os.path.join(OUTPUT_DIR, 'metadata'))
> 132
> 133 preprocess(in_test_mode=False) # change to True to run locally
> ~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in
> __exit__(self, exc_type, exc_val, exc_tb)
> 425 def __exit__(self, exc_type, exc_val, exc_tb):
> 426 if not exc_type:
> --> 427 self.run().wait_until_finish()
> 428
> 429 def visit(self, visitor):
> ~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in run(self,
> test_runner_api)
> 405 self.to_runner_api(use_fake_coders=True),
> 406 self.runner,
> --> 407 self._options).run(False)
> 408
> 409 if self._options.view_as(TypeOptions).runtime_type_check:
> ~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in run(self,
> test_runner_api)
> 418 finally:
> 419 shutil.rmtree(tmpdir)
> --> 420 return self.runner.run_pipeline(self, self._options)
> 421
> 422 def __enter__(self):
> ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/dataflow_runner.py
> in run_pipeline(self, pipeline, options)
> 483 # raise an exception.
> 484 result = DataflowPipelineResult(
> --> 485 self.dataflow_client.create_job(self.job), self)
> 486
> 487 # TODO(BEAM-4274): Circular import runners-metrics. Requires refactoring.
> ~/.local/lib/python3.5/site-packages/apache_beam/utils/retry.py in
> wrapper(*args, **kwargs)
> 204 while True:
> 205 try:
> --> 206 return fun(*args, **kwargs)
> 207 except Exception as exn: # pylint: disable=broad-except
> 208 if not retry_filter(exn):
> ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py
> in create_job(self, job)
> 529 def create_job(self, job):
> 530 """Creates job description. May stage and/or submit for remote
> execution."""
> --> 531 self.create_job_description(job)
> 532
> 533 # Stage and submit the job when necessary
> ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py
> in create_job_description(self, job)
> 559
> 560 # Stage other resources for the SDK harness
> --> 561 resources = self._stage_resources(job.options)
> 562
> 563 job.proto.environment = Environment(
> ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py
> in _stage_resources(self, options)
> 489 options,
> 490 temp_dir=tempfile.mkdtemp(),
> --> 491 staging_location=google_cloud_options.staging_location)
> 492 return resources
> 493
> ~/.local/lib/python3.5/site-packages/apache_beam/runners/portability/stager.py
> in stage_job_resources(self, options, build_setup_args, temp_dir,
> populate_requirements_cache, staging_location)
> 166 (populate_requirements_cache if populate_requirements_cache else
> 167 Stager._populate_requirements_cache)(setup_options.requirements_file,
> --> 168 requirements_cache_path)
> 169 for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
> 170 self.stage_artifact(
> ~/.local/lib/python3.5/site-packages/apache_beam/utils/retry.py in
> wrapper(*args, **kwargs)
> 204 while True:
> 205 try:
> --> 206 return fun(*args, **kwargs)
> 207 except Exception as exn: # pylint: disable=broad-except
> 208 if not retry_filter(exn):
> ~/.local/lib/python3.5/site-packages/apache_beam/runners/portability/stager.py
> in _populate_requirements_cache(requirements_file, cache_dir)
> 485 ]
> 486 logging.info('Executing command: %s', cmd_args)
> --> 487 processes.check_output(cmd_args, stderr=processes.STDOUT)
> 488
> 489 @staticmethod
> ~/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py in
> check_output(*args, **kwargs)
> 89 "Full traceback: {} \n Pip install failed for package: {} \
> 90 \n Output from execution of subprocess: {}" \
> ---> 91 .format(traceback.format_exc(), args[0][6], error.output))
> 92 else:
> 93 raise RuntimeError("Full trace: {}, \
> RuntimeError: Full traceback: Traceback (most recent call last):
> File
> "/home/jupyter/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py",
> line 83, in check_output
> out = subprocess.check_output(*args, **kwargs)
> File "/usr/lib/python3.5/subprocess.py", line 316, in check_output
> **kwargs).stdout
> File "/usr/lib/python3.5/subprocess.py", line 398, in run
> output=stdout, stderr=stderr)
> subprocess.CalledProcessError: Command '['/usr/bin/python3', '-m', 'pip',
> 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r',
> 'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned
> non-zero exit status 1
> Pip install failed for package: -r
> Output from execution of subprocess: b"Collecting
> tensorflow-transform==0.15.0\n Using cached
> tensorflow-transform-0.15.0.tar.gz (222 kB)\n Saved
> /tmp/dataflow-requirements-cache/tensorflow-transform-0.15.0.tar.gz\nCollecting
> absl-py<0.9,>=0.7\n Using cached absl-py-0.8.1.tar.gz (103 kB)\n Saved
> /tmp/dataflow-requirements-cache/absl-py-0.8.1.tar.gz\nCollecting
> apache-beam[gcp]<3,>=2.16\n Using cached apache-beam-2.19.0.zip (1.9 MB)\n
> Saved /tmp/dataflow-requirements-cache/apache-beam-2.19.0.zip\nCollecting
> numpy<2,>=1.16\n Using cached numpy-1.18.1.zip (5.4 MB)\n Installing build
> dependencies: started\n Installing build dependencies: still running...\n
> Installing build dependencies: finished with status 'done'\n Getting
> requirements to build wheel: started\n Getting requirements to build wheel:
> finished with status 'done'\n Preparing wheel metadata: started\n Preparing
> wheel metadata: finished with status 'done'\n Saved
> /tmp/dataflow-requirements-cache/numpy-1.18.1.zip\nCollecting
> protobuf<4,>=3.7\n Using cached protobuf-3.11.3.tar.gz (264 kB)\n Saved
> /tmp/dataflow-requirements-cache/protobuf-3.11.3.tar.gz\nCollecting
> pydot<2,>=1.2\n Using cached pydot-1.4.1.tar.gz (128 kB)\n Saved
> /tmp/dataflow-requirements-cache/pydot-1.4.1.tar.gz\nCollecting
> six<2,>=1.10\n Using cached six-1.14.0.tar.gz (33 kB)\n Saved
> /tmp/dataflow-requirements-cache/six-1.14.0.tar.gz\nERROR: Could not find a
> version that satisfies the requirement tensorflow-metadata<0.16,>=0.15 (from
> tensorflow-transform==0.15.0->-r requirements.txt (line 1)) (from versions:
> 0.6.0, 0.9.0, 0.12.1)\nERROR: No matching distribution found for
> tensorflow-metadata<0.16,>=0.15 (from tensorflow-transform==0.15.0->-r
> requirements.txt (line 1))\n"
--
This message was sent by Atlassian Jira
(v8.20.7#820007)