[
https://issues.apache.org/jira/browse/BEAM-13857?focusedWorklogId=732790&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-732790
]
ASF GitHub Bot logged work on BEAM-13857:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 25/Feb/22 00:54
Start Date: 25/Feb/22 00:54
Worklog Time Spent: 10m
Work Description: lostluck commented on a change in pull request #16908:
URL: https://github.com/apache/beam/pull/16908#discussion_r814389427
##########
File path: sdks/go/test/integration/internal/jars/jars.go
##########
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package jars contains functionality for running jars for integration tests.
The main entry point
+// for running a jar is the Run function. The Process interface is used to
interact with the running
+// jars, and most importantly for shutting down the jars once finished with
them.
+package jars
+
+import (
+ "fmt"
+ "os/exec"
+)
+
+type runCallback func(duration, jar string, args ...string) (Process, error)
+
+var runner runCallback // Saves which behavior to use when Run is called.
+
+func init() {
+ runner = getRunner()
+}
+
+// getRunner is used to determine the appropriate behavior for run during
initialization time,
+// based on the OS and installed binaries of the system. This is returned as a
runCallback which
+// can be called whenever Run is called. If an error prevents Run from being
used at all (for
+// example, Java is not installed), then the runCallback will return that
error.
+func getRunner() runCallback {
+ // First check if we can even run jars.
+ _, err := exec.LookPath("java")
+ if err != nil {
+ err := fmt.Errorf("cannot run jar: 'java' command not
installed: %w", err)
+ return func(_, _ string, _ ...string) (Process, error) {
+ return nil, err
+ }
+ }
+
+ // Defer to OS-specific logic for checking for presence of timeout
command.
+ return getTimeoutRunner()
+}
+
+// Run runs a jar given an optional duration, a path to the jar, and any
desired arguments to the
+// jar. It returns a Process object which can be used to shut down the jar
once finished.
+//
+// The duration parameter is a duration for the timeout command which can be
used to automatically
+// kill the jar after a set duration, in order to avoid resource leakage. It
is described here:
+// https://man7.org/linux/man-pages/man1/timeout.1.html. If a duration is
provided but the system
+// is unable to use the timeout command, this function will return an error.
To indicate that a
+// duration isn't needed, pass in an empty string.
+func Run(duration, jar string, args ...string) (Process, error) {
Review comment:
I don't love the string "duration" field. Consider accepting a
`time.Duration` instead and handling the format conversion.
`time.Duration` even has a `Seconds` method which returns the duration, in
seconds, in floating value, which you can then simply `fmt.Sprintf("%fs")`
https://pkg.go.dev/time#Duration.Seconds
To avoid the additional plumbing change, you could even do that at this
early stage, and have a 'RunForDuration' that accepts the time.Duration
parameter, and this one, `Run` can avoid the parameter and pass the empty
string downstream.
This approach also eventually allows graceful support for the non-unix OSs
which might have differently specified timeout mechanisms, that we could then
take advantage of with alternative formats.
##########
File path: sdks/go/test/integration/expansions_test.go
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package integration
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/apache/beam/sdks/v2/go/test/integration/internal/jars"
+ "github.com/google/go-cmp/cmp"
+)
+
+type testProcess struct {
+ killed bool
+ jar string
+}
+
+func (p *testProcess) Kill() error {
+ p.killed = true
+ return nil
+}
+
+func failRun(_, _ string, _ ...string) (jars.Process, error) {
+ return nil, fmt.Errorf("unexpectedly running a jar, failing")
+}
+
+func succeedRun(_, jar string, _ ...string) (jars.Process, error) {
+ return &testProcess{jar: jar}, nil
+}
+
+// TestExpansionServices_GetAddr_Addresses tests calling GetAddr on provided
addresses.
+func TestExpansionServices_GetAddr_Addresses(t *testing.T) {
+ addrsMap := map[string]string{
+ "label1": "testAddr1",
+ "label2": "testAddr2",
+ "label3": "testAddr3",
+ }
+ jarsMap := map[string]string{
+ "label2": "jarFilepath2",
+ }
+ es := &ExpansionServices{
+ addrs: addrsMap,
+ jars: jarsMap,
+ procs: make([]jars.Process, 0),
+ run: failRun,
+ }
+
+ // Ensure we get the same map we put in, and that addresses take
priority over jars if
+ // both are given for the same label.
+ for label, wantAddr := range addrsMap {
+ gotAddr, err := es.GetAddr(label)
+ if err != nil {
+ t.Errorf("unexpected error when getting address for
\"%v\": %v", label, err)
+ continue
+ }
+ if gotAddr != wantAddr {
+ t.Errorf("incorrect address for \"%v\", want %v, got
%v", label, wantAddr, gotAddr)
+ }
+ }
+ // Check that nonexistent labels fail.
+ if _, err := es.GetAddr("nonexistent_label"); err == nil {
+ t.Errorf("did not receive error when calling GetAddr with
nonexistent label")
+ }
+}
+
+// TestExpansionServices_GetAddr_Jars tests calling GetAddr on provided jars.
+func TestExpansionServices_GetAddr_Jars(t *testing.T) {
+ addrsMap := map[string]string{}
+ jarsMap := map[string]string{
+ "label1": "jarFilepath1",
+ "label2": "jarFilepath2",
+ "label3": "jarFilepath3",
+ }
+ es := &ExpansionServices{
+ addrs: addrsMap,
+ jars: jarsMap,
+ procs: make([]jars.Process, 0),
+ run: succeedRun,
+ }
+
+ // Call GetAddr on each jar twice, checking that the addresses remain
consistent.
+ gotMap := make(map[string]string)
+ for label := range jarsMap {
+ gotAddr, err := es.GetAddr(label)
+ if err != nil {
+ t.Errorf("unexpected error when getting address for
\"%v\": %v", label, err)
+ continue
+ }
+ gotMap[label] = gotAddr
+ }
+ for label, gotAddr := range gotMap {
+ secondAddr, err := es.GetAddr(label)
+ if err != nil {
+ t.Errorf("unexpected error when getting address for
\"%v\": %v", label, err)
+ continue
+ }
+ if secondAddr != gotAddr {
+ t.Errorf("getAddr returned different address when
called twice for \"%v\", "+
+ "attempt 1: %v, attempt 2: %v", label, gotAddr,
secondAddr)
+ }
+ }
+ // Check that all jars were run.
+ gotJars := make([]string, 0)
+ for _, proc := range es.procs {
+ testProc := proc.(*testProcess)
+ gotJars = append(gotJars, testProc.jar)
+ }
+ wantJars := make([]string, 0)
+ for _, jar := range jarsMap {
+ wantJars = append(wantJars, jar)
+ }
+ if diff := cmp.Diff(wantJars, gotJars); diff != "" {
Review comment:
You'll need to add the unordered compare here, *or* sort the slices
first. TBH I'd just sort the slices.
##########
File path: sdks/go/test/integration/expansions.go
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package integration
+
+import (
+ "fmt"
+ "strconv"
+ "time"
+
+ "github.com/apache/beam/sdks/v2/go/test/integration/internal/jars"
+ "github.com/apache/beam/sdks/v2/go/test/integration/internal/ports"
+)
+
+// ExpansionServices is a struct used for getting addresses and starting
expansion services, based
+// on the --expansion_jar and --expansion_addr flags in this package. The main
reason to use this
+// instead of accessing the flags directly is to let it handle jar startup and
shutdown.
+//
+// Usage
+//
+// Create an ExpansionServices object in TestMain with NewExpansionServices.
Then use GetAddr for
+// every expansion service needed for the test. Call Shutdown on it before
finishing TestMain (or
+// simply defer a call to it).
+//
+// ExpansionServices is not concurrency safe, and so a single instance should
not be used within
+// multiple individual tests, due to the possibility of those tests being run
concurrently. It is
+// recommended to only use ExpansionServices in TestMain to avoid this.
+//
+// Example:
+// var retCode int
+// defer func() { os.Exit(retCode) }() // Defer os.Exit so it happens after
other defers.
+// services := integration.NewExpansionServices()
+// defer func() { services.Shutdown() }()
+// addr, err := services.GetAddr("example")
+// if err != nil {
+// panic(err)
+// }
Review comment:
Note that panic does unwind the stack and executes defers, so you may
want to explicitly set retCode = 1 before the panic to make sure it exits with
an error state.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 732790)
Time Spent: 1h (was: 50m)
> Add expansion service startup to Go integration test flags.
> -----------------------------------------------------------
>
> Key: BEAM-13857
> URL: https://issues.apache.org/jira/browse/BEAM-13857
> Project: Beam
> Issue Type: Bug
> Components: sdk-go
> Reporter: Ritesh Ghorse
> Assignee: Daniel Oliveira
> Priority: P2
> Time Spent: 1h
> Remaining Estimate: 0h
>
> Currently a separate debezium io expansion address flag needs to be passed to
> the runner when running cross-language debezium IO pipelines from Go SDK.
> Find a way to do this in a better way so that we could have it started along
> with java io expansion service while spinning up the test without bulking
> :sdks:java:io:expansion-service.
> In particular, needing to add a flag per expansion service jar to our
> integration tests will eventually become quite cluttered, so we may wish to
> settle on some kind of KV map flag approach instead to reduce copypasta code
> overhead.
> Edit: Decided on going with the KV map flag approach within the Go SDK
> instead of in a bash script, and moving expansion service startup into the
> codebase as well.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)