[ https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15056738#comment-15056738 ]
ASF GitHub Bot commented on STORM-1187: --------------------------------------- Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/900#discussion_r47559536 --- Diff: storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java --- @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.windowing; + +import backtype.storm.generated.GlobalStreamId; +import backtype.storm.topology.FailedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Tracks tuples across input streams and periodically emits watermark events. + * Watermark event timestamp is the minimum of the latest tuple timestamps + * across all the input streams (minus the lag). Once a watermark event is emitted + * any tuple coming with an earlier timestamp can be considered as late events. + */ +public class WaterMarkEventGenerator<T> implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(WaterMarkEventGenerator.class); + private final WindowManager<T> windowManager; + private final int eventTsLag; + private final Set<GlobalStreamId> inputStreams; + private final Map<GlobalStreamId, Long> streamToTs; + private final ScheduledExecutorService executorService; + private final ScheduledFuture<?> executorFuture; + private long lastWaterMarkTs = 0; + + public WaterMarkEventGenerator(WindowManager<T> windowManager, int interval, + int eventTsLag, Set<GlobalStreamId> inputStreams) { + this.windowManager = windowManager; + streamToTs = new ConcurrentHashMap<>(); + executorService = Executors.newSingleThreadScheduledExecutor(); + this.executorFuture = executorService.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS); + this.eventTsLag = eventTsLag; + this.inputStreams = inputStreams; + } + + public void track(GlobalStreamId stream, long ts) { + Long currentVal = streamToTs.get(stream); + if (currentVal == null || ts > currentVal) { + streamToTs.put(stream, ts); + } + checkFailures(); + } + + @Override + public void run() { + try { + long waterMarkTs = computeWaterMarkTs(); + if (waterMarkTs > lastWaterMarkTs) { + this.windowManager.add(new WaterMarkEvent<T>(waterMarkTs - eventTsLag)); + lastWaterMarkTs = waterMarkTs; + } + } catch (Throwable th) { + LOG.error("Failed while processing watermark event ", th); + throw th; + } + } + + /** + * Computes the min ts across all streams. + */ + private long computeWaterMarkTs() { + long ts = Long.MIN_VALUE; + // only if some data has arrived on each input stream --- End diff -- why do we have to wait for data to arrive on each input stream? > Support for late and out of order events in time based windows > -------------------------------------------------------------- > > Key: STORM-1187 > URL: https://issues.apache.org/jira/browse/STORM-1187 > Project: Apache Storm > Issue Type: Sub-task > Reporter: Arun Mahadevan > Assignee: Arun Mahadevan > > Right now the time based windows uses the timestamp when the tuple is > received by the bolt. > However there are use cases where the tuples can be processed based on the > time when they are actually generated vs the time when they are received. So > we need to add support for processing events with a time lag and also have > some way to specify and read tuple timestamps. -- This message was sent by Atlassian JIRA (v6.3.4#6332)