[ 
https://issues.apache.org/jira/browse/BEAM-3883?focusedWorklogId=104331&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104331
 ]

ASF GitHub Bot logged work on BEAM-3883:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/May/18 23:05
            Start Date: 21/May/18 23:05
    Worklog Time Spent: 10m 
      Work Description: jkff closed pull request #5251: [BEAM-3883] Refactor 
and clean dependency.py to make it reusable with artifact service
URL: https://github.com/apache/beam/pull/5251
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 54eba06abb8..72c54a40cfe 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -25,14 +25,17 @@
 import logging
 import os
 import re
+import tempfile
 import time
 from datetime import datetime
 from StringIO import StringIO
 
+import pkg_resources
 from apitools.base.py import encoding
 from apitools.base.py import exceptions
 import six
 
+from apache_beam import version as beam_version
 from apache_beam.internal.gcp.auth import get_service_credentials
 from apache_beam.internal.gcp.json_value import to_json_value
 from apache_beam.io.filesystems import FileSystems
@@ -41,11 +44,10 @@
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.options.pipeline_options import WorkerOptions
-from apache_beam.runners.dataflow.internal import dependency
 from apache_beam.runners.dataflow.internal import names
 from apache_beam.runners.dataflow.internal.clients import dataflow
-from apache_beam.runners.dataflow.internal.dependency import 
get_sdk_name_and_version
 from apache_beam.runners.dataflow.internal.names import PropertyNames
+from apache_beam.runners.portability.stager import Stager
 from apache_beam.transforms import cy_combiners
 from apache_beam.transforms import DataflowDistributionCounter
 from apache_beam.transforms.display import DisplayData
@@ -169,7 +171,7 @@ def __init__(self, packages, options, environment_version, 
pipeline_url):
     # TODO: Use enumerated type instead of strings for job types.
     if job_type.startswith('FNAPI_'):
       runner_harness_override = (
-          dependency.get_runner_harness_container_image())
+          get_runner_harness_container_image())
       self.debug_options.experiments = self.debug_options.experiments or []
       if runner_harness_override:
         self.debug_options.experiments.append(
@@ -234,7 +236,7 @@ def __init__(self, packages, options, environment_version, 
pipeline_url):
           self.worker_options.worker_harness_container_image)
     else:
       pool.workerHarnessContainerImage = (
-          dependency.get_default_container_image_for_current_sdk(job_type))
+          get_default_container_image_for_current_sdk(job_type))
     if self.worker_options.use_public_ips is not None:
       if self.worker_options.use_public_ips:
         pool.ipConfiguration = (
@@ -432,6 +434,19 @@ def _gcs_file_copy(self, from_path, to_path):
     with open(from_path, 'rb') as f:
       self.stage_file(to_folder, to_name, f)
 
+  def _stage_resources(self, options):
+    google_cloud_options = options.view_as(GoogleCloudOptions)
+    if google_cloud_options.staging_location is None:
+      raise RuntimeError('The --staging_location option must be specified.')
+    if google_cloud_options.temp_location is None:
+      raise RuntimeError('The --temp_location option must be specified.')
+
+    resource_stager = _LegacyDataflowStager(self)
+    return resource_stager.stage_job_resources(
+        options,
+        temp_dir=tempfile.mkdtemp(),
+        staging_location=google_cloud_options.staging_location)
+
   def stage_file(self, gcs_or_local_path, file_name, stream,
                  mime_type='application/octet-stream'):
     """Stages a file at a GCS or local path with stream-supplied contents."""
@@ -496,8 +511,7 @@ def create_job_description(self, job):
                     StringIO(job.proto_pipeline.SerializeToString()))
 
     # Stage other resources for the SDK harness
-    resources = dependency.stage_job_resources(
-        job.options, file_copy=self._gcs_file_copy)
+    resources = self._stage_resources(job.options)
 
     job.proto.environment = Environment(
         
pipeline_url=FileSystems.join(job.google_cloud_options.staging_location,
@@ -731,6 +745,31 @@ def translate_scalar_counter_float(accumulator, 
metric_update_proto):
     metric_update_proto.floatingPoint = accumulator.value
 
 
+class _LegacyDataflowStager(Stager):
+  def __init__(self, dataflow_application_client):
+    super(_LegacyDataflowStager, self).__init__()
+    self._dataflow_application_client = dataflow_application_client
+
+  def stage_artifact(self, local_path_to_artifact, artifact_name):
+    self._dataflow_application_client._gcs_file_copy(local_path_to_artifact,
+                                                     artifact_name)
+
+  def commit_manifest(self):
+    pass
+
+  @staticmethod
+  def get_sdk_package_name():
+    """For internal use only; no backwards-compatibility guarantees.
+
+          Returns the PyPI package name to be staged to Google Cloud Dataflow.
+    """
+    sdk_name, _ = get_sdk_name_and_version()
+    if sdk_name == names.GOOGLE_SDK_NAME:
+      return names.GOOGLE_PACKAGE_NAME
+    else:
+      return names.BEAM_PACKAGE_NAME
+
+
 def to_split_int(n):
   res = dataflow.SplitInt64()
   res.lowBits = n & 0xffffffff
@@ -780,6 +819,71 @@ def _use_fnapi(pipeline_options):
       debug_options.experiments and 'beam_fn_api' in debug_options.experiments)
 
 
+def get_sdk_name_and_version():
+  """For internal use only; no backwards-compatibility guarantees.
+
+    Returns name and version of SDK reported to Google Cloud Dataflow."""
+  try:
+    pkg_resources.get_distribution(names.GOOGLE_PACKAGE_NAME)
+    return (names.GOOGLE_SDK_NAME, beam_version.__version__)
+  except pkg_resources.DistributionNotFound:
+    return (names.BEAM_SDK_NAME, beam_version.__version__)
+
+
+def get_default_container_image_for_current_sdk(job_type):
+  """For internal use only; no backwards-compatibility guarantees.
+
+    Args:
+      job_type (str): BEAM job type.
+
+    Returns:
+      str: Google Cloud Dataflow container image for remote execution.
+    """
+  # TODO(tvalentyn): Use enumerated type instead of strings for job types.
+  if job_type == 'FNAPI_BATCH' or job_type == 'FNAPI_STREAMING':
+    image_name = names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python-fnapi'
+  else:
+    image_name = names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python'
+  image_tag = _get_required_container_version(job_type)
+  return image_name + ':' + image_tag
+
+
+def _get_required_container_version(job_type=None):
+  """For internal use only; no backwards-compatibility guarantees.
+
+    Args:
+      job_type (str, optional): BEAM job type. Defaults to None.
+
+    Returns:
+      str: The tag of worker container images in GCR that corresponds to
+        current version of the SDK.
+    """
+  if 'dev' in beam_version.__version__:
+    if job_type == 'FNAPI_BATCH' or job_type == 'FNAPI_STREAMING':
+      return names.BEAM_FNAPI_CONTAINER_VERSION
+    else:
+      return names.BEAM_CONTAINER_VERSION
+  else:
+    return beam_version.__version__
+
+
+def get_runner_harness_container_image():
+  """For internal use only; no backwards-compatibility guarantees.
+
+     Returns:
+       str: Runner harness container image that shall be used by default
+         for current SDK version or None if the runner harness container image
+         bundled with the service shall be used.
+    """
+  # Pin runner harness for released versions of the SDK.
+  if 'dev' not in beam_version.__version__:
+    return (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/' + 'harness' + ':' +
+            beam_version.__version__)
+  # Don't pin runner harness for dev versions so that we can notice
+  # potential incompatibility between runner and sdk harnesses.
+  return None
+
+
 # To enable a counter on the service, add it to this dictionary.
 structured_counter_translations = {
     cy_combiners.CountCombineFn: (
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 35ece3ea767..2ba4e840cd3 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -21,7 +21,7 @@
 
 from apache_beam.metrics.cells import DistributionData
 from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.runners.dataflow.internal import dependency
+from apache_beam.runners.dataflow.internal import names
 from apache_beam.runners.dataflow.internal.clients import dataflow
 from apache_beam.transforms import DataflowDistributionCounter
 
@@ -206,14 +206,14 @@ def test_private_ip_configuration(self):
         env.proto.workerPools[0].ipConfiguration,
         dataflow.WorkerPool.IpConfigurationValueValuesEnum.WORKER_IP_PRIVATE)
 
-  @mock.patch('apache_beam.runners.dataflow.internal.dependency.'
+  @mock.patch('apache_beam.runners.dataflow.internal.apiclient.'
               'beam_version.__version__', '2.2.0')
   def test_harness_override_present_in_released_sdks(self):
     pipeline_options = PipelineOptions(
         ['--temp_location', 'gs://any-location/temp', '--streaming'])
     override = ''.join(
         ['runner_harness_container_image=',
-         dependency.DATAFLOW_CONTAINER_IMAGE_REPOSITORY,
+         names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY,
          '/harness:2.2.0'])
     env = apiclient.Environment([], #packages
                                 pipeline_options,
@@ -221,7 +221,7 @@ def test_harness_override_present_in_released_sdks(self):
                                 FAKE_PIPELINE_URL)
     self.assertIn(override, env.proto.experiments)
 
-  @mock.patch('apache_beam.runners.dataflow.internal.dependency.'
+  @mock.patch('apache_beam.runners.dataflow.internal.apiclient.'
               'beam_version.__version__', '2.2.0.dev')
   def test_harness_override_absent_in_unreleased_sdk(self):
     pipeline_options = PipelineOptions(
@@ -234,7 +234,7 @@ def test_harness_override_absent_in_unreleased_sdk(self):
       for experiment in env.proto.experiments:
         self.assertNotIn('runner_harness_container_image=', experiment)
 
-  @mock.patch('apache_beam.runners.dataflow.internal.dependency.'
+  @mock.patch('apache_beam.runners.dataflow.internal.apiclient.'
               'beam_version.__version__', '2.2.0.dev')
   def test_pinned_worker_harness_image_tag_used_in_dev_sdk(self):
     # streaming, fnapi pipeline.
@@ -246,8 +246,8 @@ def 
test_pinned_worker_harness_image_tag_used_in_dev_sdk(self):
                                 FAKE_PIPELINE_URL)
     self.assertEqual(
         env.proto.workerPools[0].workerHarnessContainerImage,
-        (dependency.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
-         '/python-fnapi:' + dependency.BEAM_FNAPI_CONTAINER_VERSION))
+        (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
+         '/python-fnapi:' + names.BEAM_FNAPI_CONTAINER_VERSION))
 
     # batch, legacy pipeline.
     pipeline_options = PipelineOptions(
@@ -258,10 +258,10 @@ def 
test_pinned_worker_harness_image_tag_used_in_dev_sdk(self):
                                 FAKE_PIPELINE_URL)
     self.assertEqual(
         env.proto.workerPools[0].workerHarnessContainerImage,
-        (dependency.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
-         '/python:' + dependency.BEAM_CONTAINER_VERSION))
+        (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
+         '/python:' + names.BEAM_CONTAINER_VERSION))
 
-  @mock.patch('apache_beam.runners.dataflow.internal.dependency.'
+  @mock.patch('apache_beam.runners.dataflow.internal.apiclient.'
               'beam_version.__version__', '2.2.0')
   def test_worker_harness_image_tag_matches_released_sdk_version(self):
     # streaming, fnapi pipeline.
@@ -273,7 +273,7 @@ def 
test_worker_harness_image_tag_matches_released_sdk_version(self):
                                 FAKE_PIPELINE_URL)
     self.assertEqual(
         env.proto.workerPools[0].workerHarnessContainerImage,
-        (dependency.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
+        (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
          '/python-fnapi:2.2.0'))
 
     # batch, legacy pipeline.
@@ -285,7 +285,7 @@ def 
test_worker_harness_image_tag_matches_released_sdk_version(self):
                                 FAKE_PIPELINE_URL)
     self.assertEqual(
         env.proto.workerPools[0].workerHarnessContainerImage,
-        (dependency.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
+        (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
          '/python:2.2.0'))
 
   def test_worker_harness_override_takes_precedence_over_sdk_defaults(self):
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py 
b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
deleted file mode 100644
index 2307371d380..00000000000
--- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
+++ /dev/null
@@ -1,671 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Support for installing custom code and required dependencies.
-
-Workflows, with the exception of very simple ones, are organized in multiple
-modules and packages. Typically, these modules and packages have
-dependencies on other standard libraries. Dataflow relies on the Python
-setuptools package to handle these scenarios. For further details please read:
-https://pythonhosted.org/an_example_pypi_project/setuptools.html
-
-When a runner tries to run a pipeline it will check for a --requirements_file
-and a --setup_file option.
-
-If --setup_file is present then it is assumed that the folder containing the
-file specified by the option has the typical layout required by setuptools and
-it will run 'python setup.py sdist' to produce a source distribution. The
-resulting tarball (a .tar or .tar.gz file) will be staged at the GCS staging
-location specified as job option. When a worker starts it will check for the
-presence of this file and will run 'easy_install tarball' to install the
-package in the worker.
-
-If --requirements_file is present then the file specified by the option will be
-staged in the GCS staging location.  When a worker starts it will check for the
-presence of this file and will run 'pip install -r requirements.txt'. A
-requirements file can be easily generated by running 'pip freeze -r
-requirements.txt'. The reason a Dataflow runner does not run this automatically
-is because quite often only a small fraction of the dependencies present in a
-requirements.txt file are actually needed for remote execution and therefore a
-one-time manual trimming is desirable.
-
-TODO(silviuc): Staged files should have a job specific prefix.
-To prevent several jobs in the same project stomping on each other due to a
-shared staging location.
-
-TODO(silviuc): Should we allow several setup packages?
-TODO(silviuc): We should allow customizing the exact command for setup build.
-"""
-
-import functools
-import glob
-import logging
-import os
-import shutil
-import subprocess
-import sys
-import tempfile
-
-import pkg_resources
-
-from apache_beam import version as beam_version
-from apache_beam.internal import pickler
-from apache_beam.io.filesystems import FileSystems
-from apache_beam.options.pipeline_options import GoogleCloudOptions
-from apache_beam.options.pipeline_options import SetupOptions
-from apache_beam.runners.dataflow.internal import names
-from apache_beam.utils import processes
-
-# All constants are for internal use only; no backwards-compatibility
-# guarantees.
-
-# In a released SDK, container tags are selected based on the SDK version.
-# Unreleased versions use container versions based on values of
-# BEAM_CONTAINER_VERSION and BEAM_FNAPI_CONTAINER_VERSION (see below).
-
-# Update this version to the next version whenever there is a change that will
-# require changes to legacy Dataflow worker execution environment.
-BEAM_CONTAINER_VERSION = 'beam-master-20180413'
-# Update this version to the next version whenever there is a change that
-# requires changes to SDK harness container or SDK harness launcher.
-BEAM_FNAPI_CONTAINER_VERSION = 'beam-master-20180413'
-
-# Standard file names used for staging files.
-WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
-REQUIREMENTS_FILE = 'requirements.txt'
-EXTRA_PACKAGES_FILE = 'extra_packages.txt'
-
-# Package names for different distributions
-GOOGLE_PACKAGE_NAME = 'google-cloud-dataflow'
-BEAM_PACKAGE_NAME = 'apache-beam'
-
-# SDK identifiers for different distributions
-GOOGLE_SDK_NAME = 'Google Cloud Dataflow SDK for Python'
-BEAM_SDK_NAME = 'Apache Beam SDK for Python'
-
-DATAFLOW_CONTAINER_IMAGE_REPOSITORY = 'dataflow.gcr.io/v1beta3'
-
-
-def _dependency_file_copy(from_path, to_path):
-  """Copies a local file to a GCS file or vice versa."""
-  logging.info('file copy from %s to %s.', from_path, to_path)
-  if from_path.startswith('gs://') or to_path.startswith('gs://'):
-    from apache_beam.io.gcp import gcsio
-    if from_path.startswith('gs://') and to_path.startswith('gs://'):
-      # Both files are GCS files so copy.
-      gcsio.GcsIO().copy(from_path, to_path)
-    elif to_path.startswith('gs://'):
-      # Only target is a GCS file, read local file and upload.
-      with open(from_path, 'rb') as f:
-        with gcsio.GcsIO().open(to_path, mode='wb') as g:
-          pfun = functools.partial(f.read, gcsio.WRITE_CHUNK_SIZE)
-          for chunk in iter(pfun, ''):
-            g.write(chunk)
-    else:
-      # Source is a GCS file but target is local file.
-      with gcsio.GcsIO().open(from_path, mode='rb') as g:
-        with open(to_path, 'wb') as f:
-          pfun = functools.partial(g.read, gcsio.DEFAULT_READ_BUFFER_SIZE)
-          for chunk in iter(pfun, ''):
-            f.write(chunk)
-  else:
-    # Branch used only for unit tests and integration tests.
-    # In such environments GCS support is not available.
-    if not os.path.isdir(os.path.dirname(to_path)):
-      logging.info('Created folder (since we have not done yet, and any errors 
'
-                   'will follow): %s ', os.path.dirname(to_path))
-      os.mkdir(os.path.dirname(to_path))
-    shutil.copyfile(from_path, to_path)
-
-
-def _dependency_file_download(from_url, to_folder):
-  """Downloads a file from a URL and returns path to the local file."""
-  # TODO(silviuc): We should cache downloads so we do not do it for every job.
-  try:
-    # We check if the file is actually there because wget returns a file
-    # even for a 404 response (file will contain the contents of the 404
-    # response).
-    response, content = __import__('httplib2').Http().request(from_url)
-    if int(response['status']) >= 400:
-      raise RuntimeError(
-          'Beam SDK not found at %s (response: %s)' % (from_url, response))
-    local_download_file = os.path.join(to_folder, 'beam-sdk.tar.gz')
-    with open(local_download_file, 'w') as f:
-      f.write(content)
-  except Exception:
-    logging.info('Failed to download Beam SDK from %s', from_url)
-    raise
-  return local_download_file
-
-
-def _stage_extra_packages(extra_packages, staging_location, temp_dir,
-                          file_copy=_dependency_file_copy):
-  """Stages a list of local extra packages.
-
-  Args:
-    extra_packages: Ordered list of local paths to extra packages to be staged.
-    staging_location: Staging location for the packages.
-    temp_dir: Temporary folder where the resource building can happen. Caller
-      is responsible for cleaning up this folder after this function returns.
-    file_copy: Callable for copying files. The default version will copy from
-      a local file to a GCS location using the gsutil tool available in the
-      Google Cloud SDK package.
-
-  Returns:
-    A list of file names (no paths) for the resources staged. All the files
-    are assumed to be staged in staging_location.
-
-  Raises:
-    RuntimeError: If files specified are not found or do not have expected
-      name patterns.
-  """
-  resources = []
-  staging_temp_dir = None
-  local_packages = []
-  for package in extra_packages:
-    if not (os.path.basename(package).endswith('.tar') or
-            os.path.basename(package).endswith('.tar.gz') or
-            os.path.basename(package).endswith('.whl') or
-            os.path.basename(package).endswith('.zip')):
-      raise RuntimeError(
-          'The --extra_package option expects a full path ending with '
-          '".tar", ".tar.gz", ".whl" or ".zip" instead of %s' % package)
-    if os.path.basename(package).endswith('.whl'):
-      logging.warning(
-          'The .whl package "%s" is provided in --extra_package. '
-          'This functionality is not officially supported. Since wheel '
-          'packages are binary distributions, this package must be '
-          'binary-compatible with the worker environment (e.g. Python 2.7 '
-          'running on an x64 Linux host).')
-
-    if not os.path.isfile(package):
-      if package.startswith('gs://'):
-        if not staging_temp_dir:
-          staging_temp_dir = tempfile.mkdtemp(dir=temp_dir)
-        logging.info('Downloading extra package: %s locally before staging',
-                     package)
-        if os.path.isfile(staging_temp_dir):
-          local_file_path = staging_temp_dir
-        else:
-          _, last_component = FileSystems.split(package)
-          local_file_path = FileSystems.join(staging_temp_dir, last_component)
-        _dependency_file_copy(package, local_file_path)
-      else:
-        raise RuntimeError(
-            'The file %s cannot be found. It was specified in the '
-            '--extra_packages command line option.' % package)
-    else:
-      local_packages.append(package)
-
-  if staging_temp_dir:
-    local_packages.extend(
-        [FileSystems.join(staging_temp_dir, f) for f in os.listdir(
-            staging_temp_dir)])
-
-  for package in local_packages:
-    basename = os.path.basename(package)
-    staged_path = FileSystems.join(staging_location, basename)
-    file_copy(package, staged_path)
-    resources.append(basename)
-  # Create a file containing the list of extra packages and stage it.
-  # The file is important so that in the worker the packages are installed
-  # exactly in the order specified. This approach will avoid extra PyPI
-  # requests. For example if package A depends on package B and package A
-  # is installed first then the installer will try to satisfy the
-  # dependency on B by downloading the package from PyPI. If package B is
-  # installed first this is avoided.
-  with open(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), 'wt') as f:
-    for package in local_packages:
-      f.write('%s\n' % os.path.basename(package))
-  staged_path = FileSystems.join(staging_location, EXTRA_PACKAGES_FILE)
-  # Note that the caller of this function is responsible for deleting the
-  # temporary folder where all temp files are created, including this one.
-  file_copy(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), staged_path)
-  resources.append(EXTRA_PACKAGES_FILE)
-
-  return resources
-
-
-def _get_python_executable():
-  # Allow overriding the python executable to use for downloading and
-  # installing dependencies, otherwise use the python executable for
-  # the current process.
-  python_bin = os.environ.get('BEAM_PYTHON') or sys.executable
-  if not python_bin:
-    raise ValueError('Could not find Python executable.')
-  return python_bin
-
-
-def _populate_requirements_cache(requirements_file, cache_dir):
-  # The 'pip download' command will not download again if it finds the
-  # tarball with the proper version already present.
-  # It will get the packages downloaded in the order they are presented in
-  # the requirements file and will not download package dependencies.
-  cmd_args = [
-      _get_python_executable(), '-m', 'pip', 'download', '--dest', cache_dir,
-      '-r', requirements_file, '--exists-action', 'i',
-      # Download from PyPI source distributions.
-      '--no-binary', ':all:']
-  logging.info('Executing command: %s', cmd_args)
-  processes.check_call(cmd_args)
-
-
-def stage_job_resources(
-    options, file_copy=_dependency_file_copy, build_setup_args=None,
-    temp_dir=None, populate_requirements_cache=_populate_requirements_cache):
-  """For internal use only; no backwards-compatibility guarantees.
-
-  Creates (if needed) and stages job resources to options.staging_location.
-
-  Args:
-    options: Command line options. More specifically the function will expect
-      staging_location, requirements_file, setup_file, and save_main_session
-      options to be present.
-    file_copy: Callable for copying files. The default version will copy from
-      a local file to a GCS location using the gsutil tool available in the
-      Google Cloud SDK package.
-    build_setup_args: A list of command line arguments used to build a setup
-      package. Used only if options.setup_file is not None. Used only for
-      testing.
-    temp_dir: Temporary folder where the resource building can happen. If None
-      then a unique temp directory will be created. Used only for testing.
-    populate_requirements_cache: Callable for populating the requirements 
cache.
-      Used only for testing.
-
-  Returns:
-    A list of file names (no paths) for the resources staged. All the files
-    are assumed to be staged in options.staging_location.
-
-  Raises:
-    RuntimeError: If files specified are not found or error encountered while
-      trying to create the resources (e.g., build a setup package).
-  """
-  temp_dir = temp_dir or tempfile.mkdtemp()
-  resources = []
-
-  google_cloud_options = options.view_as(GoogleCloudOptions)
-  setup_options = options.view_as(SetupOptions)
-  # Make sure that all required options are specified. There are a few that 
have
-  # defaults to support local running scenarios.
-  if google_cloud_options.staging_location is None:
-    raise RuntimeError(
-        'The --staging_location option must be specified.')
-  if google_cloud_options.temp_location is None:
-    raise RuntimeError(
-        'The --temp_location option must be specified.')
-
-  # Stage a requirements file if present.
-  if setup_options.requirements_file is not None:
-    if not os.path.isfile(setup_options.requirements_file):
-      raise RuntimeError('The file %s cannot be found. It was specified in the 
'
-                         '--requirements_file command line option.' %
-                         setup_options.requirements_file)
-    staged_path = FileSystems.join(google_cloud_options.staging_location,
-                                   REQUIREMENTS_FILE)
-    file_copy(setup_options.requirements_file, staged_path)
-    resources.append(REQUIREMENTS_FILE)
-    requirements_cache_path = (
-        os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache')
-        if setup_options.requirements_cache is None
-        else setup_options.requirements_cache)
-    # Populate cache with packages from requirements and stage the files
-    # in the cache.
-    if not os.path.exists(requirements_cache_path):
-      os.makedirs(requirements_cache_path)
-    populate_requirements_cache(
-        setup_options.requirements_file, requirements_cache_path)
-    for pkg in  glob.glob(os.path.join(requirements_cache_path, '*')):
-      file_copy(pkg, FileSystems.join(google_cloud_options.staging_location,
-                                      os.path.basename(pkg)))
-      resources.append(os.path.basename(pkg))
-
-  # Handle a setup file if present.
-  # We will build the setup package locally and then copy it to the staging
-  # location because the staging location is a GCS path and the file cannot be
-  # created directly there.
-  if setup_options.setup_file is not None:
-    if not os.path.isfile(setup_options.setup_file):
-      raise RuntimeError('The file %s cannot be found. It was specified in the 
'
-                         '--setup_file command line option.' %
-                         setup_options.setup_file)
-    if os.path.basename(setup_options.setup_file) != 'setup.py':
-      raise RuntimeError(
-          'The --setup_file option expects the full path to a file named '
-          'setup.py instead of %s' % setup_options.setup_file)
-    tarball_file = _build_setup_package(setup_options.setup_file, temp_dir,
-                                        build_setup_args)
-    staged_path = FileSystems.join(google_cloud_options.staging_location,
-                                   WORKFLOW_TARBALL_FILE)
-    file_copy(tarball_file, staged_path)
-    resources.append(WORKFLOW_TARBALL_FILE)
-
-  # Handle extra local packages that should be staged.
-  if setup_options.extra_packages is not None:
-    resources.extend(
-        _stage_extra_packages(setup_options.extra_packages,
-                              google_cloud_options.staging_location,
-                              temp_dir=temp_dir, file_copy=file_copy))
-
-  # Pickle the main session if requested.
-  # We will create the pickled main session locally and then copy it to the
-  # staging location because the staging location is a GCS path and the file
-  # cannot be created directly there.
-  if setup_options.save_main_session:
-    pickled_session_file = os.path.join(temp_dir,
-                                        names.PICKLED_MAIN_SESSION_FILE)
-    pickler.dump_session(pickled_session_file)
-    staged_path = FileSystems.join(google_cloud_options.staging_location,
-                                   names.PICKLED_MAIN_SESSION_FILE)
-    file_copy(pickled_session_file, staged_path)
-    resources.append(names.PICKLED_MAIN_SESSION_FILE)
-
-  if hasattr(setup_options, 'sdk_location'):
-    if setup_options.sdk_location == 'default':
-      stage_sdk_from_remote_location = True
-    elif (setup_options.sdk_location.startswith('gs://') or
-          setup_options.sdk_location.startswith('http://') or
-          setup_options.sdk_location.startswith('https://')):
-      stage_sdk_from_remote_location = True
-    else:
-      stage_sdk_from_remote_location = False
-
-    if stage_sdk_from_remote_location:
-      # If --sdk_location is not specified then the appropriate package
-      # will be obtained from PyPI (https://pypi.python.org) based on the
-      # version of the currently running SDK. If the option is
-      # present then no version matching is made and the exact URL or path
-      # is expected.
-      #
-      # Unit tests running in the 'python setup.py test' context will
-      # not have the sdk_location attribute present and therefore we
-      # will not stage SDK.
-      if setup_options.sdk_location == 'default':
-        sdk_remote_location = 'pypi'
-      else:
-        sdk_remote_location = setup_options.sdk_location
-      resources.extend(
-          _stage_beam_sdk(sdk_remote_location,
-                          google_cloud_options.staging_location, temp_dir))
-    else:
-      # This branch is also used by internal tests running with the SDK built
-      # at head.
-      if setup_options.sdk_location == 'default':
-        module_path = os.path.abspath(__file__)
-        sdk_path = os.path.join(
-            os.path.dirname(module_path), '..', '..', '..',
-            names.DATAFLOW_SDK_TARBALL_FILE)
-      elif os.path.isdir(setup_options.sdk_location):
-        sdk_path = os.path.join(
-            setup_options.sdk_location, names.DATAFLOW_SDK_TARBALL_FILE)
-      else:
-        sdk_path = setup_options.sdk_location
-      if os.path.isfile(sdk_path):
-        logging.info('Copying Beam SDK "%s" to staging location.', sdk_path)
-        staged_path = FileSystems.join(
-            google_cloud_options.staging_location,
-            _desired_sdk_filename_in_staging_location(
-                setup_options.sdk_location))
-        file_copy(sdk_path, staged_path)
-        _, sdk_staged_filename = FileSystems.split(staged_path)
-        resources.append(sdk_staged_filename)
-      else:
-        if setup_options.sdk_location == 'default':
-          raise RuntimeError('Cannot find default Beam SDK tar file "%s"',
-                             sdk_path)
-        elif not setup_options.sdk_location:
-          logging.info('Beam SDK will not be staged since --sdk_location '
-                       'is empty.')
-        else:
-          raise RuntimeError(
-              'The file "%s" cannot be found. Its location was specified by '
-              'the --sdk_location command-line option.' %
-              sdk_path)
-
-  # Delete all temp files created while staging job resources.
-  shutil.rmtree(temp_dir)
-  return resources
-
-
-def _build_setup_package(setup_file, temp_dir, build_setup_args=None):
-  saved_current_directory = os.getcwd()
-  try:
-    os.chdir(os.path.dirname(setup_file))
-    if build_setup_args is None:
-      build_setup_args = [
-          _get_python_executable(), os.path.basename(setup_file),
-          'sdist', '--dist-dir', temp_dir]
-    logging.info('Executing command: %s', build_setup_args)
-    processes.check_call(build_setup_args)
-    output_files = glob.glob(os.path.join(temp_dir, '*.tar.gz'))
-    if not output_files:
-      raise RuntimeError(
-          'File %s not found.' % os.path.join(temp_dir, '*.tar.gz'))
-    return output_files[0]
-  finally:
-    os.chdir(saved_current_directory)
-
-
-def _desired_sdk_filename_in_staging_location(sdk_location):
-  """Returns the name that SDK file should have file in the staging location.
-
-  Args:
-    sdk_location: Full path to SDK file.
-  """
-  if sdk_location.endswith('.whl'):
-    _, wheel_filename = FileSystems.split(sdk_location)
-    if wheel_filename.startswith('apache_beam'):
-      return wheel_filename
-    else:
-      raise RuntimeError('Unrecognized SDK wheel file: %s' % sdk_location)
-  else:
-    return names.DATAFLOW_SDK_TARBALL_FILE
-
-
-def _stage_beam_sdk(sdk_remote_location, staging_location, temp_dir):
-  """Stages a Beam SDK file with the appropriate version.
-
-  Args:
-    sdk_remote_location: A GCS path to a SDK file or a URL from which
-      the file can be downloaded. The SDK file can be a tarball or a wheel.
-      Set to 'pypi' to download and stage a wheel and source SDK from PyPi.
-    staging_location: A GCS bucket where the SDK file should be copied.
-    temp_dir: path to temporary location where the file should be downloaded.
-
-  Returns:
-    A list of SDK files that were staged to the staging location.
-
-  Raises:
-    RuntimeError: if staging was not successful.
-  """
-  if (sdk_remote_location.startswith('http://') or
-      sdk_remote_location.startswith('https://')):
-    local_download_file = _dependency_file_download(
-        sdk_remote_location, temp_dir)
-    staged_name = 
_desired_sdk_filename_in_staging_location(local_download_file)
-    staged_path = FileSystems.join(staging_location, staged_name)
-    logging.info(
-        'Staging Beam SDK from %s to %s',
-        sdk_remote_location, staged_path)
-    _dependency_file_copy(local_download_file, staged_path)
-    return [staged_name]
-  elif sdk_remote_location.startswith('gs://'):
-    # Stage the file to the GCS staging area.
-    staged_name = 
_desired_sdk_filename_in_staging_location(sdk_remote_location)
-    staged_path = FileSystems.join(staging_location, staged_name)
-    logging.info(
-        'Staging Beam SDK from %s to %s',
-        sdk_remote_location, staged_path)
-    _dependency_file_copy(sdk_remote_location, staged_path)
-    return [staged_name]
-  elif sdk_remote_location == 'pypi':
-    sdk_local_file = _download_pypi_sdk_package(temp_dir)
-    sdk_sources_staged_name = _desired_sdk_filename_in_staging_location(
-        sdk_local_file)
-    staged_path = FileSystems.join(staging_location, sdk_sources_staged_name)
-    logging.info('Staging SDK sources from PyPI to %s', staged_path)
-    _dependency_file_copy(sdk_local_file, staged_path)
-    staged_sdk_files = [sdk_sources_staged_name]
-    try:
-      # Stage binary distribution of the SDK, for now on a best-effort basis.
-      sdk_local_file = _download_pypi_sdk_package(temp_dir, fetch_binary=True)
-      sdk_binary_staged_name = _desired_sdk_filename_in_staging_location(
-          sdk_local_file)
-      staged_path = FileSystems.join(staging_location, sdk_binary_staged_name)
-      logging.info('Staging binary distribution of the SDK from PyPI to %s',
-                   staged_path)
-      _dependency_file_copy(sdk_local_file, staged_path)
-      staged_sdk_files.append(sdk_binary_staged_name)
-    except RuntimeError as e:
-      logging.warn('Failed to download requested binary distribution '
-                   'of the SDK: %s', repr(e))
-
-    return staged_sdk_files
-  else:
-    raise RuntimeError(
-        'The --sdk_location option was used with an unsupported '
-        'type of location: %s' % sdk_remote_location)
-
-
-def get_runner_harness_container_image():
-  """For internal use only; no backwards-compatibility guarantees.
-
-   Returns:
-     str: Runner harness container image that shall be used by default
-       for current SDK version or None if the runner harness container image
-       bundled with the service shall be used.
-  """
-  # Pin runner harness for released versions of the SDK.
-  if 'dev' not in beam_version.__version__:
-    return (DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/' + 'harness' + ':' +
-            beam_version.__version__)
-  # Don't pin runner harness for dev versions so that we can notice
-  # potential incompatibility between runner and sdk harnesses.
-  return None
-
-
-def get_default_container_image_for_current_sdk(job_type):
-  """For internal use only; no backwards-compatibility guarantees.
-
-  Args:
-    job_type (str): BEAM job type.
-
-  Returns:
-    str: Google Cloud Dataflow container image for remote execution.
-  """
-  # TODO(tvalentyn): Use enumerated type instead of strings for job types.
-  if job_type == 'FNAPI_BATCH' or job_type == 'FNAPI_STREAMING':
-    image_name = DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python-fnapi'
-  else:
-    image_name = DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python'
-  image_tag = _get_required_container_version(job_type)
-  return image_name + ':' + image_tag
-
-
-def _get_required_container_version(job_type=None):
-  """For internal use only; no backwards-compatibility guarantees.
-
-  Args:
-    job_type (str, optional): BEAM job type. Defaults to None.
-
-  Returns:
-    str: The tag of worker container images in GCR that corresponds to
-      current version of the SDK.
-  """
-  if 'dev' in beam_version.__version__:
-    if job_type == 'FNAPI_BATCH' or job_type == 'FNAPI_STREAMING':
-      return BEAM_FNAPI_CONTAINER_VERSION
-    else:
-      return BEAM_CONTAINER_VERSION
-  else:
-    return beam_version.__version__
-
-
-def get_sdk_name_and_version():
-  """For internal use only; no backwards-compatibility guarantees.
-
-  Returns name and version of SDK reported to Google Cloud Dataflow."""
-  try:
-    pkg_resources.get_distribution(GOOGLE_PACKAGE_NAME)
-    return (GOOGLE_SDK_NAME, beam_version.__version__)
-  except pkg_resources.DistributionNotFound:
-    return (BEAM_SDK_NAME, beam_version.__version__)
-
-
-def get_sdk_package_name():
-  """For internal use only; no backwards-compatibility guarantees.
-
-  Returns the PyPI package name to be staged to Google Cloud Dataflow."""
-  sdk_name, _ = get_sdk_name_and_version()
-  if sdk_name == GOOGLE_SDK_NAME:
-    return GOOGLE_PACKAGE_NAME
-  else:
-    return BEAM_PACKAGE_NAME
-
-
-def _download_pypi_sdk_package(temp_dir, fetch_binary=False,
-                               language_version_tag='27',
-                               language_implementation_tag='cp',
-                               abi_tag='cp27mu',
-                               platform_tag='manylinux1_x86_64'):
-  """Downloads SDK package from PyPI and returns path to local path."""
-  package_name = get_sdk_package_name()
-  try:
-    version = pkg_resources.get_distribution(package_name).version
-  except pkg_resources.DistributionNotFound:
-    raise RuntimeError('Please set --sdk_location command-line option '
-                       'or install a valid {} distribution.'
-                       .format(package_name))
-  cmd_args = [
-      _get_python_executable(), '-m', 'pip', 'download', '--dest', temp_dir,
-      '%s==%s' % (package_name, version), '--no-deps']
-
-  if fetch_binary:
-    logging.info('Downloading binary distribtution of the SDK from PyPi')
-    # Get a wheel distribution for the SDK from PyPI.
-    cmd_args.extend([
-        '--only-binary', ':all:', '--python-version', language_version_tag,
-        '--implementation', language_implementation_tag, '--abi', abi_tag,
-        '--platform', platform_tag])
-    # Example wheel: apache_beam-2.4.0-cp27-cp27mu-manylinux1_x86_64.whl
-    expected_files = [
-        os.path.join(
-            temp_dir,
-            '%s-%s-%s%s-%s-%s.whl' % (package_name.replace('-', '_'), version,
-                                      language_implementation_tag,
-                                      language_version_tag, abi_tag,
-                                      platform_tag))]
-  else:
-    logging.info('Downloading source distribtution of the SDK from PyPi')
-    cmd_args.extend(['--no-binary', ':all:'])
-    expected_files = [
-        os.path.join(temp_dir, '%s-%s.zip' % (package_name, version)),
-        os.path.join(temp_dir, '%s-%s.tar.gz' % (package_name, version))
-    ]
-
-  logging.info('Executing command: %s', cmd_args)
-  try:
-    processes.check_call(cmd_args)
-  except subprocess.CalledProcessError as e:
-    raise RuntimeError(repr(e))
-
-  for sdk_file in expected_files:
-    if os.path.exists(sdk_file):
-      return sdk_file
-
-  raise RuntimeError(
-      'Failed to download a distribution for the running SDK. '
-      'Expected either one of %s to be found in the download folder.' % (
-          expected_files))
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py 
b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index c9a4e6c5c99..7e0b81e2130 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -17,6 +17,9 @@
 
 """Various names for properties, transforms, etc."""
 
+# All constants are for internal use only; no backwards-compatibility
+# guarantees.
+
 # TODO (altay): Move shared names to a common location.
 # Standard file names used for staging files.
 PICKLED_MAIN_SESSION_FILE = 'pickled_main_session'
@@ -29,6 +32,27 @@
 SOURCE_TYPE = 'CustomSourcesType'
 SERIALIZED_SOURCE_KEY = 'serialized_source'
 
+# In a released SDK, container tags are selected based on the SDK version.
+# Unreleased versions use container versions based on values of
+# BEAM_CONTAINER_VERSION and BEAM_FNAPI_CONTAINER_VERSION (see below).
+
+# Update this version to the next version whenever there is a change that will
+# require changes to legacy Dataflow worker execution environment.
+BEAM_CONTAINER_VERSION = 'beam-master-20180518'
+# Update this version to the next version whenever there is a change that
+# requires changes to SDK harness container or SDK harness launcher.
+BEAM_FNAPI_CONTAINER_VERSION = 'beam-master-20180413'
+
+# Package names for different distributions
+GOOGLE_PACKAGE_NAME = 'google-cloud-dataflow'
+BEAM_PACKAGE_NAME = 'apache-beam'
+
+# SDK identifiers for different distributions
+GOOGLE_SDK_NAME = 'Google Cloud Dataflow SDK for Python'
+BEAM_SDK_NAME = 'Apache Beam SDK for Python'
+
+DATAFLOW_CONTAINER_IMAGE_REPOSITORY = 'dataflow.gcr.io/v1beta3'
+
 
 class TransformNames(object):
   """For internal use only; no backwards-compatibility guarantees.
diff --git a/sdks/python/apache_beam/runners/portability/stager.py 
b/sdks/python/apache_beam/runners/portability/stager.py
new file mode 100644
index 00000000000..cf2f0ea2acf
--- /dev/null
+++ b/sdks/python/apache_beam/runners/portability/stager.py
@@ -0,0 +1,557 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Support for installing custom code and required dependencies.
+
+Workflows, with the exception of very simple ones, are organized in multiple
+modules and packages. Typically, these modules and packages have
+dependencies on other standard libraries. Beam relies on the Python
+setuptools package to handle these scenarios. For further details please read:
+https://pythonhosted.org/an_example_pypi_project/setuptools.html
+
+When a runner tries to run a pipeline it will check for a --requirements_file
+and a --setup_file option.
+
+If --setup_file is present then it is assumed that the folder containing the
+file specified by the option has the typical layout required by setuptools and
+it will run 'python setup.py sdist' to produce a source distribution. The
+resulting tarball (a .tar or .tar.gz file) will be staged at the staging
+location specified as job option. When a worker starts it will check for the
+presence of this file and will run 'easy_install tarball' to install the
+package in the worker.
+
+If --requirements_file is present then the file specified by the option will be
+staged in the staging location.  When a worker starts it will check for the
+presence of this file and will run 'pip install -r requirements.txt'. A
+requirements file can be easily generated by running 'pip freeze -r
+requirements.txt'. The reason a runner does not run this automatically is
+because quite often only a small fraction of the dependencies present in a
+requirements.txt file are actually needed for remote execution and therefore a
+one-time manual trimming is desirable.
+
+TODO(silviuc): Should we allow several setup packages?
+TODO(silviuc): We should allow customizing the exact command for setup build.
+"""
+import glob
+import logging
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+
+import pkg_resources
+
+from apache_beam.internal import pickler
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.options.pipeline_options import SetupOptions
+# TODO(angoenka): Remove reference to dataflow internal names
+from apache_beam.runners.dataflow.internal import names
+from apache_beam.utils import processes
+
+# All constants are for internal use only; no backwards-compatibility
+# guarantees.
+
+# Standard file names used for staging files.
+WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
+REQUIREMENTS_FILE = 'requirements.txt'
+EXTRA_PACKAGES_FILE = 'extra_packages.txt'
+
+# Package names for distributions
+BEAM_PACKAGE_NAME = 'apache-beam'
+
+
+class Stager(object):
+  """Abstract Stager identifies and copies the appropriate artifacts to the
+  staging location.
+  Implementation of this stager has to implement :func:`stage_artifact` and
+  :func:`commit_manifest`.
+  """
+
+  def stage_artifact(self, local_path_to_artifact, artifact_name):
+    """ Stages the artifact to Stager._staging_location and adds artifact_name
+        to the manifest of artifacts that have been staged."""
+    raise NotImplementedError
+
+  def commit_manifest(self):
+    """Commits manifest."""
+    raise NotImplementedError
+
+  @staticmethod
+  def get_sdk_package_name():
+    """For internal use only; no backwards-compatibility guarantees.
+        Returns the PyPI package name to be staged."""
+    return BEAM_PACKAGE_NAME
+
+  def stage_job_resources(self,
+                          options,
+                          build_setup_args=None,
+                          temp_dir=None,
+                          populate_requirements_cache=None,
+                          staging_location=None):
+    """For internal use only; no backwards-compatibility guarantees.
+
+        Creates (if needed) and stages job resources to staging_location.
+
+        Args:
+          options: Command line options. More specifically the function will
+            expect requirements_file, setup_file, and save_main_session options
+            to be present.
+          build_setup_args: A list of command line arguments used to build a
+            setup package. Used only if options.setup_file is not None. Used
+            only for testing.
+          temp_dir: Temporary folder where the resource building can happen. If
+            None then a unique temp directory will be created. Used only for
+            testing.
+          populate_requirements_cache: Callable for populating the requirements
+            cache. Used only for testing.
+          staging_location: Location to stage the file.
+
+        Returns:
+          A list of file names (no paths) for the resources staged. All the
+          files
+          are assumed to be staged at staging_location.
+
+        Raises:
+          RuntimeError: If files specified are not found or error encountered
+          while trying to create the resources (e.g., build a setup package).
+        """
+    temp_dir = temp_dir or tempfile.mkdtemp()
+    resources = []
+
+    setup_options = options.view_as(SetupOptions)
+    # Make sure that all required options are specified.
+    if staging_location is None:
+      raise RuntimeError('The staging_location must be specified.')
+
+    # Stage a requirements file if present.
+    if setup_options.requirements_file is not None:
+      if not os.path.isfile(setup_options.requirements_file):
+        raise RuntimeError(
+            'The file %s cannot be found. It was specified in the '
+            '--requirements_file command line option.' %
+            setup_options.requirements_file)
+      staged_path = FileSystems.join(staging_location, REQUIREMENTS_FILE)
+      self.stage_artifact(setup_options.requirements_file, staged_path)
+      resources.append(REQUIREMENTS_FILE)
+      requirements_cache_path = (
+          os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache')
+          if setup_options.requirements_cache is None else
+          setup_options.requirements_cache)
+      # Populate cache with packages from requirements and stage the files
+      # in the cache.
+      if not os.path.exists(requirements_cache_path):
+        os.makedirs(requirements_cache_path)
+      (populate_requirements_cache if populate_requirements_cache else
+       Stager._populate_requirements_cache)(setup_options.requirements_file,
+                                            requirements_cache_path)
+      for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
+        self.stage_artifact(
+            pkg, FileSystems.join(staging_location, os.path.basename(pkg)))
+        resources.append(os.path.basename(pkg))
+
+    # Handle a setup file if present.
+    # We will build the setup package locally and then copy it to the staging
+    # location because the staging location is a remote path and the file 
cannot
+    # be created directly there.
+    if setup_options.setup_file is not None:
+      if not os.path.isfile(setup_options.setup_file):
+        raise RuntimeError(
+            'The file %s cannot be found. It was specified in the '
+            '--setup_file command line option.' % setup_options.setup_file)
+      if os.path.basename(setup_options.setup_file) != 'setup.py':
+        raise RuntimeError(
+            'The --setup_file option expects the full path to a file named '
+            'setup.py instead of %s' % setup_options.setup_file)
+      tarball_file = Stager._build_setup_package(setup_options.setup_file,
+                                                 temp_dir, build_setup_args)
+      staged_path = FileSystems.join(staging_location, WORKFLOW_TARBALL_FILE)
+      self.stage_artifact(tarball_file, staged_path)
+      resources.append(WORKFLOW_TARBALL_FILE)
+
+    # Handle extra local packages that should be staged.
+    if setup_options.extra_packages is not None:
+      resources.extend(
+          self._stage_extra_packages(
+              setup_options.extra_packages, staging_location,
+              temp_dir=temp_dir))
+
+    # Pickle the main session if requested.
+    # We will create the pickled main session locally and then copy it to the
+    # staging location because the staging location is a remote path and the
+    # file cannot be created directly there.
+    if setup_options.save_main_session:
+      pickled_session_file = os.path.join(temp_dir,
+                                          names.PICKLED_MAIN_SESSION_FILE)
+      pickler.dump_session(pickled_session_file)
+      staged_path = FileSystems.join(staging_location,
+                                     names.PICKLED_MAIN_SESSION_FILE)
+      self.stage_artifact(pickled_session_file, staged_path)
+      resources.append(names.PICKLED_MAIN_SESSION_FILE)
+
+    if hasattr(setup_options, 'sdk_location'):
+
+      if (setup_options.sdk_location == 'default') or Stager._is_remote_path(
+          setup_options.sdk_location):
+        # If --sdk_location is not specified then the appropriate package
+        # will be obtained from PyPI (https://pypi.python.org) based on the
+        # version of the currently running SDK. If the option is
+        # present then no version matching is made and the exact URL or path
+        # is expected.
+        #
+        # Unit tests running in the 'python setup.py test' context will
+        # not have the sdk_location attribute present and therefore we
+        # will not stage SDK.
+        sdk_remote_location = 'pypi' if (setup_options.sdk_location == 
'default'
+                                        ) else setup_options.sdk_location
+        resources.extend(
+            self._stage_beam_sdk(sdk_remote_location, staging_location,
+                                 temp_dir))
+      else:
+        # This branch is also used by internal tests running with the SDK built
+        # at head.
+        if os.path.isdir(setup_options.sdk_location):
+          # TODO(angoenka): remove reference to Dataflow
+          sdk_path = os.path.join(setup_options.sdk_location,
+                                  names.DATAFLOW_SDK_TARBALL_FILE)
+        else:
+          sdk_path = setup_options.sdk_location
+
+        if os.path.isfile(sdk_path):
+          logging.info('Copying Beam SDK "%s" to staging location.', sdk_path)
+          staged_path = FileSystems.join(
+              staging_location,
+              Stager._desired_sdk_filename_in_staging_location(
+                  setup_options.sdk_location))
+          self.stage_artifact(sdk_path, staged_path)
+          _, sdk_staged_filename = FileSystems.split(staged_path)
+          resources.append(sdk_staged_filename)
+        else:
+          if setup_options.sdk_location == 'default':
+            raise RuntimeError('Cannot find default Beam SDK tar file "%s"',
+                               sdk_path)
+          elif not setup_options.sdk_location:
+            logging.info('Beam SDK will not be staged since --sdk_location '
+                         'is empty.')
+          else:
+            raise RuntimeError(
+                'The file "%s" cannot be found. Its location was specified by '
+                'the --sdk_location command-line option.' % sdk_path)
+
+    # Delete all temp files created while staging job resources.
+    shutil.rmtree(temp_dir)
+    self.commit_manifest()
+    return resources
+
+  @staticmethod
+  def _download_file(from_url, to_path):
+    """Downloads a file over http/https from a url or copy it from a remote
+        path to local path."""
+    if from_url.startswith('http://') or from_url.startswith('https://'):
+      # TODO(silviuc): We should cache downloads so we do not do it for every
+      # job.
+      try:
+        # We check if the file is actually there because wget returns a file
+        # even for a 404 response (file will contain the contents of the 404
+        # response).
+        # TODO(angoenka): Extract and use the filename when downloading file.
+        response, content = __import__('httplib2').Http().request(from_url)
+        if int(response['status']) >= 400:
+          raise RuntimeError(
+              'Artifact not found at %s (response: %s)' % (from_url, response))
+        with open(to_path, 'w') as f:
+          f.write(content)
+      except Exception:
+        logging.info('Failed to download Artifact from %s', from_url)
+        raise
+    else:
+      if not os.path.isdir(os.path.dirname(to_path)):
+        logging.info(
+            'Created folder (since we have not done yet, and any errors '
+            'will follow): %s ', os.path.dirname(to_path))
+        os.mkdir(os.path.dirname(to_path))
+      shutil.copyfile(from_url, to_path)
+
+  @staticmethod
+  def _is_remote_path(path):
+    return path.find('://') != -1
+
+  def _stage_extra_packages(self, extra_packages, staging_location, temp_dir):
+    """Stages a list of local extra packages.
+
+      Args:
+        extra_packages: Ordered list of local paths to extra packages to be
+          staged. Only packages on localfile system and GCS are supported.
+        staging_location: Staging location for the packages.
+        temp_dir: Temporary folder where the resource building can happen.
+          Caller is responsible for cleaning up this folder after this function
+          returns.
+
+      Returns:
+        A list of file names (no paths) for the resources staged. All the files
+        are assumed to be staged in staging_location.
+
+      Raises:
+        RuntimeError: If files specified are not found or do not have expected
+          name patterns.
+      """
+    resources = []
+    staging_temp_dir = tempfile.mkdtemp(dir=temp_dir)
+    local_packages = []
+    for package in extra_packages:
+      if not (os.path.basename(package).endswith('.tar') or
+              os.path.basename(package).endswith('.tar.gz') or
+              os.path.basename(package).endswith('.whl') or
+              os.path.basename(package).endswith('.zip')):
+        raise RuntimeError(
+            'The --extra_package option expects a full path ending with '
+            '".tar", ".tar.gz", ".whl" or ".zip" instead of %s' % package)
+      if os.path.basename(package).endswith('.whl'):
+        logging.warning(
+            'The .whl package "%s" is provided in --extra_package. '
+            'This functionality is not officially supported. Since wheel '
+            'packages are binary distributions, this package must be '
+            'binary-compatible with the worker environment (e.g. Python 2.7 '
+            'running on an x64 Linux host).')
+
+      if not os.path.isfile(package):
+        if Stager._is_remote_path(package):
+          # Download remote package.
+          logging.info('Downloading extra package: %s locally before staging',
+                       package)
+          _, last_component = FileSystems.split(package)
+          local_file_path = FileSystems.join(staging_temp_dir, last_component)
+          Stager._download_file(package, local_file_path)
+        else:
+          raise RuntimeError(
+              'The file %s cannot be found. It was specified in the '
+              '--extra_packages command line option.' % package)
+      else:
+        local_packages.append(package)
+
+    local_packages.extend([
+        FileSystems.join(staging_temp_dir, f)
+        for f in os.listdir(staging_temp_dir)
+    ])
+
+    for package in local_packages:
+      basename = os.path.basename(package)
+      staged_path = FileSystems.join(staging_location, basename)
+      self.stage_artifact(package, staged_path)
+      resources.append(basename)
+    # Create a file containing the list of extra packages and stage it.
+    # The file is important so that in the worker the packages are installed
+    # exactly in the order specified. This approach will avoid extra PyPI
+    # requests. For example if package A depends on package B and package A
+    # is installed first then the installer will try to satisfy the
+    # dependency on B by downloading the package from PyPI. If package B is
+    # installed first this is avoided.
+    with open(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), 'wt') as f:
+      for package in local_packages:
+        f.write('%s\n' % os.path.basename(package))
+    staged_path = FileSystems.join(staging_location, EXTRA_PACKAGES_FILE)
+    # Note that the caller of this function is responsible for deleting the
+    # temporary folder where all temp files are created, including this one.
+    self.stage_artifact(
+        os.path.join(temp_dir, EXTRA_PACKAGES_FILE), staged_path)
+    resources.append(EXTRA_PACKAGES_FILE)
+
+    return resources
+
+  @staticmethod
+  def _get_python_executable():
+    # Allow overriding the python executable to use for downloading and
+    # installing dependencies, otherwise use the python executable for
+    # the current process.
+    python_bin = os.environ.get('BEAM_PYTHON') or sys.executable
+    if not python_bin:
+      raise ValueError('Could not find Python executable.')
+    return python_bin
+
+  @staticmethod
+  def _populate_requirements_cache(requirements_file, cache_dir):
+    # The 'pip download' command will not download again if it finds the
+    # tarball with the proper version already present.
+    # It will get the packages downloaded in the order they are presented in
+    # the requirements file and will not download package dependencies.
+    cmd_args = [
+        Stager._get_python_executable(),
+        '-m',
+        'pip',
+        'download',
+        '--dest',
+        cache_dir,
+        '-r',
+        requirements_file,
+        '--exists-action',
+        'i',
+        # Download from PyPI source distributions.
+        '--no-binary',
+        ':all:'
+    ]
+    logging.info('Executing command: %s', cmd_args)
+    processes.check_call(cmd_args)
+
+  @staticmethod
+  def _build_setup_package(setup_file, temp_dir, build_setup_args=None):
+    saved_current_directory = os.getcwd()
+    try:
+      os.chdir(os.path.dirname(setup_file))
+      if build_setup_args is None:
+        build_setup_args = [
+            Stager._get_python_executable(),
+            os.path.basename(setup_file), 'sdist', '--dist-dir', temp_dir
+        ]
+      logging.info('Executing command: %s', build_setup_args)
+      processes.check_call(build_setup_args)
+      output_files = glob.glob(os.path.join(temp_dir, '*.tar.gz'))
+      if not output_files:
+        raise RuntimeError(
+            'File %s not found.' % os.path.join(temp_dir, '*.tar.gz'))
+      return output_files[0]
+    finally:
+      os.chdir(saved_current_directory)
+
+  @staticmethod
+  def _desired_sdk_filename_in_staging_location(sdk_location):
+    """Returns the name that SDK file should have in the staging location.
+      Args:
+        sdk_location: Full path to SDK file.
+      """
+    if sdk_location.endswith('.whl'):
+      _, wheel_filename = FileSystems.split(sdk_location)
+      if wheel_filename.startswith('apache_beam'):
+        return wheel_filename
+      else:
+        raise RuntimeError('Unrecognized SDK wheel file: %s' % sdk_location)
+    else:
+      return names.DATAFLOW_SDK_TARBALL_FILE
+
+  def _stage_beam_sdk(self, sdk_remote_location, staging_location, temp_dir):
+    """Stages a Beam SDK file with the appropriate version.
+
+      Args:
+        sdk_remote_location: A URL from which thefile can be downloaded or a
+          remote file location. The SDK file can be a tarball or a wheel. Set
+          to 'pypi' to download and stage a wheel and source SDK from PyPi.
+        staging_location: Location where the SDK file should be copied.
+        temp_dir: path to temporary location where the file should be
+          downloaded.
+
+      Returns:
+        A list of SDK files that were staged to the staging location.
+
+      Raises:
+        RuntimeError: if staging was not successful.
+      """
+    if sdk_remote_location == 'pypi':
+      sdk_local_file = Stager._download_pypi_sdk_package(temp_dir)
+      sdk_sources_staged_name = Stager.\
+          _desired_sdk_filename_in_staging_location(sdk_local_file)
+      staged_path = FileSystems.join(staging_location, sdk_sources_staged_name)
+      logging.info('Staging SDK sources from PyPI to %s', staged_path)
+      self.stage_artifact(sdk_local_file, staged_path)
+      staged_sdk_files = [sdk_sources_staged_name]
+      try:
+        # Stage binary distribution of the SDK, for now on a best-effort basis.
+        sdk_local_file = Stager._download_pypi_sdk_package(
+            temp_dir, fetch_binary=True)
+        sdk_binary_staged_name = Stager.\
+            _desired_sdk_filename_in_staging_location(sdk_local_file)
+        staged_path = FileSystems.join(staging_location, 
sdk_binary_staged_name)
+        logging.info('Staging binary distribution of the SDK from PyPI to %s',
+                     staged_path)
+        self.stage_artifact(sdk_local_file, staged_path)
+        staged_sdk_files.append(sdk_binary_staged_name)
+      except RuntimeError as e:
+        logging.warn(
+            'Failed to download requested binary distribution '
+            'of the SDK: %s', repr(e))
+
+      return staged_sdk_files
+    elif Stager._is_remote_path(sdk_remote_location):
+      local_download_file = os.path.join(temp_dir, 'beam-sdk.tar.gz')
+      Stager._download_file(sdk_remote_location, local_download_file)
+      staged_name = Stager._desired_sdk_filename_in_staging_location(
+          sdk_remote_location)
+      staged_path = FileSystems.join(staging_location, staged_name)
+      logging.info('Staging Beam SDK from %s to %s', sdk_remote_location,
+                   staged_path)
+      self.stage_artifact(local_download_file, staged_path)
+      return [staged_name]
+    else:
+      raise RuntimeError(
+          'The --sdk_location option was used with an unsupported '
+          'type of location: %s' % sdk_remote_location)
+
+  @staticmethod
+  def _download_pypi_sdk_package(temp_dir,
+                                 fetch_binary=False,
+                                 language_version_tag='27',
+                                 language_implementation_tag='cp',
+                                 abi_tag='cp27mu',
+                                 platform_tag='manylinux1_x86_64'):
+    """Downloads SDK package from PyPI and returns path to local path."""
+    package_name = Stager.get_sdk_package_name()
+    try:
+      version = pkg_resources.get_distribution(package_name).version
+    except pkg_resources.DistributionNotFound:
+      raise RuntimeError('Please set --sdk_location command-line option '
+                         'or install a valid {} distribution.'
+                         .format(package_name))
+    cmd_args = [
+        Stager._get_python_executable(), '-m', 'pip', 'download', '--dest',
+        temp_dir,
+        '%s==%s' % (package_name, version), '--no-deps'
+    ]
+
+    if fetch_binary:
+      logging.info('Downloading binary distribtution of the SDK from PyPi')
+      # Get a wheel distribution for the SDK from PyPI.
+      cmd_args.extend([
+          '--only-binary', ':all:', '--python-version', language_version_tag,
+          '--implementation', language_implementation_tag, '--abi', abi_tag,
+          '--platform', platform_tag
+      ])
+      # Example wheel: apache_beam-2.4.0-cp27-cp27mu-manylinux1_x86_64.whl
+      expected_files = [
+          os.path.join(
+              temp_dir, '%s-%s-%s%s-%s-%s.whl' % (package_name.replace(
+                  '-', '_'), version, language_implementation_tag,
+                                                  language_version_tag, 
abi_tag,
+                                                  platform_tag))
+      ]
+    else:
+      logging.info('Downloading source distribtution of the SDK from PyPi')
+      cmd_args.extend(['--no-binary', ':all:'])
+      expected_files = [
+          os.path.join(temp_dir, '%s-%s.zip' % (package_name, version)),
+          os.path.join(temp_dir, '%s-%s.tar.gz' % (package_name, version))
+      ]
+
+    logging.info('Executing command: %s', cmd_args)
+    try:
+      processes.check_call(cmd_args)
+    except subprocess.CalledProcessError as e:
+      raise RuntimeError(repr(e))
+
+    for sdk_file in expected_files:
+      if os.path.exists(sdk_file):
+        return sdk_file
+
+    raise RuntimeError(
+        'Failed to download a distribution for the running SDK. '
+        'Expected either one of %s to be found in the download folder.' %
+        (expected_files))
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py 
b/sdks/python/apache_beam/runners/portability/stager_test.py
similarity index 56%
rename from sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py
rename to sdks/python/apache_beam/runners/portability/stager_test.py
index a3182a28b9b..56b57d16796 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py
+++ b/sdks/python/apache_beam/runners/portability/stager_test.py
@@ -14,8 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
-"""Unit tests for the setup module."""
+"""Unit tests for the stager module."""
 
 import logging
 import os
@@ -26,42 +25,31 @@
 import mock
 
 from apache_beam.io.filesystems import FileSystems
-from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import SetupOptions
-from apache_beam.runners.dataflow.internal import dependency
 from apache_beam.runners.dataflow.internal import names
-
-# Protect against environments where GCS library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
-try:
-  from apitools.base.py.exceptions import HttpError
-except ImportError:
-  HttpError = None
-# pylint: enable=wrong-import-order, wrong-import-position
+from apache_beam.runners.portability import stager
 
 
-@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
-class SetupTest(unittest.TestCase):
+class StagerTest(unittest.TestCase):
 
   def setUp(self):
     self._temp_dir = None
+    self.stager = TestStager()
+
+  def tearDown(self):
+    if self._temp_dir:
+      shutil.rmtree(self._temp_dir)
+    self.stager = None
 
   def make_temp_dir(self):
     if self._temp_dir is None:
       self._temp_dir = tempfile.mkdtemp()
     return tempfile.mkdtemp(dir=self._temp_dir)
 
-  def tearDown(self):
-    if self._temp_dir:
-      shutil.rmtree(self._temp_dir)
-
   def update_options(self, options):
     setup_options = options.view_as(SetupOptions)
     setup_options.sdk_location = ''
-    google_cloud_options = options.view_as(GoogleCloudOptions)
-    if google_cloud_options.temp_location is None:
-      google_cloud_options.temp_location = 
google_cloud_options.staging_location
 
   def create_temp_file(self, path, contents):
     with open(path, 'w') as f:
@@ -73,47 +61,90 @@ def populate_requirements_cache(self, requirements_file, 
cache_dir):
     self.create_temp_file(os.path.join(cache_dir, 'abc.txt'), 'nothing')
     self.create_temp_file(os.path.join(cache_dir, 'def.txt'), 'nothing')
 
-  def test_no_staging_location(self):
-    with self.assertRaises(RuntimeError) as cm:
-      dependency.stage_job_resources(PipelineOptions())
-    self.assertEqual('The --staging_location option must be specified.',
-                     cm.exception.args[0])
+  def build_fake_pip_download_command_handler(self, has_wheels):
+    """A stub for apache_beam.utils.processes.check_call that imitates pip.
 
-  def test_no_temp_location(self):
-    staging_dir = self.make_temp_dir()
-    options = PipelineOptions()
-    google_cloud_options = options.view_as(GoogleCloudOptions)
-    google_cloud_options.staging_location = staging_dir
-    self.update_options(options)
-    google_cloud_options.temp_location = None
+      Args:
+        has_wheels: Whether pip fake should have a whl distribution of 
packages.
+      """
+
+    def pip_fake(args):
+      """Fakes fetching a package from pip by creating a temporary file.
+
+          Args:
+            args: a complete list of command line arguments to invoke pip.
+              The fake is sensitive to the order of the arguments.
+              Supported commands:
+
+              1) Download SDK sources file:
+              python pip -m download --dest /tmp/dir apache-beam==2.0.0 \
+                  --no-deps --no-binary :all:
+
+              2) Download SDK binary wheel file:
+              python pip -m download --dest /tmp/dir apache-beam==2.0.0 \
+                  --no-deps --no-binary :all: --python-version 27 \
+                  --implementation cp --abi cp27mu --platform manylinux1_x86_64
+          """
+      package_file = None
+      if len(args) >= 8:
+        # package_name==x.y.z
+        if '==' in args[6]:
+          distribution_name = args[6][0:args[6].find('==')]
+          distribution_version = args[6][args[6].find('==') + 2:]
+
+          if args[8] == '--no-binary':
+            package_file = '%s-%s.zip' % (distribution_name,
+                                          distribution_version)
+          elif args[8] == '--only-binary' and len(args) >= 18:
+            if not has_wheels:
+              # Imitate the case when desired wheel distribution is not in 
PyPI.
+              raise RuntimeError('No matching distribution.')
+
+            # Per PEP-0427 in wheel filenames non-alphanumeric characters
+            # in distribution name are replaced with underscore.
+            distribution_name = distribution_name.replace('-', '_')
+            package_file = '%s-%s-%s%s-%s-%s.whl' % (
+                distribution_name,
+                distribution_version,
+                args[13],  # implementation
+                args[11],  # python version
+                args[15],  # abi tag
+                args[17]  # platform
+            )
+
+      assert package_file, 'Pip fake does not support the command: ' + 
str(args)
+      self.create_temp_file(
+          FileSystems.join(args[5], package_file), 'Package content.')
+
+    return pip_fake
+
+  def test_no_staging_location(self):
     with self.assertRaises(RuntimeError) as cm:
-      dependency.stage_job_resources(options)
-    self.assertEqual('The --temp_location option must be specified.',
+      self.stager.stage_job_resources(PipelineOptions(), staging_location=None)
+    self.assertEqual('The staging_location must be specified.',
                      cm.exception.args[0])
 
   def test_no_main_session(self):
     staging_dir = self.make_temp_dir()
     options = PipelineOptions()
 
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
     options.view_as(SetupOptions).save_main_session = False
     self.update_options(options)
 
-    self.assertEqual(
-        [],
-        dependency.stage_job_resources(options))
+    self.assertEqual([],
+                     self.stager.stage_job_resources(
+                         options, staging_location=staging_dir))
 
   def test_with_main_session(self):
     staging_dir = self.make_temp_dir()
     options = PipelineOptions()
 
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
     options.view_as(SetupOptions).save_main_session = True
     self.update_options(options)
 
-    self.assertEqual(
-        [names.PICKLED_MAIN_SESSION_FILE],
-        dependency.stage_job_resources(options))
+    self.assertEqual([names.PICKLED_MAIN_SESSION_FILE],
+                     self.stager.stage_job_resources(
+                         options, staging_location=staging_dir))
     self.assertTrue(
         os.path.isfile(
             os.path.join(staging_dir, names.PICKLED_MAIN_SESSION_FILE)))
@@ -121,12 +152,11 @@ def test_with_main_session(self):
   def test_default_resources(self):
     staging_dir = self.make_temp_dir()
     options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
     self.update_options(options)
 
-    self.assertEqual(
-        [],
-        dependency.stage_job_resources(options))
+    self.assertEqual([],
+                     self.stager.stage_job_resources(
+                         options, staging_location=staging_dir))
 
   def test_with_requirements_file(self):
     staging_dir = self.make_temp_dir()
@@ -134,22 +164,21 @@ def test_with_requirements_file(self):
     source_dir = self.make_temp_dir()
 
     options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
     self.update_options(options)
     options.view_as(SetupOptions).requirements_cache = requirements_cache_dir
     options.view_as(SetupOptions).requirements_file = os.path.join(
-        source_dir, dependency.REQUIREMENTS_FILE)
+        source_dir, stager.REQUIREMENTS_FILE)
     self.create_temp_file(
-        os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing')
+        os.path.join(source_dir, stager.REQUIREMENTS_FILE), 'nothing')
     self.assertEqual(
-        sorted([dependency.REQUIREMENTS_FILE,
-                'abc.txt', 'def.txt']),
-        sorted(dependency.stage_job_resources(
-            options,
-            populate_requirements_cache=self.populate_requirements_cache)))
+        sorted([stager.REQUIREMENTS_FILE, 'abc.txt', 'def.txt']),
+        sorted(
+            self.stager.stage_job_resources(
+                options,
+                populate_requirements_cache=self.populate_requirements_cache,
+                staging_location=staging_dir)))
     self.assertTrue(
-        os.path.isfile(
-            os.path.join(staging_dir, dependency.REQUIREMENTS_FILE)))
+        os.path.isfile(os.path.join(staging_dir, stager.REQUIREMENTS_FILE)))
     self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
     self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt')))
 
@@ -157,11 +186,12 @@ def test_requirements_file_not_present(self):
     staging_dir = self.make_temp_dir()
     with self.assertRaises(RuntimeError) as cm:
       options = PipelineOptions()
-      options.view_as(GoogleCloudOptions).staging_location = staging_dir
       self.update_options(options)
       options.view_as(SetupOptions).requirements_file = 'nosuchfile'
-      dependency.stage_job_resources(
-          options, 
populate_requirements_cache=self.populate_requirements_cache)
+      self.stager.stage_job_resources(
+          options,
+          populate_requirements_cache=self.populate_requirements_cache,
+          staging_location=staging_dir)
     self.assertEqual(
         cm.exception.args[0],
         'The file %s cannot be found. It was specified in the '
@@ -172,40 +202,37 @@ def test_with_requirements_file_and_cache(self):
     source_dir = self.make_temp_dir()
 
     options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
     self.update_options(options)
     options.view_as(SetupOptions).requirements_file = os.path.join(
-        source_dir, dependency.REQUIREMENTS_FILE)
+        source_dir, stager.REQUIREMENTS_FILE)
     options.view_as(SetupOptions).requirements_cache = self.make_temp_dir()
     self.create_temp_file(
-        os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing')
+        os.path.join(source_dir, stager.REQUIREMENTS_FILE), 'nothing')
     self.assertEqual(
-        sorted([dependency.REQUIREMENTS_FILE,
-                'abc.txt', 'def.txt']),
-        sorted(dependency.stage_job_resources(
-            options,
-            populate_requirements_cache=self.populate_requirements_cache)))
+        sorted([stager.REQUIREMENTS_FILE, 'abc.txt', 'def.txt']),
+        sorted(
+            self.stager.stage_job_resources(
+                options,
+                populate_requirements_cache=self.populate_requirements_cache,
+                staging_location=staging_dir)))
     self.assertTrue(
-        os.path.isfile(
-            os.path.join(staging_dir, dependency.REQUIREMENTS_FILE)))
+        os.path.isfile(os.path.join(staging_dir, stager.REQUIREMENTS_FILE)))
     self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
     self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt')))
 
   def test_with_setup_file(self):
     staging_dir = self.make_temp_dir()
     source_dir = self.make_temp_dir()
-    self.create_temp_file(
-        os.path.join(source_dir, 'setup.py'), 'notused')
+    self.create_temp_file(os.path.join(source_dir, 'setup.py'), 'notused')
 
     options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
     self.update_options(options)
     options.view_as(SetupOptions).setup_file = os.path.join(
         source_dir, 'setup.py')
 
     self.assertEqual(
-        [dependency.WORKFLOW_TARBALL_FILE],
-        dependency.stage_job_resources(
+        [stager.WORKFLOW_TARBALL_FILE],
+        self.stager.stage_job_resources(
             options,
             # We replace the build setup command because a realistic one would
             # require the setuptools package to be installed. Note that we 
can't
@@ -214,22 +241,22 @@ def test_with_setup_file(self):
             # equivalent behavior.
             build_setup_args=[
                 'python', '-c', 'open(__import__("sys").argv[1], "a")',
-                os.path.join(source_dir, dependency.WORKFLOW_TARBALL_FILE)],
-            temp_dir=source_dir))
+                os.path.join(source_dir, stager.WORKFLOW_TARBALL_FILE)
+            ],
+            temp_dir=source_dir,
+            staging_location=staging_dir))
     self.assertTrue(
-        os.path.isfile(
-            os.path.join(staging_dir, dependency.WORKFLOW_TARBALL_FILE)))
+        os.path.isfile(os.path.join(staging_dir, 
stager.WORKFLOW_TARBALL_FILE)))
 
   def test_setup_file_not_present(self):
     staging_dir = self.make_temp_dir()
 
     options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
     self.update_options(options)
     options.view_as(SetupOptions).setup_file = 'nosuchfile'
 
     with self.assertRaises(RuntimeError) as cm:
-      dependency.stage_job_resources(options)
+      self.stager.stage_job_resources(options, staging_location=staging_dir)
     self.assertEqual(
         cm.exception.args[0],
         'The file %s cannot be found. It was specified in the '
@@ -240,110 +267,46 @@ def test_setup_file_not_named_setup_dot_py(self):
     source_dir = self.make_temp_dir()
 
     options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
     self.update_options(options)
     options.view_as(SetupOptions).setup_file = (
         os.path.join(source_dir, 'xyz-setup.py'))
 
-    self.create_temp_file(
-        os.path.join(source_dir, 'xyz-setup.py'), 'notused')
+    self.create_temp_file(os.path.join(source_dir, 'xyz-setup.py'), 'notused')
     with self.assertRaises(RuntimeError) as cm:
-      dependency.stage_job_resources(options)
-    self.assertTrue(
-        cm.exception.args[0].startswith(
-            'The --setup_file option expects the full path to a file named '
-            'setup.py instead of '))
-
-  def build_fake_pip_download_command_handler(self, has_wheels):
-    """A stub for apache_beam.utils.processes.check_call that imitates pip.
-
-    Args:
-      has_wheels: Whether pip fake should have a whl distribution of packages.
-    """
-
-    def pip_fake(args):
-      """Fakes fetching a package from pip by creating a temporary file.
-
-      Args:
-        args: a complete list of command line arguments to invoke pip.
-          The fake is sensitive to the order of the arguments.
-          Supported commands:
-
-          1) Download SDK sources file:
-          python pip -m download --dest /tmp/dir apache-beam==2.0.0 \
-              --no-deps --no-binary :all:
-
-          2) Download SDK binary wheel file:
-          python pip -m download --dest /tmp/dir apache-beam==2.0.0 \
-              --no-deps --no-binary :all: --python-version 27 \
-              --implementation cp --abi cp27mu --platform manylinux1_x86_64
-      """
-      package_file = None
-      if len(args) >= 8:
-        # package_name==x.y.z
-        if '==' in args[6]:
-          distribution_name = args[6][0:args[6].find('==')]
-          distribution_version = args[6][args[6].find('==')+2:]
-
-          if args[8] == '--no-binary':
-            package_file = '%s-%s.zip' % (
-                distribution_name, distribution_version)
-          elif args[8] == '--only-binary' and len(args) >= 18:
-            if not has_wheels:
-              # Imitate the case when desired wheel distribution is not in 
PyPI.
-              raise RuntimeError('No matching distribution.')
-
-            # Per PEP-0427 in wheel filenames non-alphanumeric characters
-            # in distribution name are replaced with underscore.
-            distribution_name = distribution_name.replace('-', '_')
-            package_file = '%s-%s-%s%s-%s-%s.whl' % (
-                distribution_name, distribution_version,
-                args[13],  # implementation
-                args[11],  # python version
-                args[15],  # abi tag
-                args[17]   # platform
-            )
-
-      assert package_file, 'Pip fake does not support the command: ' + 
str(args)
-      self.create_temp_file(
-          FileSystems.join(args[5], package_file), 'Package content.')
-
-    return pip_fake
+      self.stager.stage_job_resources(options, staging_location=staging_dir)
+    self.assertTrue(cm.exception.args[0].startswith(
+        'The --setup_file option expects the full path to a file named '
+        'setup.py instead of '))
 
   def test_sdk_location_default(self):
     staging_dir = self.make_temp_dir()
     options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
     self.update_options(options)
     options.view_as(SetupOptions).sdk_location = 'default'
 
-    with mock.patch('apache_beam.utils.processes.check_call',
-                    self.build_fake_pip_download_command_handler(
-                        has_wheels=False)):
-      staged_resources = dependency.stage_job_resources(
-          options, temp_dir=self.make_temp_dir())
+    with mock.patch(
+        'apache_beam.utils.processes.check_call',
+        self.build_fake_pip_download_command_handler(has_wheels=False)):
+      staged_resources = self.stager.stage_job_resources(
+          options, temp_dir=self.make_temp_dir(), staging_location=staging_dir)
 
-    self.assertEqual(
-        [names.DATAFLOW_SDK_TARBALL_FILE], staged_resources)
+    self.assertEqual([names.DATAFLOW_SDK_TARBALL_FILE], staged_resources)
 
-    with open(os.path.join(
-        staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)) as f:
+    with open(os.path.join(staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)) as f:
       self.assertEqual(f.read(), 'Package content.')
 
   def test_sdk_location_default_with_wheels(self):
     staging_dir = self.make_temp_dir()
 
     options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
     self.update_options(options)
     options.view_as(SetupOptions).sdk_location = 'default'
 
     with mock.patch(
         'apache_beam.utils.processes.check_call',
         self.build_fake_pip_download_command_handler(has_wheels=True)):
-      staged_resources = dependency.stage_job_resources(
-          options,
-          temp_dir=self.make_temp_dir())
+      staged_resources = self.stager.stage_job_resources(
+          options, temp_dir=self.make_temp_dir(), staging_location=staging_dir)
 
       self.assertEqual(len(staged_resources), 2)
       self.assertEqual(staged_resources[0], names.DATAFLOW_SDK_TARBALL_FILE)
@@ -357,21 +320,17 @@ def test_sdk_location_local_directory(self):
     staging_dir = self.make_temp_dir()
     sdk_location = self.make_temp_dir()
     self.create_temp_file(
-        os.path.join(
-            sdk_location,
-            names.DATAFLOW_SDK_TARBALL_FILE),
+        os.path.join(sdk_location, names.DATAFLOW_SDK_TARBALL_FILE),
         'Package content.')
 
     options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
     self.update_options(options)
     options.view_as(SetupOptions).sdk_location = sdk_location
 
-    self.assertEqual(
-        [names.DATAFLOW_SDK_TARBALL_FILE],
-        dependency.stage_job_resources(options))
-    tarball_path = os.path.join(
-        staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
+    self.assertEqual([names.DATAFLOW_SDK_TARBALL_FILE],
+                     self.stager.stage_job_resources(
+                         options, staging_location=staging_dir))
+    tarball_path = os.path.join(staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
     with open(tarball_path) as f:
       self.assertEqual(f.read(), 'Package content.')
 
@@ -383,15 +342,13 @@ def test_sdk_location_local_source_file(self):
     self.create_temp_file(sdk_location, 'Package content.')
 
     options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
     self.update_options(options)
     options.view_as(SetupOptions).sdk_location = sdk_location
 
-    self.assertEqual(
-        [names.DATAFLOW_SDK_TARBALL_FILE],
-        dependency.stage_job_resources(options))
-    tarball_path = os.path.join(
-        staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
+    self.assertEqual([names.DATAFLOW_SDK_TARBALL_FILE],
+                     self.stager.stage_job_resources(
+                         options, staging_location=staging_dir))
+    tarball_path = os.path.join(staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
     with open(tarball_path) as f:
       self.assertEqual(f.read(), 'Package content.')
 
@@ -403,15 +360,13 @@ def test_sdk_location_local_wheel_file(self):
     self.create_temp_file(sdk_location, 'Package content.')
 
     options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
     self.update_options(options)
     options.view_as(SetupOptions).sdk_location = sdk_location
 
-    self.assertEqual(
-        [sdk_filename],
-        dependency.stage_job_resources(options))
-    tarball_path = os.path.join(
-        staging_dir, sdk_filename)
+    self.assertEqual([sdk_filename],
+                     self.stager.stage_job_resources(
+                         options, staging_location=staging_dir))
+    tarball_path = os.path.join(staging_dir, sdk_filename)
     with open(tarball_path) as f:
       self.assertEqual(f.read(), 'Package content.')
 
@@ -420,134 +375,148 @@ def test_sdk_location_local_directory_not_present(self):
     sdk_location = 'nosuchdir'
     with self.assertRaises(RuntimeError) as cm:
       options = PipelineOptions()
-      options.view_as(GoogleCloudOptions).staging_location = staging_dir
       self.update_options(options)
       options.view_as(SetupOptions).sdk_location = sdk_location
 
-      dependency.stage_job_resources(options)
+      self.stager.stage_job_resources(options, staging_location=staging_dir)
     self.assertEqual(
         'The file "%s" cannot be found. Its '
         'location was specified by the --sdk_location command-line option.' %
-        sdk_location,
-        cm.exception.args[0])
-
-  def test_sdk_location_gcs_source_file(self):
+        sdk_location, cm.exception.args[0])
+
+  @mock.patch(
+      'apache_beam.runners.portability.stager_test.TestStager.stage_artifact')
+  @mock.patch(
+      
'apache_beam.runners.portability.stager_test.stager.Stager._download_file'
+  )
+  def test_sdk_location_remote_source_file(self, *unused_mocks):
     staging_dir = self.make_temp_dir()
     sdk_location = 'gs://my-gcs-bucket/tarball.tar.gz'
 
     options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
     self.update_options(options)
     options.view_as(SetupOptions).sdk_location = sdk_location
 
-    with mock.patch('apache_beam.runners.dataflow.internal.'
-                    'dependency._dependency_file_copy'):
-      self.assertEqual(
-          [names.DATAFLOW_SDK_TARBALL_FILE],
-          dependency.stage_job_resources(options))
+    self.assertEqual([names.DATAFLOW_SDK_TARBALL_FILE],
+                     self.stager.stage_job_resources(
+                         options, staging_location=staging_dir))
 
-  def test_sdk_location_gcs_wheel_file(self):
+  @mock.patch(
+      'apache_beam.runners.portability.stager_test.TestStager.stage_artifact')
+  @mock.patch(
+      
'apache_beam.runners.portability.stager_test.stager.Stager._download_file'
+  )
+  def test_sdk_location_remote_wheel_file(self, *unused_mocks):
     staging_dir = self.make_temp_dir()
     sdk_filename = 'apache_beam-1.0.0-cp27-cp27mu-manylinux1_x86_64.whl'
-    sdk_location = 'gs://my-gcs-bucket/' + sdk_filename
+    sdk_location = '/tmp/remote/my-bucket/' + sdk_filename
 
     options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
     self.update_options(options)
     options.view_as(SetupOptions).sdk_location = sdk_location
 
-    with mock.patch('apache_beam.runners.dataflow.internal.'
-                    'dependency._dependency_file_copy'):
-      self.assertEqual(
-          [sdk_filename],
-          dependency.stage_job_resources(options))
+    # We can not rely on actual remote file systems paths hence making
+    # '/tmp/remote/' a new remote path.
+    def is_remote_path(path):
+      return path.startswith('/tmp/remote/')
+
+    with mock.patch(
+        'apache_beam.runners.portability.stager_test'
+        '.stager.Stager._is_remote_path', staticmethod(is_remote_path)):
+      self.assertEqual([sdk_filename],
+                       self.stager.stage_job_resources(
+                           options, staging_location=staging_dir))
 
   def test_sdk_location_http(self):
     staging_dir = self.make_temp_dir()
     sdk_location = 'http://storage.googleapis.com/my-gcs-bucket/tarball.tar.gz'
 
     options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
     self.update_options(options)
     options.view_as(SetupOptions).sdk_location = sdk_location
 
-    def file_download(_, to_folder):
-      tarball_path = os.path.join(to_folder, 'sdk-tarball')
-      with open(tarball_path, 'w') as f:
+    def file_download(_, to_path):
+      with open(to_path, 'w') as f:
         f.write('Package content.')
-      return tarball_path
+      return to_path
 
-    with mock.patch('apache_beam.runners.dataflow.internal.'
-                    'dependency._dependency_file_download', file_download):
-      self.assertEqual(
-          [names.DATAFLOW_SDK_TARBALL_FILE],
-          dependency.stage_job_resources(options))
+    with mock.patch(
+        'apache_beam.runners.portability.stager_test'
+        '.stager.Stager._download_file', staticmethod(file_download)):
+      self.assertEqual([names.DATAFLOW_SDK_TARBALL_FILE],
+                       self.stager.stage_job_resources(
+                           options, staging_location=staging_dir))
 
-    tarball_path = os.path.join(
-        staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
+    tarball_path = os.path.join(staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
     with open(tarball_path) as f:
       self.assertEqual(f.read(), 'Package content.')
 
   def test_with_extra_packages(self):
     staging_dir = self.make_temp_dir()
     source_dir = self.make_temp_dir()
+    self.create_temp_file(os.path.join(source_dir, 'abc.tar.gz'), 'nothing')
+    self.create_temp_file(os.path.join(source_dir, 'xyz.tar.gz'), 'nothing')
+    self.create_temp_file(os.path.join(source_dir, 'xyz2.tar'), 'nothing')
+    self.create_temp_file(os.path.join(source_dir, 'whl.whl'), 'nothing')
     self.create_temp_file(
-        os.path.join(source_dir, 'abc.tar.gz'), 'nothing')
-    self.create_temp_file(
-        os.path.join(source_dir, 'xyz.tar.gz'), 'nothing')
-    self.create_temp_file(
-        os.path.join(source_dir, 'xyz2.tar'), 'nothing')
-    self.create_temp_file(
-        os.path.join(source_dir, 'whl.whl'), 'nothing')
-    self.create_temp_file(
-        os.path.join(source_dir, dependency.EXTRA_PACKAGES_FILE), 'nothing')
+        os.path.join(source_dir, stager.EXTRA_PACKAGES_FILE), 'nothing')
 
     options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
     self.update_options(options)
     options.view_as(SetupOptions).extra_packages = [
         os.path.join(source_dir, 'abc.tar.gz'),
         os.path.join(source_dir, 'xyz.tar.gz'),
         os.path.join(source_dir, 'xyz2.tar'),
-        os.path.join(source_dir, 'whl.whl'),
-        'gs://my-gcs-bucket/gcs.tar.gz']
+        os.path.join(source_dir, 'whl.whl'), '/tmp/remote/remote_file.tar.gz'
+    ]
 
-    gcs_copied_files = []
+    remote_copied_files = []
+
+    # We can not rely on actual remote file systems paths hence making
+    # '/tmp/remote/' a new remote path.
+    def is_remote_path(path):
+      return path.startswith('/tmp/remote/')
 
     def file_copy(from_path, to_path):
-      if from_path.startswith('gs://'):
-        gcs_copied_files.append(from_path)
+      if is_remote_path(from_path):
+        remote_copied_files.append(from_path)
         _, from_name = os.path.split(from_path)
         if os.path.isdir(to_path):
           to_path = os.path.join(to_path, from_name)
         self.create_temp_file(to_path, 'nothing')
-        logging.info('Fake copied GCS file: %s to %s', from_path, to_path)
-      elif to_path.startswith('gs://'):
-        logging.info('Faking file_copy(%s, %s)', from_path, to_path)
+        logging.info('Fake copied remote file: %s to %s', from_path, to_path)
+      elif is_remote_path(to_path):
+        logging.info('Faking upload_file(%s, %s)', from_path, to_path)
       else:
         shutil.copyfile(from_path, to_path)
 
-    dependency._dependency_file_copy = file_copy
-
-    self.assertEqual(
-        ['abc.tar.gz', 'xyz.tar.gz', 'xyz2.tar', 'whl.whl', 'gcs.tar.gz',
-         dependency.EXTRA_PACKAGES_FILE],
-        dependency.stage_job_resources(options))
-    with open(os.path.join(staging_dir, dependency.EXTRA_PACKAGES_FILE)) as f:
-      self.assertEqual(['abc.tar.gz\n', 'xyz.tar.gz\n', 'xyz2.tar\n',
-                        'whl.whl\n', 'gcs.tar.gz\n'], f.readlines())
-    self.assertEqual(['gs://my-gcs-bucket/gcs.tar.gz'], gcs_copied_files)
+    with mock.patch(
+        'apache_beam.runners.portability.stager_test'
+        '.stager.Stager._download_file', staticmethod(file_copy)):
+      with mock.patch(
+          'apache_beam.runners.portability.stager_test'
+          '.stager.Stager._is_remote_path', staticmethod(is_remote_path)):
+        self.assertEqual([
+            'abc.tar.gz', 'xyz.tar.gz', 'xyz2.tar', 'whl.whl',
+            'remote_file.tar.gz', stager.EXTRA_PACKAGES_FILE
+        ], self.stager.stage_job_resources(
+            options, staging_location=staging_dir))
+    with open(os.path.join(staging_dir, stager.EXTRA_PACKAGES_FILE)) as f:
+      self.assertEqual([
+          'abc.tar.gz\n', 'xyz.tar.gz\n', 'xyz2.tar\n', 'whl.whl\n',
+          'remote_file.tar.gz\n'
+      ], f.readlines())
+    self.assertEqual(['/tmp/remote/remote_file.tar.gz'], remote_copied_files)
 
   def test_with_extra_packages_missing_files(self):
     staging_dir = self.make_temp_dir()
     with self.assertRaises(RuntimeError) as cm:
 
       options = PipelineOptions()
-      options.view_as(GoogleCloudOptions).staging_location = staging_dir
       self.update_options(options)
       options.view_as(SetupOptions).extra_packages = ['nosuchfile.tar.gz']
 
-      dependency.stage_job_resources(options)
+      self.stager.stage_job_resources(options, staging_location=staging_dir)
     self.assertEqual(
         cm.exception.args[0],
         'The file %s cannot be found. It was specified in the '
@@ -556,15 +525,14 @@ def test_with_extra_packages_missing_files(self):
   def test_with_extra_packages_invalid_file_name(self):
     staging_dir = self.make_temp_dir()
     source_dir = self.make_temp_dir()
-    self.create_temp_file(
-        os.path.join(source_dir, 'abc.tgz'), 'nothing')
+    self.create_temp_file(os.path.join(source_dir, 'abc.tgz'), 'nothing')
     with self.assertRaises(RuntimeError) as cm:
       options = PipelineOptions()
-      options.view_as(GoogleCloudOptions).staging_location = staging_dir
       self.update_options(options)
       options.view_as(SetupOptions).extra_packages = [
-          os.path.join(source_dir, 'abc.tgz')]
-      dependency.stage_job_resources(options)
+          os.path.join(source_dir, 'abc.tgz')
+      ]
+      self.stager.stage_job_resources(options, staging_location=staging_dir)
     self.assertEqual(
         cm.exception.args[0],
         'The --extra_package option expects a full path ending with '
@@ -572,6 +540,17 @@ def test_with_extra_packages_invalid_file_name(self):
         'instead of %s' % os.path.join(source_dir, 'abc.tgz'))
 
 
+class TestStager(stager.Stager):
+
+  def stage_artifact(self, local_path_to_artifact, artifact_name):
+    logging.info('File copy from %s to %s.', local_path_to_artifact,
+                 artifact_name)
+    shutil.copyfile(local_path_to_artifact, artifact_name)
+
+  def commit_manifest(self):
+    pass
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 104331)
    Time Spent: 16h 40m  (was: 16.5h)

> Python SDK stages artifacts when talking to job server
> ------------------------------------------------------
>
>                 Key: BEAM-3883
>                 URL: https://issues.apache.org/jira/browse/BEAM-3883
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Ben Sidhom
>            Assignee: Ankur Goenka
>            Priority: Major
>          Time Spent: 16h 40m
>  Remaining Estimate: 0h
>
> The Python SDK does not currently stage its user-defined functions or 
> dependencies when talking to the job API. Artifacts that need to be staged 
> include the user code itself, any SDK components not included in the 
> container image, and the list of Python packages that must be installed at 
> runtime.
>  
> Artifacts that are currently expected can be found in the harness boot code: 
> [https://github.com/apache/beam/blob/58e3b06bee7378d2d8db1c8dd534b415864f63e1/sdks/python/container/boot.go#L52.]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to