[
https://issues.apache.org/jira/browse/QUARKS-230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15380205#comment-15380205
]
ASF GitHub Bot commented on QUARKS-230:
---------------------------------------
Github user dlaboss commented on a diff in the pull request:
https://github.com/apache/incubator-quarks/pull/167#discussion_r71047423
--- Diff: api/topology/src/main/java/quarks/topology/TWindow.java ---
@@ -105,8 +129,85 @@ Licensed to the Apache Software Foundation (ASF) under
one
* @return A stream that contains the latest aggregations of
partitions in this window.
*/
<U> TStream<U> batch(BiFunction<List<T>, K, U> batcher);
+
+ /**
+ * Declares a stream that is a continuous, sliding,
+ * timer triggered aggregation of
+ * partitions in this window.
+ * <P>
+ * Periodically trigger an invocation of
+ * {@code aggregator.apply(tuples, key)}, where {@code tuples} is
+ * a {@code List<T>} containing all the tuples in the partition in
+ * insertion order from oldest to newest. The list is stable
+ * during the aggregator invocation.
+ * The list will be empty if the partition is empty.
+ * </P>
+ * <P>
+ * A non-null {@code aggregator} result is added to the returned
stream.
+ * </P>
+ * <P>
+ * Thus the returned stream will contain a sequence of tuples where the
+ * most recent tuple represents the most up to date aggregation of a
+ * partition.
+ *
+ * @param <U> Tuple type
+ * @param period how often to invoke the aggregator
+ * @param unit TimeUnit for {@code period}
+ * @param aggregator
+ * Logic to aggregation a partition.
+ * @return A stream that contains the latest aggregations of
partitions in this window.
+ *
+ * @see #aggregate(BiFunction)
+ */
+ <U> TStream<U> timedAggregate(long period, TimeUnit unit,
BiFunction<List<T>, K, U> aggregator);
/**
+ * Declares a stream that represents a
+ * timer triggered batched aggregation of
+ * partitions in this window.
+ * <P>
+ * Periodically trigger an invocation of
+ * {@code batcher.apply(tuples, key)}, where {@code tuples} is
+ * a {@code List<T>} containing all the tuples in the partition in
+ * insertion order from oldest to newest The list is stable
+ * during the batcher invocation.
+ * The list will be empty if the partition is empty.
+ * <P>
+ * A non-null {@code batcher} result is added to the returned stream.
+ * The partition's contents are cleared after a batch is processed.
+ * </P>
+ * <P>
+ * Thus the returned stream will contain a sequence of tuples where the
+ * most recent tuple represents the most up to date aggregation of a
+ * partition.
+ *
+ * @param <U> Tuple type
+ * @param period how often to invoke the batcher
+ * @param unit TimeUnit for {@code period}
+ * @param batcher
+ * Logic to aggregation a partition.
+ * @return A stream that contains the latest aggregations of
partitions in this window.
+ *
+ * @see #batch(BiFunction)
+ */
+ <U> TStream<U> timedBatch(long period, TimeUnit unit,
BiFunction<List<T>, K, U> batcher);
+
+ /**
+ * Declares a stream that represents an aggregation of
+ * partitions in this window using the specified {@link Window}.
+ * <P>
+ * This method makes it easier to create aggregation streams
+ * using the lower level {@code Window} API to construct and
+ * supply windows with configurations not directly exposed by {@code
TWindow}.
+ *
+ * @param window the window implementation
+ * @param aggregator the aggregation function
+ *
+ * @return A stream that contains the latest aggregations of
partitions in this window.
+ */
+ <U, L extends List<T>> TStream<U> process(Window<T,K,L> window,
BiFunction<List<T>, K, U> aggregator);
--- End diff --
Uh... :-) Yeah, something's not quite right in that the TWindow.last(...)
established the time-based/count-based nature of the TWindow and as presented
above, the supplied Window can violate that. Maybe it's more like
TStream.last(window, processor)? Mostly I was just trying to expose a way the
make it easier to utilize the Window API without having to know about other
impl details like synchronizing the processor, creating an Aggregator oplet, ...
> Add timer triggered window aggregations
> ---------------------------------------
>
> Key: QUARKS-230
> URL: https://issues.apache.org/jira/browse/QUARKS-230
> Project: Quarks
> Issue Type: New Feature
> Reporter: Dale LaBossiere
> Assignee: Dale LaBossiere
>
> A recent use case involved the desire for "timer triggered" instead of
> "partition content change triggered" window aggregations. E.g., "I want to
> trigger an aggregation every second over the last 10 tuples in a window
> partition" -- a count-based window with timer triggered aggregations.
> I propose adding 3 methods to TWindow. Two in direct support of "timer
> triggered" aggregations (this processing model seems like it could be common
> enough to warrant making it conveniently available) and one to make it easier
> to use the lower level Window API to define and use other processing models.
> I'm submitting a PR with the details for review but the net is these
> additions to TWindow:
> ```
> <U> TStream<U> timedAggregate(long period, TimeUnit unit,
> BiFunction<List<T>, K, U> aggregator);
> <U> TStream<U> timedBatch(long period, TimeUnit unit, BiFunction<List<T>,
> K, U> batcher);
> <U, L extends List<T>> TStream<U> process(Window<T,K,L> window,
> BiFunction<List<T>, K, U> aggregator);
> ```
> See https://github.com/apache/incubator-quarks/pull/167
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)