Hi,
I have come across unexpected (at least for me) behavior of some
apparent inconsistency of how a PCollection is processed in DirectRunner
and what PCollection.isBounded signals. Let me explain:
- I have a stateful ParDo, which needs to make sure that elements
arrive in order - it accomplishes this by defining BagState for
buffering input elements and sorting them inside this buffer, it also
keeps track of element with highest timestamp to somehow estimate local
watermark (minus some allowed lateness), to know when to remove elements
from the buffer, sort them by time and pass them to some (time ordered)
processing
- this seems to work well for streaming (unbounded) data
- for batch (bounded) data the semantics of stateful ParDo should be
(please correct me if I'm wrong) that elements always arrive in order,
because the runner can sort them by timestamp
- this implies that for batch processed input (bounded) the
allowedLateness can be set to zero, so that the processing is little
more effective, because it doesn't have to use the BagState at all
- now, the trouble seems to be, that DirectRunner always uses
streaming processing, even if the input is bounded (that is by
definition possible), but there is no way now to know when it is
possible to change allowed lateness to zero (because input will arrive
ordered)
- so - it seems to me, that either DirectRunner should apply sorting
to stateful ParDo, when it processes bounded data (the same way that
other runners do), or it can apply streaming processing, but then it
should change PCollection.isBounded to UNBOUNDED, even if the input is
originally bounded
- that way, the semantics of PCollection.isBounded, would be not if
the data are known in advance to be finite, but *how* the data are going
to be processed, which is much more valuable (IMO)
Any thoughts?
Jan