lostluck commented on a change in pull request #16908:
URL: https://github.com/apache/beam/pull/16908#discussion_r814389427



##########
File path: sdks/go/test/integration/internal/jars/jars.go
##########
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package jars contains functionality for running jars for integration tests. 
The main entry point
+// for running a jar is the Run function. The Process interface is used to 
interact with the running
+// jars, and most importantly for shutting down the jars once finished with 
them.
+package jars
+
+import (
+       "fmt"
+       "os/exec"
+)
+
+type runCallback func(duration, jar string, args ...string) (Process, error)
+
+var runner runCallback // Saves which behavior to use when Run is called.
+
+func init() {
+       runner = getRunner()
+}
+
+// getRunner is used to determine the appropriate behavior for run during 
initialization time,
+// based on the OS and installed binaries of the system. This is returned as a 
runCallback which
+// can be called whenever Run is called. If an error prevents Run from being 
used at all (for
+// example, Java is not installed), then the runCallback will return that 
error.
+func getRunner() runCallback {
+       // First check if we can even run jars.
+       _, err := exec.LookPath("java")
+       if err != nil {
+               err := fmt.Errorf("cannot run jar: 'java' command not 
installed: %w", err)
+               return func(_, _ string, _ ...string) (Process, error) {
+                       return nil, err
+               }
+       }
+
+       // Defer to OS-specific logic for checking for presence of timeout 
command.
+       return getTimeoutRunner()
+}
+
+// Run runs a jar given an optional duration, a path to the jar, and any 
desired arguments to the
+// jar. It returns a Process object which can be used to shut down the jar 
once finished.
+//
+// The duration parameter is a duration for the timeout command which can be 
used to automatically
+// kill the jar after a set duration, in order to avoid resource leakage. It 
is described here:
+// https://man7.org/linux/man-pages/man1/timeout.1.html. If a duration is 
provided but the system
+// is unable to use the timeout command, this function will return an error. 
To indicate that a
+// duration isn't needed, pass in an empty string.
+func Run(duration, jar string, args ...string) (Process, error) {

Review comment:
       I don't love the string "duration" field. Consider accepting a 
`time.Duration` instead and handling the format conversion. 
   
   `time.Duration` even has a `Seconds` method which returns the duration, in 
seconds, in floating value, which you can then simply `fmt.Sprintf("%fs")`
   https://pkg.go.dev/time#Duration.Seconds
   
   To avoid the additional plumbing change, you could even do that at this 
early stage, and have a 'RunForDuration' that accepts the time.Duration 
parameter, and this one, `Run` can avoid the parameter and pass the empty 
string downstream.
   
   This approach also eventually allows graceful support for the non-unix OSs 
which might have differently specified timeout mechanisms, that we could then 
take advantage of with alternative formats.

##########
File path: sdks/go/test/integration/expansions_test.go
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package integration
+
+import (
+       "fmt"
+       "testing"
+
+       "github.com/apache/beam/sdks/v2/go/test/integration/internal/jars"
+       "github.com/google/go-cmp/cmp"
+)
+
+type testProcess struct {
+       killed bool
+       jar    string
+}
+
+func (p *testProcess) Kill() error {
+       p.killed = true
+       return nil
+}
+
+func failRun(_, _ string, _ ...string) (jars.Process, error) {
+       return nil, fmt.Errorf("unexpectedly running a jar, failing")
+}
+
+func succeedRun(_, jar string, _ ...string) (jars.Process, error) {
+       return &testProcess{jar: jar}, nil
+}
+
+// TestExpansionServices_GetAddr_Addresses tests calling GetAddr on provided 
addresses.
+func TestExpansionServices_GetAddr_Addresses(t *testing.T) {
+       addrsMap := map[string]string{
+               "label1": "testAddr1",
+               "label2": "testAddr2",
+               "label3": "testAddr3",
+       }
+       jarsMap := map[string]string{
+               "label2": "jarFilepath2",
+       }
+       es := &ExpansionServices{
+               addrs: addrsMap,
+               jars:  jarsMap,
+               procs: make([]jars.Process, 0),
+               run:   failRun,
+       }
+
+       // Ensure we get the same map we put in, and that addresses take 
priority over jars if
+       // both are given for the same label.
+       for label, wantAddr := range addrsMap {
+               gotAddr, err := es.GetAddr(label)
+               if err != nil {
+                       t.Errorf("unexpected error when getting address for 
\"%v\": %v", label, err)
+                       continue
+               }
+               if gotAddr != wantAddr {
+                       t.Errorf("incorrect address for \"%v\", want %v, got 
%v", label, wantAddr, gotAddr)
+               }
+       }
+       // Check that nonexistent labels fail.
+       if _, err := es.GetAddr("nonexistent_label"); err == nil {
+               t.Errorf("did not receive error when calling GetAddr with 
nonexistent label")
+       }
+}
+
+// TestExpansionServices_GetAddr_Jars tests calling GetAddr on provided jars.
+func TestExpansionServices_GetAddr_Jars(t *testing.T) {
+       addrsMap := map[string]string{}
+       jarsMap := map[string]string{
+               "label1": "jarFilepath1",
+               "label2": "jarFilepath2",
+               "label3": "jarFilepath3",
+       }
+       es := &ExpansionServices{
+               addrs: addrsMap,
+               jars:  jarsMap,
+               procs: make([]jars.Process, 0),
+               run:   succeedRun,
+       }
+
+       // Call GetAddr on each jar twice, checking that the addresses remain 
consistent.
+       gotMap := make(map[string]string)
+       for label := range jarsMap {
+               gotAddr, err := es.GetAddr(label)
+               if err != nil {
+                       t.Errorf("unexpected error when getting address for 
\"%v\": %v", label, err)
+                       continue
+               }
+               gotMap[label] = gotAddr
+       }
+       for label, gotAddr := range gotMap {
+               secondAddr, err := es.GetAddr(label)
+               if err != nil {
+                       t.Errorf("unexpected error when getting address for 
\"%v\": %v", label, err)
+                       continue
+               }
+               if secondAddr != gotAddr {
+                       t.Errorf("getAddr returned different address when 
called twice for \"%v\", "+
+                               "attempt 1: %v, attempt 2: %v", label, gotAddr, 
secondAddr)
+               }
+       }
+       // Check that all jars were run.
+       gotJars := make([]string, 0)
+       for _, proc := range es.procs {
+               testProc := proc.(*testProcess)
+               gotJars = append(gotJars, testProc.jar)
+       }
+       wantJars := make([]string, 0)
+       for _, jar := range jarsMap {
+               wantJars = append(wantJars, jar)
+       }
+       if diff := cmp.Diff(wantJars, gotJars); diff != "" {

Review comment:
       You'll need to add the unordered compare here, *or* sort the slices 
first. TBH I'd just sort the slices.

##########
File path: sdks/go/test/integration/expansions.go
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package integration
+
+import (
+       "fmt"
+       "strconv"
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/test/integration/internal/jars"
+       "github.com/apache/beam/sdks/v2/go/test/integration/internal/ports"
+)
+
+// ExpansionServices is a struct used for getting addresses and starting 
expansion services, based
+// on the --expansion_jar and --expansion_addr flags in this package. The main 
reason to use this
+// instead of accessing the flags directly is to let it handle jar startup and 
shutdown.
+//
+// Usage
+//
+// Create an ExpansionServices object in TestMain with NewExpansionServices. 
Then use GetAddr for
+// every expansion service needed for the test. Call Shutdown on it before 
finishing TestMain (or
+// simply defer a call to it).
+//
+// ExpansionServices is not concurrency safe, and so a single instance should 
not be used within
+// multiple individual tests, due to the possibility of those tests being run 
concurrently. It is
+// recommended to only use ExpansionServices in TestMain to avoid this.
+//
+// Example:
+//   var retCode int
+//   defer func() { os.Exit(retCode) }()  // Defer os.Exit so it happens after 
other defers.
+//   services := integration.NewExpansionServices()
+//   defer func() { services.Shutdown() }()
+//   addr, err := services.GetAddr("example")
+//   if err != nil {
+//     panic(err)
+//   }

Review comment:
       Note that panic does unwind the stack and executes defers, so you may 
want to explicitly set retCode = 1 before the panic to make sure it exits with 
an error state.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to