[
https://issues.apache.org/jira/browse/BEAM-3883?focusedWorklogId=99886&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-99886
]
ASF GitHub Bot logged work on BEAM-3883:
----------------------------------------
Author: ASF GitHub Bot
Created on: 09/May/18 06:53
Start Date: 09/May/18 06:53
Worklog Time Spent: 10m
Work Description: tvalentyn commented on a change in pull request #5251:
[BEAM-3883] Refactor and clean dependency.py to make it reusable with artifact
service
URL: https://github.com/apache/beam/pull/5251#discussion_r186945865
##########
File path: sdks/python/apache_beam/runners/dataflow/internal/dependency.py
##########
@@ -294,249 +160,26 @@ def stage_job_resources(
trying to create the resources (e.g., build a setup package).
"""
temp_dir = temp_dir or tempfile.mkdtemp()
- resources = []
google_cloud_options = options.view_as(GoogleCloudOptions)
- setup_options = options.view_as(SetupOptions)
# Make sure that all required options are specified. There are a few that
have
# defaults to support local running scenarios.
if google_cloud_options.staging_location is None:
- raise RuntimeError(
- 'The --staging_location option must be specified.')
+ raise RuntimeError('The --staging_location option must be specified.')
if google_cloud_options.temp_location is None:
- raise RuntimeError(
- 'The --temp_location option must be specified.')
-
- # Stage a requirements file if present.
- if setup_options.requirements_file is not None:
- if not os.path.isfile(setup_options.requirements_file):
- raise RuntimeError('The file %s cannot be found. It was specified in the
'
- '--requirements_file command line option.' %
- setup_options.requirements_file)
- staged_path = FileSystems.join(google_cloud_options.staging_location,
- REQUIREMENTS_FILE)
- file_copy(setup_options.requirements_file, staged_path)
- resources.append(REQUIREMENTS_FILE)
- requirements_cache_path = (
- os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache')
- if setup_options.requirements_cache is None
- else setup_options.requirements_cache)
- # Populate cache with packages from requirements and stage the files
- # in the cache.
- if not os.path.exists(requirements_cache_path):
- os.makedirs(requirements_cache_path)
- populate_requirements_cache(
- setup_options.requirements_file, requirements_cache_path)
- for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
- file_copy(pkg, FileSystems.join(google_cloud_options.staging_location,
- os.path.basename(pkg)))
- resources.append(os.path.basename(pkg))
-
- # Handle a setup file if present.
- # We will build the setup package locally and then copy it to the staging
- # location because the staging location is a GCS path and the file cannot be
- # created directly there.
- if setup_options.setup_file is not None:
- if not os.path.isfile(setup_options.setup_file):
- raise RuntimeError('The file %s cannot be found. It was specified in the
'
- '--setup_file command line option.' %
- setup_options.setup_file)
- if os.path.basename(setup_options.setup_file) != 'setup.py':
- raise RuntimeError(
- 'The --setup_file option expects the full path to a file named '
- 'setup.py instead of %s' % setup_options.setup_file)
- tarball_file = _build_setup_package(setup_options.setup_file, temp_dir,
- build_setup_args)
- staged_path = FileSystems.join(google_cloud_options.staging_location,
- WORKFLOW_TARBALL_FILE)
- file_copy(tarball_file, staged_path)
- resources.append(WORKFLOW_TARBALL_FILE)
-
- # Handle extra local packages that should be staged.
- if setup_options.extra_packages is not None:
- resources.extend(
- _stage_extra_packages(setup_options.extra_packages,
- google_cloud_options.staging_location,
- temp_dir=temp_dir, file_copy=file_copy))
-
- # Pickle the main session if requested.
- # We will create the pickled main session locally and then copy it to the
- # staging location because the staging location is a GCS path and the file
- # cannot be created directly there.
- if setup_options.save_main_session:
- pickled_session_file = os.path.join(temp_dir,
- names.PICKLED_MAIN_SESSION_FILE)
- pickler.dump_session(pickled_session_file)
- staged_path = FileSystems.join(google_cloud_options.staging_location,
- names.PICKLED_MAIN_SESSION_FILE)
- file_copy(pickled_session_file, staged_path)
- resources.append(names.PICKLED_MAIN_SESSION_FILE)
-
- if hasattr(setup_options, 'sdk_location'):
- if setup_options.sdk_location == 'default':
- stage_sdk_from_remote_location = True
- elif (setup_options.sdk_location.startswith('gs://') or
- setup_options.sdk_location.startswith('http://') or
- setup_options.sdk_location.startswith('https://')):
- stage_sdk_from_remote_location = True
- else:
- stage_sdk_from_remote_location = False
-
- if stage_sdk_from_remote_location:
- # If --sdk_location is not specified then the appropriate package
- # will be obtained from PyPI (https://pypi.python.org) based on the
- # version of the currently running SDK. If the option is
- # present then no version matching is made and the exact URL or path
- # is expected.
- #
- # Unit tests running in the 'python setup.py test' context will
- # not have the sdk_location attribute present and therefore we
- # will not stage SDK.
- if setup_options.sdk_location == 'default':
- sdk_remote_location = 'pypi'
- else:
- sdk_remote_location = setup_options.sdk_location
- resources.extend(
- _stage_beam_sdk(sdk_remote_location,
- google_cloud_options.staging_location, temp_dir))
- else:
- # This branch is also used by internal tests running with the SDK built
- # at head.
- if setup_options.sdk_location == 'default':
- module_path = os.path.abspath(__file__)
- sdk_path = os.path.join(
- os.path.dirname(module_path), '..', '..', '..',
- names.DATAFLOW_SDK_TARBALL_FILE)
- elif os.path.isdir(setup_options.sdk_location):
- sdk_path = os.path.join(
- setup_options.sdk_location, names.DATAFLOW_SDK_TARBALL_FILE)
- else:
- sdk_path = setup_options.sdk_location
- if os.path.isfile(sdk_path):
- logging.info('Copying Beam SDK "%s" to staging location.', sdk_path)
- staged_path = FileSystems.join(
- google_cloud_options.staging_location,
- _desired_sdk_filename_in_staging_location(
- setup_options.sdk_location))
- file_copy(sdk_path, staged_path)
- _, sdk_staged_filename = FileSystems.split(staged_path)
- resources.append(sdk_staged_filename)
- else:
- if setup_options.sdk_location == 'default':
- raise RuntimeError('Cannot find default Beam SDK tar file "%s"',
- sdk_path)
- elif not setup_options.sdk_location:
- logging.info('Beam SDK will not be staged since --sdk_location '
- 'is empty.')
- else:
- raise RuntimeError(
- 'The file "%s" cannot be found. Its location was specified by '
- 'the --sdk_location command-line option.' %
- sdk_path)
-
- # Delete all temp files created while staging job resources.
- shutil.rmtree(temp_dir)
- return resources
-
-
-def _build_setup_package(setup_file, temp_dir, build_setup_args=None):
- saved_current_directory = os.getcwd()
- try:
- os.chdir(os.path.dirname(setup_file))
- if build_setup_args is None:
- build_setup_args = [
- _get_python_executable(), os.path.basename(setup_file),
- 'sdist', '--dist-dir', temp_dir]
- logging.info('Executing command: %s', build_setup_args)
- processes.check_call(build_setup_args)
- output_files = glob.glob(os.path.join(temp_dir, '*.tar.gz'))
- if not output_files:
- raise RuntimeError(
- 'File %s not found.' % os.path.join(temp_dir, '*.tar.gz'))
- return output_files[0]
- finally:
- os.chdir(saved_current_directory)
-
-
-def _desired_sdk_filename_in_staging_location(sdk_location):
- """Returns the name that SDK file should have file in the staging location.
+ raise RuntimeError('The --temp_location option must be specified.')
- Args:
- sdk_location: Full path to SDK file.
- """
- if sdk_location.endswith('.whl'):
- _, wheel_filename = FileSystems.split(sdk_location)
- if wheel_filename.startswith('apache_beam'):
- return wheel_filename
- else:
- raise RuntimeError('Unrecognized SDK wheel file: %s' % sdk_location)
- else:
- return names.DATAFLOW_SDK_TARBALL_FILE
-
-
-def _stage_beam_sdk(sdk_remote_location, staging_location, temp_dir):
- """Stages a Beam SDK file with the appropriate version.
-
- Args:
- sdk_remote_location: A GCS path to a SDK file or a URL from which
- the file can be downloaded. The SDK file can be a tarball or a wheel.
- Set to 'pypi' to download and stage a wheel and source SDK from PyPi.
- staging_location: A GCS bucket where the SDK file should be copied.
- temp_dir: path to temporary location where the file should be downloaded.
-
- Returns:
- A list of SDK files that were staged to the staging location.
-
- Raises:
- RuntimeError: if staging was not successful.
- """
- if (sdk_remote_location.startswith('http://') or
- sdk_remote_location.startswith('https://')):
- local_download_file = _dependency_file_download(
- sdk_remote_location, temp_dir)
- staged_name =
_desired_sdk_filename_in_staging_location(local_download_file)
- staged_path = FileSystems.join(staging_location, staged_name)
- logging.info(
- 'Staging Beam SDK from %s to %s',
- sdk_remote_location, staged_path)
- _dependency_file_copy(local_download_file, staged_path)
- return [staged_name]
- elif sdk_remote_location.startswith('gs://'):
- # Stage the file to the GCS staging area.
- staged_name =
_desired_sdk_filename_in_staging_location(sdk_remote_location)
- staged_path = FileSystems.join(staging_location, staged_name)
- logging.info(
- 'Staging Beam SDK from %s to %s',
- sdk_remote_location, staged_path)
- _dependency_file_copy(sdk_remote_location, staged_path)
- return [staged_name]
- elif sdk_remote_location == 'pypi':
- sdk_local_file = _download_pypi_sdk_package(temp_dir)
- sdk_sources_staged_name = _desired_sdk_filename_in_staging_location(
- sdk_local_file)
- staged_path = FileSystems.join(staging_location, sdk_sources_staged_name)
- logging.info('Staging SDK sources from PyPI to %s', staged_path)
- _dependency_file_copy(sdk_local_file, staged_path)
- staged_sdk_files = [sdk_sources_staged_name]
- try:
- # Stage binary distribution of the SDK, for now on a best-effort basis.
- sdk_local_file = _download_pypi_sdk_package(temp_dir, fetch_binary=True)
- sdk_binary_staged_name = _desired_sdk_filename_in_staging_location(
- sdk_local_file)
- staged_path = FileSystems.join(staging_location, sdk_binary_staged_name)
- logging.info('Staging binary distribution of the SDK from PyPI to %s',
- staged_path)
- _dependency_file_copy(sdk_local_file, staged_path)
- staged_sdk_files.append(sdk_binary_staged_name)
- except RuntimeError as e:
- logging.warn('Failed to download requested binary distribution '
- 'of the SDK: %s', repr(e))
-
- return staged_sdk_files
- else:
- raise RuntimeError(
- 'The --sdk_location option was used with an unsupported '
- 'type of location: %s' % sdk_remote_location)
+ file_handler = DataflowFileHandle()
+ file_handler.file_copy = file_copy if file_copy else file_handler.file_copy
+ file_handler.file_download = (
+ file_download if file_download else file_handler.file_download)
+ resource_stager = DataFlowStager(file_handler=file_handler)
Review comment:
<!--new_thread; commit:d2647e7c0c6001a89be4dc0a673e17d5ce0d951a;
resolved:0-->
I don't think we need DataflowFileStager class.
We can instantiate an object: dataflow_file_stager =
stager.Stager(file_handler=DataflowFileHandler())
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 99886)
Time Spent: 2h 50m (was: 2h 40m)
> Python SDK stages artifacts when talking to job server
> ------------------------------------------------------
>
> Key: BEAM-3883
> URL: https://issues.apache.org/jira/browse/BEAM-3883
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Ben Sidhom
> Assignee: Ankur Goenka
> Priority: Major
> Time Spent: 2h 50m
> Remaining Estimate: 0h
>
> The Python SDK does not currently stage its user-defined functions or
> dependencies when talking to the job API. Artifacts that need to be staged
> include the user code itself, any SDK components not included in the
> container image, and the list of Python packages that must be installed at
> runtime.
>
> Artifacts that are currently expected can be found in the harness boot code:
> [https://github.com/apache/beam/blob/58e3b06bee7378d2d8db1c8dd534b415864f63e1/sdks/python/container/boot.go#L52.]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)