This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d37c6f45d8 [#28187] Add standalone prism validates runner precommit 
(#28487)
0d37c6f45d8 is described below

commit 0d37c6f45d810f01907bdbcc424b621185a0033f
Author: Robert Burke <[email protected]>
AuthorDate: Mon Sep 18 11:18:36 2023 -0700

    [#28187] Add standalone prism validates runner precommit (#28487)
    
    * Add container using standalone prism go precommit.
    
    * Add GoPrism precommit action.
    
    * Don't serve prism UI by default.
    
    ---------
    
    Co-authored-by: lostluck <[email protected]>
---
 .github/workflows/beam_PreCommit_GoPrism.yml       | 89 ++++++++++++++++++++++
 build.gradle.kts                                   |  4 +
 sdks/go/README.md                                  |  1 +
 sdks/go/cmd/prism/prism.go                         | 10 ++-
 .../beam/runners/prism/internal/environments.go    | 16 ++--
 sdks/go/pkg/beam/runners/prism/internal/execute.go |  4 +
 .../beam/runners/prism/internal/jobservices/job.go |  4 +
 .../beam/runners/prism/internal/worker/worker.go   | 11 +--
 sdks/go/pkg/beam/runners/prism/prism.go            |  6 +-
 sdks/go/test/build.gradle                          | 24 ++++++
 sdks/go/test/integration/integration.go            |  4 +-
 sdks/go/test/run_validatesrunner_tests.sh          | 16 ++--
 12 files changed, 164 insertions(+), 25 deletions(-)

diff --git a/.github/workflows/beam_PreCommit_GoPrism.yml 
b/.github/workflows/beam_PreCommit_GoPrism.yml
new file mode 100644
index 00000000000..8091c23792c
--- /dev/null
+++ b/.github/workflows/beam_PreCommit_GoPrism.yml
@@ -0,0 +1,89 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: PreCommit GoPrism
+
+on:
+  push:
+    tags: ['v*']
+    branches: ['master', 'release-*']
+    paths: ['model/**', 'sdks/go.**', 
'release/**','.github/workflows/beam_PreCommit_GoPrism.yml']
+  pull_request_target:
+    branches: ['master', 'release-*']
+    paths: ['model/**', 'sdks/go.**', 'release/**']
+  issue_comment:
+    types: [created]
+  schedule:
+    - cron: '0 */6 * * *'
+  workflow_dispatch:
+
+# This allows a subsequently queued workflow run to interrupt previous runs
+concurrency:
+  group: '${{ github.workflow }} @ ${{ github.event.issue.number || 
github.event.pull_request.head.label || github.sha || github.head_ref || 
github.ref }}-${{ github.event.schedule || github.event.comment.body || 
github.event.sender.login}}'
+  cancel-in-progress: true
+
+env:
+  GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
+  GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
+  GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
+
+# Setting explicit permissions for the action to avoid the default permissions 
which are `write-all` in case of pull_request_target event
+permissions:
+  actions: write
+  pull-requests: read
+  checks: read
+  contents: read
+  deployments: read
+  id-token: none
+  issues: read
+  discussions: read
+  packages: read
+  pages: read
+  repository-projects: read
+  security-events: read
+  statuses: read
+
+jobs:
+  beam_PreCommit_GoPrism:
+    name: ${{matrix.job_name}} (${{ matrix.job_phrase }})
+    runs-on: [self-hosted, ubuntu-20.04, main]
+    strategy:
+      matrix:
+        job_name: [beam_PreCommit_GoPrism]
+        job_phrase: [Run GoPrism PreCommit]
+    timeout-minutes: 120
+    if: |
+      github.event_name == 'push' ||
+      github.event_name == 'pull_request_target' ||
+      github.event_name == 'schedule' ||
+      github.event_name == 'workflow_dispatch' ||
+      github.event.comment.body == 'Run GoPrism PreCommit'
+    steps:
+      - uses: actions/checkout@v3
+      - name: Setup repository
+        uses: ./.github/actions/setup-action
+        with:
+          comment_phrase: ${{ matrix.job_phrase }}
+          github_token: ${{ secrets.GITHUB_TOKEN }}
+          github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
+      - name: Setup self-hosted
+        uses: ./.github/actions/setup-self-hosted-action
+        with:
+          requires-py-39: false
+          requires-go: false
+      - name: Run goPrismPreCommit script
+        uses: ./.github/actions/gradle-command-self-hosted-action
+        with:
+          gradle-command: :goPrismPreCommit
\ No newline at end of file
diff --git a/build.gradle.kts b/build.gradle.kts
index 0d23861a495..7bd84789529 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -443,6 +443,10 @@ tasks.register("goPortablePreCommit") {
   dependsOn(":sdks:go:test:ulrValidatesRunner")
 }
 
+tasks.register("goPrismPreCommit") {
+  dependsOn(":sdks:go:test:prismValidatesRunner")
+}
+
 tasks.register("goPostCommitDataflowARM") {
   dependsOn(":sdks:go:test:dataflowValidatesRunnerARM64")
 }
diff --git a/sdks/go/README.md b/sdks/go/README.md
index a3b03c2e618..7734d58d9eb 100644
--- a/sdks/go/README.md
+++ b/sdks/go/README.md
@@ -131,6 +131,7 @@ Executing all unit tests for the SDK is possible from the 
`<beam root>\sdks\go`
 To test your change as Jenkins would execute it from a PR, from the
 beam root directory, run:
  * `./gradlew :sdks:go:goTest` executes the unit tests.
+ * `./gradlew :sdks:go:test:prismValidatesRunner` validates the SDK against 
the Go Prism runner as a stand alone binary, with containers.
  * `./gradlew :sdks:go:test:ulrValidatesRunner` validates the SDK against the 
Portable Python runner.
  * `./gradlew :sdks:go:test:flinkValidatesRunner` validates the SDK against 
the Flink runner.
 
diff --git a/sdks/go/cmd/prism/prism.go b/sdks/go/cmd/prism/prism.go
index f00a16c9b2d..804ae0c2ab2 100644
--- a/sdks/go/cmd/prism/prism.go
+++ b/sdks/go/cmd/prism/prism.go
@@ -30,6 +30,8 @@ import (
 )
 
 var (
+       jobPort            = flag.Int("job_port", 8073, "specify the job 
management service port")
+       webPort            = flag.Int("web_port", 8074, "specify the web ui 
port")
        jobManagerEndpoint = flag.String("jm_override", "", "set to only stand 
up a web ui that refers to a seperate JobManagement endpoint")
        serveHTTP          = flag.Bool("serve_http", true, "enable or disable 
the web ui")
 )
@@ -37,12 +39,12 @@ var (
 func main() {
        flag.Parse()
        ctx := context.Background()
-       cli, err := makeJobClient(ctx, *jobManagerEndpoint)
+       cli, err := makeJobClient(ctx, prism.Options{Port: *jobPort}, 
*jobManagerEndpoint)
        if err != nil {
                log.Fatalf("error creating job server: %v", err)
        }
        if *serveHTTP {
-               if err := prism.CreateWebServer(ctx, cli, prism.Options{Port: 
8074}); err != nil {
+               if err := prism.CreateWebServer(ctx, cli, prism.Options{Port: 
*webPort}); err != nil {
                        log.Fatalf("error creating web server: %v", err)
                }
        } else {
@@ -51,7 +53,7 @@ func main() {
        }
 }
 
-func makeJobClient(ctx context.Context, endpoint string) 
(jobpb.JobServiceClient, error) {
+func makeJobClient(ctx context.Context, opts prism.Options, endpoint string) 
(jobpb.JobServiceClient, error) {
        if endpoint != "" {
                clientConn, err := grpc.DialContext(ctx, endpoint, 
grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
                if err != nil {
@@ -59,7 +61,7 @@ func makeJobClient(ctx context.Context, endpoint string) 
(jobpb.JobServiceClient
                }
                return jobpb.NewJobServiceClient(clientConn), nil
        }
-       cli, err := prism.CreateJobServer(ctx, prism.Options{Port: 8073})
+       cli, err := prism.CreateJobServer(ctx, opts)
        if err != nil {
                return nil, fmt.Errorf("error creating local job server: %v", 
err)
        }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments.go 
b/sdks/go/pkg/beam/runners/prism/internal/environments.go
index 5830325bd05..d4fb6ad5b3e 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/environments.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/environments.go
@@ -129,13 +129,15 @@ func dockerEnvironment(ctx context.Context, logger 
*slog.Logger, dp *pipepb.Dock
                        envs = append(envs, credEnv)
                }
        }
-
-       if rc, err := cli.ImagePull(ctx, dp.GetContainerImage(), 
dtyp.ImagePullOptions{}); err == nil {
-               // Copy the output, but discard it so we can wait until the 
image pull is finished.
-               io.Copy(io.Discard, rc)
-               rc.Close()
-       } else {
-               logger.Warn("unable to pull image", "error", err)
+       if _, _, err := cli.ImageInspectWithRaw(ctx, dp.GetContainerImage()); 
err != nil {
+               // We don't have a local image, so we should pull it.
+               if rc, err := cli.ImagePull(ctx, dp.GetContainerImage(), 
dtyp.ImagePullOptions{}); err == nil {
+                       // Copy the output, but discard it so we can wait until 
the image pull is finished.
+                       io.Copy(io.Discard, rc)
+                       rc.Close()
+               } else {
+                       logger.Warn("unable to pull image and it's not local", 
"error", err)
+               }
        }
 
        ccr, err := cli.ContainerCreate(ctx, &container.Config{
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go 
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index e0c67105d45..cf04381b9cb 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -81,10 +81,14 @@ func RunPipeline(j *jobservices.Job) {
 // makeWorker creates a worker for that environment.
 func makeWorker(env string, j *jobservices.Job) (*worker.W, error) {
        wk := worker.New(j.String()+"_"+env, env)
+
        wk.EnvPb = j.Pipeline.GetComponents().GetEnvironments()[env]
+       wk.PipelineOptions = j.PipelineOptions()
        wk.JobKey = j.JobKey()
        wk.ArtifactEndpoint = j.ArtifactEndpoint()
+
        go wk.Serve()
+
        if err := runEnvironment(j.RootCtx, j, env, wk); err != nil {
                return nil, fmt.Errorf("failed to start environment %v for job 
%v: %w", env, j, err)
        }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go 
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
index 87b0ec007bf..cd302a70fcc 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
@@ -94,6 +94,10 @@ func (j *Job) ArtifactEndpoint() string {
        return j.artifactEndpoint
 }
 
+func (j *Job) PipelineOptions() *structpb.Struct {
+       return j.options
+}
+
 // ContributeTentativeMetrics returns the datachannel read index, and any 
unknown monitoring short ids.
 func (j *Job) ContributeTentativeMetrics(payloads 
*fnpb.ProcessBundleProgressResponse) (int64, []string) {
        return j.metrics.ContributeTentativeMetrics(payloads)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go 
b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
index 3a862a143b7..f33ff178c46 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
@@ -43,6 +43,7 @@ import (
        "google.golang.org/grpc/codes"
        "google.golang.org/grpc/status"
        "google.golang.org/protobuf/encoding/prototext"
+       "google.golang.org/protobuf/types/known/structpb"
 )
 
 // A W manages worker environments, sending them work
@@ -59,6 +60,7 @@ type W struct {
 
        JobKey, ArtifactEndpoint string
        EnvPb                    *pipepb.Environment
+       PipelineOptions          *structpb.Struct
 
        // Server management
        lis    net.Listener
@@ -163,7 +165,6 @@ func (wk *W) GetProvisionInfo(_ context.Context, _ 
*fnpb.GetProvisionInfoRequest
        }
        resp := &fnpb.GetProvisionInfoResponse{
                Info: &fnpb.ProvisionInfo{
-                       // TODO: Add the job's Pipeline options
                        // TODO: Include runner capabilities with the per job 
configuration.
                        RunnerCapabilities: []string{
                                urns.CapabilityMonitoringInfoShortIDs,
@@ -174,14 +175,14 @@ func (wk *W) GetProvisionInfo(_ context.Context, _ 
*fnpb.GetProvisionInfoRequest
                                Url: wk.ArtifactEndpoint,
                        },
 
-                       RetrievalToken: wk.JobKey,
-                       Dependencies:   wk.EnvPb.GetDependencies(),
-
-                       // TODO add this job's artifact Dependencies
+                       RetrievalToken:  wk.JobKey,
+                       Dependencies:    wk.EnvPb.GetDependencies(),
+                       PipelineOptions: wk.PipelineOptions,
 
                        Metadata: map[string]string{
                                "runner":         "prism",
                                "runner_version": core.SdkVersion,
+                               "variant":        "test",
                        },
                },
        }
diff --git a/sdks/go/pkg/beam/runners/prism/prism.go 
b/sdks/go/pkg/beam/runners/prism/prism.go
index 0be35ad5cc3..bcb7a3fb689 100644
--- a/sdks/go/pkg/beam/runners/prism/prism.go
+++ b/sdks/go/pkg/beam/runners/prism/prism.go
@@ -49,9 +49,9 @@ func Execute(ctx context.Context, p *beam.Pipeline) 
(beam.PipelineResult, error)
                s := jobservices.NewServer(0, internal.RunPipeline)
                *jobopts.Endpoint = s.Endpoint()
                go s.Serve()
-       }
-       if !jobopts.IsLoopback() {
-               *jobopts.EnvironmentType = "loopback"
+               if !jobopts.IsLoopback() {
+                       *jobopts.EnvironmentType = "loopback"
+               }
        }
        return universal.Execute(ctx, p)
 }
diff --git a/sdks/go/test/build.gradle b/sdks/go/test/build.gradle
index d5349119475..5b39cf81400 100644
--- a/sdks/go/test/build.gradle
+++ b/sdks/go/test/build.gradle
@@ -173,6 +173,30 @@ tasks.register("ulrValidatesRunner") {
   }
 }
 
+// ValidatesRunner tests for Prism. Runs tests in the integration directory
+// with prism in docker mod to validate that the runner behaves as expected.
+task prismValidatesRunner {
+  group = "Verification"
+
+  dependsOn ":sdks:go:test:goBuild"
+  dependsOn ":sdks:go:container:docker"
+  dependsOn ":sdks:java:container:java8:docker"
+  dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar"
+  doLast {
+    def pipelineOptions = [  // Pipeline options piped directly to Go SDK 
flags.
+        
"--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
+    ]
+    def options = [
+        "--runner prism",
+        "--pipeline_opts \"${pipelineOptions.join(' ')}\"",
+    ]
+    exec {
+      executable "sh"
+      args "-c", "./run_validatesrunner_tests.sh ${options.join(' ')}"
+    }
+  }
+}
+
 // A method for configuring a cross-language validates runner test task,
 // intended to be used in calls to createCrossLanguageValidatesRunnerTask.
 ext.goIoValidatesRunnerTask = { proj, name, scriptOpts, pipelineOpts ->
diff --git a/sdks/go/test/integration/integration.go 
b/sdks/go/test/integration/integration.go
index dee161dcb2a..bb7f5275a16 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -38,11 +38,11 @@ package integration
 import (
        "fmt"
        "math/rand"
+       "os"
        "regexp"
        "strings"
        "testing"
        "time"
-       "os"
 
        // common runner flag.
        "github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
@@ -140,6 +140,8 @@ var portableFilters = []string{
 }
 
 var prismFilters = []string{
+       // The prism runner does not yet support cross-language.
+       "TestXLang.*",
        // The prism runner does not support the TestStream primitive
        "TestTestStream.*",
        // The trigger and pane tests uses TestStream
diff --git a/sdks/go/test/run_validatesrunner_tests.sh 
b/sdks/go/test/run_validatesrunner_tests.sh
index c25d59bd57b..fd4856f25a0 100755
--- a/sdks/go/test/run_validatesrunner_tests.sh
+++ b/sdks/go/test/run_validatesrunner_tests.sh
@@ -257,8 +257,10 @@ print(s.getsockname()[1])
 s.close()
 "
 
+TMPDIR=$(mktemp -d)
+
 # Set up environment based on runner.
-if [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || "$RUNNER" == "samza" || 
"$RUNNER" == "portable" ]]; then
+if [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || "$RUNNER" == "samza" || 
"$RUNNER" == "portable" || "$RUNNER" == "prism" ]]; then
   if [[ -z "$ENDPOINT" ]]; then
     JOB_PORT=$(python3 -c "$SOCKET_SCRIPT")
     ENDPOINT="localhost:$JOB_PORT"
@@ -288,6 +290,10 @@ if [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || 
"$RUNNER" == "samza" || "$
       python3 \
           -m apache_beam.runners.portability.local_job_service_main \
           --port $JOB_PORT &
+    elif [[ "$RUNNER" == "prism" ]]; then
+      PRISMBIN=$TMPDIR/prismbin
+      ./sdks/go/run_with_go_version.sh build -o $PRISMBIN 
sdks/go/cmd/prism/*.go
+      $PRISMBIN --job_port $JOB_PORT &
     else
       echo "Unknown runner: $RUNNER"
       exit 1;
@@ -340,7 +346,6 @@ if [[ "$RUNNER" == "dataflow" ]]; then
   gcloud --version
 
   # ensure gcloud is version 186 or above
-  TMPDIR=$(mktemp -d)
   gcloud_ver=$(gcloud -v | head -1 | awk '{print $4}')
   if [[ "$gcloud_ver" < "186" ]]
   then
@@ -402,6 +407,7 @@ fi
 ARGS="$ARGS -p $SIMULTANEOUS"
 
 # Assemble test arguments and pipeline options.
+ARGS="$ARGS -v"
 ARGS="$ARGS -timeout $TIMEOUT"
 ARGS="$ARGS --runner=$RUNNER"
 ARGS="$ARGS --project=$DATAFLOW_PROJECT"
@@ -449,9 +455,9 @@ if [[ "$RUNNER" == "dataflow" ]]; then
     docker rmi $JAVA_CONTAINER:$JAVA_TAG || echo "Failed to remove container"
     gcloud --quiet container images delete $JAVA_CONTAINER:$JAVA_TAG || echo 
"Failed to delete container"
   fi
-
-  # Clean up tempdir
-  rm -rf $TMPDIR
 fi
 
+# Clean up tempdir
+rm -rf $TMPDIR
+
 exit $TEST_EXIT_CODE

Reply via email to