[ https://issues.apache.org/jira/browse/BEAM-9493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
PY updated BEAM-9493: --------------------- Description: def preprocess(in_test_mode): import os import os.path import tempfile from apache_beam.io import tfrecordio from tensorflow_transform.coders import example_proto_coder from tensorflow_transform.tf_metadata import dataset_metadata from tensorflow_transform.tf_metadata import dataset_schema from tensorflow_transform.beam import tft_beam_io from tensorflow_transform.beam.tft_beam_io import transform_fn_io job_name = 'preprocess-bike-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S') if in_test_mode: import shutil print('Launching local job ... hang on') OUTPUT_DIR = './bike_preproc_tft' shutil.rmtree(OUTPUT_DIR, ignore_errors=True) EVERY_N = 5 else: print('Launching Dataflow job {} ... hang on'.format(job_name)) OUTPUT_DIR = 'gs://{0}/bike_preproc_tft/'.format(BUCKET) import subprocess subprocess.call('gsutil rm -r {}'.format(OUTPUT_DIR).split()) EVERY_N = 5 options = { 'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'), 'temp_location': os.path.join(OUTPUT_DIR, 'tmp'), 'job_name': job_name, 'project': PROJECT, 'max_num_workers': 6, 'teardown_policy': 'TEARDOWN_ALWAYS', 'no_save_main_session': True, 'requirements_file': 'requirements.txt' } opts = beam.pipeline.PipelineOptions(flags=[], **options) if in_test_mode: RUNNER = 'DirectRunner' else: RUNNER = 'DataflowRunner' # set up raw data metadata raw_data_schema = { colname : dataset_schema.ColumnSchema(tf.int64, [], dataset_schema.FixedColumnRepresentation()) for colname in 'start_year,start_month,start_day,start_station_id,hire_count'.split(',') } raw_data_schema.update({ 'day_of_week' : dataset_schema.ColumnSchema(tf.string, [], dataset_schema.FixedColumnRepresentation()) }) raw_data_metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema(raw_data_schema)) # run Beam with beam.Pipeline(RUNNER, options=opts) as p: with beam_impl.Context(temp_dir=os.path.join(OUTPUT_DIR, 'tmp')): # save the raw data metadata raw_data_metadata | 'WriteInputMetadata' >> tft_beam_io.WriteMetadata( os.path.join(OUTPUT_DIR, 'metadata/rawdata_metadata'), pipeline=p) # read training data from bigquery and filter rows raw_data = (p | 'train_read' >> beam.io.Read(beam.io.BigQuerySource(query=create_query('train', EVERY_N), use_standard_sql=True)) | 'train_filter' >> beam.Filter(is_valid)) raw_dataset = (raw_data, raw_data_metadata) # analyze and transform training data transformed_dataset, transform_fn = ( raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocess_tft)) transformed_data, transformed_metadata = transformed_dataset # save transformed training data to disk in efficient tfrecord format transformed_data | 'WriteTrainData' >> tfrecordio.WriteToTFRecord( os.path.join(OUTPUT_DIR, 'train'), file_name_suffix='.gz', coder=example_proto_coder.ExampleProtoCoder( transformed_metadata.schema)) # read eval data from bigquery and filter rows raw_test_data = (p | 'eval_read' >> beam.io.Read(beam.io.BigQuerySource(query=create_query('valid', EVERY_N), use_standard_sql=True)) | 'eval_filter' >> beam.Filter(is_valid)) raw_test_dataset = (raw_test_data, raw_data_metadata) # transform eval data transformed_test_dataset = ( (raw_test_dataset, transform_fn) | beam_impl.TransformDataset()) transformed_test_data, _ = transformed_test_dataset # save transformed training data to disk in efficient tfrecord format transformed_test_data | 'WriteTestData' >> tfrecordio.WriteToTFRecord( os.path.join(OUTPUT_DIR, 'eval'), file_name_suffix='.gz', coder=example_proto_coder.ExampleProtoCoder( transformed_metadata.schema)) # save transformation function to disk for use at serving time transform_fn | 'WriteTransformFn' >> transform_fn_io.WriteTransformFn( os.path.join(OUTPUT_DIR, 'metadata')) --------------------------------------------------------------------------- CalledProcessError Traceback (most recent call last) ~/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py in check_output(*args, **kwargs) 82 try: ---> 83 out = subprocess.check_output(*args, **kwargs) 84 except OSError: /usr/lib/python3.5/subprocess.py in check_output(timeout, *popenargs, **kwargs) 315 return run(*popenargs, stdout=PIPE, timeout=timeout, check=True, --> 316 **kwargs).stdout 317 /usr/lib/python3.5/subprocess.py in run(input, timeout, check, *popenargs, **kwargs) 397 raise CalledProcessError(retcode, process.args, --> 398 output=stdout, stderr=stderr) 399 return CompletedProcess(process.args, retcode, stdout, stderr) CalledProcessError: Command '['/usr/bin/python3', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1 During handling of the above exception, another exception occurred: RuntimeError Traceback (most recent call last) <ipython-input-13-eac0bb8c8400> in <module> 131 os.path.join(OUTPUT_DIR, 'metadata')) 132 --> 133 preprocess(in_test_mode=False) # change to True to run locally <ipython-input-13-eac0bb8c8400> in preprocess(in_test_mode) 129 # save transformation function to disk for use at serving time 130 transform_fn | 'WriteTransformFn' >> transform_fn_io.WriteTransformFn( --> 131 os.path.join(OUTPUT_DIR, 'metadata')) 132 133 preprocess(in_test_mode=False) # change to True to run locally ~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in __exit__(self, exc_type, exc_val, exc_tb) 425 def __exit__(self, exc_type, exc_val, exc_tb): 426 if not exc_type: --> 427 self.run().wait_until_finish() 428 429 def visit(self, visitor): ~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in run(self, test_runner_api) 405 self.to_runner_api(use_fake_coders=True), 406 self.runner, --> 407 self._options).run(False) 408 409 if self._options.view_as(TypeOptions).runtime_type_check: ~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in run(self, test_runner_api) 418 finally: 419 shutil.rmtree(tmpdir) --> 420 return self.runner.run_pipeline(self, self._options) 421 422 def __enter__(self): ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/dataflow_runner.py in run_pipeline(self, pipeline, options) 483 # raise an exception. 484 result = DataflowPipelineResult( --> 485 self.dataflow_client.create_job(self.job), self) 486 487 # TODO(BEAM-4274): Circular import runners-metrics. Requires refactoring. ~/.local/lib/python3.5/site-packages/apache_beam/utils/retry.py in wrapper(*args, **kwargs) 204 while True: 205 try: --> 206 return fun(*args, **kwargs) 207 except Exception as exn: # pylint: disable=broad-except 208 if not retry_filter(exn): ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py in create_job(self, job) 529 def create_job(self, job): 530 """Creates job description. May stage and/or submit for remote execution.""" --> 531 self.create_job_description(job) 532 533 # Stage and submit the job when necessary ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py in create_job_description(self, job) 559 560 # Stage other resources for the SDK harness --> 561 resources = self._stage_resources(job.options) 562 563 job.proto.environment = Environment( ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py in _stage_resources(self, options) 489 options, 490 temp_dir=tempfile.mkdtemp(), --> 491 staging_location=google_cloud_options.staging_location) 492 return resources 493 ~/.local/lib/python3.5/site-packages/apache_beam/runners/portability/stager.py in stage_job_resources(self, options, build_setup_args, temp_dir, populate_requirements_cache, staging_location) 166 (populate_requirements_cache if populate_requirements_cache else 167 Stager._populate_requirements_cache)(setup_options.requirements_file, --> 168 requirements_cache_path) 169 for pkg in glob.glob(os.path.join(requirements_cache_path, '*')): 170 self.stage_artifact( ~/.local/lib/python3.5/site-packages/apache_beam/utils/retry.py in wrapper(*args, **kwargs) 204 while True: 205 try: --> 206 return fun(*args, **kwargs) 207 except Exception as exn: # pylint: disable=broad-except 208 if not retry_filter(exn): ~/.local/lib/python3.5/site-packages/apache_beam/runners/portability/stager.py in _populate_requirements_cache(requirements_file, cache_dir) 485 ] 486 logging.info('Executing command: %s', cmd_args) --> 487 processes.check_output(cmd_args, stderr=processes.STDOUT) 488 489 @staticmethod ~/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py in check_output(*args, **kwargs) 89 "Full traceback: {} \n Pip install failed for package: {} \ 90 \n Output from execution of subprocess: {}" \ ---> 91 .format(traceback.format_exc(), args[0][6], error.output)) 92 else: 93 raise RuntimeError("Full trace: {}, \ RuntimeError: Full traceback: Traceback (most recent call last): File "/home/jupyter/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py", line 83, in check_output out = subprocess.check_output(*args, **kwargs) File "/usr/lib/python3.5/subprocess.py", line 316, in check_output **kwargs).stdout File "/usr/lib/python3.5/subprocess.py", line 398, in run output=stdout, stderr=stderr) subprocess.CalledProcessError: Command '['/usr/bin/python3', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1 Pip install failed for package: -r Output from execution of subprocess: b"Collecting tensorflow-transform==0.15.0\n Using cached tensorflow-transform-0.15.0.tar.gz (222 kB)\n Saved /tmp/dataflow-requirements-cache/tensorflow-transform-0.15.0.tar.gz\nCollecting absl-py<0.9,>=0.7\n Using cached absl-py-0.8.1.tar.gz (103 kB)\n Saved /tmp/dataflow-requirements-cache/absl-py-0.8.1.tar.gz\nCollecting apache-beam[gcp]<3,>=2.16\n Using cached apache-beam-2.19.0.zip (1.9 MB)\n Saved /tmp/dataflow-requirements-cache/apache-beam-2.19.0.zip\nCollecting numpy<2,>=1.16\n Using cached numpy-1.18.1.zip (5.4 MB)\n Installing build dependencies: started\n Installing build dependencies: still running...\n Installing build dependencies: finished with status 'done'\n Getting requirements to build wheel: started\n Getting requirements to build wheel: finished with status 'done'\n Preparing wheel metadata: started\n Preparing wheel metadata: finished with status 'done'\n Saved /tmp/dataflow-requirements-cache/numpy-1.18.1.zip\nCollecting protobuf<4,>=3.7\n Using cached protobuf-3.11.3.tar.gz (264 kB)\n Saved /tmp/dataflow-requirements-cache/protobuf-3.11.3.tar.gz\nCollecting pydot<2,>=1.2\n Using cached pydot-1.4.1.tar.gz (128 kB)\n Saved /tmp/dataflow-requirements-cache/pydot-1.4.1.tar.gz\nCollecting six<2,>=1.10\n Using cached six-1.14.0.tar.gz (33 kB)\n Saved /tmp/dataflow-requirements-cache/six-1.14.0.tar.gz\nERROR: Could not find a version that satisfies the requirement tensorflow-metadata<0.16,>=0.15 (from tensorflow-transform==0.15.0->-r requirements.txt (line 1)) (from versions: 0.6.0, 0.9.0, 0.12.1)\nERROR: No matching distribution found for tensorflow-metadata<0.16,>=0.15 (from tensorflow-transform==0.15.0->-r requirements.txt (line 1))\n" was: --------------------------------------------------------------------------- CalledProcessError Traceback (most recent call last) ~/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py in check_output(*args, **kwargs) 82 try: ---> 83 out = subprocess.check_output(*args, **kwargs) 84 except OSError: /usr/lib/python3.5/subprocess.py in check_output(timeout, *popenargs, **kwargs) 315 return run(*popenargs, stdout=PIPE, timeout=timeout, check=True, --> 316 **kwargs).stdout 317 /usr/lib/python3.5/subprocess.py in run(input, timeout, check, *popenargs, **kwargs) 397 raise CalledProcessError(retcode, process.args, --> 398 output=stdout, stderr=stderr) 399 return CompletedProcess(process.args, retcode, stdout, stderr) CalledProcessError: Command '['/usr/bin/python3', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1 During handling of the above exception, another exception occurred: RuntimeError Traceback (most recent call last) <ipython-input-13-eac0bb8c8400> in <module> 131 os.path.join(OUTPUT_DIR, 'metadata')) 132 --> 133 preprocess(in_test_mode=False) # change to True to run locally <ipython-input-13-eac0bb8c8400> in preprocess(in_test_mode) 129 # save transformation function to disk for use at serving time 130 transform_fn | 'WriteTransformFn' >> transform_fn_io.WriteTransformFn( --> 131 os.path.join(OUTPUT_DIR, 'metadata')) 132 133 preprocess(in_test_mode=False) # change to True to run locally ~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in __exit__(self, exc_type, exc_val, exc_tb) 425 def __exit__(self, exc_type, exc_val, exc_tb): 426 if not exc_type: --> 427 self.run().wait_until_finish() 428 429 def visit(self, visitor): ~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in run(self, test_runner_api) 405 self.to_runner_api(use_fake_coders=True), 406 self.runner, --> 407 self._options).run(False) 408 409 if self._options.view_as(TypeOptions).runtime_type_check: ~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in run(self, test_runner_api) 418 finally: 419 shutil.rmtree(tmpdir) --> 420 return self.runner.run_pipeline(self, self._options) 421 422 def __enter__(self): ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/dataflow_runner.py in run_pipeline(self, pipeline, options) 483 # raise an exception. 484 result = DataflowPipelineResult( --> 485 self.dataflow_client.create_job(self.job), self) 486 487 # TODO(BEAM-4274): Circular import runners-metrics. Requires refactoring. ~/.local/lib/python3.5/site-packages/apache_beam/utils/retry.py in wrapper(*args, **kwargs) 204 while True: 205 try: --> 206 return fun(*args, **kwargs) 207 except Exception as exn: # pylint: disable=broad-except 208 if not retry_filter(exn): ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py in create_job(self, job) 529 def create_job(self, job): 530 """Creates job description. May stage and/or submit for remote execution.""" --> 531 self.create_job_description(job) 532 533 # Stage and submit the job when necessary ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py in create_job_description(self, job) 559 560 # Stage other resources for the SDK harness --> 561 resources = self._stage_resources(job.options) 562 563 job.proto.environment = Environment( ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py in _stage_resources(self, options) 489 options, 490 temp_dir=tempfile.mkdtemp(), --> 491 staging_location=google_cloud_options.staging_location) 492 return resources 493 ~/.local/lib/python3.5/site-packages/apache_beam/runners/portability/stager.py in stage_job_resources(self, options, build_setup_args, temp_dir, populate_requirements_cache, staging_location) 166 (populate_requirements_cache if populate_requirements_cache else 167 Stager._populate_requirements_cache)(setup_options.requirements_file, --> 168 requirements_cache_path) 169 for pkg in glob.glob(os.path.join(requirements_cache_path, '*')): 170 self.stage_artifact( ~/.local/lib/python3.5/site-packages/apache_beam/utils/retry.py in wrapper(*args, **kwargs) 204 while True: 205 try: --> 206 return fun(*args, **kwargs) 207 except Exception as exn: # pylint: disable=broad-except 208 if not retry_filter(exn): ~/.local/lib/python3.5/site-packages/apache_beam/runners/portability/stager.py in _populate_requirements_cache(requirements_file, cache_dir) 485 ] 486 logging.info('Executing command: %s', cmd_args) --> 487 processes.check_output(cmd_args, stderr=processes.STDOUT) 488 489 @staticmethod ~/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py in check_output(*args, **kwargs) 89 "Full traceback: {} \n Pip install failed for package: {} \ 90 \n Output from execution of subprocess: {}" \ ---> 91 .format(traceback.format_exc(), args[0][6], error.output)) 92 else: 93 raise RuntimeError("Full trace: {}, \ RuntimeError: Full traceback: Traceback (most recent call last): File "/home/jupyter/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py", line 83, in check_output out = subprocess.check_output(*args, **kwargs) File "/usr/lib/python3.5/subprocess.py", line 316, in check_output **kwargs).stdout File "/usr/lib/python3.5/subprocess.py", line 398, in run output=stdout, stderr=stderr) subprocess.CalledProcessError: Command '['/usr/bin/python3', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1 Pip install failed for package: -r Output from execution of subprocess: b"Collecting tensorflow-transform==0.15.0\n Using cached tensorflow-transform-0.15.0.tar.gz (222 kB)\n Saved /tmp/dataflow-requirements-cache/tensorflow-transform-0.15.0.tar.gz\nCollecting absl-py<0.9,>=0.7\n Using cached absl-py-0.8.1.tar.gz (103 kB)\n Saved /tmp/dataflow-requirements-cache/absl-py-0.8.1.tar.gz\nCollecting apache-beam[gcp]<3,>=2.16\n Using cached apache-beam-2.19.0.zip (1.9 MB)\n Saved /tmp/dataflow-requirements-cache/apache-beam-2.19.0.zip\nCollecting numpy<2,>=1.16\n Using cached numpy-1.18.1.zip (5.4 MB)\n Installing build dependencies: started\n Installing build dependencies: still running...\n Installing build dependencies: finished with status 'done'\n Getting requirements to build wheel: started\n Getting requirements to build wheel: finished with status 'done'\n Preparing wheel metadata: started\n Preparing wheel metadata: finished with status 'done'\n Saved /tmp/dataflow-requirements-cache/numpy-1.18.1.zip\nCollecting protobuf<4,>=3.7\n Using cached protobuf-3.11.3.tar.gz (264 kB)\n Saved /tmp/dataflow-requirements-cache/protobuf-3.11.3.tar.gz\nCollecting pydot<2,>=1.2\n Using cached pydot-1.4.1.tar.gz (128 kB)\n Saved /tmp/dataflow-requirements-cache/pydot-1.4.1.tar.gz\nCollecting six<2,>=1.10\n Using cached six-1.14.0.tar.gz (33 kB)\n Saved /tmp/dataflow-requirements-cache/six-1.14.0.tar.gz\nERROR: Could not find a version that satisfies the requirement tensorflow-metadata<0.16,>=0.15 (from tensorflow-transform==0.15.0->-r requirements.txt (line 1)) (from versions: 0.6.0, 0.9.0, 0.12.1)\nERROR: No matching distribution found for tensorflow-metadata<0.16,>=0.15 (from tensorflow-transform==0.15.0->-r requirements.txt (line 1))\n" > Apache Beam/Dataflow flowed a CalledProcessError with > beam.Pipeline("DataflowRunner", options=opts) > --------------------------------------------------------------------------------------------------- > > Key: BEAM-9493 > URL: https://issues.apache.org/jira/browse/BEAM-9493 > Project: Beam > Issue Type: Bug > Components: runner-dataflow > Affects Versions: 2.16.0 > Environment: apache-beam==2.16.0 > tensorflow==2.1.0 > tensorflow-metadata==0.15.2 > tensorflow-transform==0.15.0 > Python 2.7.13 > pip 20.0.2 > Reporter: PY > Priority: Blocker > > def preprocess(in_test_mode): > import os > import os.path > import tempfile > from apache_beam.io import tfrecordio > from tensorflow_transform.coders import example_proto_coder > from tensorflow_transform.tf_metadata import dataset_metadata > from tensorflow_transform.tf_metadata import dataset_schema > from tensorflow_transform.beam import tft_beam_io > from tensorflow_transform.beam.tft_beam_io import transform_fn_io > job_name = 'preprocess-bike-features' + '-' + > datetime.datetime.now().strftime('%y%m%d-%H%M%S') > if in_test_mode: > import shutil > print('Launching local job ... hang on') > OUTPUT_DIR = './bike_preproc_tft' > shutil.rmtree(OUTPUT_DIR, ignore_errors=True) > EVERY_N = 5 > else: > print('Launching Dataflow job {} ... hang on'.format(job_name)) > OUTPUT_DIR = 'gs://{0}/bike_preproc_tft/'.format(BUCKET) > import subprocess > subprocess.call('gsutil rm -r {}'.format(OUTPUT_DIR).split()) > EVERY_N = 5 > options = { > 'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'), > 'temp_location': os.path.join(OUTPUT_DIR, 'tmp'), > 'job_name': job_name, > 'project': PROJECT, > 'max_num_workers': 6, > 'teardown_policy': 'TEARDOWN_ALWAYS', > 'no_save_main_session': True, > 'requirements_file': 'requirements.txt' > } > opts = beam.pipeline.PipelineOptions(flags=[], **options) > if in_test_mode: > RUNNER = 'DirectRunner' > else: > RUNNER = 'DataflowRunner' > # set up raw data metadata > raw_data_schema = { > colname : dataset_schema.ColumnSchema(tf.int64, [], > dataset_schema.FixedColumnRepresentation()) > for colname in > 'start_year,start_month,start_day,start_station_id,hire_count'.split(',') > } > raw_data_schema.update({ > 'day_of_week' : dataset_schema.ColumnSchema(tf.string, [], > dataset_schema.FixedColumnRepresentation()) > }) > raw_data_metadata = > dataset_metadata.DatasetMetadata(dataset_schema.Schema(raw_data_schema)) > # run Beam > with beam.Pipeline(RUNNER, options=opts) as p: > with beam_impl.Context(temp_dir=os.path.join(OUTPUT_DIR, 'tmp')): > # save the raw data metadata > raw_data_metadata | 'WriteInputMetadata' >> tft_beam_io.WriteMetadata( > os.path.join(OUTPUT_DIR, 'metadata/rawdata_metadata'), > pipeline=p) > # read training data from bigquery and filter rows > raw_data = (p > | 'train_read' >> > beam.io.Read(beam.io.BigQuerySource(query=create_query('train', EVERY_N), > use_standard_sql=True)) > | 'train_filter' >> beam.Filter(is_valid)) > raw_dataset = (raw_data, raw_data_metadata) > # analyze and transform training data > transformed_dataset, transform_fn = ( > raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocess_tft)) > transformed_data, transformed_metadata = transformed_dataset > # save transformed training data to disk in efficient tfrecord format > transformed_data | 'WriteTrainData' >> tfrecordio.WriteToTFRecord( > os.path.join(OUTPUT_DIR, 'train'), > file_name_suffix='.gz', > coder=example_proto_coder.ExampleProtoCoder( > transformed_metadata.schema)) > # read eval data from bigquery and filter rows > raw_test_data = (p > | 'eval_read' >> > beam.io.Read(beam.io.BigQuerySource(query=create_query('valid', EVERY_N), > use_standard_sql=True)) > | 'eval_filter' >> beam.Filter(is_valid)) > raw_test_dataset = (raw_test_data, raw_data_metadata) > # transform eval data > transformed_test_dataset = ( > (raw_test_dataset, transform_fn) | beam_impl.TransformDataset()) > transformed_test_data, _ = transformed_test_dataset > # save transformed training data to disk in efficient tfrecord format > transformed_test_data | 'WriteTestData' >> tfrecordio.WriteToTFRecord( > os.path.join(OUTPUT_DIR, 'eval'), > file_name_suffix='.gz', > coder=example_proto_coder.ExampleProtoCoder( > transformed_metadata.schema)) > # save transformation function to disk for use at serving time > transform_fn | 'WriteTransformFn' >> transform_fn_io.WriteTransformFn( > os.path.join(OUTPUT_DIR, 'metadata')) > > > > --------------------------------------------------------------------------- > CalledProcessError Traceback (most recent call last) > ~/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py in > check_output(*args, **kwargs) > 82 try: > ---> 83 out = subprocess.check_output(*args, **kwargs) > 84 except OSError: > /usr/lib/python3.5/subprocess.py in check_output(timeout, *popenargs, > **kwargs) > 315 return run(*popenargs, stdout=PIPE, timeout=timeout, check=True, > --> 316 **kwargs).stdout > 317 > /usr/lib/python3.5/subprocess.py in run(input, timeout, check, *popenargs, > **kwargs) > 397 raise CalledProcessError(retcode, process.args, > --> 398 output=stdout, stderr=stderr) > 399 return CompletedProcess(process.args, retcode, stdout, stderr) > CalledProcessError: Command '['/usr/bin/python3', '-m', 'pip', 'download', > '--dest', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt', > '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit > status 1 > During handling of the above exception, another exception occurred: > RuntimeError Traceback (most recent call last) > <ipython-input-13-eac0bb8c8400> in <module> > 131 os.path.join(OUTPUT_DIR, 'metadata')) > 132 > --> 133 preprocess(in_test_mode=False) # change to True to run locally > <ipython-input-13-eac0bb8c8400> in preprocess(in_test_mode) > 129 # save transformation function to disk for use at serving time > 130 transform_fn | 'WriteTransformFn' >> transform_fn_io.WriteTransformFn( > --> 131 os.path.join(OUTPUT_DIR, 'metadata')) > 132 > 133 preprocess(in_test_mode=False) # change to True to run locally > ~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in > __exit__(self, exc_type, exc_val, exc_tb) > 425 def __exit__(self, exc_type, exc_val, exc_tb): > 426 if not exc_type: > --> 427 self.run().wait_until_finish() > 428 > 429 def visit(self, visitor): > ~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in run(self, > test_runner_api) > 405 self.to_runner_api(use_fake_coders=True), > 406 self.runner, > --> 407 self._options).run(False) > 408 > 409 if self._options.view_as(TypeOptions).runtime_type_check: > ~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in run(self, > test_runner_api) > 418 finally: > 419 shutil.rmtree(tmpdir) > --> 420 return self.runner.run_pipeline(self, self._options) > 421 > 422 def __enter__(self): > ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/dataflow_runner.py > in run_pipeline(self, pipeline, options) > 483 # raise an exception. > 484 result = DataflowPipelineResult( > --> 485 self.dataflow_client.create_job(self.job), self) > 486 > 487 # TODO(BEAM-4274): Circular import runners-metrics. Requires refactoring. > ~/.local/lib/python3.5/site-packages/apache_beam/utils/retry.py in > wrapper(*args, **kwargs) > 204 while True: > 205 try: > --> 206 return fun(*args, **kwargs) > 207 except Exception as exn: # pylint: disable=broad-except > 208 if not retry_filter(exn): > ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py > in create_job(self, job) > 529 def create_job(self, job): > 530 """Creates job description. May stage and/or submit for remote > execution.""" > --> 531 self.create_job_description(job) > 532 > 533 # Stage and submit the job when necessary > ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py > in create_job_description(self, job) > 559 > 560 # Stage other resources for the SDK harness > --> 561 resources = self._stage_resources(job.options) > 562 > 563 job.proto.environment = Environment( > ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py > in _stage_resources(self, options) > 489 options, > 490 temp_dir=tempfile.mkdtemp(), > --> 491 staging_location=google_cloud_options.staging_location) > 492 return resources > 493 > ~/.local/lib/python3.5/site-packages/apache_beam/runners/portability/stager.py > in stage_job_resources(self, options, build_setup_args, temp_dir, > populate_requirements_cache, staging_location) > 166 (populate_requirements_cache if populate_requirements_cache else > 167 Stager._populate_requirements_cache)(setup_options.requirements_file, > --> 168 requirements_cache_path) > 169 for pkg in glob.glob(os.path.join(requirements_cache_path, '*')): > 170 self.stage_artifact( > ~/.local/lib/python3.5/site-packages/apache_beam/utils/retry.py in > wrapper(*args, **kwargs) > 204 while True: > 205 try: > --> 206 return fun(*args, **kwargs) > 207 except Exception as exn: # pylint: disable=broad-except > 208 if not retry_filter(exn): > ~/.local/lib/python3.5/site-packages/apache_beam/runners/portability/stager.py > in _populate_requirements_cache(requirements_file, cache_dir) > 485 ] > 486 logging.info('Executing command: %s', cmd_args) > --> 487 processes.check_output(cmd_args, stderr=processes.STDOUT) > 488 > 489 @staticmethod > ~/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py in > check_output(*args, **kwargs) > 89 "Full traceback: {} \n Pip install failed for package: {} \ > 90 \n Output from execution of subprocess: {}" \ > ---> 91 .format(traceback.format_exc(), args[0][6], error.output)) > 92 else: > 93 raise RuntimeError("Full trace: {}, \ > RuntimeError: Full traceback: Traceback (most recent call last): > File > "/home/jupyter/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py", > line 83, in check_output > out = subprocess.check_output(*args, **kwargs) > File "/usr/lib/python3.5/subprocess.py", line 316, in check_output > **kwargs).stdout > File "/usr/lib/python3.5/subprocess.py", line 398, in run > output=stdout, stderr=stderr) > subprocess.CalledProcessError: Command '['/usr/bin/python3', '-m', 'pip', > 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', > 'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned > non-zero exit status 1 > Pip install failed for package: -r > Output from execution of subprocess: b"Collecting > tensorflow-transform==0.15.0\n Using cached > tensorflow-transform-0.15.0.tar.gz (222 kB)\n Saved > /tmp/dataflow-requirements-cache/tensorflow-transform-0.15.0.tar.gz\nCollecting > absl-py<0.9,>=0.7\n Using cached absl-py-0.8.1.tar.gz (103 kB)\n Saved > /tmp/dataflow-requirements-cache/absl-py-0.8.1.tar.gz\nCollecting > apache-beam[gcp]<3,>=2.16\n Using cached apache-beam-2.19.0.zip (1.9 MB)\n > Saved /tmp/dataflow-requirements-cache/apache-beam-2.19.0.zip\nCollecting > numpy<2,>=1.16\n Using cached numpy-1.18.1.zip (5.4 MB)\n Installing build > dependencies: started\n Installing build dependencies: still running...\n > Installing build dependencies: finished with status 'done'\n Getting > requirements to build wheel: started\n Getting requirements to build wheel: > finished with status 'done'\n Preparing wheel metadata: started\n Preparing > wheel metadata: finished with status 'done'\n Saved > /tmp/dataflow-requirements-cache/numpy-1.18.1.zip\nCollecting > protobuf<4,>=3.7\n Using cached protobuf-3.11.3.tar.gz (264 kB)\n Saved > /tmp/dataflow-requirements-cache/protobuf-3.11.3.tar.gz\nCollecting > pydot<2,>=1.2\n Using cached pydot-1.4.1.tar.gz (128 kB)\n Saved > /tmp/dataflow-requirements-cache/pydot-1.4.1.tar.gz\nCollecting > six<2,>=1.10\n Using cached six-1.14.0.tar.gz (33 kB)\n Saved > /tmp/dataflow-requirements-cache/six-1.14.0.tar.gz\nERROR: Could not find a > version that satisfies the requirement tensorflow-metadata<0.16,>=0.15 (from > tensorflow-transform==0.15.0->-r requirements.txt (line 1)) (from versions: > 0.6.0, 0.9.0, 0.12.1)\nERROR: No matching distribution found for > tensorflow-metadata<0.16,>=0.15 (from tensorflow-transform==0.15.0->-r > requirements.txt (line 1))\n" -- This message was sent by Atlassian Jira (v8.3.4#803005)