[ https://issues.apache.org/jira/browse/STORM-1873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15312444#comment-15312444 ]
ASF GitHub Bot commented on STORM-1873: --------------------------------------- Github user kosii commented on a diff in the pull request: https://github.com/apache/storm/pull/1453#discussion_r65557653 --- Diff: storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java --- @@ -268,8 +277,12 @@ public void execute(Tuple input) { if (waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) { windowManager.add(input, ts); } else { + if (emitLateTuples()) { + windowedOutputCollector.emit(LATE_TUPLE_STREAM, input, new Values(input)); + } else { + LOG.info("Received a late tuple {} with ts {}. This will not processed.", input, ts); --- End diff -- fixed > Reemit late tuples in windowed mode > ----------------------------------- > > Key: STORM-1873 > URL: https://issues.apache.org/jira/browse/STORM-1873 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core > Reporter: Balazs Kossovics > > Currently late tuples are just logged (and acknowledged in the coming 1.0.2), > but in our use-case it would be desirable to emit them on a different stream > than the default. > I implemented a first version, where every windowed bolt are going to have a > '_late' stream by default, and component-specific parameter > (Config.TOPOLOGY_BOLTS_EMIT_LATE_TUPLE) the definer of the bolt could turn on > or off the emission of the late tuples on this stream. > One could turn on the emission of late tuples with a builder method like this: > {code:title=MyWindowedBolt.java|borderStyle=solid} > new MyWindowedBolt() > .withTimestampField("timestamp") > .withLateTupleEmission(true) > .withWindow( > new BaseWindowedBolt.Duration(1, TimeUnit.MINUTES), > new BaseWindowedBolt.Duration(1, TimeUnit.SECONDS) > ); > {code} > What do you think about it? -- This message was sent by Atlassian JIRA (v6.3.4#6332)