[ 
https://issues.apache.org/jira/browse/QUARKS-230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15381429#comment-15381429
 ] 

ASF GitHub Bot commented on QUARKS-230:
---------------------------------------

Github user dlaboss commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/167#discussion_r71086042
  
    --- Diff: api/topology/src/main/java/quarks/topology/TWindow.java ---
    @@ -105,8 +129,85 @@ Licensed to the Apache Software Foundation (ASF) under 
one
          * @return A stream that contains the latest aggregations of 
partitions in this window.
          */
         <U> TStream<U> batch(BiFunction<List<T>, K, U> batcher);
    +
    +    /**
    +     * Declares a stream that is a continuous, sliding, 
    +     * timer triggered aggregation of
    +     * partitions in this window.
    +     * <P>
    +     * Periodically trigger an invocation of
    +     * {@code aggregator.apply(tuples, key)}, where {@code tuples} is
    +     * a {@code List<T>} containing all the tuples in the partition in
    +     * insertion order from oldest to newest.  The list is stable
    +     * during the aggregator invocation.  
    +     * The list will be empty if the partition is empty.
    +     * </P>
    +     * <P> 
    +     * A non-null {@code aggregator} result is added to the returned 
stream.
    +     * </P>
    +     * <P>
    +     * Thus the returned stream will contain a sequence of tuples where the
    +     * most recent tuple represents the most up to date aggregation of a
    +     * partition.
    +     *
    +     * @param <U> Tuple type
    +     * @param period how often to invoke the aggregator
    +     * @param unit TimeUnit for {@code period}
    +     * @param aggregator
    +     *            Logic to aggregation a partition.
    +     * @return A stream that contains the latest aggregations of 
partitions in this window.
    +     * 
    +     * @see #aggregate(BiFunction)
    +     */
    +    <U> TStream<U> timedAggregate(long period, TimeUnit unit, 
BiFunction<List<T>, K, U> aggregator);
         
         /**
    +     * Declares a stream that represents a 
    +     * timer triggered batched aggregation of
    +     * partitions in this window. 
    +     * <P>
    +     * Periodically trigger an invocation of
    +     * {@code batcher.apply(tuples, key)}, where {@code tuples} is
    +     * a {@code List<T>} containing all the tuples in the partition in
    +     * insertion order from oldest to newest  The list is stable
    +     * during the batcher invocation.
    +     * The list will be empty if the partition is empty.
    +     * <P>
    +     * A non-null {@code batcher} result is added to the returned stream.
    +     * The partition's contents are cleared after a batch is processed.
    +     * </P>
    +     * <P>
    +     * Thus the returned stream will contain a sequence of tuples where the
    +     * most recent tuple represents the most up to date aggregation of a
    +     * partition.
    +     * 
    +     * @param <U> Tuple type
    +     * @param period how often to invoke the batcher
    +     * @param unit TimeUnit for {@code period}
    +     * @param batcher
    +     *            Logic to aggregation a partition.
    +     * @return A stream that contains the latest aggregations of 
partitions in this window.
    +     * 
    +     * @see #batch(BiFunction)
    +     */
    +    <U> TStream<U> timedBatch(long period, TimeUnit unit, 
BiFunction<List<T>, K, U> batcher);
    --- End diff --
    
    So you're asking "why have timedBatch", right?
    
    A timedBatch and timedAggregate yield the same aggregations when the 
trigger interval is => the effectiveWindowWidth... because the 
effectiveWindowWidth will have flushed all tuples since the last batch/agg in 
both cases.  "effectiveWindowWidth" => with predictable/regular tuple arrival 
rates, one can specify a last(N) and last(S sec) that yield the same window 
content at any time.  e.g., with a 1 tup/sec arrival rate, last(10) and 
last(10sec) are equivalent.
    
    The behavior of the two is different if the trigger rate is shorter than 
the effectiveWindowWidth.
    e.g., last(10) with a 1 tup/sec arrival rate
    timedBatch(3sec) - agg1[1-3], agg2[4-6], agg3[7-9],agg4[10-12],agg5[13-15]
    timedAgg(3sec) - agg1[1-3], agg2[1-6], agg3[1-9],agg4[3-12],agg5[6-15]
    Right?
    
    That said, observe that the last(10)-timedBatch(3sec) with 1tps arrival 
yields the same result as either last(3)-batch() or last(3sec)-batch().  Right?
    
    So if timedBatch yields the same result as timedAggregate when the trigger 
period is >= effectiveWindowWidth, and it yields the same result as those 
untimed batch() when timedBatch trigger period is < effectiveWindowWidth, then 
why have timedBatch()?
    
    I believe there are only equivalences in the cases where the tuple arrival 
is regular/reliable -- not bursty or lossy.  In other cases I don't think you 
can come up with a last(N) and last(Ssec) that are equivalent.  Hence none of 
these equivalences are possible.  Hence timedBatch() isn't redundant.  e g., 
even the trigger period >= effectiveWindowWidth case, when there is burstyness, 
a last(10sec) window can contain different collections of tuples than a last(N) 
window, for any N (more or less depending on the burstyness and values of N).  
Right?
    
    Am I missing/misthinking something and there are always equiv configs to 
any timedBatch() config?  Where's a signal-processing guy when you need them? 
:-)


> Add timer triggered window aggregations
> ---------------------------------------
>
>                 Key: QUARKS-230
>                 URL: https://issues.apache.org/jira/browse/QUARKS-230
>             Project: Quarks
>          Issue Type: New Feature
>            Reporter: Dale LaBossiere
>            Assignee: Dale LaBossiere
>
> A recent use case involved the desire for "timer triggered" instead of 
> "partition content change triggered" window aggregations.  E.g., "I want to 
> trigger an aggregation every second over the last 10 tuples in a window 
> partition" -- a count-based window with timer triggered aggregations.
> I propose adding 3 methods to TWindow.  Two in direct support of "timer 
> triggered" aggregations (this processing model seems like it could be common 
> enough to warrant making it conveniently available) and one to make it easier 
> to use the lower level Window API to define and use other processing models.
> I'm submitting a PR with the details for review but the net is these 
> additions to TWindow:
> ```
>     <U> TStream<U> timedAggregate(long period, TimeUnit unit, 
> BiFunction<List<T>, K, U> aggregator);
>     <U> TStream<U> timedBatch(long period, TimeUnit unit, BiFunction<List<T>, 
> K, U> batcher);
>     <U, L extends List<T>> TStream<U> process(Window<T,K,L> window, 
> BiFunction<List<T>, K, U> aggregator);
> ```
> See https://github.com/apache/incubator-quarks/pull/167



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to