[
https://issues.apache.org/jira/browse/QUARKS-230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15381429#comment-15381429
]
ASF GitHub Bot commented on QUARKS-230:
---------------------------------------
Github user dlaboss commented on a diff in the pull request:
https://github.com/apache/incubator-quarks/pull/167#discussion_r71086042
--- Diff: api/topology/src/main/java/quarks/topology/TWindow.java ---
@@ -105,8 +129,85 @@ Licensed to the Apache Software Foundation (ASF) under
one
* @return A stream that contains the latest aggregations of
partitions in this window.
*/
<U> TStream<U> batch(BiFunction<List<T>, K, U> batcher);
+
+ /**
+ * Declares a stream that is a continuous, sliding,
+ * timer triggered aggregation of
+ * partitions in this window.
+ * <P>
+ * Periodically trigger an invocation of
+ * {@code aggregator.apply(tuples, key)}, where {@code tuples} is
+ * a {@code List<T>} containing all the tuples in the partition in
+ * insertion order from oldest to newest. The list is stable
+ * during the aggregator invocation.
+ * The list will be empty if the partition is empty.
+ * </P>
+ * <P>
+ * A non-null {@code aggregator} result is added to the returned
stream.
+ * </P>
+ * <P>
+ * Thus the returned stream will contain a sequence of tuples where the
+ * most recent tuple represents the most up to date aggregation of a
+ * partition.
+ *
+ * @param <U> Tuple type
+ * @param period how often to invoke the aggregator
+ * @param unit TimeUnit for {@code period}
+ * @param aggregator
+ * Logic to aggregation a partition.
+ * @return A stream that contains the latest aggregations of
partitions in this window.
+ *
+ * @see #aggregate(BiFunction)
+ */
+ <U> TStream<U> timedAggregate(long period, TimeUnit unit,
BiFunction<List<T>, K, U> aggregator);
/**
+ * Declares a stream that represents a
+ * timer triggered batched aggregation of
+ * partitions in this window.
+ * <P>
+ * Periodically trigger an invocation of
+ * {@code batcher.apply(tuples, key)}, where {@code tuples} is
+ * a {@code List<T>} containing all the tuples in the partition in
+ * insertion order from oldest to newest The list is stable
+ * during the batcher invocation.
+ * The list will be empty if the partition is empty.
+ * <P>
+ * A non-null {@code batcher} result is added to the returned stream.
+ * The partition's contents are cleared after a batch is processed.
+ * </P>
+ * <P>
+ * Thus the returned stream will contain a sequence of tuples where the
+ * most recent tuple represents the most up to date aggregation of a
+ * partition.
+ *
+ * @param <U> Tuple type
+ * @param period how often to invoke the batcher
+ * @param unit TimeUnit for {@code period}
+ * @param batcher
+ * Logic to aggregation a partition.
+ * @return A stream that contains the latest aggregations of
partitions in this window.
+ *
+ * @see #batch(BiFunction)
+ */
+ <U> TStream<U> timedBatch(long period, TimeUnit unit,
BiFunction<List<T>, K, U> batcher);
--- End diff --
So you're asking "why have timedBatch", right?
A timedBatch and timedAggregate yield the same aggregations when the
trigger interval is => the effectiveWindowWidth... because the
effectiveWindowWidth will have flushed all tuples since the last batch/agg in
both cases. "effectiveWindowWidth" => with predictable/regular tuple arrival
rates, one can specify a last(N) and last(S sec) that yield the same window
content at any time. e.g., with a 1 tup/sec arrival rate, last(10) and
last(10sec) are equivalent.
The behavior of the two is different if the trigger rate is shorter than
the effectiveWindowWidth.
e.g., last(10) with a 1 tup/sec arrival rate
timedBatch(3sec) - agg1[1-3], agg2[4-6], agg3[7-9],agg4[10-12],agg5[13-15]
timedAgg(3sec) - agg1[1-3], agg2[1-6], agg3[1-9],agg4[3-12],agg5[6-15]
Right?
That said, observe that the last(10)-timedBatch(3sec) with 1tps arrival
yields the same result as either last(3)-batch() or last(3sec)-batch(). Right?
So if timedBatch yields the same result as timedAggregate when the trigger
period is >= effectiveWindowWidth, and it yields the same result as those
untimed batch() when timedBatch trigger period is < effectiveWindowWidth, then
why have timedBatch()?
I believe there are only equivalences in the cases where the tuple arrival
is regular/reliable -- not bursty or lossy. In other cases I don't think you
can come up with a last(N) and last(Ssec) that are equivalent. Hence none of
these equivalences are possible. Hence timedBatch() isn't redundant. e g.,
even the trigger period >= effectiveWindowWidth case, when there is burstyness,
a last(10sec) window can contain different collections of tuples than a last(N)
window, for any N (more or less depending on the burstyness and values of N).
Right?
Am I missing/misthinking something and there are always equiv configs to
any timedBatch() config? Where's a signal-processing guy when you need them?
:-)
> Add timer triggered window aggregations
> ---------------------------------------
>
> Key: QUARKS-230
> URL: https://issues.apache.org/jira/browse/QUARKS-230
> Project: Quarks
> Issue Type: New Feature
> Reporter: Dale LaBossiere
> Assignee: Dale LaBossiere
>
> A recent use case involved the desire for "timer triggered" instead of
> "partition content change triggered" window aggregations. E.g., "I want to
> trigger an aggregation every second over the last 10 tuples in a window
> partition" -- a count-based window with timer triggered aggregations.
> I propose adding 3 methods to TWindow. Two in direct support of "timer
> triggered" aggregations (this processing model seems like it could be common
> enough to warrant making it conveniently available) and one to make it easier
> to use the lower level Window API to define and use other processing models.
> I'm submitting a PR with the details for review but the net is these
> additions to TWindow:
> ```
> <U> TStream<U> timedAggregate(long period, TimeUnit unit,
> BiFunction<List<T>, K, U> aggregator);
> <U> TStream<U> timedBatch(long period, TimeUnit unit, BiFunction<List<T>,
> K, U> batcher);
> <U, L extends List<T>> TStream<U> process(Window<T,K,L> window,
> BiFunction<List<T>, K, U> aggregator);
> ```
> See https://github.com/apache/incubator-quarks/pull/167
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)