[ 
https://issues.apache.org/jira/browse/STORM-1167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14989733#comment-14989733
 ] 

ASF GitHub Bot commented on STORM-1167:
---------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43894843
  
    --- Diff: 
storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java ---
    @@ -0,0 +1,200 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.IOutputCollector;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.windowing.TupleWindowImpl;
    +import backtype.storm.windowing.WindowLifecycleListener;
    +import backtype.storm.windowing.WindowManager;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * An {@link IWindowedBolt} wrapper that does the windowing of tuples.
    + */
    +public class WindowedBoltExecutor implements IRichBolt {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(WindowedBoltExecutor.class);
    +
    +    private IWindowedBolt bolt;
    +    private transient WindowedOutputCollector windowedOutputCollector;
    +    private transient WindowLifecycleListener<Tuple> listener;
    +    private transient WindowManager<Tuple> windowManager;
    +
    +    public WindowedBoltExecutor(IWindowedBolt bolt) {
    +        this.bolt = bolt;
    +    }
    +
    +    private int getTopologyTimeoutMillis(Map stormConf) {
    +        if 
(stormConf.containsKey(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)) {
    +            boolean timeOutsEnabled = (boolean) 
stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
    +            if (!timeOutsEnabled) {
    +                return Integer.MAX_VALUE;
    +            }
    +        }
    +        int timeout = 0;
    +        if (stormConf.containsKey(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
    +            timeout = ((Number) 
stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
    +        }
    +        return timeout * 1000;
    +    }
    +
    +    private void ensureDurationLessThanTimeout(int duration, int timeout) {
    +        if (duration > timeout) {
    +            throw new IllegalArgumentException("Window duration (length + 
sliding interval) value " + duration +
    +                                                       " is more than " + 
Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS +
    +                                                       " value " + 
timeout);
    +        }
    +    }
    +
    +    // TODO: add more validation
    +    private void validate(Map stormConf, Count windowLengthCount, Duration 
windowLengthDuration,
    +                          Count slidingIntervalCount, Duration 
slidingIntervalDuration) {
    +
    +        int topologyTimeout = getTopologyTimeoutMillis(stormConf);
    +        if (windowLengthDuration != null && slidingIntervalDuration != 
null) {
    +            ensureDurationLessThanTimeout(windowLengthDuration.value + 
slidingIntervalDuration.value, topologyTimeout);
    +        } else if (windowLengthDuration != null) {
    +            ensureDurationLessThanTimeout(windowLengthDuration.value, 
topologyTimeout);
    +        }
    +    }
    +
    +    private WindowManager<Tuple> 
initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map 
stormConf) {
    +        WindowManager<Tuple> manager = new 
WindowManager<>(lifecycleListener);
    +        Duration windowLengthDuration = null;
    +        Count windowLengthCount = null;
    +        Duration slidingIntervalDuration = null;
    +        Count slidingIntervalCount = null;
    +        if 
(stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
    +            windowLengthCount = new Count(((Number) 
stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue());
    +        } else if 
(stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) {
    +            windowLengthDuration = new Duration(
    +                    ((Number) 
stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(),
    +                    TimeUnit.MILLISECONDS);
    +        }
    +
    +        if 
(stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) {
    +            slidingIntervalCount = new Count(((Number) 
stormConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue());
    +        } else if 
(stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) {
    +            slidingIntervalDuration = new Duration(((Number) 
stormConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)).intValue(), 
TimeUnit.MILLISECONDS);
    +        } else {
    +            // default is a sliding window of count 1
    +            slidingIntervalCount = new Count(1);
    +        }
    +        // validate
    +        validate(stormConf, windowLengthCount, windowLengthDuration,
    +                 slidingIntervalCount, slidingIntervalDuration);
    +        if (windowLengthCount != null) {
    +            manager.setWindowLength(windowLengthCount);
    +        } else {
    +            manager.setWindowLength(windowLengthDuration);
    +        }
    +        if (slidingIntervalCount != null) {
    +            manager.setSlidingInterval(slidingIntervalCount);
    +        } else {
    +            manager.setSlidingInterval(slidingIntervalDuration);
    +        }
    +        return manager;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
    +        this.windowedOutputCollector = new 
WindowedOutputCollector(collector);
    +        bolt.prepare(stormConf, context, windowedOutputCollector);
    +        this.listener = newWindowLifecycleListener();
    +        this.windowManager = initWindowManager(listener, stormConf);
    +        LOG.info("Initialized window manager {} ", this.windowManager);
    --- End diff --
    
    This looks more like a debug statement to me.


> Add sliding & tumbling window support for core storm
> ----------------------------------------------------
>
>                 Key: STORM-1167
>                 URL: https://issues.apache.org/jira/browse/STORM-1167
>             Project: Apache Storm
>          Issue Type: Improvement
>            Reporter: Arun Mahadevan
>            Assignee: Arun Mahadevan
>
> Currently, topologies that needs windowing support requires writing custom 
> logic inside bolts making it tedious to handle the windowing and acking logic 
> with custom logic.
> We can add framework level support to core storm bolts to process tuples in a 
> time or a count based window. Sliding and tumbling windows can be supported.
> Later this can be extended to trident apis as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to