This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0d37c6f45d8 [#28187] Add standalone prism validates runner precommit
(#28487)
0d37c6f45d8 is described below
commit 0d37c6f45d810f01907bdbcc424b621185a0033f
Author: Robert Burke <[email protected]>
AuthorDate: Mon Sep 18 11:18:36 2023 -0700
[#28187] Add standalone prism validates runner precommit (#28487)
* Add container using standalone prism go precommit.
* Add GoPrism precommit action.
* Don't serve prism UI by default.
---------
Co-authored-by: lostluck <[email protected]>
---
.github/workflows/beam_PreCommit_GoPrism.yml | 89 ++++++++++++++++++++++
build.gradle.kts | 4 +
sdks/go/README.md | 1 +
sdks/go/cmd/prism/prism.go | 10 ++-
.../beam/runners/prism/internal/environments.go | 16 ++--
sdks/go/pkg/beam/runners/prism/internal/execute.go | 4 +
.../beam/runners/prism/internal/jobservices/job.go | 4 +
.../beam/runners/prism/internal/worker/worker.go | 11 +--
sdks/go/pkg/beam/runners/prism/prism.go | 6 +-
sdks/go/test/build.gradle | 24 ++++++
sdks/go/test/integration/integration.go | 4 +-
sdks/go/test/run_validatesrunner_tests.sh | 16 ++--
12 files changed, 164 insertions(+), 25 deletions(-)
diff --git a/.github/workflows/beam_PreCommit_GoPrism.yml
b/.github/workflows/beam_PreCommit_GoPrism.yml
new file mode 100644
index 00000000000..8091c23792c
--- /dev/null
+++ b/.github/workflows/beam_PreCommit_GoPrism.yml
@@ -0,0 +1,89 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: PreCommit GoPrism
+
+on:
+ push:
+ tags: ['v*']
+ branches: ['master', 'release-*']
+ paths: ['model/**', 'sdks/go.**',
'release/**','.github/workflows/beam_PreCommit_GoPrism.yml']
+ pull_request_target:
+ branches: ['master', 'release-*']
+ paths: ['model/**', 'sdks/go.**', 'release/**']
+ issue_comment:
+ types: [created]
+ schedule:
+ - cron: '0 */6 * * *'
+ workflow_dispatch:
+
+# This allows a subsequently queued workflow run to interrupt previous runs
+concurrency:
+ group: '${{ github.workflow }} @ ${{ github.event.issue.number ||
github.event.pull_request.head.label || github.sha || github.head_ref ||
github.ref }}-${{ github.event.schedule || github.event.comment.body ||
github.event.sender.login}}'
+ cancel-in-progress: true
+
+env:
+ GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
+ GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
+ GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
+
+# Setting explicit permissions for the action to avoid the default permissions
which are `write-all` in case of pull_request_target event
+permissions:
+ actions: write
+ pull-requests: read
+ checks: read
+ contents: read
+ deployments: read
+ id-token: none
+ issues: read
+ discussions: read
+ packages: read
+ pages: read
+ repository-projects: read
+ security-events: read
+ statuses: read
+
+jobs:
+ beam_PreCommit_GoPrism:
+ name: ${{matrix.job_name}} (${{ matrix.job_phrase }})
+ runs-on: [self-hosted, ubuntu-20.04, main]
+ strategy:
+ matrix:
+ job_name: [beam_PreCommit_GoPrism]
+ job_phrase: [Run GoPrism PreCommit]
+ timeout-minutes: 120
+ if: |
+ github.event_name == 'push' ||
+ github.event_name == 'pull_request_target' ||
+ github.event_name == 'schedule' ||
+ github.event_name == 'workflow_dispatch' ||
+ github.event.comment.body == 'Run GoPrism PreCommit'
+ steps:
+ - uses: actions/checkout@v3
+ - name: Setup repository
+ uses: ./.github/actions/setup-action
+ with:
+ comment_phrase: ${{ matrix.job_phrase }}
+ github_token: ${{ secrets.GITHUB_TOKEN }}
+ github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
+ - name: Setup self-hosted
+ uses: ./.github/actions/setup-self-hosted-action
+ with:
+ requires-py-39: false
+ requires-go: false
+ - name: Run goPrismPreCommit script
+ uses: ./.github/actions/gradle-command-self-hosted-action
+ with:
+ gradle-command: :goPrismPreCommit
\ No newline at end of file
diff --git a/build.gradle.kts b/build.gradle.kts
index 0d23861a495..7bd84789529 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -443,6 +443,10 @@ tasks.register("goPortablePreCommit") {
dependsOn(":sdks:go:test:ulrValidatesRunner")
}
+tasks.register("goPrismPreCommit") {
+ dependsOn(":sdks:go:test:prismValidatesRunner")
+}
+
tasks.register("goPostCommitDataflowARM") {
dependsOn(":sdks:go:test:dataflowValidatesRunnerARM64")
}
diff --git a/sdks/go/README.md b/sdks/go/README.md
index a3b03c2e618..7734d58d9eb 100644
--- a/sdks/go/README.md
+++ b/sdks/go/README.md
@@ -131,6 +131,7 @@ Executing all unit tests for the SDK is possible from the
`<beam root>\sdks\go`
To test your change as Jenkins would execute it from a PR, from the
beam root directory, run:
* `./gradlew :sdks:go:goTest` executes the unit tests.
+ * `./gradlew :sdks:go:test:prismValidatesRunner` validates the SDK against
the Go Prism runner as a stand alone binary, with containers.
* `./gradlew :sdks:go:test:ulrValidatesRunner` validates the SDK against the
Portable Python runner.
* `./gradlew :sdks:go:test:flinkValidatesRunner` validates the SDK against
the Flink runner.
diff --git a/sdks/go/cmd/prism/prism.go b/sdks/go/cmd/prism/prism.go
index f00a16c9b2d..804ae0c2ab2 100644
--- a/sdks/go/cmd/prism/prism.go
+++ b/sdks/go/cmd/prism/prism.go
@@ -30,6 +30,8 @@ import (
)
var (
+ jobPort = flag.Int("job_port", 8073, "specify the job
management service port")
+ webPort = flag.Int("web_port", 8074, "specify the web ui
port")
jobManagerEndpoint = flag.String("jm_override", "", "set to only stand
up a web ui that refers to a seperate JobManagement endpoint")
serveHTTP = flag.Bool("serve_http", true, "enable or disable
the web ui")
)
@@ -37,12 +39,12 @@ var (
func main() {
flag.Parse()
ctx := context.Background()
- cli, err := makeJobClient(ctx, *jobManagerEndpoint)
+ cli, err := makeJobClient(ctx, prism.Options{Port: *jobPort},
*jobManagerEndpoint)
if err != nil {
log.Fatalf("error creating job server: %v", err)
}
if *serveHTTP {
- if err := prism.CreateWebServer(ctx, cli, prism.Options{Port:
8074}); err != nil {
+ if err := prism.CreateWebServer(ctx, cli, prism.Options{Port:
*webPort}); err != nil {
log.Fatalf("error creating web server: %v", err)
}
} else {
@@ -51,7 +53,7 @@ func main() {
}
}
-func makeJobClient(ctx context.Context, endpoint string)
(jobpb.JobServiceClient, error) {
+func makeJobClient(ctx context.Context, opts prism.Options, endpoint string)
(jobpb.JobServiceClient, error) {
if endpoint != "" {
clientConn, err := grpc.DialContext(ctx, endpoint,
grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
if err != nil {
@@ -59,7 +61,7 @@ func makeJobClient(ctx context.Context, endpoint string)
(jobpb.JobServiceClient
}
return jobpb.NewJobServiceClient(clientConn), nil
}
- cli, err := prism.CreateJobServer(ctx, prism.Options{Port: 8073})
+ cli, err := prism.CreateJobServer(ctx, opts)
if err != nil {
return nil, fmt.Errorf("error creating local job server: %v",
err)
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments.go
b/sdks/go/pkg/beam/runners/prism/internal/environments.go
index 5830325bd05..d4fb6ad5b3e 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/environments.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/environments.go
@@ -129,13 +129,15 @@ func dockerEnvironment(ctx context.Context, logger
*slog.Logger, dp *pipepb.Dock
envs = append(envs, credEnv)
}
}
-
- if rc, err := cli.ImagePull(ctx, dp.GetContainerImage(),
dtyp.ImagePullOptions{}); err == nil {
- // Copy the output, but discard it so we can wait until the
image pull is finished.
- io.Copy(io.Discard, rc)
- rc.Close()
- } else {
- logger.Warn("unable to pull image", "error", err)
+ if _, _, err := cli.ImageInspectWithRaw(ctx, dp.GetContainerImage());
err != nil {
+ // We don't have a local image, so we should pull it.
+ if rc, err := cli.ImagePull(ctx, dp.GetContainerImage(),
dtyp.ImagePullOptions{}); err == nil {
+ // Copy the output, but discard it so we can wait until
the image pull is finished.
+ io.Copy(io.Discard, rc)
+ rc.Close()
+ } else {
+ logger.Warn("unable to pull image and it's not local",
"error", err)
+ }
}
ccr, err := cli.ContainerCreate(ctx, &container.Config{
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index e0c67105d45..cf04381b9cb 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -81,10 +81,14 @@ func RunPipeline(j *jobservices.Job) {
// makeWorker creates a worker for that environment.
func makeWorker(env string, j *jobservices.Job) (*worker.W, error) {
wk := worker.New(j.String()+"_"+env, env)
+
wk.EnvPb = j.Pipeline.GetComponents().GetEnvironments()[env]
+ wk.PipelineOptions = j.PipelineOptions()
wk.JobKey = j.JobKey()
wk.ArtifactEndpoint = j.ArtifactEndpoint()
+
go wk.Serve()
+
if err := runEnvironment(j.RootCtx, j, env, wk); err != nil {
return nil, fmt.Errorf("failed to start environment %v for job
%v: %w", env, j, err)
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
index 87b0ec007bf..cd302a70fcc 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
@@ -94,6 +94,10 @@ func (j *Job) ArtifactEndpoint() string {
return j.artifactEndpoint
}
+func (j *Job) PipelineOptions() *structpb.Struct {
+ return j.options
+}
+
// ContributeTentativeMetrics returns the datachannel read index, and any
unknown monitoring short ids.
func (j *Job) ContributeTentativeMetrics(payloads
*fnpb.ProcessBundleProgressResponse) (int64, []string) {
return j.metrics.ContributeTentativeMetrics(payloads)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
index 3a862a143b7..f33ff178c46 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
@@ -43,6 +43,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/prototext"
+ "google.golang.org/protobuf/types/known/structpb"
)
// A W manages worker environments, sending them work
@@ -59,6 +60,7 @@ type W struct {
JobKey, ArtifactEndpoint string
EnvPb *pipepb.Environment
+ PipelineOptions *structpb.Struct
// Server management
lis net.Listener
@@ -163,7 +165,6 @@ func (wk *W) GetProvisionInfo(_ context.Context, _
*fnpb.GetProvisionInfoRequest
}
resp := &fnpb.GetProvisionInfoResponse{
Info: &fnpb.ProvisionInfo{
- // TODO: Add the job's Pipeline options
// TODO: Include runner capabilities with the per job
configuration.
RunnerCapabilities: []string{
urns.CapabilityMonitoringInfoShortIDs,
@@ -174,14 +175,14 @@ func (wk *W) GetProvisionInfo(_ context.Context, _
*fnpb.GetProvisionInfoRequest
Url: wk.ArtifactEndpoint,
},
- RetrievalToken: wk.JobKey,
- Dependencies: wk.EnvPb.GetDependencies(),
-
- // TODO add this job's artifact Dependencies
+ RetrievalToken: wk.JobKey,
+ Dependencies: wk.EnvPb.GetDependencies(),
+ PipelineOptions: wk.PipelineOptions,
Metadata: map[string]string{
"runner": "prism",
"runner_version": core.SdkVersion,
+ "variant": "test",
},
},
}
diff --git a/sdks/go/pkg/beam/runners/prism/prism.go
b/sdks/go/pkg/beam/runners/prism/prism.go
index 0be35ad5cc3..bcb7a3fb689 100644
--- a/sdks/go/pkg/beam/runners/prism/prism.go
+++ b/sdks/go/pkg/beam/runners/prism/prism.go
@@ -49,9 +49,9 @@ func Execute(ctx context.Context, p *beam.Pipeline)
(beam.PipelineResult, error)
s := jobservices.NewServer(0, internal.RunPipeline)
*jobopts.Endpoint = s.Endpoint()
go s.Serve()
- }
- if !jobopts.IsLoopback() {
- *jobopts.EnvironmentType = "loopback"
+ if !jobopts.IsLoopback() {
+ *jobopts.EnvironmentType = "loopback"
+ }
}
return universal.Execute(ctx, p)
}
diff --git a/sdks/go/test/build.gradle b/sdks/go/test/build.gradle
index d5349119475..5b39cf81400 100644
--- a/sdks/go/test/build.gradle
+++ b/sdks/go/test/build.gradle
@@ -173,6 +173,30 @@ tasks.register("ulrValidatesRunner") {
}
}
+// ValidatesRunner tests for Prism. Runs tests in the integration directory
+// with prism in docker mod to validate that the runner behaves as expected.
+task prismValidatesRunner {
+ group = "Verification"
+
+ dependsOn ":sdks:go:test:goBuild"
+ dependsOn ":sdks:go:container:docker"
+ dependsOn ":sdks:java:container:java8:docker"
+ dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar"
+ doLast {
+ def pipelineOptions = [ // Pipeline options piped directly to Go SDK
flags.
+
"--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
+ ]
+ def options = [
+ "--runner prism",
+ "--pipeline_opts \"${pipelineOptions.join(' ')}\"",
+ ]
+ exec {
+ executable "sh"
+ args "-c", "./run_validatesrunner_tests.sh ${options.join(' ')}"
+ }
+ }
+}
+
// A method for configuring a cross-language validates runner test task,
// intended to be used in calls to createCrossLanguageValidatesRunnerTask.
ext.goIoValidatesRunnerTask = { proj, name, scriptOpts, pipelineOpts ->
diff --git a/sdks/go/test/integration/integration.go
b/sdks/go/test/integration/integration.go
index dee161dcb2a..bb7f5275a16 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -38,11 +38,11 @@ package integration
import (
"fmt"
"math/rand"
+ "os"
"regexp"
"strings"
"testing"
"time"
- "os"
// common runner flag.
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
@@ -140,6 +140,8 @@ var portableFilters = []string{
}
var prismFilters = []string{
+ // The prism runner does not yet support cross-language.
+ "TestXLang.*",
// The prism runner does not support the TestStream primitive
"TestTestStream.*",
// The trigger and pane tests uses TestStream
diff --git a/sdks/go/test/run_validatesrunner_tests.sh
b/sdks/go/test/run_validatesrunner_tests.sh
index c25d59bd57b..fd4856f25a0 100755
--- a/sdks/go/test/run_validatesrunner_tests.sh
+++ b/sdks/go/test/run_validatesrunner_tests.sh
@@ -257,8 +257,10 @@ print(s.getsockname()[1])
s.close()
"
+TMPDIR=$(mktemp -d)
+
# Set up environment based on runner.
-if [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || "$RUNNER" == "samza" ||
"$RUNNER" == "portable" ]]; then
+if [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || "$RUNNER" == "samza" ||
"$RUNNER" == "portable" || "$RUNNER" == "prism" ]]; then
if [[ -z "$ENDPOINT" ]]; then
JOB_PORT=$(python3 -c "$SOCKET_SCRIPT")
ENDPOINT="localhost:$JOB_PORT"
@@ -288,6 +290,10 @@ if [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" ||
"$RUNNER" == "samza" || "$
python3 \
-m apache_beam.runners.portability.local_job_service_main \
--port $JOB_PORT &
+ elif [[ "$RUNNER" == "prism" ]]; then
+ PRISMBIN=$TMPDIR/prismbin
+ ./sdks/go/run_with_go_version.sh build -o $PRISMBIN
sdks/go/cmd/prism/*.go
+ $PRISMBIN --job_port $JOB_PORT &
else
echo "Unknown runner: $RUNNER"
exit 1;
@@ -340,7 +346,6 @@ if [[ "$RUNNER" == "dataflow" ]]; then
gcloud --version
# ensure gcloud is version 186 or above
- TMPDIR=$(mktemp -d)
gcloud_ver=$(gcloud -v | head -1 | awk '{print $4}')
if [[ "$gcloud_ver" < "186" ]]
then
@@ -402,6 +407,7 @@ fi
ARGS="$ARGS -p $SIMULTANEOUS"
# Assemble test arguments and pipeline options.
+ARGS="$ARGS -v"
ARGS="$ARGS -timeout $TIMEOUT"
ARGS="$ARGS --runner=$RUNNER"
ARGS="$ARGS --project=$DATAFLOW_PROJECT"
@@ -449,9 +455,9 @@ if [[ "$RUNNER" == "dataflow" ]]; then
docker rmi $JAVA_CONTAINER:$JAVA_TAG || echo "Failed to remove container"
gcloud --quiet container images delete $JAVA_CONTAINER:$JAVA_TAG || echo
"Failed to delete container"
fi
-
- # Clean up tempdir
- rm -rf $TMPDIR
fi
+# Clean up tempdir
+rm -rf $TMPDIR
+
exit $TEST_EXIT_CODE