Ah,

numShards = 0 is explicitly not supported in unbounded mode today, for the
reason mentioned above. If FileIO doesn't reject the pipeline in that case,
we should fix that.

Reuven

On Fri, Jan 11, 2019 at 9:23 AM Jeff Klukas <[email protected]> wrote:

> Indeed, I was wrong about the ValueProvider distinction. I updated that in
> the JIRA.
>
> It's when numShards is 0 (so runner-provided sharding) vs. an explicit
> number. Things work fine for explicit sharding. It's the runner-provided
> sharding mode that encounters the Flatten of PCollections with conflicting
> triggers.
>
> On Fri, Jan 11, 2019 at 12:18 PM Reuven Lax <[email protected]> wrote:
>
>> FileIO requires an explicit numShards in unbounded mode for a number of
>> reasons - one being that a trigger has to happen on a GroupByKey, and we
>> need something to group on.
>>
>> It is extremely surprising that behavior would change between using a
>> ValueProvider or not. The exact same codepath should be triggered
>> regardless of whether a ValueProvider is used.
>>
>> Reuven
>>
>> On Wed, Jan 9, 2019 at 11:00 PM Kenneth Knowles <[email protected]> wrote:
>>
>>> Definitely sounds like a bug but also I want to caution you (or anyone
>>> reading this archived) that there are known problems with continuation
>>> triggers. A spec on continuation triggers that we missed was that they
>>> really must be "compatible" (this is an arbitrary concept, having only to
>>> do with Flattening two PCollections together) with their original trigger.
>>> Without this, we also know that you can have three PCollections with
>>> identical triggering and you can CoGroupByKey them together but you cannot
>>> do this three-way join as a sequence of binary joins.
>>>
>>> Kenn
>>>
>>> On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas <[email protected]> wrote:
>>>
>>>> Thanks for the response, Chamikara. I filed
>>>> https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work
>>>> around the problem in my case by not using a ValueProvider for numShards.
>>>>
>>>> On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <[email protected]>
>>>> wrote:
>>>>
>>>>> I'm not to familiar about the exact underlying issue here but writing
>>>>> unbounded input to files when using GlobalWindows for unsharded output is 
>>>>> a
>>>>> valid usecase so sounds like a bug. Feel free to create a JIRA.
>>>>>
>>>>> - Cham
>>>>>
>>>>> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> I've read more deeply into the WriteFiles code and I'm understanding
>>>>>> now that the exception is due to WriteFiles' attempt to handle unsharded
>>>>>> input. In that case, it creates a sharded and unsharded collection; the
>>>>>> first goes through one GroupByKey while the other goes through 2. These 
>>>>>> two
>>>>>> collections are then flattened together and they have incompatible 
>>>>>> triggers
>>>>>> due to the double-grouped collection using a continuation trigger.
>>>>>>
>>>>>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I
>>>>>> switch to hard coding an integer rather than passing a ValueProvider,
>>>>>> WriteFiles uses a different code path that doesn't flatten collections 
>>>>>> and
>>>>>> no exception is thrown.
>>>>>>
>>>>>> So, this might really be considered a bug of WriteFiles (and thus
>>>>>> FileIO). But I'd love to hear other interpretations.
>>>>>>
>>>>>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> I'm building a pipeline that streams from Pubsub and writes to
>>>>>>> files. I'm using FileIO's dynamic destinations to place elements into
>>>>>>> different directories according to date and I really don't care about
>>>>>>> ordering of elements beyond the date buckets.
>>>>>>>
>>>>>>> So, I think GlobalWindows is appropriate in this case, even though
>>>>>>> the input is unbounded. Is it possible to use GlobalWindows but set a
>>>>>>> trigger based on number of elements and/or processing time so that beam
>>>>>>> actually writes out files periodically?
>>>>>>>
>>>>>>> I tried the following:
>>>>>>>
>>>>>>> Window.into(new GlobalWindows())
>>>>>>>   .triggering(Repeatedly.forever(AfterFirst.of(
>>>>>>>     AfterPane.elementCountAtLeast(10000),
>>>>>>>
>>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
>>>>>>>   .discardingFiredPanes()
>>>>>>>
>>>>>>> But it raises an exception about incompatible triggers:
>>>>>>>
>>>>>>> Inputs to Flatten had incompatible triggers:
>>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
>>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
>>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
>>>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()))
>>>>>>>
>>>>>>> I believe that what's happening is that FileIO with explicit
>>>>>>> numShards (required in the case of unbounded input) is forcing a
>>>>>>> GroupByKey, which activates continuation triggers that are incompatible
>>>>>>> with my stated triggers. It's internals of WriteFiles that's trying to
>>>>>>> flatten the incompatible PCollections together.
>>>>>>>
>>>>>>>
>>>>>>>

Reply via email to