[
https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15208265#comment-15208265
]
ASF GitHub Bot commented on STORM-676:
--------------------------------------
Github user arunmahadevan commented on a diff in the pull request:
https://github.com/apache/storm/pull/1072#discussion_r57143268
--- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
@@ -594,19 +607,155 @@ public Stream aggregate(Fields inputFields,
ReducerAggregator agg, Fields functi
.aggregate(inputFields, agg, functionFields)
.chainEnd();
}
-
+
+ /**
+ * Returns a stream of tuples which are aggregated results of a
tumbling window with every {@code windowCount} of tuples.
+ *
+ * @param windowCount represents no of tuples in the window
+ * @param windowStoreFactory intermediary tuple store for storing
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+ public Stream tumblingWindow(int windowCount, WindowsStoreFactory
windowStoreFactory,
+ Fields inputFields, Aggregator
aggregator, Fields functionFields) {
+ return window(TumblingCountWindow.of(windowCount),
windowStoreFactory, inputFields, aggregator, functionFields);
+ }
+
+ /**
+ * Returns a stream of tuples which are aggregated results of a
sliding window with every {@code windowCount} of tuples
+ * and slides the window with {@code slideCount}.
+ *
+ * @param windowCount represents tuples count of a window
+ * @param slideCount the number of tuples after which the window slides
+ * @param windowStoreFactory intermediary tuple store for storing
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+ public Stream slidingWindow(int windowCount, int slideCount,
WindowsStoreFactory windowStoreFactory,
+ Fields inputFields, Aggregator
aggregator, Fields functionFields) {
+ return window(SlidingCountWindow.of(windowCount, slideCount),
windowStoreFactory, inputFields, aggregator, functionFields);
+ }
+
+ /**
+ * Returns a stream of tuples which are aggregated results of a window
tumbles at duration of {@code windowDuration}
+ *
+ * @param windowDuration represents tumbling window duration
configuration
+ * @param windowStoreFactory intermediary tuple store for storing
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+ public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration,
WindowsStoreFactory windowStoreFactory,
+ Fields inputFields, Aggregator
aggregator, Fields functionFields) {
+ return window(TumblingDurationWindow.of(windowDuration),
windowStoreFactory, inputFields, aggregator, functionFields);
+ }
+
+ /**
+ * Returns a stream of tuples which are aggregated results of a window
which slides at duration of {@code slideDuration}
+ * and completes a window at {@code windowDuration}
+ *
+ * @param windowDuration represents window duration configuration
+ * @param slidingInterval the time duration after which the window
slides
+ * @param windowStoreFactory intermediary tuple store for storing
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+ public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration,
BaseWindowedBolt.Duration slidingInterval,
+ WindowsStoreFactory
windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields
functionFields) {
+ return window(SlidingDurationWindow.of(windowDuration,
slidingInterval), windowStoreFactory, inputFields, aggregator, functionFields);
+ }
+
+ /**
+ * Returns a stream of aggregated results based on the given window
configuration which uses inmemory windowing tuple store.
+ *
+ * @param windowConfig window configuration like window length and
slide length.
+ * @param inputFields input fields
+ * @param aggregator aggregator to run on the window of tuples to
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ * @return
+ */
+ public Stream window(WindowConfig windowConfig, Fields inputFields,
Aggregator aggregator, Fields functionFields) {
+ // this store is used only for storing triggered aggregated
results but not tuples as storeTuplesInStore is set
+ // as false int he below call.
+ InMemoryWindowsStoreFactory inMemoryWindowsStoreFactory = new
InMemoryWindowsStoreFactory();
+ return window(windowConfig, inMemoryWindowsStoreFactory,
inputFields, aggregator, functionFields, false);
+ }
+
+ /**
+ * Returns stream of aggregated results based on the given window
configuration.
+ *
+ * @param windowConfig window configuration like window length and
slide length.
+ * @param windowStoreFactory intermediary tuple store for storing
tuples for windowing
+ * @param inputFields input fields
+ * @param aggregator aggregator to run on the window of tuples to
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ * @return
--- End diff --
`@return the new Stream`
> Storm Trident support for sliding/tumbling windows
> --------------------------------------------------
>
> Key: STORM-676
> URL: https://issues.apache.org/jira/browse/STORM-676
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-core
> Reporter: Sriharsha Chintalapani
> Assignee: Satish Duggana
> Fix For: 1.0.0, 2.0.0
>
> Attachments: StormTrident_windowing_support-676.pdf
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)