So the problem here is that windowFn is a property of the PCollection, not
the element, and the result of Flatten is a single PCollection.

In various cases, there is a notion of "compatible" windows. Basically
given window functions W1 and W2, provide a W3 that "works" with both.

Note that Beam already has something similar with side inputs, since the
side input often is in a different window than the main input. However main
input elements are supposed to see side input elements in the same window
(and in fact main inputs are blocked until the side-input window is ready),
so we must do a mapping. If for example (and very commonly!) the side input
is in the global window and the main input is in a fixed window, by default
we will remap the global-window elements into the main-input's fixed window.

In Side input we also allow the user to control this mapping, so for
example side input elements could always map to the previous fixed window
(e.g. while processing window 12-1, you want to see summary data of all
records in the previous window 11-12). Users can do this by providing a
WindowMappingFunction to the View - essentially a function from window to
window. Unfortunately this is hard to use (one must create their own
PCollectionView class) and very poorly documented, so I doubt many users
know about this!

Reuven

On Sat, Apr 6, 2024 at 7:09 AM Jan Lukavský <je...@seznam.cz> wrote:

> Immediate self-correction, although setting the strategy directly via
> setWindowingStrategyInternal() *seemed* to be working during Pipeline
> construction time, during runtime it obviously does not work, because
> the PCollection was still windowed using the old windowFn. Make sense to
> me, but there remains the other question if we can make flattening
> PCollections with incompatible windowFns more user-friendly. The current
> approach where we require the same windowFn for all input PCollections
> creates some unnecessary boilerplate code needed on user side.
>
>   Jan
>
> On 4/6/24 15:45, Jan Lukavský wrote:
> > Hi,
> >
> > I came across a case where using
> > PCollection#applyWindowingStrategyInternal seems legit in user core.
> > The case is roughly as follows:
> >
> >  a) compute some streaming statistics
> >
> >  b) apply the same transform (say ComputeWindowedAggregation) with
> > different parameters on these statistics yielding two windowed
> > PCollections - first is global with early trigger, the other is
> > sliding window, the specific parameters of the windowFns are
> > encapsulated in the ComputeWindowedAggregation transform
> >
> >  c) apply the same transform on both of the above PCollections,
> > yielding two PCollections with the same types, but different windowFns
> >
> >  d) flatten these PCollections into single one (e.g. for downstream
> > processing - joining - or flushing to sink)
> >
> > Now, the flatten will not work, because these PCollections have
> > different windowFns. It would be possible to restore the windowing for
> > either of them, but it requires to somewhat break the encapsulation of
> > the transforms that produce the windowed outputs. A more natural
> > solution is to take the WindowingStrategy from the global aggregation
> > and set it via setWindowingStrategyInternal() to the other
> > PCollection. This works, but it uses API that is marked as @Internal
> > (and obviously, the name as well suggests it is not intended for
> > client-code usage).
> >
> > The question is, should we make a legitimate version of this call? Or
> > should we introduce a way for Flatten.pCollections() to re-window the
> > input PCollections appropriately? In the case of conflicting
> > WindowFns, where one of them is GlobalWindowing strategy, it seems to
> > me that the user's intention is quite well-defined (this might extend
> > to some 'flatten windowFn resolution strategy', maybe).
> >
> > WDYT?
> >
> >  Jan
> >
>

Reply via email to