[
https://issues.apache.org/jira/browse/BEAM-4604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519084#comment-16519084
]
Oleksandr Bushkovskyi commented on BEAM-4604:
---------------------------------------------
Hi [~kenn],
Yes, I’ve checked implementation and understood that GroupIntoBatches is
implemented with state and timers and do not respect triggers. But I think that
without looking into implementation details it may be not clear for SDK users.
I was trying to batch elements by 10 and have output of batch each second in
processing time, even if it’s less than 10 elements. So like best-effort batch
by 10 and mandatory 1 second output. I've wrote following code, but obviously
it doesn't work:
{code:java}
@Override
public PCollection<Iterable<ClickEvent>> expand(PCollection<ClickEvent>
input) {
return input
// assign keys, as "GroupIntoBatches" works only with key-value
pairs
.apply(ParDo.of(new AssignRandomKeys(shardsNumber)))
.apply(
Window.<KV<Integer,
ClickEvent>>into(FixedWindows.of(Duration.standardSeconds(WINDOW_DURATION_SECONDS)))
.triggering(Repeatedly.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1))))
.withAllowedLateness(Duration.standardMinutes(10))
.discardingFiredPanes()
)
.apply(GroupIntoBatches.ofSize(batchSize))
// extract iterators ignoring keys
.apply(ParDo.of(new ExtractValues()));
}
{code}
Just event-time - 1-second windows doesn't work well for me because on test
environment I have small number of elements and watermarks are late by 5-10
minutes regarding processing time.
> Support Triggers for "GroupIntoBatches" Transform
> -------------------------------------------------
>
> Key: BEAM-4604
> URL: https://issues.apache.org/jira/browse/BEAM-4604
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core
> Reporter: Oleksandr Bushkovskyi
> Priority: Major
>
> I think it makes sense to implement triggering support for "GroupIntoBatches"
> transform.
> I've spent quite a long time trying to understand why my triggering behavior
> doesn't work with "GroupIntoBatches".
> This transform has an exactly same signature and similar naming as
> "GroupByKey" transform. It's confusing that this two similar from outer view
> transforms works differently with triggers.
> At least it should be clearly documented with "GroupIntoBatches" that it
> doesn't support triggers.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)