[ 
https://issues.apache.org/jira/browse/BEAM-4604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519084#comment-16519084
 ] 

Oleksandr Bushkovskyi commented on BEAM-4604:
---------------------------------------------

Hi [~kenn],
 Yes, I’ve checked implementation and understood that GroupIntoBatches is 
implemented with state and timers and do not respect triggers. But I think that 
without looking into implementation details it may be not clear for SDK users.

I was trying to batch elements by 10 and have output of batch each second in 
processing time, even if it’s less than 10 elements. So like best-effort batch 
by 10 and mandatory 1 second output. I've wrote following code, but obviously 
it doesn't work:
{code:java}
    @Override
    public PCollection<Iterable<ClickEvent>> expand(PCollection<ClickEvent> 
input) {

        return input
                // assign keys, as "GroupIntoBatches" works only with key-value 
pairs
                .apply(ParDo.of(new AssignRandomKeys(shardsNumber)))
                .apply(
                        Window.<KV<Integer, 
ClickEvent>>into(FixedWindows.of(Duration.standardSeconds(WINDOW_DURATION_SECONDS)))
                        .triggering(Repeatedly.forever(AfterProcessingTime
                                .pastFirstElementInPane()
                                .plusDelayOf(Duration.standardSeconds(1))))
                        .withAllowedLateness(Duration.standardMinutes(10))
                        .discardingFiredPanes()
                )
                .apply(GroupIntoBatches.ofSize(batchSize))
                // extract iterators ignoring keys
                .apply(ParDo.of(new ExtractValues()));
    }
{code}
 

Just event-time - 1-second windows doesn't work well for me because on test 
environment I have small number of elements and watermarks are late by 5-10 
minutes regarding processing time.

> Support Triggers for "GroupIntoBatches" Transform
> -------------------------------------------------
>
>                 Key: BEAM-4604
>                 URL: https://issues.apache.org/jira/browse/BEAM-4604
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>            Reporter: Oleksandr Bushkovskyi
>            Priority: Major
>
> I think it makes sense to implement triggering support for "GroupIntoBatches" 
> transform.
> I've spent quite a long time trying to understand why my triggering behavior 
> doesn't work with "GroupIntoBatches".
> This transform has an exactly same signature and similar naming as 
> "GroupByKey" transform. It's confusing that this two similar from outer view 
> transforms works differently with triggers.
> At least it should be clearly documented with "GroupIntoBatches" that it 
> doesn't support triggers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to