This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 934b141  [BEAM-13922] [Coverage] Make boot.go more testable and add 
tests (#16833)
934b141 is described below

commit 934b1416c633be6385ff85c8228d3fc85de04a33
Author: Danny McCormick <[email protected]>
AuthorDate: Mon Feb 14 16:56:29 2022 -0500

    [BEAM-13922] [Coverage] Make boot.go more testable and add tests (#16833)
---
 sdks/go/container/boot.go      | 115 +++++++++++++----------
 sdks/go/container/boot_test.go | 205 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 270 insertions(+), 50 deletions(-)

diff --git a/sdks/go/container/boot.go b/sdks/go/container/boot.go
index 49d9165..93311e5 100644
--- a/sdks/go/container/boot.go
+++ b/sdks/go/container/boot.go
@@ -17,7 +17,9 @@ package main
 
 import (
        "context"
+       "errors"
        "flag"
+       "fmt"
        "io"
        "log"
        "os"
@@ -25,6 +27,8 @@ import (
        "strings"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/provision"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
@@ -58,25 +62,9 @@ func main() {
        }
        log.Printf("Provision info:\n%v", info)
 
-       // TODO(BEAM-8201): Simplify once flags are no longer used.
-       if info.GetLoggingEndpoint().GetUrl() != "" {
-               *loggingEndpoint = info.GetLoggingEndpoint().GetUrl()
-       }
-       if info.GetArtifactEndpoint().GetUrl() != "" {
-               *artifactEndpoint = info.GetArtifactEndpoint().GetUrl()
-       }
-       if info.GetControlEndpoint().GetUrl() != "" {
-               *controlEndpoint = info.GetControlEndpoint().GetUrl()
-       }
-
-       if *loggingEndpoint == "" {
-               log.Fatal("No logging endpoint provided.")
-       }
-       if *artifactEndpoint == "" {
-               log.Fatal("No artifact endpoint provided.")
-       }
-       if *controlEndpoint == "" {
-               log.Fatal("No control endpoint provided.")
+       err = ensureEndpointsSet(info)
+       if err != nil {
+               log.Fatalf("Endpoint not set: %v", err)
        }
        log.Printf("Initializing Go harness: %v", strings.Join(os.Args, " "))
 
@@ -99,60 +87,87 @@ func main() {
                log.Fatalf("Failed to retrieve staged files: %v", err)
        }
 
-       // TODO(BEAM-13647): Remove legacy hack once aged out.
+       name, err := getGoWorkerArtifactName(artifacts)
+       if err != nil {
+               log.Fatalf("Failed to get Go Worker Artifact Name: %v", err)
+       }
+
+       // (3) The persist dir may be on a noexec volume, so we must
+       // copy the binary to a different location to execute.
+       const prog = "/bin/worker"
+       if err := copyExe(filepath.Join(dir, name), prog); err != nil {
+               log.Fatalf("Failed to copy worker binary: %v", err)
+       }
+
+       args := []string{
+               "--worker=true",
+               "--id=" + *id,
+               "--logging_endpoint=" + *loggingEndpoint,
+               "--control_endpoint=" + *controlEndpoint,
+               "--semi_persist_dir=" + *semiPersistDir,
+               "--options=" + options,
+       }
+       if info.GetStatusEndpoint() != nil {
+               args = append(args, 
"--status_endpoint="+info.GetStatusEndpoint().GetUrl())
+       }
+
+       log.Fatalf("User program exited: %v", execx.Execute(prog, args...))
+}
+
+func getGoWorkerArtifactName(artifacts []*pipepb.ArtifactInformation) (string, 
error) {
        const worker = "worker"
        name := worker
 
        switch len(artifacts) {
        case 0:
-               log.Fatal("No artifacts staged")
+               return "", errors.New(fmt.Sprintf("No artifacts staged"))
        case 1:
                name, _ = artifact.MustExtractFilePayload(artifacts[0])
+               return name, nil
        default:
-               found := false
                for _, a := range artifacts {
                        if a.GetRoleUrn() == artifact.URNGoWorkerBinaryRole {
                                name, _ = artifact.MustExtractFilePayload(a)
-                               found = true
-                               break
+                               return name, nil
                        }
                }
                // TODO(BEAM-13647): Remove legacy hack once aged out.
-               if !found {
-                       for _, a := range artifacts {
-                               n, _ := artifact.MustExtractFilePayload(a)
-                               if n == worker {
-                                       found = true
-                                       log.Printf("Go worker binary found with 
legacy name '%v' found", worker)
-                                       break
-                               }
+               for _, a := range artifacts {
+                       n, _ := artifact.MustExtractFilePayload(a)
+                       if n == worker {
+                               log.Printf("Go worker binary found with legacy 
name '%v'", worker)
+                               return n, nil
                        }
                }
-               if !found {
-                       log.Fatalf("No artifact named '%v' found", worker)
-               }
+               return "", errors.New(fmt.Sprintf("No artifact named '%v' 
found", worker))
        }
 
-       // (3) The persist dir may be on a noexec volume, so we must
-       // copy the binary to a different location to execute.
-       const prog = "/bin/worker"
-       if err := copyExe(filepath.Join(dir, name), prog); err != nil {
-               log.Fatalf("Failed to copy worker binary: %v", err)
+       return name, nil
+}
+
+func ensureEndpointsSet(info *fnpb.ProvisionInfo) error {
+       // TODO(BEAM-8201): Simplify once flags are no longer used.
+       if info.GetLoggingEndpoint().GetUrl() != "" {
+               *loggingEndpoint = info.GetLoggingEndpoint().GetUrl()
+       }
+       if info.GetArtifactEndpoint().GetUrl() != "" {
+               *artifactEndpoint = info.GetArtifactEndpoint().GetUrl()
+       }
+       if info.GetControlEndpoint().GetUrl() != "" {
+               *controlEndpoint = info.GetControlEndpoint().GetUrl()
        }
 
-       args := []string{
-               "--worker=true",
-               "--id=" + *id,
-               "--logging_endpoint=" + *loggingEndpoint,
-               "--control_endpoint=" + *controlEndpoint,
-               "--semi_persist_dir=" + *semiPersistDir,
-               "--options=" + options,
+       if *loggingEndpoint == "" {
+               return errors.New("No logging endpoint provided.")
        }
-       if info.GetStatusEndpoint() != nil {
-               args = append(args, 
"--status_endpoint="+info.GetStatusEndpoint().GetUrl())
+       if *artifactEndpoint == "" {
+               return errors.New("No artifact endpoint provided.")
+       }
+       if *controlEndpoint == "" {
+               return errors.New("No control endpoint provided.")
        }
 
-       log.Fatalf("User program exited: %v", execx.Execute(prog, args...))
+       return nil
 }
 
 func copyExe(from, to string) error {
diff --git a/sdks/go/container/boot_test.go b/sdks/go/container/boot_test.go
new file mode 100644
index 0000000..7eda752
--- /dev/null
+++ b/sdks/go/container/boot_test.go
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+       "io/ioutil"
+       "os"
+       "path/filepath"
+       "testing"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+       "github.com/golang/protobuf/proto"
+)
+
+func TestEnsureEndpointsSet_AllSet(t *testing.T) {
+       provisionInfo := &fnpb.ProvisionInfo{
+               LoggingEndpoint:  &pipepb.ApiServiceDescriptor{Url: 
"testLoggingEndpointUrl"},
+               ArtifactEndpoint: &pipepb.ApiServiceDescriptor{Url: 
"testArtifactEndpointUrl"},
+               ControlEndpoint:  &pipepb.ApiServiceDescriptor{Url: 
"testControlEndpointUrl"},
+       }
+       *loggingEndpoint = ""
+       *artifactEndpoint = ""
+       *controlEndpoint = ""
+       err := ensureEndpointsSet(provisionInfo)
+       if err != nil {
+               t.Fatalf("ensureEndpointsSet() = %q, want nil", err)
+       }
+       if got, want := *loggingEndpoint, "testLoggingEndpointUrl"; got != want 
{
+               t.Fatalf("After ensureEndpointsSet(), *loggingEndpoint = %q, 
want %q", got, want)
+       }
+       if got, want := *artifactEndpoint, "testArtifactEndpointUrl"; got != 
want {
+               t.Fatalf("After ensureEndpointsSet(), *artifactEndpoint = %q, 
want %q", got, want)
+       }
+       if got, want := *controlEndpoint, "testControlEndpointUrl"; got != want 
{
+               t.Fatalf("After ensureEndpointsSet(), *controlEndpoint = %q, 
want %q", got, want)
+       }
+}
+
+func TestEnsureEndpointsSet_OneMissing(t *testing.T) {
+       provisionInfo := &fnpb.ProvisionInfo{
+               LoggingEndpoint:  &pipepb.ApiServiceDescriptor{Url: 
"testLoggingEndpointUrl"},
+               ArtifactEndpoint: &pipepb.ApiServiceDescriptor{Url: 
"testArtifactEndpointUrl"},
+               ControlEndpoint:  &pipepb.ApiServiceDescriptor{Url: ""},
+       }
+       *loggingEndpoint = ""
+       *artifactEndpoint = ""
+       *controlEndpoint = ""
+       err := ensureEndpointsSet(provisionInfo)
+       if err == nil {
+               t.Fatalf("ensureEndpointsSet() = nil, want non-nil error")
+       }
+       if got, want := *loggingEndpoint, "testLoggingEndpointUrl"; got != want 
{
+               t.Fatalf("After ensureEndpointsSet(), *loggingEndpoint = %q, 
want %q", got, want)
+       }
+       if got, want := *artifactEndpoint, "testArtifactEndpointUrl"; got != 
want {
+               t.Fatalf("After ensureEndpointsSet(), *artifactEndpoint = %q, 
want %q", got, want)
+       }
+       if got, want := *controlEndpoint, ""; got != want {
+               t.Fatalf("After ensureEndpointsSet(), *controlEndpoint = %q, 
want %q", got, want)
+       }
+}
+
+func TestGetGoWorkerArtifactName_NoArtifacts(t *testing.T) {
+       _, err := getGoWorkerArtifactName([]*pipepb.ArtifactInformation{})
+       if err == nil {
+               t.Fatalf("getGoWorkerArtifactName() = nil, want non-nil error")
+       }
+}
+
+func TestGetGoWorkerArtifactName_OneArtifact(t *testing.T) {
+       artifact := constructArtifactInformation(t, 
artifact.URNGoWorkerBinaryRole, "test/path", "sha")
+       artifacts := []*pipepb.ArtifactInformation{&artifact}
+
+       val, err := getGoWorkerArtifactName(artifacts)
+       if err != nil {
+               t.Fatalf("getGoWorkerArtifactName() = %v, want nil", err)
+       }
+       if got, want := val, "test/path"; got != want {
+               t.Fatalf("getGoWorkerArtifactName() = %v, want %v", got, want)
+       }
+}
+
+func TestGetGoWorkerArtifactName_MultipleArtifactsFirstIsWorker(t *testing.T) {
+       artifact1 := constructArtifactInformation(t, 
artifact.URNGoWorkerBinaryRole, "test/path", "sha")
+       artifact2 := constructArtifactInformation(t, "other role", 
"test/path2", "sha")
+       artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}
+
+       val, err := getGoWorkerArtifactName(artifacts)
+       if err != nil {
+               t.Fatalf("getGoWorkerArtifactName() = %v, want nil", err)
+       }
+       if got, want := val, "test/path"; got != want {
+               t.Fatalf("getGoWorkerArtifactName() = %v, want %v", got, want)
+       }
+}
+
+func TestGetGoWorkerArtifactName_MultipleArtifactsSecondIsWorker(t *testing.T) 
{
+       artifact1 := constructArtifactInformation(t, "other role", "test/path", 
"sha")
+       artifact2 := constructArtifactInformation(t, 
artifact.URNGoWorkerBinaryRole, "test/path2", "sha")
+       artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}
+
+       val, err := getGoWorkerArtifactName(artifacts)
+       if err != nil {
+               t.Fatalf("getGoWorkerArtifactName() = %v, want nil", err)
+       }
+       if got, want := val, "test/path2"; got != want {
+               t.Fatalf("getGoWorkerArtifactName() = %v, want %v", got, want)
+       }
+}
+
+func TestGetGoWorkerArtifactName_MultipleArtifactsLegacyWay(t *testing.T) {
+       artifact1 := constructArtifactInformation(t, "other role", "test/path", 
"sha")
+       artifact2 := constructArtifactInformation(t, "other role", "worker", 
"sha")
+       artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}
+
+       val, err := getGoWorkerArtifactName(artifacts)
+       if err != nil {
+               t.Fatalf("getGoWorkerArtifactName() = %v, want nil", err)
+       }
+       if got, want := val, "worker"; got != want {
+               t.Fatalf("getGoWorkerArtifactName() = %v, want %v", got, want)
+       }
+}
+
+func TestGetGoWorkerArtifactName_MultipleArtifactsNoneMatch(t *testing.T) {
+       artifact1 := constructArtifactInformation(t, "other role", "test/path", 
"sha")
+       artifact2 := constructArtifactInformation(t, "other role", 
"test/path2", "sha")
+       artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}
+
+       _, err := getGoWorkerArtifactName(artifacts)
+       if err == nil {
+               t.Fatalf("getGoWorkerArtifactName() = nil, want non-nil error")
+       }
+}
+
+func TestCopyExe(t *testing.T) {
+       testExeContent := []byte("testContent")
+
+       // Make temp directory to cleanup at the end
+       d, err := ioutil.TempDir(os.Getenv("TEST_TMPDIR"), "copyExe-*")
+       if err != nil {
+               t.Fatalf("failed to make temp directory, got %v", err)
+       }
+       t.Cleanup(func() { os.RemoveAll(d) })
+
+       // Make our source file and write to it
+       src, err := ioutil.TempFile(d, "src.exe")
+       if err != nil {
+               t.Fatalf("failed to make temp file, got %v", err)
+       }
+       if _, err := src.Write(testExeContent); err != nil {
+               t.Fatalf("failed to write to temp file, got %v", err)
+       }
+       if err := src.Close(); err != nil {
+               t.Fatalf("failed to close temp file, got %v", err)
+       }
+
+       // Make sure our destination path doesn't exist already
+       srcPath, destPath := src.Name(), filepath.Join(d, "dest.exe")
+       if _, err := os.Stat(destPath); err == nil {
+               t.Fatalf("dest file %v already exists", destPath)
+       }
+
+       err = copyExe(srcPath, destPath)
+       if err != nil {
+               t.Fatalf("copyExe() = %v, want nil", err)
+       }
+       if _, err := os.Stat(destPath); err != nil {
+               t.Fatalf("After running copyExe, os.Stat() = %v, want nil", err)
+       }
+       destContents, err := ioutil.ReadFile(destPath)
+       if err != nil {
+               t.Fatalf("After running copyExe, ioutil.ReadFile() = %v, want 
nil", err)
+       }
+       if got, want := string(destContents), string(testExeContent); got != 
want {
+               t.Fatalf("After running copyExe, ioutil.ReadFile() = %v, want 
%v", got, want)
+       }
+}
+
+func constructArtifactInformation(t *testing.T, roleUrn string, path string, 
sha string) pipepb.ArtifactInformation {
+       t.Helper()
+
+       typePayload, _ := proto.Marshal(&pipepb.ArtifactFilePayload{Path: path, 
Sha256: sha})
+
+       return pipepb.ArtifactInformation{
+               RoleUrn:     roleUrn,
+               TypeUrn:     artifact.URNFileArtifact,
+               TypePayload: typePayload,
+       }
+}

Reply via email to