[
https://issues.apache.org/jira/browse/BEAM-3883?focusedWorklogId=99880&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-99880
]
ASF GitHub Bot logged work on BEAM-3883:
----------------------------------------
Author: ASF GitHub Bot
Created on: 09/May/18 06:53
Start Date: 09/May/18 06:53
Worklog Time Spent: 10m
Work Description: tvalentyn commented on a change in pull request #5251:
[BEAM-3883] Refactor and clean dependency.py to make it reusable with artifact
service
URL: https://github.com/apache/beam/pull/5251#discussion_r186945860
##########
File path: sdks/python/apache_beam/runners/dataflow/internal/dependency.py
##########
@@ -99,173 +91,47 @@
DATAFLOW_CONTAINER_IMAGE_REPOSITORY = 'dataflow.gcr.io/v1beta3'
-def _dependency_file_copy(from_path, to_path):
- """Copies a local file to a GCS file or vice versa."""
- logging.info('file copy from %s to %s.', from_path, to_path)
- if from_path.startswith('gs://') or to_path.startswith('gs://'):
- from apache_beam.io.gcp import gcsio
- if from_path.startswith('gs://') and to_path.startswith('gs://'):
- # Both files are GCS files so copy.
- gcsio.GcsIO().copy(from_path, to_path)
- elif to_path.startswith('gs://'):
- # Only target is a GCS file, read local file and upload.
- with open(from_path, 'rb') as f:
- with gcsio.GcsIO().open(to_path, mode='wb') as g:
- pfun = functools.partial(f.read, gcsio.WRITE_CHUNK_SIZE)
- for chunk in iter(pfun, ''):
- g.write(chunk)
- else:
- # Source is a GCS file but target is local file.
- with gcsio.GcsIO().open(from_path, mode='rb') as g:
- with open(to_path, 'wb') as f:
- pfun = functools.partial(g.read, gcsio.DEFAULT_READ_BUFFER_SIZE)
- for chunk in iter(pfun, ''):
- f.write(chunk)
- else:
- # Branch used only for unit tests and integration tests.
- # In such environments GCS support is not available.
- if not os.path.isdir(os.path.dirname(to_path)):
- logging.info('Created folder (since we have not done yet, and any errors
'
- 'will follow): %s ', os.path.dirname(to_path))
- os.mkdir(os.path.dirname(to_path))
- shutil.copyfile(from_path, to_path)
-
-
-def _dependency_file_download(from_url, to_folder):
- """Downloads a file from a URL and returns path to the local file."""
- # TODO(silviuc): We should cache downloads so we do not do it for every job.
- try:
- # We check if the file is actually there because wget returns a file
- # even for a 404 response (file will contain the contents of the 404
- # response).
- response, content = __import__('httplib2').Http().request(from_url)
- if int(response['status']) >= 400:
- raise RuntimeError(
- 'Beam SDK not found at %s (response: %s)' % (from_url, response))
- local_download_file = os.path.join(to_folder, 'beam-sdk.tar.gz')
- with open(local_download_file, 'w') as f:
- f.write(content)
- except Exception:
- logging.info('Failed to download Beam SDK from %s', from_url)
- raise
- return local_download_file
-
-
-def _stage_extra_packages(extra_packages, staging_location, temp_dir,
- file_copy=_dependency_file_copy):
- """Stages a list of local extra packages.
-
- Args:
- extra_packages: Ordered list of local paths to extra packages to be staged.
- staging_location: Staging location for the packages.
- temp_dir: Temporary folder where the resource building can happen. Caller
- is responsible for cleaning up this folder after this function returns.
- file_copy: Callable for copying files. The default version will copy from
- a local file to a GCS location using the gsutil tool available in the
- Google Cloud SDK package.
-
- Returns:
- A list of file names (no paths) for the resources staged. All the files
- are assumed to be staged in staging_location.
-
- Raises:
- RuntimeError: If files specified are not found or do not have expected
- name patterns.
- """
- resources = []
- staging_temp_dir = None
- local_packages = []
- for package in extra_packages:
- if not (os.path.basename(package).endswith('.tar') or
- os.path.basename(package).endswith('.tar.gz') or
- os.path.basename(package).endswith('.whl') or
- os.path.basename(package).endswith('.zip')):
- raise RuntimeError(
- 'The --extra_package option expects a full path ending with '
- '".tar", ".tar.gz", ".whl" or ".zip" instead of %s' % package)
- if os.path.basename(package).endswith('.whl'):
- logging.warning(
- 'The .whl package "%s" is provided in --extra_package. '
- 'This functionality is not officially supported. Since wheel '
- 'packages are binary distributions, this package must be '
- 'binary-compatible with the worker environment (e.g. Python 2.7 '
- 'running on an x64 Linux host).')
-
- if not os.path.isfile(package):
- if package.startswith('gs://'):
- if not staging_temp_dir:
- staging_temp_dir = tempfile.mkdtemp(dir=temp_dir)
- logging.info('Downloading extra package: %s locally before staging',
- package)
- if os.path.isfile(staging_temp_dir):
- local_file_path = staging_temp_dir
- else:
- _, last_component = FileSystems.split(package)
- local_file_path = FileSystems.join(staging_temp_dir, last_component)
- _dependency_file_copy(package, local_file_path)
+class DataflowFileHandle(stager.FileHandler):
+ def file_copy(self, from_path, to_path):
+ """Copies a local file to a GCS file or vice versa."""
+ logging.info('file copy from %s to %s.', from_path, to_path)
+ if from_path.startswith('gs://') or to_path.startswith('gs://'):
+ from apache_beam.io.gcp import gcsio
+ if from_path.startswith('gs://') and to_path.startswith('gs://'):
+ # Both files are GCS files so copy.
+ gcsio.GcsIO().copy(from_path, to_path)
+ elif to_path.startswith('gs://'):
+ # Only target is a GCS file, read local file and upload.
+ with open(from_path, 'rb') as f:
+ with gcsio.GcsIO().open(to_path, mode='wb') as g:
+ pfun = functools.partial(f.read, gcsio.WRITE_CHUNK_SIZE)
+ for chunk in iter(pfun, ''):
+ g.write(chunk)
else:
- raise RuntimeError(
- 'The file %s cannot be found. It was specified in the '
- '--extra_packages command line option.' % package)
+ # Source is a GCS file but target is local file.
+ with gcsio.GcsIO().open(from_path, mode='rb') as g:
+ with open(to_path, 'wb') as f:
+ pfun = functools.partial(g.read, gcsio.DEFAULT_READ_BUFFER_SIZE)
+ for chunk in iter(pfun, ''):
+ f.write(chunk)
else:
- local_packages.append(package)
-
- if staging_temp_dir:
- local_packages.extend(
- [FileSystems.join(staging_temp_dir, f) for f in os.listdir(
- staging_temp_dir)])
-
- for package in local_packages:
- basename = os.path.basename(package)
- staged_path = FileSystems.join(staging_location, basename)
- file_copy(package, staged_path)
- resources.append(basename)
- # Create a file containing the list of extra packages and stage it.
- # The file is important so that in the worker the packages are installed
- # exactly in the order specified. This approach will avoid extra PyPI
- # requests. For example if package A depends on package B and package A
- # is installed first then the installer will try to satisfy the
- # dependency on B by downloading the package from PyPI. If package B is
- # installed first this is avoided.
- with open(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), 'wt') as f:
- for package in local_packages:
- f.write('%s\n' % os.path.basename(package))
- staged_path = FileSystems.join(staging_location, EXTRA_PACKAGES_FILE)
- # Note that the caller of this function is responsible for deleting the
- # temporary folder where all temp files are created, including this one.
- file_copy(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), staged_path)
- resources.append(EXTRA_PACKAGES_FILE)
-
- return resources
-
-
-def _get_python_executable():
- # Allow overriding the python executable to use for downloading and
- # installing dependencies, otherwise use the python executable for
- # the current process.
- python_bin = os.environ.get('BEAM_PYTHON') or sys.executable
- if not python_bin:
- raise ValueError('Could not find Python executable.')
- return python_bin
-
-
-def _populate_requirements_cache(requirements_file, cache_dir):
- # The 'pip download' command will not download again if it finds the
- # tarball with the proper version already present.
- # It will get the packages downloaded in the order they are presented in
- # the requirements file and will not download package dependencies.
- cmd_args = [
- _get_python_executable(), '-m', 'pip', 'download', '--dest', cache_dir,
- '-r', requirements_file,
- # Download from PyPI source distributions.
- '--no-binary', ':all:']
- logging.info('Executing command: %s', cmd_args)
- processes.check_call(cmd_args)
+ # Branch used only for unit tests and integration tests.
+ # In such environments GCS support is not available.
+ if not os.path.isdir(os.path.dirname(to_path)):
+ logging.info(
Review comment:
<!--new_thread; commit:d2647e7c0c6001a89be4dc0a673e17d5ce0d951a;
resolved:0-->
Can we defer to superclass implementaion for this branch?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 99880)
Time Spent: 2h 10m (was: 2h)
> Python SDK stages artifacts when talking to job server
> ------------------------------------------------------
>
> Key: BEAM-3883
> URL: https://issues.apache.org/jira/browse/BEAM-3883
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Ben Sidhom
> Assignee: Ankur Goenka
> Priority: Major
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> The Python SDK does not currently stage its user-defined functions or
> dependencies when talking to the job API. Artifacts that need to be staged
> include the user code itself, any SDK components not included in the
> container image, and the list of Python packages that must be installed at
> runtime.
>
> Artifacts that are currently expected can be found in the harness boot code:
> [https://github.com/apache/beam/blob/58e3b06bee7378d2d8db1c8dd534b415864f63e1/sdks/python/container/boot.go#L52.]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)