[ https://issues.apache.org/jira/browse/STORM-1873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15312600#comment-15312600 ]
ASF GitHub Bot commented on STORM-1873: --------------------------------------- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1453#discussion_r65573427 --- Diff: storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java --- @@ -255,6 +255,22 @@ public BaseWindowedBolt withTimestampField(String fieldName) { } /** + * Specify a stream id on which late tuples are going to be emitted. When using this method, + * {@link Config#TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME} should be defined on a per component basis before + * calling this method, otherwise {@link IllegalArgumentException} will be thrown. + * + * @param streamId the name of the stream used to emit late tuples on + */ + public BaseWindowedBolt withLateTupleStream(String streamId) { + if (!windowConfiguration.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME)) { --- End diff -- On re-thinking, can this validation be moved to `initWindowManager` or `validate` so that we don't impose an ordering in the builder? > Reemit late tuples in windowed mode > ----------------------------------- > > Key: STORM-1873 > URL: https://issues.apache.org/jira/browse/STORM-1873 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core > Reporter: Balazs Kossovics > > Currently late tuples are just logged (and acknowledged in the coming 1.0.2), > but in our use-case it would be desirable to emit them on a different stream > than the default. > I implemented a first version, where every windowed bolt are going to have a > '_late' stream by default, and component-specific parameter > (Config.TOPOLOGY_BOLTS_EMIT_LATE_TUPLE) the definer of the bolt could turn on > or off the emission of the late tuples on this stream. > One could turn on the emission of late tuples with a builder method like this: > {code:title=MyWindowedBolt.java|borderStyle=solid} > new MyWindowedBolt() > .withTimestampField("timestamp") > .withLateTupleEmission(true) > .withWindow( > new BaseWindowedBolt.Duration(1, TimeUnit.MINUTES), > new BaseWindowedBolt.Duration(1, TimeUnit.SECONDS) > ); > {code} > What do you think about it? -- This message was sent by Atlassian JIRA (v6.3.4#6332)