Get current SDK package from PyPI instead of GitHub
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0bda677d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0bda677d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0bda677d Branch: refs/heads/python-sdk Commit: 0bda677d47d5bd5d9c45b74e00e5c3fd113a4f81 Parents: bff9801 Author: Silviu Calinoiu <[email protected]> Authored: Thu Jun 30 13:04:23 2016 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Thu Jun 30 17:00:11 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/internal/apiclient.py | 4 +- sdks/python/apache_beam/utils/dependency.py | 66 +++++++++++++++++--- .../python/apache_beam/utils/dependency_test.py | 14 ++++- 3 files changed, 73 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0bda677d/sdks/python/apache_beam/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index 7dfb035..0bb30ac 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -38,6 +38,7 @@ from apache_beam.transforms import cy_combiners from apache_beam.utils import dependency from apache_beam.utils import names from apache_beam.utils import retry +from apache_beam.utils.dependency import get_required_container_version from apache_beam.utils.names import PropertyNames from apache_beam.utils.options import GoogleCloudOptions from apache_beam.utils.options import StandardOptions @@ -260,7 +261,8 @@ class Environment(object): # Default to using the worker harness container image for the current SDK # version. pool.workerHarnessContainerImage = ( - 'dataflow.gcr.io/v1beta3/python:%s' % version.__version__) + 'dataflow.gcr.io/v1beta3/python:%s' % + get_required_container_version()) if self.worker_options.teardown_policy: if self.worker_options.teardown_policy == 'TEARDOWN_NEVER': pool.teardownPolicy = ( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0bda677d/sdks/python/apache_beam/utils/dependency.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/dependency.py b/sdks/python/apache_beam/utils/dependency.py index 1c6ad9c..be7cd03 100644 --- a/sdks/python/apache_beam/utils/dependency.py +++ b/sdks/python/apache_beam/utils/dependency.py @@ -72,9 +72,6 @@ WORKFLOW_TARBALL_FILE = 'workflow.tar.gz' REQUIREMENTS_FILE = 'requirements.txt' EXTRA_PACKAGES_FILE = 'extra_packages.txt' -PACKAGES_URL_PREFIX = ( - 'https://github.com/GoogleCloudPlatform/DataflowPythonSDK/archive') - def _dependency_file_copy(from_path, to_path): """Copies a local file to a GCS file or vice versa.""" @@ -327,8 +324,9 @@ def stage_job_resources( staged_path = utils.path.join(google_cloud_options.staging_location, names.DATAFLOW_SDK_TARBALL_FILE) if stage_tarball_from_remote_location: - # If --sdk_location is not specified then the appropriate URL is built - # based on the version of the currently running SDK. If the option is + # If --sdk_location is not specified then the appropriate package + # will be obtained from PyPI (https://pypi.python.org) based on the + # version of the currently running SDK. If the option is # present then no version matching is made and the exact URL or path # is expected. # @@ -336,8 +334,7 @@ def stage_job_resources( # not have the sdk_location attribute present and therefore we # will not stage a tarball. if setup_options.sdk_location == 'default': - sdk_remote_location = '%s/v%s.tar.gz' % ( - PACKAGES_URL_PREFIX, __version__) + sdk_remote_location = 'pypi' else: sdk_remote_location = setup_options.sdk_location _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir) @@ -423,7 +420,62 @@ def _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir): 'Staging Dataflow SDK tarball from %s to %s', sdk_remote_location, staged_path) _dependency_file_copy(sdk_remote_location, staged_path) + elif sdk_remote_location == 'pypi': + logging.info('Staging the SDK tarball from PyPI to %s', staged_path) + _dependency_file_copy(_download_pypi_sdk_package(temp_dir), staged_path) else: raise RuntimeError( 'The --sdk_location option was used with an unsupported ' 'type of location: %s' % sdk_remote_location) + + +def get_required_container_version(): + """Returns the Google Cloud Dataflow container version for remote execution. + + Raises: + pkg_resources.DistributionNotFound: if one of the expected package names + are not found: 'google-cloud-dataflow' (right now) and 'apache-beam' + (in the future). + """ + # TODO(silviuc): Handle apache-beam versions when we have official releases. + import pkg_resources as pkg + try: + version = pkg.get_distribution('google-cloud-dataflow').version + # We drop any pre/post parts of the version and we keep only the X.Y.Z format. + # For instance the 0.3.0rc2 SDK version translates into 0.3.0. + return '%s.%s.%s' % pkg.parse_version(version)._version.release + except pkg.DistributionNotFound: + # This case covers Apache Beam end-to-end testing scenarios. All these tests + # will run with the latest container version. + return 'latest' + + +def _download_pypi_sdk_package(temp_dir): + """Downloads SDK package from PyPI and returns path to local path.""" + # TODO(silviuc): Handle apache-beam versions when we have official releases. + PACKAGE_NAME = 'google-cloud-dataflow' + import pkg_resources as pkg + version = pkg.get_distribution('google-cloud-dataflow').version + # Get a source distribution for the SDK package from PyPI. + cmd_args = [ + 'pip', 'install', '--download', temp_dir, + '%s==%s' % (PACKAGE_NAME, version), + '--no-binary', ':all:', '--no-deps'] + logging.info('Executing command: %s', cmd_args) + result = processes.call(cmd_args) + if result != 0: + raise RuntimeError( + 'Failed to execute command: %s. Exit code %d', + cmd_args, result) + zip_expected = os.path.join(temp_dir, '%s-%s.zip' % (PACKAGE_NAME, version)) + if os.path.exists(zip_expected): + return zip_expected + tgz_expected = os.path.join( + temp_dir, '%s-%s.tar.gz' % (PACKAGE_NAME, version)) + if os.path.exists(tgz_expected): + return tgz_expected + raise RuntimeError( + 'Failed to download a source distribution for the running SDK. Expected ' + 'either %s or %s to be found in the download folder.' % ( + zip_expected, tgz_expected)) + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0bda677d/sdks/python/apache_beam/utils/dependency_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/dependency_test.py b/sdks/python/apache_beam/utils/dependency_test.py index 8a97f4b..ab6446d 100644 --- a/sdks/python/apache_beam/utils/dependency_test.py +++ b/sdks/python/apache_beam/utils/dependency_test.py @@ -243,11 +243,19 @@ class SetupTest(unittest.TestCase): dependency._dependency_file_download = file_download return os.path.join(expected_to_folder, 'sdk-tarball') + def override_pypi_download(self, expected_from_url, expected_to_folder): + def pypi_download(_): + tarball_path = os.path.join(expected_to_folder, 'sdk-tarball') + with open(tarball_path, 'w') as f: + f.write('Some contents.') + return tarball_path + dependency._download_pypi_sdk_package = pypi_download + return os.path.join(expected_to_folder, 'sdk-tarball') + def test_sdk_location_default(self): staging_dir = tempfile.mkdtemp() - expected_from_url = '%s/v%s.tar.gz' % ( - dependency.PACKAGES_URL_PREFIX, __version__) - expected_from_path = self.override_file_download( + expected_from_url = 'pypi' + expected_from_path = self.override_pypi_download( expected_from_url, staging_dir) self.override_file_copy(expected_from_path, staging_dir)
