[ 
https://issues.apache.org/jira/browse/BEAM-3883?focusedWorklogId=102655&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-102655
 ]

ASF GitHub Bot logged work on BEAM-3883:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/May/18 21:18
            Start Date: 16/May/18 21:18
    Worklog Time Spent: 10m 
      Work Description: angoenka commented on a change in pull request #5251: 
[BEAM-3883] Refactor and clean dependency.py to make it reusable with artifact 
service
URL: https://github.com/apache/beam/pull/5251#discussion_r188746649
 
 

 ##########
 File path: sdks/python/apache_beam/runners/portability/stager.py
 ##########
 @@ -0,0 +1,573 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Support for installing custom code and required dependencies.
+
+Workflows, with the exception of very simple ones, are organized in multiple
+modules and packages. Typically, these modules and packages have
+dependencies on other standard libraries. Beam relies on the Python
+setuptools package to handle these scenarios. For further details please read:
+https://pythonhosted.org/an_example_pypi_project/setuptools.html
+
+When a runner tries to run a pipeline it will check for a --requirements_file
+and a --setup_file option.
+
+If --setup_file is present then it is assumed that the folder containing the
+file specified by the option has the typical layout required by setuptools and
+it will run 'python setup.py sdist' to produce a source distribution. The
+resulting tarball (a .tar or .tar.gz file) will be staged at the staging
+location specified as job option. When a worker starts it will check for the
+presence of this file and will run 'easy_install tarball' to install the
+package in the worker.
+
+If --requirements_file is present then the file specified by the option will be
+staged in the staging location.  When a worker starts it will check for the
+presence of this file and will run 'pip install -r requirements.txt'. A
+requirements file can be easily generated by running 'pip freeze -r
+requirements.txt'. The reason a runner does not run this automatically is
+because quite often only a small fraction of the dependencies present in a
+requirements.txt file are actually needed for remote execution and therefore a
+one-time manual trimming is desirable.
+
+TODO(silviuc): Should we allow several setup packages?
+TODO(silviuc): We should allow customizing the exact command for setup build.
+"""
+import functools
+import glob
+import logging
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+
+import pkg_resources
+
+from apache_beam.internal import pickler
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.options.pipeline_options import SetupOptions
+# TODO(angoenka): Remove reference to dataflow internal names
+from apache_beam.runners.dataflow.internal import names
+from apache_beam.utils import processes
+
+# All constants are for internal use only; no backwards-compatibility
+# guarantees.
+
+# Standard file names used for staging files.
+WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
+REQUIREMENTS_FILE = 'requirements.txt'
+EXTRA_PACKAGES_FILE = 'extra_packages.txt'
+
+# Package names for distributions
+BEAM_PACKAGE_NAME = 'apache-beam'
+
+
+class Stager(object):
+  """Stager identifies and copies the appropriate artifacts to the staging
+  location."""
+
+  def _copy_file(self, from_path, to_path):
+    """Copies a local file to a GCS file or vice versa."""
+    logging.info('file copy from %s to %s.', from_path, to_path)
+    if from_path.startswith('gs://') or to_path.startswith('gs://'):
+      from apache_beam.io.gcp import gcsio
+      if from_path.startswith('gs://') and to_path.startswith('gs://'):
+        # Both files are GCS files so copy.
+        gcsio.GcsIO().copy(from_path, to_path)
+      elif to_path.startswith('gs://'):
+        # Only target is a GCS file, read local file and upload.
+        with open(from_path, 'rb') as f:
+          with gcsio.GcsIO().open(to_path, mode='wb') as g:
+            pfun = functools.partial(f.read, gcsio.WRITE_CHUNK_SIZE)
+            for chunk in iter(pfun, ''):
+              g.write(chunk)
+      else:
+        # Source is a GCS file but target is local file.
+        with gcsio.GcsIO().open(from_path, mode='rb') as g:
+          with open(to_path, 'wb') as f:
+            pfun = functools.partial(g.read, gcsio.DEFAULT_READ_BUFFER_SIZE)
+            for chunk in iter(pfun, ''):
+              f.write(chunk)
+    else:
+      # Branch used only for unit tests and integration tests.
+      # In such environments GCS support is not available.
+      if not os.path.isdir(os.path.dirname(to_path)):
+        logging.info(
+            'Created folder (since we have not done yet, and any errors '
+            'will follow): %s ', os.path.dirname(to_path))
+        os.mkdir(os.path.dirname(to_path))
+      shutil.copyfile(from_path, to_path)
+
+  def _download_file(self, from_url, to_path):
+    """Downloads a file over http/https from a url or copy it from a remote
+        path to local path."""
+    if from_url.startswith('http://') or from_url.startswith('https://'):
+      # TODO(silviuc): We should cache downloads so we do not do it for every
+      # job.
+      try:
+        # We check if the file is actually there because wget returns a file
+        # even for a 404 response (file will contain the contents of the 404
+        # response).
+        # TODO(angoenka): Extract and use the filename when downloading file.
+        response, content = __import__('httplib2').Http().request(from_url)
+        if int(response['status']) >= 400:
+          raise RuntimeError(
+              'Artifact not found at %s (response: %s)' % (from_url, response))
+        with open(to_path, 'w') as f:
+          f.write(content)
+      except Exception:
+        logging.info('Failed to download Artifact from %s', from_url)
+        raise
+    else:
+      # Copy the file from the remote file system to loca files system.
+      self._copy_file(from_url, to_path)
+
+  def _is_remote_path(self, path):
+    return path.find('://') != -1
+
+  def _stage_extra_packages(self, extra_packages, staging_location, temp_dir):
+    """Stages a list of local extra packages.
+
+      Args:
+        extra_packages: Ordered list of local paths to extra packages to be
+          staged.
+        staging_location: Staging location for the packages.
+        temp_dir: Temporary folder where the resource building can happen.
+          Caller is responsible for cleaning up this folder after this function
+          returns.
+
+      Returns:
+        A list of file names (no paths) for the resources staged. All the files
+        are assumed to be staged in staging_location.
+
+      Raises:
+        RuntimeError: If files specified are not found or do not have expected
+          name patterns.
+      """
+    resources = []
+    staging_temp_dir = tempfile.mkdtemp(dir=temp_dir)
+    local_packages = []
+    for package in extra_packages:
+      if not (os.path.basename(package).endswith('.tar') or
+              os.path.basename(package).endswith('.tar.gz') or
+              os.path.basename(package).endswith('.whl') or
+              os.path.basename(package).endswith('.zip')):
+        raise RuntimeError(
+            'The --extra_package option expects a full path ending with '
+            '".tar", ".tar.gz", ".whl" or ".zip" instead of %s' % package)
+      if os.path.basename(package).endswith('.whl'):
+        logging.warning(
+            'The .whl package "%s" is provided in --extra_package. '
+            'This functionality is not officially supported. Since wheel '
+            'packages are binary distributions, this package must be '
+            'binary-compatible with the worker environment (e.g. Python 2.7 '
+            'running on an x64 Linux host).')
+
+      if not os.path.isfile(package):
+        if self._is_remote_path(package):
+          # Download remote package.
+          logging.info('Downloading extra package: %s locally before staging',
+                       package)
+          _, last_component = FileSystems.split(package)
+          local_file_path = FileSystems.join(staging_temp_dir, last_component)
+          self._download_file(package, local_file_path)
+        else:
+          raise RuntimeError(
+              'The file %s cannot be found. It was specified in the '
+              '--extra_packages command line option.' % package)
+      else:
+        local_packages.append(package)
+
+    local_packages.extend([
+        FileSystems.join(staging_temp_dir, f)
+        for f in os.listdir(staging_temp_dir)
+    ])
+
+    for package in local_packages:
+      basename = os.path.basename(package)
+      staged_path = FileSystems.join(staging_location, basename)
+      self.stage_artifact(package, staged_path)
+      resources.append(basename)
+    # Create a file containing the list of extra packages and stage it.
+    # The file is important so that in the worker the packages are installed
+    # exactly in the order specified. This approach will avoid extra PyPI
+    # requests. For example if package A depends on package B and package A
+    # is installed first then the installer will try to satisfy the
+    # dependency on B by downloading the package from PyPI. If package B is
+    # installed first this is avoided.
+    with open(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), 'wt') as f:
+      for package in local_packages:
+        f.write('%s\n' % os.path.basename(package))
+    staged_path = FileSystems.join(staging_location, EXTRA_PACKAGES_FILE)
+    # Note that the caller of this function is responsible for deleting the
+    # temporary folder where all temp files are created, including this one.
+    self.stage_artifact(
+        os.path.join(temp_dir, EXTRA_PACKAGES_FILE), staged_path)
+    resources.append(EXTRA_PACKAGES_FILE)
+
+    return resources
+
+  def _get_python_executable(self):
+    # Allow overriding the python executable to use for downloading and
+    # installing dependencies, otherwise use the python executable for
+    # the current process.
+    python_bin = os.environ.get('BEAM_PYTHON') or sys.executable
+    if not python_bin:
+      raise ValueError('Could not find Python executable.')
+    return python_bin
+
+  def _populate_requirements_cache(self, requirements_file, cache_dir):
 
 Review comment:
   Marked `@staticmethod`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 102655)
    Time Spent: 10h 50m  (was: 10h 40m)

> Python SDK stages artifacts when talking to job server
> ------------------------------------------------------
>
>                 Key: BEAM-3883
>                 URL: https://issues.apache.org/jira/browse/BEAM-3883
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Ben Sidhom
>            Assignee: Ankur Goenka
>            Priority: Major
>          Time Spent: 10h 50m
>  Remaining Estimate: 0h
>
> The Python SDK does not currently stage its user-defined functions or 
> dependencies when talking to the job API. Artifacts that need to be staged 
> include the user code itself, any SDK components not included in the 
> container image, and the list of Python packages that must be installed at 
> runtime.
>  
> Artifacts that are currently expected can be found in the harness boot code: 
> [https://github.com/apache/beam/blob/58e3b06bee7378d2d8db1c8dd534b415864f63e1/sdks/python/container/boot.go#L52.]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to