lostluck commented on a change in pull request #16908: URL: https://github.com/apache/beam/pull/16908#discussion_r814389427
########## File path: sdks/go/test/integration/internal/jars/jars.go ########## @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package jars contains functionality for running jars for integration tests. The main entry point +// for running a jar is the Run function. The Process interface is used to interact with the running +// jars, and most importantly for shutting down the jars once finished with them. +package jars + +import ( + "fmt" + "os/exec" +) + +type runCallback func(duration, jar string, args ...string) (Process, error) + +var runner runCallback // Saves which behavior to use when Run is called. + +func init() { + runner = getRunner() +} + +// getRunner is used to determine the appropriate behavior for run during initialization time, +// based on the OS and installed binaries of the system. This is returned as a runCallback which +// can be called whenever Run is called. If an error prevents Run from being used at all (for +// example, Java is not installed), then the runCallback will return that error. +func getRunner() runCallback { + // First check if we can even run jars. + _, err := exec.LookPath("java") + if err != nil { + err := fmt.Errorf("cannot run jar: 'java' command not installed: %w", err) + return func(_, _ string, _ ...string) (Process, error) { + return nil, err + } + } + + // Defer to OS-specific logic for checking for presence of timeout command. + return getTimeoutRunner() +} + +// Run runs a jar given an optional duration, a path to the jar, and any desired arguments to the +// jar. It returns a Process object which can be used to shut down the jar once finished. +// +// The duration parameter is a duration for the timeout command which can be used to automatically +// kill the jar after a set duration, in order to avoid resource leakage. It is described here: +// https://man7.org/linux/man-pages/man1/timeout.1.html. If a duration is provided but the system +// is unable to use the timeout command, this function will return an error. To indicate that a +// duration isn't needed, pass in an empty string. +func Run(duration, jar string, args ...string) (Process, error) { Review comment: I don't love the string "duration" field. Consider accepting a `time.Duration` instead and handling the format conversion. `time.Duration` even has a `Seconds` method which returns the duration, in seconds, in floating value, which you can then simply `fmt.Sprintf("%fs")` https://pkg.go.dev/time#Duration.Seconds To avoid the additional plumbing change, you could even do that at this early stage, and have a 'RunForDuration' that accepts the time.Duration parameter, and this one, `Run` can avoid the parameter and pass the empty string downstream. This approach also eventually allows graceful support for the non-unix OSs which might have differently specified timeout mechanisms, that we could then take advantage of with alternative formats. ########## File path: sdks/go/test/integration/expansions_test.go ########## @@ -0,0 +1,169 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "fmt" + "testing" + + "github.com/apache/beam/sdks/v2/go/test/integration/internal/jars" + "github.com/google/go-cmp/cmp" +) + +type testProcess struct { + killed bool + jar string +} + +func (p *testProcess) Kill() error { + p.killed = true + return nil +} + +func failRun(_, _ string, _ ...string) (jars.Process, error) { + return nil, fmt.Errorf("unexpectedly running a jar, failing") +} + +func succeedRun(_, jar string, _ ...string) (jars.Process, error) { + return &testProcess{jar: jar}, nil +} + +// TestExpansionServices_GetAddr_Addresses tests calling GetAddr on provided addresses. +func TestExpansionServices_GetAddr_Addresses(t *testing.T) { + addrsMap := map[string]string{ + "label1": "testAddr1", + "label2": "testAddr2", + "label3": "testAddr3", + } + jarsMap := map[string]string{ + "label2": "jarFilepath2", + } + es := &ExpansionServices{ + addrs: addrsMap, + jars: jarsMap, + procs: make([]jars.Process, 0), + run: failRun, + } + + // Ensure we get the same map we put in, and that addresses take priority over jars if + // both are given for the same label. + for label, wantAddr := range addrsMap { + gotAddr, err := es.GetAddr(label) + if err != nil { + t.Errorf("unexpected error when getting address for \"%v\": %v", label, err) + continue + } + if gotAddr != wantAddr { + t.Errorf("incorrect address for \"%v\", want %v, got %v", label, wantAddr, gotAddr) + } + } + // Check that nonexistent labels fail. + if _, err := es.GetAddr("nonexistent_label"); err == nil { + t.Errorf("did not receive error when calling GetAddr with nonexistent label") + } +} + +// TestExpansionServices_GetAddr_Jars tests calling GetAddr on provided jars. +func TestExpansionServices_GetAddr_Jars(t *testing.T) { + addrsMap := map[string]string{} + jarsMap := map[string]string{ + "label1": "jarFilepath1", + "label2": "jarFilepath2", + "label3": "jarFilepath3", + } + es := &ExpansionServices{ + addrs: addrsMap, + jars: jarsMap, + procs: make([]jars.Process, 0), + run: succeedRun, + } + + // Call GetAddr on each jar twice, checking that the addresses remain consistent. + gotMap := make(map[string]string) + for label := range jarsMap { + gotAddr, err := es.GetAddr(label) + if err != nil { + t.Errorf("unexpected error when getting address for \"%v\": %v", label, err) + continue + } + gotMap[label] = gotAddr + } + for label, gotAddr := range gotMap { + secondAddr, err := es.GetAddr(label) + if err != nil { + t.Errorf("unexpected error when getting address for \"%v\": %v", label, err) + continue + } + if secondAddr != gotAddr { + t.Errorf("getAddr returned different address when called twice for \"%v\", "+ + "attempt 1: %v, attempt 2: %v", label, gotAddr, secondAddr) + } + } + // Check that all jars were run. + gotJars := make([]string, 0) + for _, proc := range es.procs { + testProc := proc.(*testProcess) + gotJars = append(gotJars, testProc.jar) + } + wantJars := make([]string, 0) + for _, jar := range jarsMap { + wantJars = append(wantJars, jar) + } + if diff := cmp.Diff(wantJars, gotJars); diff != "" { Review comment: You'll need to add the unordered compare here, *or* sort the slices first. TBH I'd just sort the slices. ########## File path: sdks/go/test/integration/expansions.go ########## @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "fmt" + "strconv" + "time" + + "github.com/apache/beam/sdks/v2/go/test/integration/internal/jars" + "github.com/apache/beam/sdks/v2/go/test/integration/internal/ports" +) + +// ExpansionServices is a struct used for getting addresses and starting expansion services, based +// on the --expansion_jar and --expansion_addr flags in this package. The main reason to use this +// instead of accessing the flags directly is to let it handle jar startup and shutdown. +// +// Usage +// +// Create an ExpansionServices object in TestMain with NewExpansionServices. Then use GetAddr for +// every expansion service needed for the test. Call Shutdown on it before finishing TestMain (or +// simply defer a call to it). +// +// ExpansionServices is not concurrency safe, and so a single instance should not be used within +// multiple individual tests, due to the possibility of those tests being run concurrently. It is +// recommended to only use ExpansionServices in TestMain to avoid this. +// +// Example: +// var retCode int +// defer func() { os.Exit(retCode) }() // Defer os.Exit so it happens after other defers. +// services := integration.NewExpansionServices() +// defer func() { services.Shutdown() }() +// addr, err := services.GetAddr("example") +// if err != nil { +// panic(err) +// } Review comment: Note that panic does unwind the stack and executes defers, so you may want to explicitly set retCode = 1 before the panic to make sure it exits with an error state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
