Hello Beam! == tl;dr; ==
I wrote a local, portable Beam runner in Go to replace the Go direct runner. I'd like to contribute it to the Beam Repo. The Big PR with everything is here: https://github.com/apache/beam/pull/25391 I'll be sending smaller PRs out for review to get it into the repo. Take a look at the big one, don't mind the mess, but do ask questions, or offer constructive suggestions to make it clearer. There are ample TODOs that could be added. This thread will be kept up to date with the progress. Highlights: Avoids false positive issues the Go Direct runner has, especially around serialization issues. Single transform at a time execution. Watermark propagation through Graph for GBKs and Side Input windowing. Will be capable of testing the whole Go SDK, in time. Will be capable of being a stand alone single binary runner, in time. ++Many opportunities for contribution after getting into the repo!++ Lowlights: Only for Go SDK, for now. ~~Many unimplemented features~~ Where to start reading? Vision README: https://github.com/apache/beam/blob/9044f2d4ae151f4222a2f3e0a3264c1198040181/sdks/go/pkg/beam/runners/prism/README.md Code Structure README: https://github.com/apache/beam/blob/9044f2d4ae151f4222a2f3e0a3264c1198040181/sdks/go/pkg/beam/runners/prism/internal/README.md executePipeline entrypoint: https://github.com/apache/beam/blob/9044f2d4ae151f4222a2f3e0a3264c1198040181/sdks/go/pkg/beam/runners/prism/internal/execute.go#L41 == The long version == Since last year, I was puttering away at making a Portable Beam Runner authored in Go. Partly because I wanted to learn the "runner" half of beam, and partly because the Go Direct Runner (and most other direct runners), are not good at testing. I managed to get it roughly ready for basic batch execution by end of February 2022 , and then 2022 got away from me. And I couldn't pick it up until the end of the year. I gave a talk about this at Beam Summit 2022 https://2022.beamsummit.org/sessions/portable-go-beam-runner/ that covers my motivation for it. Loosely, Beam has a Testing Problem. There are large parts of Beam execution that matter for real world performance and correctness, but the facilities to test these don't exist. For example, take Combiner Lifting, if a combiner is unlifted, but implements AddInput... then Merge is never called, leaving it untested. And the user has no control over this, or may not even be aware of it. How a DoFn is executed matters for coverage, and user confidence. In particular for Streaming jobs, users will tend to try things out on their Prod runner, but that doesn't help if one is testing on local Flink, but executing on Google Cloud Dataflow, which behave very differently. Regardless of whether you agree with that thesis... I wanted to fill that gap. I wanted a runner that could be configured to test those situations, and in particular, make it easier to develop SDKs and all the features of Beam that don't get their own blog posts. Especially for the Go SDK. Java, being the oldest, has arguably the only "correct" beam runner, in the form of the Java Direct Runner. But one can't execute Go pipelines on that. Python has a portable execution of its runner, but the current state of python is Parallelism hostile at best. It supports a great many things, like Cross Language, but can't support streaming execution (ProcessContinations etc) at present. Also, being a large Python program, it's harder to follow. The Java Direct runner, while being slightly easier to follow, doesn't have a clear execution flow. Neither of them are particularly easy for Non Language Experts to stand up and use, especially outside of the Beam repo. The Go SDK's Direct Runner has many flaws, most of which are due to Direct execution, rather than Portable Execution. Implementing features largely meant hacking certain things in, so they would be able to be executed. This also made supporting and testing Cross Language Transforms, State and Timers in Go pipelines a non-starter for users. And that's just the tip. So I wanted something better. I mentioned it a few times to others, but I kept hearing the same refrain: "I want something that does that". Or at least they wanted something simpler to understand to hack against themselves. I added more tests, and implemented more features, filed a tracking issue ( https://github.com/apache/beam/issues/24789), re-wrote the whole engine from scratch to actually support watermarks, and terminating Process Continuations. In the process I found and fixed some bugs in the Go SDK too. Many conversations were happening so I could understand how things are supposed to work. At some point, chatting with Jack McCluskey (@jrmccluskey) we came up with the name Prism (among many many other contenders). I wanted the runner to be able to split up and examine the different components of Beam and combine them in different ways, and Prisms do that for Beams of light. The name stuck as the best one. Formally, it will be the Apache Beam Prism Runner. It can always be argued about And then I had to stop and clean it up. If I kept going. eventually it becomes too big to review. I've spent the last month, reorganizing the code ~6600+ lines of code and comments. I hope it's clear enough for others to follow at this point, and the initial review PRs will help keep things small. My expectations for now are to send out this email and have people take a first look at the BIG has everything PR: https://github.com/apache/beam/pull/25391. That branch will be canonical for other changes until everything is in the beam repo. It will be kept up to date with changes in the smaller PRs. If you'd like a place to start, I recommend the main README.md which has the rationale and goals. https://github.com/apache/beam/blob/9044f2d4ae151f4222a2f3e0a3264c1198040181/sdks/go/pkg/beam/runners/prism/README.md Follow that with the structure README.md in the internal directory. https://github.com/apache/beam/blob/9044f2d4ae151f4222a2f3e0a3264c1198040181/sdks/go/pkg/beam/runners/prism/internal/README.md Finally, see how the (post job submission) part of how a job executes, starting with the executePipeline function: https://github.com/apache/beam/blob/9044f2d4ae151f4222a2f3e0a3264c1198040181/sdks/go/pkg/beam/runners/prism/internal/execute.go#L41 In the meantime, as the code is going out for review, I'll be improving Unit Test coverage of the sub packages in the structure. When it was still a mono-package though, ~85% coverage was achieved via the test pipelines. In the medium term, I'd like to get it working standalone, so any user with a Go install can get a working job runner with a quick `go install github.com/apache/beam/sdks/go/cmd/prism; prism`, and have it work indefinitely with tiny scale streaming pipelines. Longer term, I'd like to get the Java Validates Runner tests executing on it, which will properly validate correctness of details I'm not aware of, and are not covered in the Go SDK integration tests. As stated, the primary purpose is to simplify testing of Beam pipelines (especially for the Go SDK) and SDK development. As long as it can be used for that, it can also be expanded to do more in time. It's not expected to become "the best" runner, let alone a distributed runner. I look forward to your thoughts! Robert Burke Beam Go Busy Body