[ 
https://issues.apache.org/jira/browse/QUARKS-230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15381426#comment-15381426
 ] 

ASF GitHub Bot commented on QUARKS-230:
---------------------------------------

Github user dlaboss commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/167#discussion_r71085952
  
    --- Diff: api/topology/src/main/java/quarks/topology/TWindow.java ---
    @@ -105,8 +129,85 @@ Licensed to the Apache Software Foundation (ASF) under 
one
          * @return A stream that contains the latest aggregations of 
partitions in this window.
          */
         <U> TStream<U> batch(BiFunction<List<T>, K, U> batcher);
    +
    +    /**
    +     * Declares a stream that is a continuous, sliding, 
    +     * timer triggered aggregation of
    +     * partitions in this window.
    +     * <P>
    +     * Periodically trigger an invocation of
    +     * {@code aggregator.apply(tuples, key)}, where {@code tuples} is
    +     * a {@code List<T>} containing all the tuples in the partition in
    +     * insertion order from oldest to newest.  The list is stable
    +     * during the aggregator invocation.  
    +     * The list will be empty if the partition is empty.
    +     * </P>
    +     * <P> 
    +     * A non-null {@code aggregator} result is added to the returned 
stream.
    +     * </P>
    +     * <P>
    +     * Thus the returned stream will contain a sequence of tuples where the
    +     * most recent tuple represents the most up to date aggregation of a
    +     * partition.
    +     *
    +     * @param <U> Tuple type
    +     * @param period how often to invoke the aggregator
    +     * @param unit TimeUnit for {@code period}
    +     * @param aggregator
    +     *            Logic to aggregation a partition.
    +     * @return A stream that contains the latest aggregations of 
partitions in this window.
    +     * 
    +     * @see #aggregate(BiFunction)
    +     */
    +    <U> TStream<U> timedAggregate(long period, TimeUnit unit, 
BiFunction<List<T>, K, U> aggregator);
    --- End diff --
    
    ... per-partition.  Sounds like that separate operation could be a 
count(1)-timedAggregate-evenIfDidntChange.  Does it make sense to force such a 
user to have to use a timed-trigger-if-changed window followed by this separate 
operation rather than just use a count(N)-timedAggregate-evenIfDidntChange? (I 
can imagine it would be ok, just want to be sure)


> Add timer triggered window aggregations
> ---------------------------------------
>
>                 Key: QUARKS-230
>                 URL: https://issues.apache.org/jira/browse/QUARKS-230
>             Project: Quarks
>          Issue Type: New Feature
>            Reporter: Dale LaBossiere
>            Assignee: Dale LaBossiere
>
> A recent use case involved the desire for "timer triggered" instead of 
> "partition content change triggered" window aggregations.  E.g., "I want to 
> trigger an aggregation every second over the last 10 tuples in a window 
> partition" -- a count-based window with timer triggered aggregations.
> I propose adding 3 methods to TWindow.  Two in direct support of "timer 
> triggered" aggregations (this processing model seems like it could be common 
> enough to warrant making it conveniently available) and one to make it easier 
> to use the lower level Window API to define and use other processing models.
> I'm submitting a PR with the details for review but the net is these 
> additions to TWindow:
> ```
>     <U> TStream<U> timedAggregate(long period, TimeUnit unit, 
> BiFunction<List<T>, K, U> aggregator);
>     <U> TStream<U> timedBatch(long period, TimeUnit unit, BiFunction<List<T>, 
> K, U> batcher);
>     <U, L extends List<T>> TStream<U> process(Window<T,K,L> window, 
> BiFunction<List<T>, K, U> aggregator);
> ```
> See https://github.com/apache/incubator-quarks/pull/167



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to