I had to scratch the itch this weekend, and now have Prism able to pay attention to Estimated Output Watermarks, and send elements downstream if the transform is not blocked by watermarks somewhere. Woohoo! Is it fully correct? Probably not, but it's a start.
Transforms are still executed one at a time, but this is currently a matter of implementation, rather than inability. Changing that implementation will occur with progress tracking and split request handling, which will be required by the separation harness tests. All data is presently cached and stored indefinitely in memory which prevents indefinite runs of a fully unbounded pipeline. Primary inputs consume & garbage collect data as the pipeline advances, but since side inputs are re-used they use their own thing, and the system isn't aware when it can be safely garbage collected. Big Everything PR is at https://github.com/apache/beam/pull/25391 Next up PRs: https://github.com/apache/beam/pull/25556 - Remaining initial job services. https://github.com/apache/beam/pull/25557 - Test DoFns for later use in pipelines. https://github.com/apache/beam/pull/25558 - Handling graph transformations for Combine & SDF composites, executing Flattens and GBKs On Thu, 16 Feb 2023 at 15:02, Robert Burke <lostl...@apache.org> wrote: > Next up: > > https://github.com/apache/beam/pull/25478 - Large PR for initial handling > of worker FnAPI surfaces. > https://github.com/apache/beam/pull/25518 - Tiny PR for handling basic > windowing strategies. > https://github.com/apache/beam/pull/25520 - Medium PR for adding the > graph preprocessor scaffolding > > > On 2023/02/15 05:41:51 Robert Burke via dev wrote: > > Here are the next two chunks! > > > > https://github.com/apache/beam/pull/25476 - Coder / element / bytes > > handling internally for prism. > > https://github.com/apache/beam/pull/25478 - Worker fnAPI handling. > > > > Took a bit to get a baseline of unit testing in for these, since they > were > > covered by whole pipeline runs. > > Coders in particular, since they currently live in the package with the > > pipeline tests, so it was harder to ensure > > coverage in a vacuum. > > > > But they did force a bit of documentation improvements, and a neglected > > inefficiency I had in the original coder structure. > > > > So small pain now, but will make sure future development is a bit easier, > > as convenient as "just write a pipeline" is for testing. > > Sometimes you just want to ensure the protocol works. > > > > On Thu, Feb 9, 2023 at 2:50 PM Kenneth Knowles <k...@apache.org> wrote: > > > > > Just a +100 to the idea of this runner. Having an easy-to-read, > > > portable-execution, batch & streaming, parallel, local runner, that > > > exercises plenty of advanced model features... solid gold! > > > > > > On Thu, Feb 9, 2023 at 12:01 PM Robert Burke via dev < > dev@beam.apache.org> > > > wrote: > > > > > >> Here are the first of the smaller PRs: > > >> > > >> https://github.com/apache/beam/pull/25404 -> Adds READMEs and updates > > >> go.mod so later changes don't collide there. > > >> https://github.com/apache/beam/pull/25405 -> Adds internal/urns > package > > >> for extracting URNs from the protos. > > >> https://github.com/apache/beam/pull/25406 -> Adds internal/config > > >> package for parsing and accessing the configuration of variants and > > >> handlers in the runner. > > >> > > >> These are independant changes, and small enough for quicker review. > The > > >> remaining larger packages can be submitted more piecemeal once these > are in. > > >> > > >> > > >> > > >> On Wed, Feb 8, 2023 at 3:23 PM Robert Burke <lostl...@apache.org> > wrote: > > >> > > >>> Hello Beam! > > >>> > > >>> == tl;dr; == > > >>> > > >>> I wrote a local, portable Beam runner in Go to replace the Go direct > > >>> runner. I'd like to contribute it to the Beam Repo. The Big PR with > > >>> everything is here: https://github.com/apache/beam/pull/25391 > > >>> > > >>> I'll be sending smaller PRs out for review to get it into the repo. > Take > > >>> a look at the big one, don't mind the mess, but do ask questions, or > offer > > >>> constructive suggestions to make it clearer. There are ample TODOs > that > > >>> could be added. This thread will be kept up to date with the > progress. > > >>> > > >>> Highlights: > > >>> Avoids false positive issues the Go Direct runner has, especially > around > > >>> serialization issues. > > >>> Single transform at a time execution. > > >>> Watermark propagation through Graph for GBKs and Side Input > windowing. > > >>> Will be capable of testing the whole Go SDK, in time. > > >>> Will be capable of being a stand alone single binary runner, in time. > > >>> ++Many opportunities for contribution after getting into the repo!++ > > >>> > > >>> Lowlights: > > >>> Only for Go SDK, for now. > > >>> ~~Many unimplemented features~~ > > >>> > > >>> Where to start reading? > > >>> > > >>> Vision README: > > >>> > https://github.com/apache/beam/blob/9044f2d4ae151f4222a2f3e0a3264c1198040181/sdks/go/pkg/beam/runners/prism/README.md > > >>> > > >>> > > >>> Code Structure README: > > >>> > https://github.com/apache/beam/blob/9044f2d4ae151f4222a2f3e0a3264c1198040181/sdks/go/pkg/beam/runners/prism/internal/README.md > > >>> > > >>> > > >>> executePipeline entrypoint: > > >>> > https://github.com/apache/beam/blob/9044f2d4ae151f4222a2f3e0a3264c1198040181/sdks/go/pkg/beam/runners/prism/internal/execute.go#L41 > > >>> > > >>> > > >>> > > >>> == The long version == > > >>> > > >>> Since last year, I was puttering away at making a Portable Beam > Runner > > >>> authored in Go. Partly because I wanted to learn the "runner" half > of beam, > > >>> and partly because the Go Direct Runner (and most other direct > runners), > > >>> are not good at testing. > > >>> > > >>> I managed to get it roughly ready for basic batch execution by end of > > >>> February 2022 , and then 2022 got away from me. And I couldn't pick > it up > > >>> until the end of the year. > > >>> > > >>> I gave a talk about this at Beam Summit 2022 > > >>> https://2022.beamsummit.org/sessions/portable-go-beam-runner/ that > > >>> covers my motivation for it. Loosely, Beam has a Testing Problem. > There are > > >>> large parts of Beam execution that matter for real world performance > and > > >>> correctness, but the facilities to test these don't exist. For > example, > > >>> take Combiner Lifting, if a combiner is unlifted, but implements > > >>> AddInput... then Merge is never called, leaving it untested. And the > user > > >>> has no control over this, or may not even be aware of it. How a DoFn > is > > >>> executed matters for coverage, and user confidence. In particular > for > > >>> Streaming jobs, users will tend to try things out on their Prod > runner, but > > >>> that doesn't help if one is testing on local Flink, but executing on > Google > > >>> Cloud Dataflow, which behave very differently. > > >>> > > >>> Regardless of whether you agree with that thesis... I wanted to fill > > >>> that gap. I wanted a runner that could be configured to test those > > >>> situations, and in particular, make it easier to develop SDKs and > all the > > >>> features of Beam that don't get their own blog posts. > > >>> > > >>> Especially for the Go SDK. Java, being the oldest, has arguably the > only > > >>> "correct" beam runner, in the form of the Java Direct Runner. But > one can't > > >>> execute Go pipelines on that. Python has a portable execution of its > > >>> runner, but the current state of python is Parallelism hostile at > best. It > > >>> supports a great many things, like Cross Language, but can't support > > >>> streaming execution (ProcessContinations etc) at present. Also, > being a > > >>> large Python program, it's harder to follow. The Java Direct > runner, while > > >>> being slightly easier to follow, doesn't have a clear execution flow. > > >>> Neither of them are particularly easy for Non Language Experts to > stand up > > >>> and use, especially outside of the Beam repo. > > >>> > > >>> The Go SDK's Direct Runner has many flaws, most of which are due to > > >>> Direct execution, rather than Portable Execution. Implementing > features > > >>> largely meant hacking certain things in, so they would be able to be > > >>> executed. This also made supporting and testing Cross Language > Transforms, > > >>> State and Timers in Go pipelines a non-starter for users. And that's > just > > >>> the tip. > > >>> > > >>> So I wanted something better. I mentioned it a few times to others, > but > > >>> I kept hearing the same refrain: "I want something that does that". > Or at > > >>> least they wanted something simpler to understand to hack against > > >>> themselves. > > >>> > > >>> I added more tests, and implemented more features, filed a tracking > > >>> issue (https://github.com/apache/beam/issues/24789), re-wrote the > whole > > >>> engine from scratch to actually support watermarks, and terminating > Process > > >>> Continuations. In the process I found and fixed some bugs in the Go > SDK > > >>> too. Many conversations were happening so I could understand how > things are > > >>> supposed to work. > > >>> > > >>> At some point, chatting with Jack McCluskey (@jrmccluskey) we came up > > >>> with the name Prism (among many many other contenders). I wanted the > runner > > >>> to be able to split up and examine the different components of Beam > and > > >>> combine them in different ways, and Prisms do that for Beams of > light. The > > >>> name stuck as the best one. Formally, it will be the Apache Beam > Prism > > >>> Runner. It can always be argued about > > >>> > > >>> And then I had to stop and clean it up. If I kept going. eventually > it > > >>> becomes too big to review. I've spent the last month, reorganizing > the code > > >>> ~6600+ lines of code and comments. I hope it's clear enough for > others to > > >>> follow at this point, and the initial review PRs will help keep > things > > >>> small. > > >>> > > >>> My expectations for now are to send out this email and have people > take > > >>> a first look at the BIG has everything PR: > > >>> https://github.com/apache/beam/pull/25391. That branch will be > > >>> canonical for other changes until everything is in the beam repo. It > will > > >>> be kept up to date with changes in the smaller PRs. > > >>> > > >>> If you'd like a place to start, I recommend the main README.md which > has > > >>> the rationale and goals. > > >>> > > >>> > https://github.com/apache/beam/blob/9044f2d4ae151f4222a2f3e0a3264c1198040181/sdks/go/pkg/beam/runners/prism/README.md > > >>> > > >>> > > >>> Follow that with the structure README.md in the internal directory. > > >>> > > >>> > https://github.com/apache/beam/blob/9044f2d4ae151f4222a2f3e0a3264c1198040181/sdks/go/pkg/beam/runners/prism/internal/README.md > > >>> > > >>> Finally, see how the (post job submission) part of how a job > executes, > > >>> starting with the executePipeline function: > > >>> > https://github.com/apache/beam/blob/9044f2d4ae151f4222a2f3e0a3264c1198040181/sdks/go/pkg/beam/runners/prism/internal/execute.go#L41 > > >>> > > >>> > > >>> In the meantime, as the code is going out for review, I'll be > improving > > >>> Unit Test coverage of the sub packages in the structure. When it was > still > > >>> a mono-package though, ~85% coverage was achieved via the test > pipelines. > > >>> > > >>> In the medium term, I'd like to get it working standalone, so any > user > > >>> with a Go install can get a working job runner with a quick `go > install > > >>> github.com/apache/beam/sdks/go/cmd/prism; prism`, and have it work > > >>> indefinitely with tiny scale streaming pipelines. > > >>> > > >>> Longer term, I'd like to get the Java Validates Runner tests > executing > > >>> on it, which will properly validate correctness of details I'm not > aware > > >>> of, and are not covered in the Go SDK integration tests. > > >>> > > >>> As stated, the primary purpose is to simplify testing of Beam > pipelines > > >>> (especially for the Go SDK) and SDK development. As long as it can > be used > > >>> for that, it can also be expanded to do more in time. It's not > expected to > > >>> become "the best" runner, let alone a distributed runner. > > >>> > > >>> I look forward to your thoughts! > > >>> > > >>> Robert Burke > > >>> Beam Go Busy Body > > >>> > > >>> > > >