Really the main difference that makes it a little more complicated (but not terribly so) is Flume isn't windowed (along with the metadata). Beam would need to make that up front and available for use.
On Fri, Jan 31, 2025, 9:50 AM Robert Bradshaw via dev <dev@beam.apache.org> wrote: > On Fri, Jan 31, 2025 at 8:19 AM Joey Tran <joey.t...@schrodinger.com> > wrote: > >> Is there an equivalent to `PCollectionView` in python? >> >> > ... as there's not much novel one can do with a PObject vs. a singleton >> PCollection. >> >> Ah maybe I misunderstood how PObjects work. From the FlumeJava paper: >> >> >>> These features can be used to express a computation that needs >>> to iterate until the computed data converges: >>> >>> PCollection<Data> results = computeInitialApproximation(); >>> for (;;) { >>> results = computeNextApproximation(results); >>> PCollection<Boolean> haveConverged = >>> results.parallelDo(checkIfConvergedFn(), >>> collectionOf(booleans())); >>> PObject<Boolean> allHaveConverged = >>> haveConverged.combine(AND_BOOLS); >>> FlumeJava.run(); >>> if (allHaveConverged.getValue()) break; >>> } >>> ... continue working with converged results ... >> >> >> I had understood this to mean that the `PObject` will materialize the >> pcollection into in-memory values, which is maybe a little novel? At least, >> in the python SDK, I've always been writing elements to disk by transform >> and then reading it manually outside of the pipeline back into the original >> python objects. >> > > FWIW, this wasn't unique to PObjects in FlumeJava, one could do the same > for PCollections. While this is useful on the one hand, this notion of > "materialization" (especially combined with further execution) has > complexities in that keeping the main program alive now is essential to > pipeline completion (when this feature is used) as opposed to the "fire and > forget" model of Dataflow. (There were thoughts about doing runner-side > lazy graph expansion, but they never got fleshed out.) > > Note that one can easily write a PTransform that internally executes a > write and exposes an API to return the set of elements as in-memory objects > (using the coder that was used for write) post pipeline completion. One > would probably need to supply a distributed filestore to use, as there's > not really a good "default." This doesn't have to be provided as part of > Beam (though arguably it's a common enough usecase that maybe it should > be). There's also some exploration in this area with Python's interactive > utilities. One could conceivably even support this in streaming (though not > without providing manual details of a backing store, e.g. a cofka instance > or pubsub topic to use). > > >> On Fri, Jan 31, 2025 at 9:50 AM Kenneth Knowles <k...@apache.org> wrote: >> >>> Fun fact, this was one of my onboarding projects :-) >>> >>> It is the way it is because the invariant we need in order to apply >>> windowing independent of core business logic is: if you apply a transform >>> to windowed input, each window should contain the same output it would if >>> it were the entirety of the data. (composites are permitted to break >>> this rule, but the core compute primitives never do) >>> >>> And I guess it seemed wrong to call something an "Object" when it was >>> really an object per window. TBH the "P" was probably always a misnomer... >>> >>> Kenn >>> >>> On Thu, Jan 30, 2025 at 7:26 PM Robert Bradshaw via dev < >>> dev@beam.apache.org> wrote: >>> >>>> On Thu, Jan 30, 2025 at 4:25 PM Reuven Lax via dev <dev@beam.apache.org> >>>> wrote: >>>> >>>>> PCollectionView is the equivalent of PObject. Given that the Beam API >>>>> needed to work with the windowing model, we needed something like a >>>>> PObject >>>>> that could be windowed. This is what PCollectionView provides. >>>>> >>>> >>>> +1, I'd forgotten about the windowing complexities as well. >>>> >>>> >>>>> On Thu, Jan 30, 2025 at 4:20 PM Joey Tran <joey.t...@schrodinger.com> >>>>> wrote: >>>>> >>>>>> I read the FlumeJava paper and I was just curious what happened to >>>>>> PObjects. They seem like a useful construct. Do they exist in the java >>>>>> SDK >>>>>> in some version still? Or were they done away with because they made >>>>>> pipeline optimization more difficult? >>>>>> >>>>>