This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch prismContainerPrecommit
in repository https://gitbox.apache.org/repos/asf/beam.git

commit dd3c554beb5aa7e7503125afda63663ca430d6ee
Author: lostluck <[email protected]>
AuthorDate: Fri Sep 15 14:18:16 2023 -0700

    Add container using standalone prism go precommit.
---
 build.gradle.kts                                   |  4 ++++
 sdks/go/README.md                                  |  1 +
 sdks/go/cmd/prism/prism.go                         | 10 +++++----
 .../beam/runners/prism/internal/environments.go    | 16 ++++++++-------
 sdks/go/pkg/beam/runners/prism/internal/execute.go |  4 ++++
 .../beam/runners/prism/internal/jobservices/job.go |  4 ++++
 .../beam/runners/prism/internal/worker/worker.go   | 11 +++++-----
 sdks/go/pkg/beam/runners/prism/prism.go            |  6 +++---
 sdks/go/test/build.gradle                          | 24 ++++++++++++++++++++++
 sdks/go/test/integration/integration.go            |  4 +++-
 sdks/go/test/run_validatesrunner_tests.sh          | 18 +++++++++++-----
 11 files changed, 77 insertions(+), 25 deletions(-)

diff --git a/build.gradle.kts b/build.gradle.kts
index 0d23861a495..7bd84789529 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -443,6 +443,10 @@ tasks.register("goPortablePreCommit") {
   dependsOn(":sdks:go:test:ulrValidatesRunner")
 }
 
+tasks.register("goPrismPreCommit") {
+  dependsOn(":sdks:go:test:prismValidatesRunner")
+}
+
 tasks.register("goPostCommitDataflowARM") {
   dependsOn(":sdks:go:test:dataflowValidatesRunnerARM64")
 }
diff --git a/sdks/go/README.md b/sdks/go/README.md
index a3b03c2e618..7734d58d9eb 100644
--- a/sdks/go/README.md
+++ b/sdks/go/README.md
@@ -131,6 +131,7 @@ Executing all unit tests for the SDK is possible from the 
`<beam root>\sdks\go`
 To test your change as Jenkins would execute it from a PR, from the
 beam root directory, run:
  * `./gradlew :sdks:go:goTest` executes the unit tests.
+ * `./gradlew :sdks:go:test:prismValidatesRunner` validates the SDK against 
the Go Prism runner as a stand alone binary, with containers.
  * `./gradlew :sdks:go:test:ulrValidatesRunner` validates the SDK against the 
Portable Python runner.
  * `./gradlew :sdks:go:test:flinkValidatesRunner` validates the SDK against 
the Flink runner.
 
diff --git a/sdks/go/cmd/prism/prism.go b/sdks/go/cmd/prism/prism.go
index f00a16c9b2d..804ae0c2ab2 100644
--- a/sdks/go/cmd/prism/prism.go
+++ b/sdks/go/cmd/prism/prism.go
@@ -30,6 +30,8 @@ import (
 )
 
 var (
+       jobPort            = flag.Int("job_port", 8073, "specify the job 
management service port")
+       webPort            = flag.Int("web_port", 8074, "specify the web ui 
port")
        jobManagerEndpoint = flag.String("jm_override", "", "set to only stand 
up a web ui that refers to a seperate JobManagement endpoint")
        serveHTTP          = flag.Bool("serve_http", true, "enable or disable 
the web ui")
 )
@@ -37,12 +39,12 @@ var (
 func main() {
        flag.Parse()
        ctx := context.Background()
-       cli, err := makeJobClient(ctx, *jobManagerEndpoint)
+       cli, err := makeJobClient(ctx, prism.Options{Port: *jobPort}, 
*jobManagerEndpoint)
        if err != nil {
                log.Fatalf("error creating job server: %v", err)
        }
        if *serveHTTP {
-               if err := prism.CreateWebServer(ctx, cli, prism.Options{Port: 
8074}); err != nil {
+               if err := prism.CreateWebServer(ctx, cli, prism.Options{Port: 
*webPort}); err != nil {
                        log.Fatalf("error creating web server: %v", err)
                }
        } else {
@@ -51,7 +53,7 @@ func main() {
        }
 }
 
-func makeJobClient(ctx context.Context, endpoint string) 
(jobpb.JobServiceClient, error) {
+func makeJobClient(ctx context.Context, opts prism.Options, endpoint string) 
(jobpb.JobServiceClient, error) {
        if endpoint != "" {
                clientConn, err := grpc.DialContext(ctx, endpoint, 
grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
                if err != nil {
@@ -59,7 +61,7 @@ func makeJobClient(ctx context.Context, endpoint string) 
(jobpb.JobServiceClient
                }
                return jobpb.NewJobServiceClient(clientConn), nil
        }
-       cli, err := prism.CreateJobServer(ctx, prism.Options{Port: 8073})
+       cli, err := prism.CreateJobServer(ctx, opts)
        if err != nil {
                return nil, fmt.Errorf("error creating local job server: %v", 
err)
        }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments.go 
b/sdks/go/pkg/beam/runners/prism/internal/environments.go
index 5830325bd05..d4fb6ad5b3e 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/environments.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/environments.go
@@ -129,13 +129,15 @@ func dockerEnvironment(ctx context.Context, logger 
*slog.Logger, dp *pipepb.Dock
                        envs = append(envs, credEnv)
                }
        }
-
-       if rc, err := cli.ImagePull(ctx, dp.GetContainerImage(), 
dtyp.ImagePullOptions{}); err == nil {
-               // Copy the output, but discard it so we can wait until the 
image pull is finished.
-               io.Copy(io.Discard, rc)
-               rc.Close()
-       } else {
-               logger.Warn("unable to pull image", "error", err)
+       if _, _, err := cli.ImageInspectWithRaw(ctx, dp.GetContainerImage()); 
err != nil {
+               // We don't have a local image, so we should pull it.
+               if rc, err := cli.ImagePull(ctx, dp.GetContainerImage(), 
dtyp.ImagePullOptions{}); err == nil {
+                       // Copy the output, but discard it so we can wait until 
the image pull is finished.
+                       io.Copy(io.Discard, rc)
+                       rc.Close()
+               } else {
+                       logger.Warn("unable to pull image and it's not local", 
"error", err)
+               }
        }
 
        ccr, err := cli.ContainerCreate(ctx, &container.Config{
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go 
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index e0c67105d45..cf04381b9cb 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -81,10 +81,14 @@ func RunPipeline(j *jobservices.Job) {
 // makeWorker creates a worker for that environment.
 func makeWorker(env string, j *jobservices.Job) (*worker.W, error) {
        wk := worker.New(j.String()+"_"+env, env)
+
        wk.EnvPb = j.Pipeline.GetComponents().GetEnvironments()[env]
+       wk.PipelineOptions = j.PipelineOptions()
        wk.JobKey = j.JobKey()
        wk.ArtifactEndpoint = j.ArtifactEndpoint()
+
        go wk.Serve()
+
        if err := runEnvironment(j.RootCtx, j, env, wk); err != nil {
                return nil, fmt.Errorf("failed to start environment %v for job 
%v: %w", env, j, err)
        }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go 
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
index 87b0ec007bf..cd302a70fcc 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
@@ -94,6 +94,10 @@ func (j *Job) ArtifactEndpoint() string {
        return j.artifactEndpoint
 }
 
+func (j *Job) PipelineOptions() *structpb.Struct {
+       return j.options
+}
+
 // ContributeTentativeMetrics returns the datachannel read index, and any 
unknown monitoring short ids.
 func (j *Job) ContributeTentativeMetrics(payloads 
*fnpb.ProcessBundleProgressResponse) (int64, []string) {
        return j.metrics.ContributeTentativeMetrics(payloads)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go 
b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
index 3a862a143b7..f33ff178c46 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
@@ -43,6 +43,7 @@ import (
        "google.golang.org/grpc/codes"
        "google.golang.org/grpc/status"
        "google.golang.org/protobuf/encoding/prototext"
+       "google.golang.org/protobuf/types/known/structpb"
 )
 
 // A W manages worker environments, sending them work
@@ -59,6 +60,7 @@ type W struct {
 
        JobKey, ArtifactEndpoint string
        EnvPb                    *pipepb.Environment
+       PipelineOptions          *structpb.Struct
 
        // Server management
        lis    net.Listener
@@ -163,7 +165,6 @@ func (wk *W) GetProvisionInfo(_ context.Context, _ 
*fnpb.GetProvisionInfoRequest
        }
        resp := &fnpb.GetProvisionInfoResponse{
                Info: &fnpb.ProvisionInfo{
-                       // TODO: Add the job's Pipeline options
                        // TODO: Include runner capabilities with the per job 
configuration.
                        RunnerCapabilities: []string{
                                urns.CapabilityMonitoringInfoShortIDs,
@@ -174,14 +175,14 @@ func (wk *W) GetProvisionInfo(_ context.Context, _ 
*fnpb.GetProvisionInfoRequest
                                Url: wk.ArtifactEndpoint,
                        },
 
-                       RetrievalToken: wk.JobKey,
-                       Dependencies:   wk.EnvPb.GetDependencies(),
-
-                       // TODO add this job's artifact Dependencies
+                       RetrievalToken:  wk.JobKey,
+                       Dependencies:    wk.EnvPb.GetDependencies(),
+                       PipelineOptions: wk.PipelineOptions,
 
                        Metadata: map[string]string{
                                "runner":         "prism",
                                "runner_version": core.SdkVersion,
+                               "variant":        "test",
                        },
                },
        }
diff --git a/sdks/go/pkg/beam/runners/prism/prism.go 
b/sdks/go/pkg/beam/runners/prism/prism.go
index 0be35ad5cc3..bcb7a3fb689 100644
--- a/sdks/go/pkg/beam/runners/prism/prism.go
+++ b/sdks/go/pkg/beam/runners/prism/prism.go
@@ -49,9 +49,9 @@ func Execute(ctx context.Context, p *beam.Pipeline) 
(beam.PipelineResult, error)
                s := jobservices.NewServer(0, internal.RunPipeline)
                *jobopts.Endpoint = s.Endpoint()
                go s.Serve()
-       }
-       if !jobopts.IsLoopback() {
-               *jobopts.EnvironmentType = "loopback"
+               if !jobopts.IsLoopback() {
+                       *jobopts.EnvironmentType = "loopback"
+               }
        }
        return universal.Execute(ctx, p)
 }
diff --git a/sdks/go/test/build.gradle b/sdks/go/test/build.gradle
index d5349119475..5b39cf81400 100644
--- a/sdks/go/test/build.gradle
+++ b/sdks/go/test/build.gradle
@@ -173,6 +173,30 @@ tasks.register("ulrValidatesRunner") {
   }
 }
 
+// ValidatesRunner tests for Prism. Runs tests in the integration directory
+// with prism in docker mod to validate that the runner behaves as expected.
+task prismValidatesRunner {
+  group = "Verification"
+
+  dependsOn ":sdks:go:test:goBuild"
+  dependsOn ":sdks:go:container:docker"
+  dependsOn ":sdks:java:container:java8:docker"
+  dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar"
+  doLast {
+    def pipelineOptions = [  // Pipeline options piped directly to Go SDK 
flags.
+        
"--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
+    ]
+    def options = [
+        "--runner prism",
+        "--pipeline_opts \"${pipelineOptions.join(' ')}\"",
+    ]
+    exec {
+      executable "sh"
+      args "-c", "./run_validatesrunner_tests.sh ${options.join(' ')}"
+    }
+  }
+}
+
 // A method for configuring a cross-language validates runner test task,
 // intended to be used in calls to createCrossLanguageValidatesRunnerTask.
 ext.goIoValidatesRunnerTask = { proj, name, scriptOpts, pipelineOpts ->
diff --git a/sdks/go/test/integration/integration.go 
b/sdks/go/test/integration/integration.go
index dee161dcb2a..bb7f5275a16 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -38,11 +38,11 @@ package integration
 import (
        "fmt"
        "math/rand"
+       "os"
        "regexp"
        "strings"
        "testing"
        "time"
-       "os"
 
        // common runner flag.
        "github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
@@ -140,6 +140,8 @@ var portableFilters = []string{
 }
 
 var prismFilters = []string{
+       // The prism runner does not yet support cross-language.
+       "TestXLang.*",
        // The prism runner does not support the TestStream primitive
        "TestTestStream.*",
        // The trigger and pane tests uses TestStream
diff --git a/sdks/go/test/run_validatesrunner_tests.sh 
b/sdks/go/test/run_validatesrunner_tests.sh
index c25d59bd57b..6f4c5c62182 100755
--- a/sdks/go/test/run_validatesrunner_tests.sh
+++ b/sdks/go/test/run_validatesrunner_tests.sh
@@ -257,8 +257,10 @@ print(s.getsockname()[1])
 s.close()
 "
 
+TMPDIR=$(mktemp -d)
+
 # Set up environment based on runner.
-if [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || "$RUNNER" == "samza" || 
"$RUNNER" == "portable" ]]; then
+if [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || "$RUNNER" == "samza" || 
"$RUNNER" == "portable" || "$RUNNER" == "prism" ]]; then
   if [[ -z "$ENDPOINT" ]]; then
     JOB_PORT=$(python3 -c "$SOCKET_SCRIPT")
     ENDPOINT="localhost:$JOB_PORT"
@@ -288,6 +290,12 @@ if [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || 
"$RUNNER" == "samza" || "$
       python3 \
           -m apache_beam.runners.portability.local_job_service_main \
           --port $JOB_PORT &
+    elif [[ "$RUNNER" == "prism" ]]; then
+      PRISMBIN=$TMPDIR/prismbin
+      ./sdks/go/run_with_go_version.sh build -o $PRISMBIN 
sdks/go/cmd/prism/*.go
+      $PRISMBIN \
+          --serve_http=true \
+          --job_port $JOB_PORT &
     else
       echo "Unknown runner: $RUNNER"
       exit 1;
@@ -340,7 +348,6 @@ if [[ "$RUNNER" == "dataflow" ]]; then
   gcloud --version
 
   # ensure gcloud is version 186 or above
-  TMPDIR=$(mktemp -d)
   gcloud_ver=$(gcloud -v | head -1 | awk '{print $4}')
   if [[ "$gcloud_ver" < "186" ]]
   then
@@ -402,6 +409,7 @@ fi
 ARGS="$ARGS -p $SIMULTANEOUS"
 
 # Assemble test arguments and pipeline options.
+ARGS="$ARGS -v"
 ARGS="$ARGS -timeout $TIMEOUT"
 ARGS="$ARGS --runner=$RUNNER"
 ARGS="$ARGS --project=$DATAFLOW_PROJECT"
@@ -449,9 +457,9 @@ if [[ "$RUNNER" == "dataflow" ]]; then
     docker rmi $JAVA_CONTAINER:$JAVA_TAG || echo "Failed to remove container"
     gcloud --quiet container images delete $JAVA_CONTAINER:$JAVA_TAG || echo 
"Failed to delete container"
   fi
-
-  # Clean up tempdir
-  rm -rf $TMPDIR
 fi
 
+# Clean up tempdir
+rm -rf $TMPDIR
+
 exit $TEST_EXIT_CODE

Reply via email to