FileIO requires an explicit numShards in unbounded mode for a number of reasons - one being that a trigger has to happen on a GroupByKey, and we need something to group on.
It is extremely surprising that behavior would change between using a ValueProvider or not. The exact same codepath should be triggered regardless of whether a ValueProvider is used. Reuven On Wed, Jan 9, 2019 at 11:00 PM Kenneth Knowles <[email protected]> wrote: > Definitely sounds like a bug but also I want to caution you (or anyone > reading this archived) that there are known problems with continuation > triggers. A spec on continuation triggers that we missed was that they > really must be "compatible" (this is an arbitrary concept, having only to > do with Flattening two PCollections together) with their original trigger. > Without this, we also know that you can have three PCollections with > identical triggering and you can CoGroupByKey them together but you cannot > do this three-way join as a sequence of binary joins. > > Kenn > > On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas <[email protected]> wrote: > >> Thanks for the response, Chamikara. I filed >> https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work >> around the problem in my case by not using a ValueProvider for numShards. >> >> On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <[email protected]> >> wrote: >> >>> I'm not to familiar about the exact underlying issue here but writing >>> unbounded input to files when using GlobalWindows for unsharded output is a >>> valid usecase so sounds like a bug. Feel free to create a JIRA. >>> >>> - Cham >>> >>> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <[email protected]> wrote: >>> >>>> I've read more deeply into the WriteFiles code and I'm understanding >>>> now that the exception is due to WriteFiles' attempt to handle unsharded >>>> input. In that case, it creates a sharded and unsharded collection; the >>>> first goes through one GroupByKey while the other goes through 2. These two >>>> collections are then flattened together and they have incompatible triggers >>>> due to the double-grouped collection using a continuation trigger. >>>> >>>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I >>>> switch to hard coding an integer rather than passing a ValueProvider, >>>> WriteFiles uses a different code path that doesn't flatten collections and >>>> no exception is thrown. >>>> >>>> So, this might really be considered a bug of WriteFiles (and thus >>>> FileIO). But I'd love to hear other interpretations. >>>> >>>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <[email protected]> >>>> wrote: >>>> >>>>> I'm building a pipeline that streams from Pubsub and writes to files. >>>>> I'm using FileIO's dynamic destinations to place elements into different >>>>> directories according to date and I really don't care about ordering of >>>>> elements beyond the date buckets. >>>>> >>>>> So, I think GlobalWindows is appropriate in this case, even though the >>>>> input is unbounded. Is it possible to use GlobalWindows but set a trigger >>>>> based on number of elements and/or processing time so that beam actually >>>>> writes out files periodically? >>>>> >>>>> I tried the following: >>>>> >>>>> Window.into(new GlobalWindows()) >>>>> .triggering(Repeatedly.forever(AfterFirst.of( >>>>> AfterPane.elementCountAtLeast(10000), >>>>> >>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10))))) >>>>> .discardingFiredPanes() >>>>> >>>>> But it raises an exception about incompatible triggers: >>>>> >>>>> Inputs to Flatten had incompatible triggers: >>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000), >>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))), >>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1), >>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane())) >>>>> >>>>> I believe that what's happening is that FileIO with explicit numShards >>>>> (required in the case of unbounded input) is forcing a GroupByKey, which >>>>> activates continuation triggers that are incompatible with my stated >>>>> triggers. It's internals of WriteFiles that's trying to flatten the >>>>> incompatible PCollections together. >>>>> >>>>> >>>>>
