Github user dlaboss commented on a diff in the pull request:
https://github.com/apache/incubator-quarks/pull/167#discussion_r71048226
--- Diff: api/topology/src/main/java/quarks/topology/TWindow.java ---
@@ -105,8 +129,85 @@ Licensed to the Apache Software Foundation (ASF) under
one
* @return A stream that contains the latest aggregations of
partitions in this window.
*/
<U> TStream<U> batch(BiFunction<List<T>, K, U> batcher);
+
+ /**
+ * Declares a stream that is a continuous, sliding,
+ * timer triggered aggregation of
+ * partitions in this window.
+ * <P>
+ * Periodically trigger an invocation of
+ * {@code aggregator.apply(tuples, key)}, where {@code tuples} is
+ * a {@code List<T>} containing all the tuples in the partition in
+ * insertion order from oldest to newest. The list is stable
+ * during the aggregator invocation.
+ * The list will be empty if the partition is empty.
+ * </P>
+ * <P>
+ * A non-null {@code aggregator} result is added to the returned
stream.
+ * </P>
+ * <P>
+ * Thus the returned stream will contain a sequence of tuples where the
+ * most recent tuple represents the most up to date aggregation of a
+ * partition.
+ *
+ * @param <U> Tuple type
+ * @param period how often to invoke the aggregator
+ * @param unit TimeUnit for {@code period}
+ * @param aggregator
+ * Logic to aggregation a partition.
+ * @return A stream that contains the latest aggregations of
partitions in this window.
+ *
+ * @see #aggregate(BiFunction)
+ */
+ <U> TStream<U> timedAggregate(long period, TimeUnit unit,
BiFunction<List<T>, K, U> aggregator);
--- End diff --
A reasonable question.
In the particular use case that came up, it was OK/desirable that an
unchanged partition still yielded an aggregation. e.g., the, less than
perfect, interface that was desired between the device and the iothub was to
publish events on the "current location" of the device even if it hadn't moved
a meaningful distance.
A more efficient, less chatty, device/iothub interface would have been to
only publish under that condition.
So maybe a timed-aggregator interface that only supported
timed-trigger-if-changed semantics might not be acceptible?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---