[ 
https://issues.apache.org/jira/browse/BEAM-9493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

PY updated BEAM-9493:
---------------------
    Description: 
def preprocess(in_test_mode):
  import os
  import os.path
  import tempfile
  from apache_beam.io import tfrecordio
  from tensorflow_transform.coders import example_proto_coder
  from tensorflow_transform.tf_metadata import dataset_metadata
  from tensorflow_transform.tf_metadata import dataset_schema
  from tensorflow_transform.beam import tft_beam_io
  from tensorflow_transform.beam.tft_beam_io import transform_fn_io

  job_name = 'preprocess-bike-features' + '-' + 
datetime.datetime.now().strftime('%y%m%d-%H%M%S')    
  if in_test_mode:
    import shutil
    print('Launching local job ... hang on')
    OUTPUT_DIR = './bike_preproc_tft'
    shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
    EVERY_N = 5
  else:
    print('Launching Dataflow job {} ... hang on'.format(job_name))
    OUTPUT_DIR = 'gs://{0}/bike_preproc_tft/'.format(BUCKET)
    import subprocess
    subprocess.call('gsutil rm -r {}'.format(OUTPUT_DIR).split())
    EVERY_N = 5

  options = {
    'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
    'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
    'job_name': job_name,
    'project': PROJECT,
    'max_num_workers': 6,
    'teardown_policy': 'TEARDOWN_ALWAYS',
    'no_save_main_session': True,
    'requirements_file': 'requirements.txt'
  }
  opts = beam.pipeline.PipelineOptions(flags=[], **options)
  if in_test_mode:
    RUNNER = 'DirectRunner'
  else:
    RUNNER = 'DataflowRunner'

  # set up raw data metadata
  raw_data_schema = {
    colname : dataset_schema.ColumnSchema(tf.int64, [], 
dataset_schema.FixedColumnRepresentation())
                   for colname in 
'start_year,start_month,start_day,start_station_id,hire_count'.split(',')
  }
  raw_data_schema.update({
      'day_of_week' : dataset_schema.ColumnSchema(tf.string, [], 
dataset_schema.FixedColumnRepresentation())
    })

  raw_data_metadata = 
dataset_metadata.DatasetMetadata(dataset_schema.Schema(raw_data_schema))

  # run Beam  
  with beam.Pipeline(RUNNER, options=opts) as p:
    with beam_impl.Context(temp_dir=os.path.join(OUTPUT_DIR, 'tmp')):
      # save the raw data metadata
      raw_data_metadata | 'WriteInputMetadata' >> tft_beam_io.WriteMetadata(
            os.path.join(OUTPUT_DIR, 'metadata/rawdata_metadata'),
            pipeline=p)

      # read training data from bigquery and filter rows     
      raw_data = (p 
        | 'train_read' >> 
beam.io.Read(beam.io.BigQuerySource(query=create_query('train', EVERY_N), 
use_standard_sql=True))
        | 'train_filter' >> beam.Filter(is_valid))
      raw_dataset = (raw_data, raw_data_metadata)

      # analyze and transform training data
      transformed_dataset, transform_fn = (
          raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocess_tft))
      transformed_data, transformed_metadata = transformed_dataset

      # save transformed training data to disk in efficient tfrecord format
      transformed_data | 'WriteTrainData' >> tfrecordio.WriteToTFRecord(
          os.path.join(OUTPUT_DIR, 'train'),
          file_name_suffix='.gz',
          coder=example_proto_coder.ExampleProtoCoder(
              transformed_metadata.schema))

      # read eval data from bigquery and filter rows  
      raw_test_data = (p 
        | 'eval_read' >> 
beam.io.Read(beam.io.BigQuerySource(query=create_query('valid', EVERY_N), 
use_standard_sql=True))
        | 'eval_filter' >> beam.Filter(is_valid))
      raw_test_dataset = (raw_test_data, raw_data_metadata)

      # transform eval data
      transformed_test_dataset = (
          (raw_test_dataset, transform_fn) | beam_impl.TransformDataset())
      transformed_test_data, _ = transformed_test_dataset

      # save transformed training data to disk in efficient tfrecord format
      transformed_test_data | 'WriteTestData' >> tfrecordio.WriteToTFRecord(
          os.path.join(OUTPUT_DIR, 'eval'),
          file_name_suffix='.gz',
          coder=example_proto_coder.ExampleProtoCoder(
              transformed_metadata.schema))

      # save transformation function to disk for use at serving time
      transform_fn | 'WriteTransformFn' >> transform_fn_io.WriteTransformFn(
          os.path.join(OUTPUT_DIR, 'metadata')) 

 

 

 

---------------------------------------------------------------------------
 CalledProcessError Traceback (most recent call last)
 ~/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py in 
check_output(*args, **kwargs)
 82 try:
 ---> 83 out = subprocess.check_output(*args, **kwargs)
 84 except OSError:

/usr/lib/python3.5/subprocess.py in check_output(timeout, *popenargs, **kwargs)
 315 return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
 --> 316 **kwargs).stdout
 317

/usr/lib/python3.5/subprocess.py in run(input, timeout, check, *popenargs, 
**kwargs)
 397 raise CalledProcessError(retcode, process.args,
 --> 398 output=stdout, stderr=stderr)
 399 return CompletedProcess(process.args, retcode, stdout, stderr)

CalledProcessError: Command '['/usr/bin/python3', '-m', 'pip', 'download', 
'--dest', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt', 
'--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1

During handling of the above exception, another exception occurred:

RuntimeError Traceback (most recent call last)
 <ipython-input-13-eac0bb8c8400> in <module>
 131 os.path.join(OUTPUT_DIR, 'metadata'))
 132 
 --> 133 preprocess(in_test_mode=False) # change to True to run locally

<ipython-input-13-eac0bb8c8400> in preprocess(in_test_mode)
 129 # save transformation function to disk for use at serving time
 130 transform_fn | 'WriteTransformFn' >> transform_fn_io.WriteTransformFn(
 --> 131 os.path.join(OUTPUT_DIR, 'metadata'))
 132 
 133 preprocess(in_test_mode=False) # change to True to run locally

~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in __exit__(self, 
exc_type, exc_val, exc_tb)
 425 def __exit__(self, exc_type, exc_val, exc_tb):
 426 if not exc_type:
 --> 427 self.run().wait_until_finish()
 428 
 429 def visit(self, visitor):

~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in run(self, 
test_runner_api)
 405 self.to_runner_api(use_fake_coders=True),
 406 self.runner,
 --> 407 self._options).run(False)
 408 
 409 if self._options.view_as(TypeOptions).runtime_type_check:

~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in run(self, 
test_runner_api)
 418 finally:
 419 shutil.rmtree(tmpdir)
 --> 420 return self.runner.run_pipeline(self, self._options)
 421 
 422 def __enter__(self):

~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/dataflow_runner.py
 in run_pipeline(self, pipeline, options)
 483 # raise an exception.
 484 result = DataflowPipelineResult(
 --> 485 self.dataflow_client.create_job(self.job), self)
 486 
 487 # TODO(BEAM-4274): Circular import runners-metrics. Requires refactoring.

~/.local/lib/python3.5/site-packages/apache_beam/utils/retry.py in 
wrapper(*args, **kwargs)
 204 while True:
 205 try:
 --> 206 return fun(*args, **kwargs)
 207 except Exception as exn: # pylint: disable=broad-except
 208 if not retry_filter(exn):

~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py
 in create_job(self, job)
 529 def create_job(self, job):
 530 """Creates job description. May stage and/or submit for remote 
execution."""
 --> 531 self.create_job_description(job)
 532 
 533 # Stage and submit the job when necessary

~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py
 in create_job_description(self, job)
 559 
 560 # Stage other resources for the SDK harness
 --> 561 resources = self._stage_resources(job.options)
 562 
 563 job.proto.environment = Environment(

~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py
 in _stage_resources(self, options)
 489 options,
 490 temp_dir=tempfile.mkdtemp(),
 --> 491 staging_location=google_cloud_options.staging_location)
 492 return resources
 493

~/.local/lib/python3.5/site-packages/apache_beam/runners/portability/stager.py 
in stage_job_resources(self, options, build_setup_args, temp_dir, 
populate_requirements_cache, staging_location)
 166 (populate_requirements_cache if populate_requirements_cache else
 167 Stager._populate_requirements_cache)(setup_options.requirements_file,
 --> 168 requirements_cache_path)
 169 for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
 170 self.stage_artifact(

~/.local/lib/python3.5/site-packages/apache_beam/utils/retry.py in 
wrapper(*args, **kwargs)
 204 while True:
 205 try:
 --> 206 return fun(*args, **kwargs)
 207 except Exception as exn: # pylint: disable=broad-except
 208 if not retry_filter(exn):

~/.local/lib/python3.5/site-packages/apache_beam/runners/portability/stager.py 
in _populate_requirements_cache(requirements_file, cache_dir)
 485 ]
 486 logging.info('Executing command: %s', cmd_args)
 --> 487 processes.check_output(cmd_args, stderr=processes.STDOUT)
 488 
 489 @staticmethod

~/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py in 
check_output(*args, **kwargs)
 89 "Full traceback: {} \n Pip install failed for package: {} \
 90 \n Output from execution of subprocess: {}" \
 ---> 91 .format(traceback.format_exc(), args[0][6], error.output))
 92 else:
 93 raise RuntimeError("Full trace: {}, \

RuntimeError: Full traceback: Traceback (most recent call last):
 File 
"/home/jupyter/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py",
 line 83, in check_output
 out = subprocess.check_output(*args, **kwargs)
 File "/usr/lib/python3.5/subprocess.py", line 316, in check_output
 **kwargs).stdout
 File "/usr/lib/python3.5/subprocess.py", line 398, in run
 output=stdout, stderr=stderr)
 subprocess.CalledProcessError: Command '['/usr/bin/python3', '-m', 'pip', 
'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', 
'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned 
non-zero exit status 1

Pip install failed for package: -r 
 Output from execution of subprocess: b"Collecting 
tensorflow-transform==0.15.0\n Using cached tensorflow-transform-0.15.0.tar.gz 
(222 kB)\n Saved 
/tmp/dataflow-requirements-cache/tensorflow-transform-0.15.0.tar.gz\nCollecting 
absl-py<0.9,>=0.7\n Using cached absl-py-0.8.1.tar.gz (103 kB)\n Saved 
/tmp/dataflow-requirements-cache/absl-py-0.8.1.tar.gz\nCollecting 
apache-beam[gcp]<3,>=2.16\n Using cached apache-beam-2.19.0.zip (1.9 MB)\n 
Saved /tmp/dataflow-requirements-cache/apache-beam-2.19.0.zip\nCollecting 
numpy<2,>=1.16\n Using cached numpy-1.18.1.zip (5.4 MB)\n Installing build 
dependencies: started\n Installing build dependencies: still running...\n 
Installing build dependencies: finished with status 'done'\n Getting 
requirements to build wheel: started\n Getting requirements to build wheel: 
finished with status 'done'\n Preparing wheel metadata: started\n Preparing 
wheel metadata: finished with status 'done'\n Saved 
/tmp/dataflow-requirements-cache/numpy-1.18.1.zip\nCollecting 
protobuf<4,>=3.7\n Using cached protobuf-3.11.3.tar.gz (264 kB)\n Saved 
/tmp/dataflow-requirements-cache/protobuf-3.11.3.tar.gz\nCollecting 
pydot<2,>=1.2\n Using cached pydot-1.4.1.tar.gz (128 kB)\n Saved 
/tmp/dataflow-requirements-cache/pydot-1.4.1.tar.gz\nCollecting six<2,>=1.10\n 
Using cached six-1.14.0.tar.gz (33 kB)\n Saved 
/tmp/dataflow-requirements-cache/six-1.14.0.tar.gz\nERROR: Could not find a 
version that satisfies the requirement tensorflow-metadata<0.16,>=0.15 (from 
tensorflow-transform==0.15.0->-r requirements.txt (line 1)) (from versions: 
0.6.0, 0.9.0, 0.12.1)\nERROR: No matching distribution found for 
tensorflow-metadata<0.16,>=0.15 (from tensorflow-transform==0.15.0->-r 
requirements.txt (line 1))\n"

  was:
---------------------------------------------------------------------------
CalledProcessError                        Traceback (most recent call last)
~/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py in 
check_output(*args, **kwargs)
     82   try:
---> 83     out = subprocess.check_output(*args, **kwargs)
     84   except OSError:

/usr/lib/python3.5/subprocess.py in check_output(timeout, *popenargs, **kwargs)
    315     return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
--> 316                **kwargs).stdout
    317 

/usr/lib/python3.5/subprocess.py in run(input, timeout, check, *popenargs, 
**kwargs)
    397             raise CalledProcessError(retcode, process.args,
--> 398                                      output=stdout, stderr=stderr)
    399     return CompletedProcess(process.args, retcode, stdout, stderr)

CalledProcessError: Command '['/usr/bin/python3', '-m', 'pip', 'download', 
'--dest', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt', 
'--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1

During handling of the above exception, another exception occurred:

RuntimeError                              Traceback (most recent call last)
<ipython-input-13-eac0bb8c8400> in <module>
    131           os.path.join(OUTPUT_DIR, 'metadata'))
    132 
--> 133 preprocess(in_test_mode=False) # change to True to run locally

<ipython-input-13-eac0bb8c8400> in preprocess(in_test_mode)
    129       # save transformation function to disk for use at serving time
    130       transform_fn | 'WriteTransformFn' >> 
transform_fn_io.WriteTransformFn(
--> 131           os.path.join(OUTPUT_DIR, 'metadata'))
    132 
    133 preprocess(in_test_mode=False) # change to True to run locally

~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in __exit__(self, 
exc_type, exc_val, exc_tb)
    425   def __exit__(self, exc_type, exc_val, exc_tb):
    426     if not exc_type:
--> 427       self.run().wait_until_finish()
    428 
    429   def visit(self, visitor):

~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in run(self, 
test_runner_api)
    405           self.to_runner_api(use_fake_coders=True),
    406           self.runner,
--> 407           self._options).run(False)
    408 
    409     if self._options.view_as(TypeOptions).runtime_type_check:

~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in run(self, 
test_runner_api)
    418       finally:
    419         shutil.rmtree(tmpdir)
--> 420     return self.runner.run_pipeline(self, self._options)
    421 
    422   def __enter__(self):

~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/dataflow_runner.py
 in run_pipeline(self, pipeline, options)
    483     # raise an exception.
    484     result = DataflowPipelineResult(
--> 485         self.dataflow_client.create_job(self.job), self)
    486 
    487     # TODO(BEAM-4274): Circular import runners-metrics. Requires 
refactoring.

~/.local/lib/python3.5/site-packages/apache_beam/utils/retry.py in 
wrapper(*args, **kwargs)
    204       while True:
    205         try:
--> 206           return fun(*args, **kwargs)
    207         except Exception as exn:  # pylint: disable=broad-except
    208           if not retry_filter(exn):

~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py
 in create_job(self, job)
    529   def create_job(self, job):
    530     """Creates job description. May stage and/or submit for remote 
execution."""
--> 531     self.create_job_description(job)
    532 
    533     # Stage and submit the job when necessary

~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py
 in create_job_description(self, job)
    559 
    560     # Stage other resources for the SDK harness
--> 561     resources = self._stage_resources(job.options)
    562 
    563     job.proto.environment = Environment(

~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py
 in _stage_resources(self, options)
    489         options,
    490         temp_dir=tempfile.mkdtemp(),
--> 491         staging_location=google_cloud_options.staging_location)
    492     return resources
    493 

~/.local/lib/python3.5/site-packages/apache_beam/runners/portability/stager.py 
in stage_job_resources(self, options, build_setup_args, temp_dir, 
populate_requirements_cache, staging_location)
    166       (populate_requirements_cache if populate_requirements_cache else
    167        
Stager._populate_requirements_cache)(setup_options.requirements_file,
--> 168                                             requirements_cache_path)
    169       for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
    170         self.stage_artifact(

~/.local/lib/python3.5/site-packages/apache_beam/utils/retry.py in 
wrapper(*args, **kwargs)
    204       while True:
    205         try:
--> 206           return fun(*args, **kwargs)
    207         except Exception as exn:  # pylint: disable=broad-except
    208           if not retry_filter(exn):

~/.local/lib/python3.5/site-packages/apache_beam/runners/portability/stager.py 
in _populate_requirements_cache(requirements_file, cache_dir)
    485     ]
    486     logging.info('Executing command: %s', cmd_args)
--> 487     processes.check_output(cmd_args, stderr=processes.STDOUT)
    488 
    489   @staticmethod

~/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py in 
check_output(*args, **kwargs)
     89         "Full traceback: {} \n Pip install failed for package: {} \
     90         \n Output from execution of subprocess: {}" \
---> 91         .format(traceback.format_exc(), args[0][6], error.output))
     92     else:
     93       raise RuntimeError("Full trace: {}, \

RuntimeError: Full traceback: Traceback (most recent call last):
  File 
"/home/jupyter/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py",
 line 83, in check_output
    out = subprocess.check_output(*args, **kwargs)
  File "/usr/lib/python3.5/subprocess.py", line 316, in check_output
    **kwargs).stdout
  File "/usr/lib/python3.5/subprocess.py", line 398, in run
    output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command '['/usr/bin/python3', '-m', 'pip', 
'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', 
'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned 
non-zero exit status 1

 Pip install failed for package: -r         
 Output from execution of subprocess: b"Collecting 
tensorflow-transform==0.15.0\n  Using cached tensorflow-transform-0.15.0.tar.gz 
(222 kB)\n  Saved 
/tmp/dataflow-requirements-cache/tensorflow-transform-0.15.0.tar.gz\nCollecting 
absl-py<0.9,>=0.7\n  Using cached absl-py-0.8.1.tar.gz (103 kB)\n  Saved 
/tmp/dataflow-requirements-cache/absl-py-0.8.1.tar.gz\nCollecting 
apache-beam[gcp]<3,>=2.16\n  Using cached apache-beam-2.19.0.zip (1.9 MB)\n  
Saved /tmp/dataflow-requirements-cache/apache-beam-2.19.0.zip\nCollecting 
numpy<2,>=1.16\n  Using cached numpy-1.18.1.zip (5.4 MB)\n  Installing build 
dependencies: started\n  Installing build dependencies: still running...\n  
Installing build dependencies: finished with status 'done'\n  Getting 
requirements to build wheel: started\n  Getting requirements to build wheel: 
finished with status 'done'\n    Preparing wheel metadata: started\n    
Preparing wheel metadata: finished with status 'done'\n  Saved 
/tmp/dataflow-requirements-cache/numpy-1.18.1.zip\nCollecting 
protobuf<4,>=3.7\n  Using cached protobuf-3.11.3.tar.gz (264 kB)\n  Saved 
/tmp/dataflow-requirements-cache/protobuf-3.11.3.tar.gz\nCollecting 
pydot<2,>=1.2\n  Using cached pydot-1.4.1.tar.gz (128 kB)\n  Saved 
/tmp/dataflow-requirements-cache/pydot-1.4.1.tar.gz\nCollecting six<2,>=1.10\n  
Using cached six-1.14.0.tar.gz (33 kB)\n  Saved 
/tmp/dataflow-requirements-cache/six-1.14.0.tar.gz\nERROR: Could not find a 
version that satisfies the requirement tensorflow-metadata<0.16,>=0.15 (from 
tensorflow-transform==0.15.0->-r requirements.txt (line 1)) (from versions: 
0.6.0, 0.9.0, 0.12.1)\nERROR: No matching distribution found for 
tensorflow-metadata<0.16,>=0.15 (from tensorflow-transform==0.15.0->-r 
requirements.txt (line 1))\n"


> Apache Beam/Dataflow flowed a CalledProcessError with 
> beam.Pipeline("DataflowRunner", options=opts)
> ---------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-9493
>                 URL: https://issues.apache.org/jira/browse/BEAM-9493
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>    Affects Versions: 2.16.0
>         Environment: apache-beam==2.16.0
> tensorflow==2.1.0
> tensorflow-metadata==0.15.2
> tensorflow-transform==0.15.0
> Python 2.7.13
> pip 20.0.2
>            Reporter: PY
>            Priority: Blocker
>
> def preprocess(in_test_mode):
>   import os
>   import os.path
>   import tempfile
>   from apache_beam.io import tfrecordio
>   from tensorflow_transform.coders import example_proto_coder
>   from tensorflow_transform.tf_metadata import dataset_metadata
>   from tensorflow_transform.tf_metadata import dataset_schema
>   from tensorflow_transform.beam import tft_beam_io
>   from tensorflow_transform.beam.tft_beam_io import transform_fn_io
>   job_name = 'preprocess-bike-features' + '-' + 
> datetime.datetime.now().strftime('%y%m%d-%H%M%S')    
>   if in_test_mode:
>     import shutil
>     print('Launching local job ... hang on')
>     OUTPUT_DIR = './bike_preproc_tft'
>     shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
>     EVERY_N = 5
>   else:
>     print('Launching Dataflow job {} ... hang on'.format(job_name))
>     OUTPUT_DIR = 'gs://{0}/bike_preproc_tft/'.format(BUCKET)
>     import subprocess
>     subprocess.call('gsutil rm -r {}'.format(OUTPUT_DIR).split())
>     EVERY_N = 5
>   options = {
>     'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
>     'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
>     'job_name': job_name,
>     'project': PROJECT,
>     'max_num_workers': 6,
>     'teardown_policy': 'TEARDOWN_ALWAYS',
>     'no_save_main_session': True,
>     'requirements_file': 'requirements.txt'
>   }
>   opts = beam.pipeline.PipelineOptions(flags=[], **options)
>   if in_test_mode:
>     RUNNER = 'DirectRunner'
>   else:
>     RUNNER = 'DataflowRunner'
>   # set up raw data metadata
>   raw_data_schema = {
>     colname : dataset_schema.ColumnSchema(tf.int64, [], 
> dataset_schema.FixedColumnRepresentation())
>                    for colname in 
> 'start_year,start_month,start_day,start_station_id,hire_count'.split(',')
>   }
>   raw_data_schema.update({
>       'day_of_week' : dataset_schema.ColumnSchema(tf.string, [], 
> dataset_schema.FixedColumnRepresentation())
>     })
>   raw_data_metadata = 
> dataset_metadata.DatasetMetadata(dataset_schema.Schema(raw_data_schema))
>   # run Beam  
>   with beam.Pipeline(RUNNER, options=opts) as p:
>     with beam_impl.Context(temp_dir=os.path.join(OUTPUT_DIR, 'tmp')):
>       # save the raw data metadata
>       raw_data_metadata | 'WriteInputMetadata' >> tft_beam_io.WriteMetadata(
>             os.path.join(OUTPUT_DIR, 'metadata/rawdata_metadata'),
>             pipeline=p)
>       # read training data from bigquery and filter rows     
>       raw_data = (p 
>         | 'train_read' >> 
> beam.io.Read(beam.io.BigQuerySource(query=create_query('train', EVERY_N), 
> use_standard_sql=True))
>         | 'train_filter' >> beam.Filter(is_valid))
>       raw_dataset = (raw_data, raw_data_metadata)
>       # analyze and transform training data
>       transformed_dataset, transform_fn = (
>           raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocess_tft))
>       transformed_data, transformed_metadata = transformed_dataset
>       # save transformed training data to disk in efficient tfrecord format
>       transformed_data | 'WriteTrainData' >> tfrecordio.WriteToTFRecord(
>           os.path.join(OUTPUT_DIR, 'train'),
>           file_name_suffix='.gz',
>           coder=example_proto_coder.ExampleProtoCoder(
>               transformed_metadata.schema))
>       # read eval data from bigquery and filter rows  
>       raw_test_data = (p 
>         | 'eval_read' >> 
> beam.io.Read(beam.io.BigQuerySource(query=create_query('valid', EVERY_N), 
> use_standard_sql=True))
>         | 'eval_filter' >> beam.Filter(is_valid))
>       raw_test_dataset = (raw_test_data, raw_data_metadata)
>       # transform eval data
>       transformed_test_dataset = (
>           (raw_test_dataset, transform_fn) | beam_impl.TransformDataset())
>       transformed_test_data, _ = transformed_test_dataset
>       # save transformed training data to disk in efficient tfrecord format
>       transformed_test_data | 'WriteTestData' >> tfrecordio.WriteToTFRecord(
>           os.path.join(OUTPUT_DIR, 'eval'),
>           file_name_suffix='.gz',
>           coder=example_proto_coder.ExampleProtoCoder(
>               transformed_metadata.schema))
>       # save transformation function to disk for use at serving time
>       transform_fn | 'WriteTransformFn' >> transform_fn_io.WriteTransformFn(
>           os.path.join(OUTPUT_DIR, 'metadata')) 
>  
>  
>  
> ---------------------------------------------------------------------------
>  CalledProcessError Traceback (most recent call last)
>  ~/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py in 
> check_output(*args, **kwargs)
>  82 try:
>  ---> 83 out = subprocess.check_output(*args, **kwargs)
>  84 except OSError:
> /usr/lib/python3.5/subprocess.py in check_output(timeout, *popenargs, 
> **kwargs)
>  315 return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
>  --> 316 **kwargs).stdout
>  317
> /usr/lib/python3.5/subprocess.py in run(input, timeout, check, *popenargs, 
> **kwargs)
>  397 raise CalledProcessError(retcode, process.args,
>  --> 398 output=stdout, stderr=stderr)
>  399 return CompletedProcess(process.args, retcode, stdout, stderr)
> CalledProcessError: Command '['/usr/bin/python3', '-m', 'pip', 'download', 
> '--dest', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt', 
> '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit 
> status 1
> During handling of the above exception, another exception occurred:
> RuntimeError Traceback (most recent call last)
>  <ipython-input-13-eac0bb8c8400> in <module>
>  131 os.path.join(OUTPUT_DIR, 'metadata'))
>  132 
>  --> 133 preprocess(in_test_mode=False) # change to True to run locally
> <ipython-input-13-eac0bb8c8400> in preprocess(in_test_mode)
>  129 # save transformation function to disk for use at serving time
>  130 transform_fn | 'WriteTransformFn' >> transform_fn_io.WriteTransformFn(
>  --> 131 os.path.join(OUTPUT_DIR, 'metadata'))
>  132 
>  133 preprocess(in_test_mode=False) # change to True to run locally
> ~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in 
> __exit__(self, exc_type, exc_val, exc_tb)
>  425 def __exit__(self, exc_type, exc_val, exc_tb):
>  426 if not exc_type:
>  --> 427 self.run().wait_until_finish()
>  428 
>  429 def visit(self, visitor):
> ~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in run(self, 
> test_runner_api)
>  405 self.to_runner_api(use_fake_coders=True),
>  406 self.runner,
>  --> 407 self._options).run(False)
>  408 
>  409 if self._options.view_as(TypeOptions).runtime_type_check:
> ~/.local/lib/python3.5/site-packages/apache_beam/pipeline.py in run(self, 
> test_runner_api)
>  418 finally:
>  419 shutil.rmtree(tmpdir)
>  --> 420 return self.runner.run_pipeline(self, self._options)
>  421 
>  422 def __enter__(self):
> ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/dataflow_runner.py
>  in run_pipeline(self, pipeline, options)
>  483 # raise an exception.
>  484 result = DataflowPipelineResult(
>  --> 485 self.dataflow_client.create_job(self.job), self)
>  486 
>  487 # TODO(BEAM-4274): Circular import runners-metrics. Requires refactoring.
> ~/.local/lib/python3.5/site-packages/apache_beam/utils/retry.py in 
> wrapper(*args, **kwargs)
>  204 while True:
>  205 try:
>  --> 206 return fun(*args, **kwargs)
>  207 except Exception as exn: # pylint: disable=broad-except
>  208 if not retry_filter(exn):
> ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py
>  in create_job(self, job)
>  529 def create_job(self, job):
>  530 """Creates job description. May stage and/or submit for remote 
> execution."""
>  --> 531 self.create_job_description(job)
>  532 
>  533 # Stage and submit the job when necessary
> ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py
>  in create_job_description(self, job)
>  559 
>  560 # Stage other resources for the SDK harness
>  --> 561 resources = self._stage_resources(job.options)
>  562 
>  563 job.proto.environment = Environment(
> ~/.local/lib/python3.5/site-packages/apache_beam/runners/dataflow/internal/apiclient.py
>  in _stage_resources(self, options)
>  489 options,
>  490 temp_dir=tempfile.mkdtemp(),
>  --> 491 staging_location=google_cloud_options.staging_location)
>  492 return resources
>  493
> ~/.local/lib/python3.5/site-packages/apache_beam/runners/portability/stager.py
>  in stage_job_resources(self, options, build_setup_args, temp_dir, 
> populate_requirements_cache, staging_location)
>  166 (populate_requirements_cache if populate_requirements_cache else
>  167 Stager._populate_requirements_cache)(setup_options.requirements_file,
>  --> 168 requirements_cache_path)
>  169 for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
>  170 self.stage_artifact(
> ~/.local/lib/python3.5/site-packages/apache_beam/utils/retry.py in 
> wrapper(*args, **kwargs)
>  204 while True:
>  205 try:
>  --> 206 return fun(*args, **kwargs)
>  207 except Exception as exn: # pylint: disable=broad-except
>  208 if not retry_filter(exn):
> ~/.local/lib/python3.5/site-packages/apache_beam/runners/portability/stager.py
>  in _populate_requirements_cache(requirements_file, cache_dir)
>  485 ]
>  486 logging.info('Executing command: %s', cmd_args)
>  --> 487 processes.check_output(cmd_args, stderr=processes.STDOUT)
>  488 
>  489 @staticmethod
> ~/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py in 
> check_output(*args, **kwargs)
>  89 "Full traceback: {} \n Pip install failed for package: {} \
>  90 \n Output from execution of subprocess: {}" \
>  ---> 91 .format(traceback.format_exc(), args[0][6], error.output))
>  92 else:
>  93 raise RuntimeError("Full trace: {}, \
> RuntimeError: Full traceback: Traceback (most recent call last):
>  File 
> "/home/jupyter/.local/lib/python3.5/site-packages/apache_beam/utils/processes.py",
>  line 83, in check_output
>  out = subprocess.check_output(*args, **kwargs)
>  File "/usr/lib/python3.5/subprocess.py", line 316, in check_output
>  **kwargs).stdout
>  File "/usr/lib/python3.5/subprocess.py", line 398, in run
>  output=stdout, stderr=stderr)
>  subprocess.CalledProcessError: Command '['/usr/bin/python3', '-m', 'pip', 
> 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', 
> 'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned 
> non-zero exit status 1
> Pip install failed for package: -r 
>  Output from execution of subprocess: b"Collecting 
> tensorflow-transform==0.15.0\n Using cached 
> tensorflow-transform-0.15.0.tar.gz (222 kB)\n Saved 
> /tmp/dataflow-requirements-cache/tensorflow-transform-0.15.0.tar.gz\nCollecting
>  absl-py<0.9,>=0.7\n Using cached absl-py-0.8.1.tar.gz (103 kB)\n Saved 
> /tmp/dataflow-requirements-cache/absl-py-0.8.1.tar.gz\nCollecting 
> apache-beam[gcp]<3,>=2.16\n Using cached apache-beam-2.19.0.zip (1.9 MB)\n 
> Saved /tmp/dataflow-requirements-cache/apache-beam-2.19.0.zip\nCollecting 
> numpy<2,>=1.16\n Using cached numpy-1.18.1.zip (5.4 MB)\n Installing build 
> dependencies: started\n Installing build dependencies: still running...\n 
> Installing build dependencies: finished with status 'done'\n Getting 
> requirements to build wheel: started\n Getting requirements to build wheel: 
> finished with status 'done'\n Preparing wheel metadata: started\n Preparing 
> wheel metadata: finished with status 'done'\n Saved 
> /tmp/dataflow-requirements-cache/numpy-1.18.1.zip\nCollecting 
> protobuf<4,>=3.7\n Using cached protobuf-3.11.3.tar.gz (264 kB)\n Saved 
> /tmp/dataflow-requirements-cache/protobuf-3.11.3.tar.gz\nCollecting 
> pydot<2,>=1.2\n Using cached pydot-1.4.1.tar.gz (128 kB)\n Saved 
> /tmp/dataflow-requirements-cache/pydot-1.4.1.tar.gz\nCollecting 
> six<2,>=1.10\n Using cached six-1.14.0.tar.gz (33 kB)\n Saved 
> /tmp/dataflow-requirements-cache/six-1.14.0.tar.gz\nERROR: Could not find a 
> version that satisfies the requirement tensorflow-metadata<0.16,>=0.15 (from 
> tensorflow-transform==0.15.0->-r requirements.txt (line 1)) (from versions: 
> 0.6.0, 0.9.0, 0.12.1)\nERROR: No matching distribution found for 
> tensorflow-metadata<0.16,>=0.15 (from tensorflow-transform==0.15.0->-r 
> requirements.txt (line 1))\n"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to