This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1abeee9  Support installing Beam SDK from a wheel distribution in SDK 
containers. (#5446)
1abeee9 is described below

commit 1abeee9ee0da16d3a9ab713d9b0c4198ddf766f0
Author: tvalentyn <[email protected]>
AuthorDate: Tue May 29 17:06:50 2018 -0700

    Support installing Beam SDK from a wheel distribution in SDK containers. 
(#5446)
    
    Support installing Beam SDK from a wheel file.
---
 sdks/python/container/boot.go    |  12 ++-
 sdks/python/container/piputil.go | 214 ++++++++++++++++++++++-----------------
 2 files changed, 130 insertions(+), 96 deletions(-)

diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index 14e2619..9001868 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -27,8 +27,8 @@ import (
        "strings"
 
        "github.com/apache/beam/sdks/go/pkg/beam/artifact"
-       pbpipeline "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
        pbjob "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+       pbpipeline "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
        "github.com/apache/beam/sdks/go/pkg/beam/provision"
        "github.com/apache/beam/sdks/go/pkg/beam/util/execx"
        "github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
@@ -36,6 +36,8 @@ import (
 )
 
 var (
+       acceptableWhlSpecs = []string{"cp27-cp27mu-manylinux1_x86_64.whl"}
+
        // Contract: https://s.apache.org/beam-fn-api-container-contract.
 
        id                = flag.String("id", "", "Local identifier 
(required).")
@@ -48,10 +50,10 @@ var (
 
 const (
        sdkHarnessEntrypoint = "apache_beam.runners.worker.sdk_worker_main"
-       // Please keep these names in sync with setup dependency.py
+       // Please keep these names in sync with stager.py
        workflowFile      = "workflow.tar.gz"
        requirementsFile  = "requirements.txt"
-       sdkFile           = "dataflow_python_sdk.tar"
+       sdkSrcFile        = "dataflow_python_sdk.tar"
        extraPackagesFile = "extra_packages.txt"
 )
 
@@ -100,7 +102,7 @@ func main() {
        // TODO(herohde): the packages to install should be specified 
explicitly. It
        // would also be possible to install the SDK in the Dockerfile.
        if setupErr := installSetupPackages(files, dir); setupErr != nil {
-               log.Fatalf("Failed to install SDK: %v", setupErr)
+               log.Fatalf("Failed to install required packages: %v", setupErr)
        }
 
        // (3) Invoke python
@@ -133,7 +135,7 @@ func installSetupPackages(mds []*pbjob.ArtifactMetadata, 
workDir string) error {
        // Install the Dataflow Python SDK and worker packages.
        // We install the extra requirements in case of using the beam sdk. 
These are ignored by pip
        // if the user is using an SDK that does not provide these.
-       if err := pipInstallPackage(files, workDir, sdkFile, false, false, 
[]string{"gcp"}); err != nil {
+       if err := installSdk(files, workDir, sdkSrcFile, acceptableWhlSpecs); 
err != nil {
                return fmt.Errorf("failed to install SDK: %v", err)
        }
        // The staged files will not disappear due to restarts because workDir 
is a
diff --git a/sdks/python/container/piputil.go b/sdks/python/container/piputil.go
index b227774..b8d837a 100644
--- a/sdks/python/container/piputil.go
+++ b/sdks/python/container/piputil.go
@@ -16,116 +16,148 @@
 package main
 
 import (
-  "bufio"
-  "bytes"
-  "errors"
-  "fmt"
-  "io/ioutil"
-  "log"
-  "path/filepath"
-  "strings"
+       "bufio"
+       "bytes"
+       "errors"
+       "fmt"
+       "io/ioutil"
+       "log"
+       "path/filepath"
+       "strings"
 
-  "github.com/apache/beam/sdks/go/pkg/beam/util/execx"
+       "github.com/apache/beam/sdks/go/pkg/beam/util/execx"
 )
 
 const (
-  pip = "/usr/local/bin/pip"
+       pip = "/usr/local/bin/pip"
 )
 
 // pipInstallRequirements installs the given requirement, if present.
 func pipInstallRequirements(files []string, dir, name string) error {
-  for _, file := range files {
-    if file == name {
-      // We run the install process in two rounds in order to avoid as much
-      // as possible PyPI downloads. In the first round the --find-links
-      // option will make sure that only things staged in the worker will be
-      // used without following their dependencies.
-      args := []string{"install", "-r", filepath.Join(dir, name), 
"--no-index", "--no-deps", "--find-links", dir}
-      if err := execx.Execute(pip, args...); err != nil {
-        return err
-      }
-      // The second install round opens up the search for packages on PyPI and
-      // also installs dependencies. The key is that if all the packages have
-      // been installed in the first round then this command will be a no-op.
-      args = []string{"install", "-r", filepath.Join(dir, name), 
"--find-links", dir}
-      return execx.Execute(pip, args...)
-    }
-  }
-  return nil
+       for _, file := range files {
+               if file == name {
+                       // We run the install process in two rounds in order to 
avoid as much
+                       // as possible PyPI downloads. In the first round the 
--find-links
+                       // option will make sure that only things staged in the 
worker will be
+                       // used without following their dependencies.
+                       args := []string{"install", "-r", filepath.Join(dir, 
name), "--no-index", "--no-deps", "--find-links", dir}
+                       if err := execx.Execute(pip, args...); err != nil {
+                               return err
+                       }
+                       // The second install round opens up the search for 
packages on PyPI and
+                       // also installs dependencies. The key is that if all 
the packages have
+                       // been installed in the first round then this command 
will be a no-op.
+                       args = []string{"install", "-r", filepath.Join(dir, 
name), "--find-links", dir}
+                       return execx.Execute(pip, args...)
+               }
+       }
+       return nil
 }
 
 // pipInstallPackage installs the given package, if present.
 func pipInstallPackage(files []string, dir, name string, force, optional bool, 
extras []string) error {
-  for _, file := range files {
-    if file == name {
-      var packageSpec = name
-      if extras != nil {
-        packageSpec += "[" + strings.Join(extras, ",") + "]"
-      }
-      if force {
-        // We only use force reinstallation for packages specified using the
-        // --extra_package flag.  In this case, we always want to use the
-        // user-specified package, overwriting any existing package already
-        // installed.  At the same time, we want to avoid reinstalling any
-        // dependencies.  The "pip install" command doesn't have a clean way 
to do
-        // this, so we do this in two steps.
-        //
-        // First, we use the three flags "--upgrade --force-reinstall 
--no-deps"
-        // to "pip install" so as to force the package to be reinstalled, while
-        // avoiding reinstallation of dependencies.  Note now that if any 
needed
-        // dependencies were not installed, they will still be missing.
-        //
-        // Next, we run "pip install" on the package without any flags.  Since 
the
-        // installed version will match the package specified, the package 
itself
-        // will not be reinstalled, but its dependencies will now be resolved 
and
-        // installed if necessary.  This achieves our goal outlined above.
-        args := []string{"install", "--upgrade", "--force-reinstall", 
"--no-deps",
-          filepath.Join(dir, packageSpec)}
-        err := execx.Execute(pip, args...)
-        if err != nil {
-          return err
-        }
-        args = []string{"install", filepath.Join(dir, packageSpec)}
-        return execx.Execute(pip, args...)
-      }
+       for _, file := range files {
+               if file == name {
+                       var packageSpec = name
+                       if extras != nil {
+                               packageSpec += "[" + strings.Join(extras, ",") 
+ "]"
+                       }
+                       if force {
+                               // We only use force reinstallation for 
packages specified using the
+                               // --extra_package flag.  In this case, we 
always want to use the
+                               // user-specified package, overwriting any 
existing package already
+                               // installed.  At the same time, we want to 
avoid reinstalling any
+                               // dependencies.  The "pip install" command 
doesn't have a clean way to do
+                               // this, so we do this in two steps.
+                               //
+                               // First, we use the three flags "--upgrade 
--force-reinstall --no-deps"
+                               // to "pip install" so as to force the package 
to be reinstalled, while
+                               // avoiding reinstallation of dependencies.  
Note now that if any needed
+                               // dependencies were not installed, they will 
still be missing.
+                               //
+                               // Next, we run "pip install" on the package 
without any flags.  Since the
+                               // installed version will match the package 
specified, the package itself
+                               // will not be reinstalled, but its 
dependencies will now be resolved and
+                               // installed if necessary.  This achieves our 
goal outlined above.
+                               args := []string{"install", "--upgrade", 
"--force-reinstall", "--no-deps",
+                                       filepath.Join(dir, packageSpec)}
+                               err := execx.Execute(pip, args...)
+                               if err != nil {
+                                       return err
+                               }
+                               args = []string{"install", filepath.Join(dir, 
packageSpec)}
+                               return execx.Execute(pip, args...)
+                       }
 
-      // Case when we do not perform a forced reinstall.
-      args := []string{"install", filepath.Join(dir, packageSpec)}
-      return execx.Execute(pip, args...)
-    }
-  }
-  if optional {
-    return nil
-  }
-  return errors.New("package '" + name + "' not found")
+                       // Case when we do not perform a forced reinstall.
+                       args := []string{"install", filepath.Join(dir, 
packageSpec)}
+                       return execx.Execute(pip, args...)
+               }
+       }
+       if optional {
+               return nil
+       }
+       return errors.New("package '" + name + "' not found")
 }
 
 // installExtraPackages installs all the packages declared in the extra
 // packages manifest file.
 func installExtraPackages(files []string, extraPackagesFile, dir string) error 
{
-  // First check that extra packages manifest file is present.
-  for _, file := range files {
-    if file != extraPackagesFile {
-      continue
-    }
+       // First check that extra packages manifest file is present.
+       for _, file := range files {
+               if file != extraPackagesFile {
+                       continue
+               }
 
-    // Found the manifest. Install extra packages.
-    manifest, err := ioutil.ReadFile(filepath.Join(dir, extraPackagesFile))
-    if err != nil {
-      return fmt.Errorf("failed to read extra packages manifest file: %v", err)
-    }
+               // Found the manifest. Install extra packages.
+               manifest, err := ioutil.ReadFile(filepath.Join(dir, 
extraPackagesFile))
+               if err != nil {
+                       return fmt.Errorf("failed to read extra packages 
manifest file: %v", err)
+               }
 
-    s := bufio.NewScanner(bytes.NewReader(manifest))
-    s.Split(bufio.ScanLines)
+               s := bufio.NewScanner(bytes.NewReader(manifest))
+               s.Split(bufio.ScanLines)
 
-    for s.Scan() {
-      extraPackage := s.Text()
-      log.Printf("Installing extra package: %s", extraPackage)
-      if err = pipInstallPackage(files, dir, extraPackage, true, false, nil); 
err != nil {
-        return fmt.Errorf("failed to install extra package %s: %v", 
extraPackage, err)
-      }
-    }
-    return nil
-  }
-  return nil
+               for s.Scan() {
+                       extraPackage := s.Text()
+                       log.Printf("Installing extra package: %s", extraPackage)
+                       if err = pipInstallPackage(files, dir, extraPackage, 
true, false, nil); err != nil {
+                               return fmt.Errorf("failed to install extra 
package %s: %v", extraPackage, err)
+                       }
+               }
+               return nil
+       }
+       return nil
+}
+
+func findBeamSdkWhl(files []string, acceptableWhlSpecs []string) string {
+       for _, file := range files {
+               if strings.HasPrefix(file, "apache_beam") {
+                       for _, s := range acceptableWhlSpecs {
+                               if strings.HasSuffix(file, s) {
+                                       log.Printf("Found Apache Beam SDK 
wheel: %v", file)
+                                       return file
+                               }
+                       }
+               }
+       }
+       return ""
+}
+
+// InstallSdk installs Beam SDK: First, we try to find a compiled
+// wheel distribution of Apache Beam among staged files. If we find it, we
+// assume that the pipleine was started with the Beam SDK found in the wheel
+// file, and we try to install it. If not successful, we fall back to 
installing
+// SDK from source tarball provided in sdkSrcFile.
+func installSdk(files []string, workDir string, sdkSrcFile string, 
acceptableWhlSpecs []string) error {
+       sdkWhlFile := findBeamSdkWhl(files, acceptableWhlSpecs)
+       if sdkWhlFile != "" {
+               err := pipInstallPackage(files, workDir, sdkWhlFile, false, 
false, []string{"gcp"})
+               if err == nil {
+                       return nil
+               }
+               log.Printf("Could not install Apache Beam SDK from a wheel: %v, 
proceeding to install SDK from source tarball.", err)
+       }
+       err := pipInstallPackage(files, workDir, sdkSrcFile, false, false, 
[]string{"gcp"})
+       return err
 }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to