[ 
https://issues.apache.org/jira/browse/STORM-1167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14989756#comment-14989756
 ] 

ASF GitHub Bot commented on STORM-1167:
---------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43896077
  
    --- Diff: 
storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java ---
    @@ -0,0 +1,184 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology.base;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.IWindowedBolt;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.windowing.TupleWindow;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +public class BaseWindowedBolt implements IWindowedBolt {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(BaseWindowedBolt.class);
    +
    +    private transient Map<String, Object> windowConfiguration;
    +
    +    /**
    +     * Holds a count value for count based windows and sliding intervals.
    +     */
    +    public static class Count {
    +        public final int value;
    +
    +        public Count(int value) {
    +            this.value = value;
    +        }
    +    }
    +
    +    /**
    +     * Holds a Time duration for time based windows and sliding intervals.
    +     */
    +    public static class Duration {
    +        public final int value;
    +
    +        public Duration(int value, TimeUnit timeUnit) {
    +            this.value = (int) timeUnit.toMillis(value);
    +        }
    +    }
    +
    +    protected BaseWindowedBolt() {
    +        windowConfiguration = new HashMap<>();
    +    }
    +
    +    private BaseWindowedBolt withWindowLength(Count count) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT, 
count.value);
    +        return this;
    +    }
    +
    +    private BaseWindowedBolt withWindowLength(Duration duration) {
    +        
windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 
duration.value);
    +        return this;
    +    }
    +
    +    private BaseWindowedBolt withSlidingInterval(Count count) {
    +        
windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT, 
count.value);
    +        return this;
    +    }
    +
    +    private BaseWindowedBolt withSlidingInterval(Duration duration) {
    +        
windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 
duration.value);
    +        return this;
    +    }
    +
    +    /**
    +     * Tuple count based sliding window configuration.
    +     *
    +     * @param windowLength    the number of tuples in the window
    +     * @param slidingInterval the number of tuples after which the window 
slides
    +     */
    +    public BaseWindowedBolt withWindow(Count windowLength, Count 
slidingInterval) {
    +        return 
withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * Tuple count and time duration based sliding window configuration.
    +     *
    +     * @param windowLength    the number of tuples in the window
    +     * @param slidingInterval the time duration after which the window 
slides
    +     */
    +    public BaseWindowedBolt withWindow(Count windowLength, Duration 
slidingInterval) {
    +        return 
withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * Time duration and count based sliding window configuration.
    +     *
    +     * @param windowLength    the time duration of the window
    +     * @param slidingInterval the number of tuples after which the window 
slides
    +     */
    +    public BaseWindowedBolt withWindow(Duration windowLength, Count 
slidingInterval) {
    +        return 
withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * Time duration based sliding window configuration.
    +     *
    +     * @param windowLength    the time duration of the window
    +     * @param slidingInterval the time duration after which the window 
slides
    +     */
    +    public BaseWindowedBolt withWindow(Duration windowLength, Duration 
slidingInterval) {
    +        return 
withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * A tuple count based window that slides with every incoming tuple.
    +     *
    +     * @param windowLength the number of tuples in the window
    +     */
    +    public BaseWindowedBolt withWindow(Count windowLength) {
    +        return withWindowLength(windowLength).withSlidingInterval(new 
Count(1));
    +    }
    +
    +
    +    /**
    +     * A time duration based window that slides with every incoming tuple.
    +     *
    +     * @param windowLength the time duration of the window
    +     */
    +    public BaseWindowedBolt withWindow(Duration windowLength) {
    +        return withWindowLength(windowLength).withSlidingInterval(new 
Count(1));
    +    }
    +
    +    /**
    +     * A count based tumbling window.
    +     *
    +     * @param count the number of tuples after which the window tumbles
    +     */
    +    public BaseWindowedBolt withTumblingWindow(Count count) {
    +        return withWindowLength(count).withSlidingInterval(count);
    +    }
    +
    +    /**
    +     * A time duration based tumbling window.
    +     *
    +     * @param duration the time duration after which the window tumbles
    +     */
    +    public BaseWindowedBolt withTumblingWindow(Duration duration) {
    +        return withWindowLength(duration).withSlidingInterval(duration);
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
    +    }
    --- End diff --
    
    Can you put in a comment in all the empty methods, something like 
```\\NOOP``` just so it is obvious that it is intended to be left blank.


> Add sliding & tumbling window support for core storm
> ----------------------------------------------------
>
>                 Key: STORM-1167
>                 URL: https://issues.apache.org/jira/browse/STORM-1167
>             Project: Apache Storm
>          Issue Type: Improvement
>            Reporter: Arun Mahadevan
>            Assignee: Arun Mahadevan
>
> Currently, topologies that needs windowing support requires writing custom 
> logic inside bolts making it tedious to handle the windowing and acking logic 
> with custom logic.
> We can add framework level support to core storm bolts to process tuples in a 
> time or a count based window. Sliding and tumbling windows can be supported.
> Later this can be extended to trident apis as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to