The main thing stopping a low impedance version of collection
materialization is the lack of a robust notion of common storage across the
submission program and different runners.

We almost have this with runner specific staging directories, but we don't
make them a canonical facet of Beam execution, or part of the existing
contracts.

Ultimately the moment you get to wantings something in memory, you'd just
need to write it out somewhere known and read it back in. Wether that's
expanding the Job Management with a way to request such objects, or hiding
the read & write mechanisms, and tagging a collection on job submission
that we'd like to get the data after the job completes.

On Fri, Jan 31, 2025, 9:06 AM Joey Tran <joey.t...@schrodinger.com> wrote:

> Yeah I briefly considered reimplementing PObject in python to make unit
> testing asserts a little more ergonomic (`results_pobject.getvalue() ==
> set([1, 2, 3])` feels more natural than `assert_that(results, equal_to([1,
> 2, 3]))`) but when I proposed it to some beam users at my company someone
> excitedly asked if they could start adding conditional forks in their
> pipeline and that made me a lot more hesitant.
>
> I suppose it wouldn't be that hard to have our internal distributed runner
> just check to see if there are any PObject transforms and reject a pipeline
> if it does.
>
> Anyways, I asked here because I was wondering if the same line of thought
> might've lead to the removal of PObject (just out of curiosity)
>
> On Fri, Jan 31, 2025 at 11:57 AM Kenneth Knowles <k...@apache.org> wrote:
>
>>
>>
>> On Fri, Jan 31, 2025 at 11:20 AM Joey Tran <joey.t...@schrodinger.com>
>> wrote:
>>
>>> Is there an equivalent to `PCollectionView` in python?
>>>
>>> > ... as there's not much novel one can do with a PObject vs. a
>>> singleton PCollection.
>>>
>>> Ah maybe I misunderstood how PObjects work. From the FlumeJava paper:
>>>
>>>
>>>> These features can be used to express a computation that needs
>>>> to iterate until the computed data converges:
>>>>
>>>> PCollection<Data> results = computeInitialApproximation();
>>>> for (;;) {
>>>>     results = computeNextApproximation(results);
>>>>     PCollection<Boolean> haveConverged =
>>>>         results.parallelDo(checkIfConvergedFn(),
>>>>                            collectionOf(booleans()));
>>>>     PObject<Boolean> allHaveConverged =
>>>>         haveConverged.combine(AND_BOOLS);
>>>>     FlumeJava.run();
>>>>     if (allHaveConverged.getValue()) break;
>>>> }
>>>> ... continue working with converged results ...
>>>
>>>
>>> I had understood this to mean that the `PObject`   will materialize the
>>> pcollection into in-memory values, which is maybe a little novel?
>>>
>>
>>>
>> At least, in the python SDK, I've always been writing elements to disk by
>>> transform and then reading it manually outside of the pipeline back into
>>> the original python objects.
>>>
>>
>> Ah, yes, the magic is more in the `FlumeJava.run()` which is equivalent
>> to doing a whole pipeline.run() and the PObject written to a place where it
>> can be then read by the main driver program. Our interactive runner and SQL
>> shell do this for in-memory pipelines, but we don't have anything like that
>> for our distributed runners. If I understand correctly, scio does something
>> along these lines. I think having some sort of "standard for storage
>> between driver main program and distributed pipeline execution" is
>> compatible with Beam, in theory. I also suspect it is very easy to
>> accidentally write a program that is weirdly slow in confusing ways,
>> because of the blurring of the different execution environments/phases.
>>
>> Kenn
>>
>>
>>> On Fri, Jan 31, 2025 at 9:50 AM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>> Fun fact, this was one of my onboarding projects :-)
>>>>
>>>> It is the way it is because the invariant we need in order to apply
>>>> windowing independent of core business logic is: if you apply a transform
>>>> to windowed input, each window should contain the same output it would if
>>>> it were the entirety of the data. (composites are permitted to break
>>>> this rule, but the core compute primitives never do)
>>>>
>>>> And I guess it seemed wrong to call something an "Object" when it was
>>>> really an object per window. TBH the "P" was probably always a misnomer...
>>>>
>>>> Kenn
>>>>
>>>> On Thu, Jan 30, 2025 at 7:26 PM Robert Bradshaw via dev <
>>>> dev@beam.apache.org> wrote:
>>>>
>>>>> On Thu, Jan 30, 2025 at 4:25 PM Reuven Lax via dev <
>>>>> dev@beam.apache.org> wrote:
>>>>>
>>>>>> PCollectionView is the equivalent of PObject. Given that the Beam API
>>>>>> needed to work with the windowing model, we needed something like a 
>>>>>> PObject
>>>>>> that could be windowed. This is what PCollectionView provides.
>>>>>>
>>>>>
>>>>> +1, I'd forgotten about the windowing complexities as well.
>>>>>
>>>>>
>>>>>> On Thu, Jan 30, 2025 at 4:20 PM Joey Tran <joey.t...@schrodinger.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I read the FlumeJava paper and I was just curious what happened to
>>>>>>> PObjects. They seem like a useful construct. Do they exist in the java 
>>>>>>> SDK
>>>>>>> in some version still? Or were they done away with because they made
>>>>>>> pipeline optimization more difficult?
>>>>>>>
>>>>>>

Reply via email to