Jeff Klukas created BEAM-6399:
---------------------------------
Summary: FileIO errors on unbounded input with nondefault trigger
Key: BEAM-6399
URL: https://issues.apache.org/jira/browse/BEAM-6399
Project: Beam
Issue Type: Improvement
Components: io-java-files
Reporter: Jeff Klukas
Assignee: Eugene Kirpichov
In a pipeline with unbounded input, if a user defines a custom trigger and
calls FileIO.withNumShards(ValueProvider<Integer>), they may see an
IllegalArgumentException at runtime due to incompatible windows.
For example, consider this compound trigger:
{{Window.into(new GlobalWindows())}}
{{ .triggering(Repeatedly.forever(AfterFirst.of(}}{{ }}
{{ AfterPane.elementCountAtLeast(10000), }}{{ }}
{{ AfterProcessingTime.pastFirstElementInPane()}}
{{ .plusDelayOf(Duration.standardMinutes(10)))))}}{{
.discardingFiredPanes()}}
Using that windowing with a numShards ValueProvider yields:
{{Inputs to Flatten had incompatible
triggers:}}{{Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
minute))),}}{{Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
AfterSynchronizedProcessingTime.pastFirstElementInPane()))}}
In the case of ValueProvider for numShards, WriteFiles creates both a sharded
and unsharded collection; the first goes through one GroupByKey while the other
goes through 2. These two collections are then flattened together and they have
incompatible triggers due to the double-grouped collection using a continuation
trigger.
If the user instead specifies a positive non-ValueProvider numShards, then a
different code path is followed that avoids this incompatibility.
It looks like WriteFiles may need to be implemented differently to avoid
combining collections with potentially incompatible triggers.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)