Github user dlaboss commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/167#discussion_r71086042
  
    --- Diff: api/topology/src/main/java/quarks/topology/TWindow.java ---
    @@ -105,8 +129,85 @@ Licensed to the Apache Software Foundation (ASF) under 
one
          * @return A stream that contains the latest aggregations of 
partitions in this window.
          */
         <U> TStream<U> batch(BiFunction<List<T>, K, U> batcher);
    +
    +    /**
    +     * Declares a stream that is a continuous, sliding, 
    +     * timer triggered aggregation of
    +     * partitions in this window.
    +     * <P>
    +     * Periodically trigger an invocation of
    +     * {@code aggregator.apply(tuples, key)}, where {@code tuples} is
    +     * a {@code List<T>} containing all the tuples in the partition in
    +     * insertion order from oldest to newest.  The list is stable
    +     * during the aggregator invocation.  
    +     * The list will be empty if the partition is empty.
    +     * </P>
    +     * <P> 
    +     * A non-null {@code aggregator} result is added to the returned 
stream.
    +     * </P>
    +     * <P>
    +     * Thus the returned stream will contain a sequence of tuples where the
    +     * most recent tuple represents the most up to date aggregation of a
    +     * partition.
    +     *
    +     * @param <U> Tuple type
    +     * @param period how often to invoke the aggregator
    +     * @param unit TimeUnit for {@code period}
    +     * @param aggregator
    +     *            Logic to aggregation a partition.
    +     * @return A stream that contains the latest aggregations of 
partitions in this window.
    +     * 
    +     * @see #aggregate(BiFunction)
    +     */
    +    <U> TStream<U> timedAggregate(long period, TimeUnit unit, 
BiFunction<List<T>, K, U> aggregator);
         
         /**
    +     * Declares a stream that represents a 
    +     * timer triggered batched aggregation of
    +     * partitions in this window. 
    +     * <P>
    +     * Periodically trigger an invocation of
    +     * {@code batcher.apply(tuples, key)}, where {@code tuples} is
    +     * a {@code List<T>} containing all the tuples in the partition in
    +     * insertion order from oldest to newest  The list is stable
    +     * during the batcher invocation.
    +     * The list will be empty if the partition is empty.
    +     * <P>
    +     * A non-null {@code batcher} result is added to the returned stream.
    +     * The partition's contents are cleared after a batch is processed.
    +     * </P>
    +     * <P>
    +     * Thus the returned stream will contain a sequence of tuples where the
    +     * most recent tuple represents the most up to date aggregation of a
    +     * partition.
    +     * 
    +     * @param <U> Tuple type
    +     * @param period how often to invoke the batcher
    +     * @param unit TimeUnit for {@code period}
    +     * @param batcher
    +     *            Logic to aggregation a partition.
    +     * @return A stream that contains the latest aggregations of 
partitions in this window.
    +     * 
    +     * @see #batch(BiFunction)
    +     */
    +    <U> TStream<U> timedBatch(long period, TimeUnit unit, 
BiFunction<List<T>, K, U> batcher);
    --- End diff --
    
    So you're asking "why have timedBatch", right?
    
    A timedBatch and timedAggregate yield the same aggregations when the 
trigger interval is => the effectiveWindowWidth... because the 
effectiveWindowWidth will have flushed all tuples since the last batch/agg in 
both cases.  "effectiveWindowWidth" => with predictable/regular tuple arrival 
rates, one can specify a last(N) and last(S sec) that yield the same window 
content at any time.  e.g., with a 1 tup/sec arrival rate, last(10) and 
last(10sec) are equivalent.
    
    The behavior of the two is different if the trigger rate is shorter than 
the effectiveWindowWidth.
    e.g., last(10) with a 1 tup/sec arrival rate
    timedBatch(3sec) - agg1[1-3], agg2[4-6], agg3[7-9],agg4[10-12],agg5[13-15]
    timedAgg(3sec) - agg1[1-3], agg2[1-6], agg3[1-9],agg4[3-12],agg5[6-15]
    Right?
    
    That said, observe that the last(10)-timedBatch(3sec) with 1tps arrival 
yields the same result as either last(3)-batch() or last(3sec)-batch().  Right?
    
    So if timedBatch yields the same result as timedAggregate when the trigger 
period is >= effectiveWindowWidth, and it yields the same result as those 
untimed batch() when timedBatch trigger period is < effectiveWindowWidth, then 
why have timedBatch()?
    
    I believe there are only equivalences in the cases where the tuple arrival 
is regular/reliable -- not bursty or lossy.  In other cases I don't think you 
can come up with a last(N) and last(Ssec) that are equivalent.  Hence none of 
these equivalences are possible.  Hence timedBatch() isn't redundant.  e g., 
even the trigger period >= effectiveWindowWidth case, when there is burstyness, 
a last(10sec) window can contain different collections of tuples than a last(N) 
window, for any N (more or less depending on the burstyness and values of N).  
Right?
    
    Am I missing/misthinking something and there are always equiv configs to 
any timedBatch() config?  Where's a signal-processing guy when you need them? 
:-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to