Hello Beam!

== tl;dr; ==

I wrote a local, portable Beam runner in Go to replace the Go direct
runner.  I'd like to contribute it to the Beam Repo. The Big PR with
everything is here: https://github.com/apache/beam/pull/25391

I'll be sending smaller PRs out for review to get it into the repo. Take a
look at the big one, don't mind the mess, but do ask questions, or offer
constructive suggestions to make it clearer. There are ample TODOs that
could be added. This thread will be kept up to date with the progress.

Highlights:
Avoids false positive issues the Go Direct runner has, especially around
serialization issues.
Single transform at a time execution.
Watermark propagation through Graph for GBKs and Side Input windowing.
Will be capable of testing the whole Go SDK, in time.
Will be capable of being a stand alone single binary runner, in time.
++Many opportunities for contribution after getting into the repo!++

Lowlights:
Only for Go SDK, for now.
~~Many unimplemented features~~

Where to start reading?

Vision README:
https://github.com/apache/beam/blob/9044f2d4ae151f4222a2f3e0a3264c1198040181/sdks/go/pkg/beam/runners/prism/README.md


Code Structure README:
https://github.com/apache/beam/blob/9044f2d4ae151f4222a2f3e0a3264c1198040181/sdks/go/pkg/beam/runners/prism/internal/README.md


executePipeline entrypoint:
https://github.com/apache/beam/blob/9044f2d4ae151f4222a2f3e0a3264c1198040181/sdks/go/pkg/beam/runners/prism/internal/execute.go#L41



== The long version ==

Since last year, I was puttering away at making a Portable Beam Runner
authored in Go. Partly because I wanted to learn the "runner" half of beam,
and partly because the Go Direct Runner (and most other direct runners),
are not good at testing.

I managed to get it roughly ready for basic batch execution by end of
February 2022 , and then 2022 got away from me. And I couldn't pick it up
until the end of the year.

I gave a talk about this at Beam Summit 2022
https://2022.beamsummit.org/sessions/portable-go-beam-runner/ that covers
my motivation for it. Loosely, Beam has a Testing Problem. There are large
parts of Beam execution that matter for real world performance and
correctness, but the facilities to test these don't exist.  For example,
take Combiner Lifting, if a combiner is unlifted, but implements
AddInput... then Merge is never called, leaving it untested. And the user
has no control over this, or may not even be aware of it. How a DoFn is
executed matters for coverage, and user confidence.  In particular for
Streaming jobs, users will tend to try things out on their Prod runner, but
that doesn't help if one is testing on local Flink, but executing on Google
Cloud Dataflow, which behave very differently.

Regardless of whether you agree with that thesis...  I wanted to fill that
gap. I wanted a runner that could be configured to test those situations,
and in particular, make it easier to develop SDKs and all the features of
Beam that don't get their own blog posts.

Especially for the Go SDK. Java, being the oldest, has arguably the only
"correct" beam runner, in the form of the Java Direct Runner. But one can't
execute Go pipelines on that. Python has a portable execution of its
runner, but the current state of python is Parallelism hostile at best. It
supports a great many things, like Cross Language, but can't support
streaming execution (ProcessContinations etc) at present. Also, being a
large Python program, it's harder to follow.  The Java Direct runner, while
being slightly easier to follow, doesn't have a clear execution flow.
Neither of them are particularly easy for Non Language Experts to stand up
and use, especially outside of the Beam repo.

The Go SDK's Direct Runner has many flaws, most of which are due to Direct
execution, rather than Portable Execution.  Implementing features largely
meant hacking certain things in, so they would be able to be executed. This
also made supporting and testing Cross Language Transforms, State and
Timers in Go pipelines a non-starter for users. And that's just the tip.

So I wanted something better. I mentioned it a few times to others, but I
kept hearing the same refrain: "I want something that does that". Or at
least they wanted something simpler to understand to hack against
themselves.

I added more tests, and implemented more features, filed a tracking issue (
https://github.com/apache/beam/issues/24789), re-wrote the whole engine
from scratch to actually support watermarks, and terminating Process
Continuations. In the process I found and fixed some bugs in the Go SDK
too. Many conversations were happening so I could understand how things are
supposed to work.

At some point, chatting with Jack McCluskey (@jrmccluskey) we came up with
the name Prism (among many many other contenders). I wanted the runner to
be able to split up and examine the different components of Beam and
combine them in different ways, and Prisms do that for Beams of light. The
name stuck as the best one. Formally, it will be the Apache Beam Prism
Runner. It can always be argued about

And then I had to stop and clean it up. If I kept going. eventually it
becomes too big to review. I've spent the last month, reorganizing the code
~6600+ lines of code and comments. I hope it's clear enough for others to
follow at this point, and the initial review PRs will help keep things
small.

My expectations for now are to send out this email and have people take a
first look at the BIG has everything PR:
https://github.com/apache/beam/pull/25391. That branch will be canonical
for other changes until everything is in the beam repo. It will be kept up
to date with changes in the smaller PRs.

If you'd like a place to start, I recommend the main README.md which has
the rationale and goals.
https://github.com/apache/beam/blob/9044f2d4ae151f4222a2f3e0a3264c1198040181/sdks/go/pkg/beam/runners/prism/README.md


Follow that with the structure README.md in the internal directory.
https://github.com/apache/beam/blob/9044f2d4ae151f4222a2f3e0a3264c1198040181/sdks/go/pkg/beam/runners/prism/internal/README.md

Finally, see how the (post job submission) part of how a job executes,
starting with the executePipeline function:
https://github.com/apache/beam/blob/9044f2d4ae151f4222a2f3e0a3264c1198040181/sdks/go/pkg/beam/runners/prism/internal/execute.go#L41


In the meantime, as the code is going out for review, I'll be improving
Unit Test coverage of the sub packages in the structure. When it was still
a mono-package though, ~85% coverage was achieved via the test pipelines.

In the medium term, I'd like to get it working standalone, so any user with
a Go install can get a working job runner with a quick `go install
github.com/apache/beam/sdks/go/cmd/prism; prism`, and have it work
indefinitely with tiny scale streaming pipelines.

Longer term, I'd like to get the Java Validates Runner tests executing on
it, which will properly validate correctness of details I'm not aware of,
and are not covered in the Go SDK integration tests.

As stated, the primary purpose is to simplify testing of Beam pipelines
(especially for the Go SDK) and SDK development. As long as it can be used
for that, it can also be expanded to do more in time. It's not expected to
become "the best" runner, let alone a distributed runner.

I look forward to your thoughts!

Robert Burke
Beam Go Busy Body

Reply via email to