FileIO requires an explicit numShards in unbounded mode for a number of
reasons - one being that a trigger has to happen on a GroupByKey, and we
need something to group on.

It is extremely surprising that behavior would change between using a
ValueProvider or not. The exact same codepath should be triggered
regardless of whether a ValueProvider is used.

Reuven

On Wed, Jan 9, 2019 at 11:00 PM Kenneth Knowles <[email protected]> wrote:

> Definitely sounds like a bug but also I want to caution you (or anyone
> reading this archived) that there are known problems with continuation
> triggers. A spec on continuation triggers that we missed was that they
> really must be "compatible" (this is an arbitrary concept, having only to
> do with Flattening two PCollections together) with their original trigger.
> Without this, we also know that you can have three PCollections with
> identical triggering and you can CoGroupByKey them together but you cannot
> do this three-way join as a sequence of binary joins.
>
> Kenn
>
> On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas <[email protected]> wrote:
>
>> Thanks for the response, Chamikara. I filed
>> https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work
>> around the problem in my case by not using a ValueProvider for numShards.
>>
>> On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <[email protected]>
>> wrote:
>>
>>> I'm not to familiar about the exact underlying issue here but writing
>>> unbounded input to files when using GlobalWindows for unsharded output is a
>>> valid usecase so sounds like a bug. Feel free to create a JIRA.
>>>
>>> - Cham
>>>
>>> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <[email protected]> wrote:
>>>
>>>> I've read more deeply into the WriteFiles code and I'm understanding
>>>> now that the exception is due to WriteFiles' attempt to handle unsharded
>>>> input. In that case, it creates a sharded and unsharded collection; the
>>>> first goes through one GroupByKey while the other goes through 2. These two
>>>> collections are then flattened together and they have incompatible triggers
>>>> due to the double-grouped collection using a continuation trigger.
>>>>
>>>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I
>>>> switch to hard coding an integer rather than passing a ValueProvider,
>>>> WriteFiles uses a different code path that doesn't flatten collections and
>>>> no exception is thrown.
>>>>
>>>> So, this might really be considered a bug of WriteFiles (and thus
>>>> FileIO). But I'd love to hear other interpretations.
>>>>
>>>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <[email protected]>
>>>> wrote:
>>>>
>>>>> I'm building a pipeline that streams from Pubsub and writes to files.
>>>>> I'm using FileIO's dynamic destinations to place elements into different
>>>>> directories according to date and I really don't care about ordering of
>>>>> elements beyond the date buckets.
>>>>>
>>>>> So, I think GlobalWindows is appropriate in this case, even though the
>>>>> input is unbounded. Is it possible to use GlobalWindows but set a trigger
>>>>> based on number of elements and/or processing time so that beam actually
>>>>> writes out files periodically?
>>>>>
>>>>> I tried the following:
>>>>>
>>>>> Window.into(new GlobalWindows())
>>>>>   .triggering(Repeatedly.forever(AfterFirst.of(
>>>>>     AfterPane.elementCountAtLeast(10000),
>>>>>
>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
>>>>>   .discardingFiredPanes()
>>>>>
>>>>> But it raises an exception about incompatible triggers:
>>>>>
>>>>> Inputs to Flatten had incompatible triggers:
>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()))
>>>>>
>>>>> I believe that what's happening is that FileIO with explicit numShards
>>>>> (required in the case of unbounded input) is forcing a GroupByKey, which
>>>>> activates continuation triggers that are incompatible with my stated
>>>>> triggers. It's internals of WriteFiles that's trying to flatten the
>>>>> incompatible PCollections together.
>>>>>
>>>>>
>>>>>

Reply via email to