Repository: storm
Updated Branches:
  refs/heads/1.x-branch 1783d7ae9 -> b0db246ad


http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/TridentBatchTuple.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/TridentBatchTuple.java 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/TridentBatchTuple.java
new file mode 100644
index 0000000..13643db
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/TridentBatchTuple.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+/**
+ *
+ */
+public class TridentBatchTuple {
+    final String effectiveBatchId;
+    final long timeStamp;
+    final int tupleIndex;
+    final TridentTuple tridentTuple;
+
+    public TridentBatchTuple(String effectiveBatchId, long timeStamp, int 
tupleIndex) {
+        this(effectiveBatchId, timeStamp, tupleIndex, null);
+    }
+
+    public TridentBatchTuple(String effectiveBatchId, long timeStamp, int 
tupleIndex, TridentTuple tridentTuple) {
+        this.effectiveBatchId = effectiveBatchId;
+        this.timeStamp = timeStamp;
+        this.tupleIndex = tupleIndex;
+        this.tridentTuple = tridentTuple;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
new file mode 100644
index 0000000..c2d9362
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.Aggregator;
+import org.apache.storm.trident.planner.ProcessorContext;
+import org.apache.storm.trident.planner.TridentProcessor;
+import org.apache.storm.trident.planner.processor.FreshCollector;
+import org.apache.storm.trident.planner.processor.TridentContext;
+import org.apache.storm.trident.spout.IBatchID;
+import org.apache.storm.trident.tuple.ConsList;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.tuple.TridentTupleView;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+/**
+ * {@code TridentProcessor} implementation for windowing operations on trident 
stream.
+ *
+ */
+public class WindowTridentProcessor implements TridentProcessor {
+    private static final Logger log = 
LoggerFactory.getLogger(WindowTridentProcessor.class);
+
+    public static final String TRIGGER_INPROCESS_PREFIX = "tip" + 
WindowsStore.KEY_SEPARATOR;
+    public static final String TRIGGER_PREFIX = "tr" + 
WindowsStore.KEY_SEPARATOR;
+    public static final String TRIGGER_COUNT_PREFIX = "tc" + 
WindowsStore.KEY_SEPARATOR;
+
+    public static final String TRIGGER_FIELD_NAME = "_task_info";
+    public static final long DEFAULT_INMEMORY_TUPLE_CACHE_LIMIT = 100l;
+
+    private final String windowId;
+    private final Fields inputFields;
+    private final Aggregator aggregator;
+    private final boolean storeTuplesInStore;
+
+    private String windowTriggerInprocessId;
+    private WindowConfig windowConfig;
+    private WindowsStoreFactory windowStoreFactory;
+    private WindowsStore windowStore;
+
+    private Map conf;
+    private TopologyContext topologyContext;
+    private FreshCollector collector;
+    private TridentTupleView.ProjectionFactory projection;
+    private TridentContext tridentContext;
+    private ITridentWindowManager tridentWindowManager;
+    private String windowTaskId;
+
+    public WindowTridentProcessor(WindowConfig windowConfig, String 
uniqueWindowId, WindowsStoreFactory windowStoreFactory,
+                                  Fields inputFields, Aggregator aggregator, 
boolean storeTuplesInStore) {
+
+        this.windowConfig = windowConfig;
+        this.windowId = uniqueWindowId;
+        this.windowStoreFactory = windowStoreFactory;
+        this.inputFields = inputFields;
+        this.aggregator = aggregator;
+        this.storeTuplesInStore = storeTuplesInStore;
+    }
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, TridentContext 
tridentContext) {
+        this.conf = conf;
+        this.topologyContext = context;
+        List<TridentTuple.Factory> parents = 
tridentContext.getParentTupleFactories();
+        if (parents.size() != 1) {
+            throw new RuntimeException("Aggregation related operation can only 
have one parent");
+        }
+
+        Long maxTuplesCacheSize = getWindowTuplesCacheSize(conf);
+
+        this.tridentContext = tridentContext;
+        collector = new FreshCollector(tridentContext);
+        projection = new TridentTupleView.ProjectionFactory(parents.get(0), 
inputFields);
+
+        windowStore = windowStoreFactory.create();
+        windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + 
topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR;
+        windowTriggerInprocessId = 
getWindowTriggerInprocessIdPrefix(windowTaskId);
+
+        tridentWindowManager = storeTuplesInStore ?
+                new StoreBasedTridentWindowManager(windowConfig, windowTaskId, 
windowStore, aggregator, tridentContext.getDelegateCollector(), 
maxTuplesCacheSize, inputFields)
+                : new InMemoryTridentWindowManager(windowConfig, windowTaskId, 
windowStore, aggregator, tridentContext.getDelegateCollector());
+
+        tridentWindowManager.prepare();
+    }
+
+    public static String getWindowTriggerInprocessIdPrefix(String 
windowTaskId) {
+        return TRIGGER_INPROCESS_PREFIX + windowTaskId;
+    }
+
+    public static String getWindowTriggerTaskPrefix(String windowTaskId) {
+        return TRIGGER_PREFIX + windowTaskId;
+    }
+
+    private Long getWindowTuplesCacheSize(Map conf) {
+        if 
(conf.containsKey(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT)) {
+            return ((Number) 
conf.get(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT)).longValue();
+        }
+        return DEFAULT_INMEMORY_TUPLE_CACHE_LIMIT;
+    }
+
+    @Override
+    public void cleanup() {
+        log.info("shutting down window manager");
+        tridentWindowManager.shutdown();
+    }
+
+    @Override
+    public void startBatch(ProcessorContext processorContext) {
+        // initialize state for batch
+        processorContext.state[tridentContext.getStateIndex()] = new 
ArrayList<TridentTuple>();
+    }
+
+    @Override
+    public void execute(ProcessorContext processorContext, String streamId, 
TridentTuple tuple) {
+        // add tuple to the batch state
+        Object state = processorContext.state[tridentContext.getStateIndex()];
+        ((List<TridentTuple>) state).add(projection.create(tuple));
+    }
+
+    @Override
+    public void finishBatch(ProcessorContext processorContext) {
+
+        Object batchId = processorContext.batchId;
+        Object batchTxnId = getBatchTxnId(batchId);
+
+        log.debug("Received finishBatch of : {} ", batchId);
+        // get all the tuples in a batch and add it to trident-window-manager
+        List<TridentTuple> tuples = (List<TridentTuple>) 
processorContext.state[tridentContext.getStateIndex()];
+        tridentWindowManager.addTuplesBatch(batchId, tuples);
+
+        List<Integer> pendingTriggerIds = null;
+        List<String> triggerKeys = new ArrayList<>();
+        Iterable<Object> triggerValues = null;
+
+        if (retriedAttempt(batchId)) {
+            pendingTriggerIds = (List<Integer>) 
windowStore.get(inprocessTriggerKey(batchTxnId));
+            for (Integer pendingTriggerId : pendingTriggerIds) {
+                triggerKeys.add(triggerKey(pendingTriggerId));
+            }
+            triggerValues = windowStore.get(triggerKeys);
+        }
+
+        // if there are no trigger values in earlier attempts or this is a new 
batch, emit pending triggers.
+        if(triggerValues == null) {
+            pendingTriggerIds = new ArrayList<>();
+            Queue<StoreBasedTridentWindowManager.TriggerResult> 
pendingTriggers = tridentWindowManager.getPendingTriggers();
+            log.debug("pending triggers at batch: {} and triggers.size: {} ", 
batchId, pendingTriggers.size());
+            try {
+                Iterator<StoreBasedTridentWindowManager.TriggerResult> 
pendingTriggersIter = pendingTriggers.iterator();
+                List<Object> values = new ArrayList<>();
+                StoreBasedTridentWindowManager.TriggerResult triggerResult = 
null;
+                while (pendingTriggersIter.hasNext()) {
+                    triggerResult = pendingTriggersIter.next();
+                    for (List<Object> aggregatedResult : triggerResult.result) 
{
+                        String triggerKey = triggerKey(triggerResult.id);
+                        triggerKeys.add(triggerKey);
+                        values.add(aggregatedResult);
+                        pendingTriggerIds.add(triggerResult.id);
+                    }
+                    pendingTriggersIter.remove();
+                }
+                triggerValues = values;
+            } finally {
+                // store inprocess triggers of a batch in store for batch 
retries for any failures
+                if (!pendingTriggerIds.isEmpty()) {
+                    windowStore.put(inprocessTriggerKey(batchTxnId), 
pendingTriggerIds);
+                }
+            }
+        }
+
+        collector.setContext(processorContext);
+        int i = 0;
+        for (Object resultValue : triggerValues) {
+            collector.emit(new ConsList(new TriggerInfo(windowTaskId, 
pendingTriggerIds.get(i++)), (List<Object>) resultValue));
+        }
+        collector.setContext(null);
+    }
+
+    private String inprocessTriggerKey(Object batchTxnId) {
+        return windowTriggerInprocessId + batchTxnId;
+    }
+
+    public static Object getBatchTxnId(Object batchId) {
+        if (batchId instanceof IBatchID) {
+            return ((IBatchID) batchId).getId();
+        }
+        return null;
+    }
+
+    static boolean retriedAttempt(Object batchId) {
+        if (batchId instanceof IBatchID) {
+            return ((IBatchID) batchId).getAttemptId() > 0;
+        }
+
+        return false;
+    }
+
+    @Override
+    public TridentTuple.Factory getOutputFactory() {
+        return collector.getOutputFactory();
+    }
+
+    public static class TriggerInfo implements Serializable {
+        public final String windowTaskId;
+        public final int triggerId;
+
+        public TriggerInfo(String windowTaskId, int triggerId) {
+            this.windowTaskId = windowTaskId;
+            this.triggerId = triggerId;
+        }
+
+        public String generateTriggerKey() {
+            return generateWindowTriggerKey(windowTaskId, triggerId);
+        }
+
+        @Override
+        public String toString() {
+            return "TriggerInfo{" +
+                    "windowTaskId='" + windowTaskId + '\'' +
+                    ", triggerId=" + triggerId +
+                    '}';
+        }
+    }
+
+    public String triggerKey(int triggerId) {
+        return generateWindowTriggerKey(windowTaskId, triggerId);
+    }
+
+    public static String generateWindowTriggerKey(String windowTaskId, int 
triggerId) {
+        return TRIGGER_PREFIX + windowTaskId + triggerId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
new file mode 100644
index 0000000..378d24f
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.trident.state.State;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@code State} implementation for windowing operation. This is mainly used 
to get callback of commit txId of batches
+ * in which triggers are emitted.
+ *
+ */
+public class WindowsState implements State {
+    private static final Logger log = 
LoggerFactory.getLogger(WindowsState.class);
+
+    private Long currentTxId;
+
+    public WindowsState() {
+    }
+
+    @Override
+    public void beginCommit(Long txId) {
+        currentTxId = txId;
+        log.debug(" WindowsState.beginCommit:: [{}] ", txId);
+    }
+
+    @Override
+    public void commit(Long txId) {
+        log.debug("WindowsState.commit :: [{}]", txId);
+    }
+
+    public Long getCurrentTxId() {
+        return currentTxId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateFactory.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateFactory.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateFactory.java
new file mode 100644
index 0000000..28893c7
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateFactory.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+import java.util.Map;
+
+/**
+ * {@code StateFactory} instance for creating {@code WindowsState} instances.
+ *
+ */
+public class WindowsStateFactory implements StateFactory {
+
+    public WindowsStateFactory() {
+    }
+
+    @Override
+    public State makeState(Map conf, IMetricsContext metrics, int 
partitionIndex, int numPartitions) {
+        return new WindowsState();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
new file mode 100644
index 0000000..45ac885
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.IllegalClassException;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.state.StateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@code StateUpdater<WindowState>} instance which removes successfully 
emitted triggers from store
+ */
+public class WindowsStateUpdater implements StateUpdater<WindowsState> {
+
+    private static final Logger log = 
LoggerFactory.getLogger(WindowsStateUpdater.class);
+
+    private final WindowsStoreFactory windowStoreFactory;
+    private WindowsStore windowsStore;
+
+    public WindowsStateUpdater(WindowsStoreFactory windowStoreFactory) {
+        this.windowStoreFactory = windowStoreFactory;
+    }
+
+    @Override
+    public void updateState(WindowsState state, List<TridentTuple> tuples, 
TridentCollector collector) {
+        Long currentTxId = state.getCurrentTxId();
+        log.debug("Removing triggers using WindowStateUpdater, txnId: {} ", 
currentTxId);
+        for (TridentTuple tuple : tuples) {
+            try {
+                Object fieldValue = 
tuple.getValueByField(WindowTridentProcessor.TRIGGER_FIELD_NAME);
+                if(! (fieldValue instanceof 
WindowTridentProcessor.TriggerInfo)) {
+                    throw new 
IllegalClassException(WindowTridentProcessor.TriggerInfo.class, 
fieldValue.getClass());
+                }
+                WindowTridentProcessor.TriggerInfo triggerInfo = 
(WindowTridentProcessor.TriggerInfo) fieldValue;
+                String triggerCompletedKey = 
WindowTridentProcessor.getWindowTriggerInprocessIdPrefix(triggerInfo.windowTaskId)+currentTxId;
+
+                log.debug("Removing trigger key [{}] and trigger completed key 
[{}] from store: [{}]", triggerInfo, triggerCompletedKey, windowsStore);
+
+                
windowsStore.removeAll(Lists.newArrayList(triggerInfo.generateTriggerKey(), 
triggerCompletedKey));
+            } catch (Exception ex) {
+                log.warn(ex.getMessage());
+                collector.reportError(ex);
+                throw new FailedException(ex);
+            }
+        }
+    }
+
+    @Override
+    public void prepare(Map conf, TridentOperationContext context) {
+        windowsStore = windowStoreFactory.create();
+    }
+
+    @Override
+    public void cleanup() {
+        windowsStore.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java
new file mode 100644
index 0000000..8904b7b
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import com.google.common.base.Preconditions;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Store for storing window related entities like windowed tuples, triggers 
etc.
+ *
+ */
+public interface WindowsStore extends Serializable {
+
+    /**
+     * This can be used as a separator while generating a key from sequence of 
strings.
+     */
+    public static final String KEY_SEPARATOR = "|";
+
+    public Object get(String key);
+
+    public Iterable<Object> get(List<String> keys);
+
+    public Iterable<String> getAllKeys();
+
+    public void put(String key, Object value);
+
+    public void putAll(Collection<Entry> entries);
+
+    public void remove(String key);
+
+    public void removeAll(Collection<String> keys);
+
+    public void shutdown();
+
+    /**
+     * This class wraps key and value objects which can be passed to {@code 
putAll} method.
+     */
+    public static class Entry implements Serializable {
+        public final String key;
+        public final Object value;
+
+        public Entry(String key, Object value) {
+            nonNullCheckForKey(key);
+            nonNullCheckForValue(value);
+            this.key = key;
+            this.value = value;
+        }
+
+        public static void nonNullCheckForKey(Object key) {
+            Preconditions.checkArgument(key != null, "key argument can not be 
null");
+        }
+
+        public static void nonNullCheckForValue(Object value) {
+            Preconditions.checkArgument(value != null, "value argument can not 
be null");
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java
new file mode 100644
index 0000000..409d672
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import java.io.Serializable;
+
+/**
+ * Factory to create instances of {@code WindowsStore}.
+ *
+ */
+public interface WindowsStoreFactory extends Serializable {
+
+    /**
+     * Creates a window store
+     *
+     * @return
+     */
+    public WindowsStore create();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/BaseWindowConfig.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/BaseWindowConfig.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/BaseWindowConfig.java
new file mode 100644
index 0000000..8f9ef1a
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/BaseWindowConfig.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.config;
+
+/**
+ *
+ */
+public abstract class BaseWindowConfig implements WindowConfig {
+    protected final int windowLength;
+    protected final int slideLength;
+
+    protected BaseWindowConfig(int windowLength, int slideLength) {
+        this.windowLength = windowLength;
+        this.slideLength = slideLength;
+    }
+
+    @Override
+    public int getWindowLength() {
+        return windowLength;
+    }
+
+    @Override
+    public int getSlidingLength() {
+        return slideLength;
+    }
+
+    public void validate() {
+        if (slideLength > windowLength) {
+            throw new IllegalArgumentException("slideLength '" + slideLength + 
"' should always be less than windowLegth '" + windowLength + "'");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
new file mode 100644
index 0000000..a0dd13c
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.config;
+
+/**
+ * Represents configuration of sliding window based on count of events. Window 
of length {@code windowLength} slides
+ * at every count of given {@code slideLength}
+ *
+ */
+public final class SlidingCountWindow extends BaseWindowConfig {
+
+    private SlidingCountWindow(int windowLength, int slideLength) {
+        super(windowLength, slideLength);
+    }
+
+    @Override
+    public Type getWindowType() {
+        return Type.SLIDING_COUNT;
+    }
+
+    public static SlidingCountWindow of(int windowCount, int slidingCount) {
+        return new SlidingCountWindow(windowCount, slidingCount);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
new file mode 100644
index 0000000..f2fe291
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.config;
+
+import org.apache.storm.topology.base.BaseWindowedBolt;
+
+/**
+ * Represents configuration of sliding window based on duration. Window 
duration of {@code windowLength} slides
+ * at every {@code slideLength} interval.
+ *
+ */
+public final class SlidingDurationWindow extends BaseWindowConfig {
+
+    private SlidingDurationWindow(int windowLength, int slideLength) {
+        super(windowLength, slideLength);
+    }
+
+    @Override
+    public Type getWindowType() {
+        return Type.SLIDING_DURATION;
+    }
+
+    public static SlidingDurationWindow of(BaseWindowedBolt.Duration 
windowDuration, BaseWindowedBolt.Duration slidingDuration) {
+        return new SlidingDurationWindow(windowDuration.value, 
slidingDuration.value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
new file mode 100644
index 0000000..a5f3528
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.config;
+
+/**
+ * Represents tumbling count window configuration. Window tumbles at each 
given {@code windowLength} count of events.
+ */
+public final class TumblingCountWindow extends BaseWindowConfig {
+
+    private TumblingCountWindow(int windowLength) {
+        super(windowLength, windowLength);
+    }
+
+    @Override
+    public Type getWindowType() {
+        return Type.TUMBLING_COUNT;
+    }
+
+    public static TumblingCountWindow of(int windowLength) {
+        return new TumblingCountWindow(windowLength);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
new file mode 100644
index 0000000..8beb68d
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.config;
+
+import org.apache.storm.topology.base.BaseWindowedBolt;
+
+/**
+ * Represents tumbling duration window configuration. Window tumbles every 
given {@code windowLength} duration.
+ */
+public final class TumblingDurationWindow extends BaseWindowConfig {
+
+    private TumblingDurationWindow(int windowLength) {
+        super(windowLength, windowLength);
+    }
+
+    @Override
+    public Type getWindowType() {
+        return Type.TUMBLING_DURATION;
+    }
+
+    public static TumblingDurationWindow of(BaseWindowedBolt.Duration 
windowLength) {
+        return new TumblingDurationWindow(windowLength.value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
new file mode 100644
index 0000000..49347e7
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.config;
+
+import java.io.Serializable;
+
+/**
+ * Windowing configuration with window and sliding length.
+ */
+public interface WindowConfig extends Serializable {
+
+    /**
+     * Returns the length of the window.
+     * @return
+     */
+    public int getWindowLength();
+
+    /**
+     * Returns the sliding length of the moving window.
+     * @return
+     */
+    public int getSlidingLength();
+
+    /**
+     * Gives the type of windowing. It can be any of {@code Type} values.
+     *
+     * @return
+     */
+    public Type getWindowType();
+
+    public void validate();
+
+    public enum Type {
+        SLIDING_COUNT,
+        TUMBLING_COUNT,
+        SLIDING_DURATION,
+        TUMBLING_DURATION
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/BaseWindowStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/BaseWindowStrategy.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/BaseWindowStrategy.java
new file mode 100644
index 0000000..ed9befa
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/BaseWindowStrategy.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.strategy;
+
+import org.apache.storm.trident.windowing.config.WindowConfig;
+
+/**
+ *
+ */
+public abstract class BaseWindowStrategy<T> implements WindowStrategy<T> {
+    protected final WindowConfig windowConfig;
+
+    public BaseWindowStrategy(WindowConfig windowConfig) {
+        this.windowConfig = windowConfig;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingCountWindowStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingCountWindowStrategy.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingCountWindowStrategy.java
new file mode 100644
index 0000000..c26b795
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingCountWindowStrategy.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.strategy;
+
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.windowing.CountEvictionPolicy;
+import org.apache.storm.windowing.CountTriggerPolicy;
+import org.apache.storm.windowing.EvictionPolicy;
+import org.apache.storm.windowing.TriggerHandler;
+import org.apache.storm.windowing.TriggerPolicy;
+
+/**
+ * This class represents sliding window strategy based on the sliding window 
count and sliding interval count from the
+ * given {@code slidingCountWindow} configuration.
+ */
+public class SlidingCountWindowStrategy<T> extends BaseWindowStrategy<T> {
+
+    public SlidingCountWindowStrategy(WindowConfig slidingCountWindow) {
+        super(slidingCountWindow);
+    }
+
+    /**
+     * Returns a {@code TriggerPolicy} which triggers for every count of given 
sliding window.
+     *
+     * @param triggerHandler
+     * @param evictionPolicy
+     * @return
+     */
+    @Override
+    public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, 
EvictionPolicy<T> evictionPolicy) {
+        return new CountTriggerPolicy<>(windowConfig.getSlidingLength(), 
triggerHandler, evictionPolicy);
+    }
+
+    /**
+     * Returns an {@code EvictionPolicy} instance which evicts elements after 
a count of given window length.
+     *
+     * @return
+     */
+    @Override
+    public EvictionPolicy<T> getEvictionPolicy() {
+        return new CountEvictionPolicy<>(windowConfig.getWindowLength());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java
new file mode 100644
index 0000000..9e71220
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.strategy;
+
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.windowing.EvictionPolicy;
+import org.apache.storm.windowing.TimeEvictionPolicy;
+import org.apache.storm.windowing.TimeTriggerPolicy;
+import org.apache.storm.windowing.TriggerHandler;
+import org.apache.storm.windowing.TriggerPolicy;
+
+/**
+ * This class represents sliding window strategy based on the sliding window 
duration and sliding interval from the
+ * given {@code slidingCountWindow} configuration.
+ *
+ **/
+public final class SlidingDurationWindowStrategy<T> extends 
BaseWindowStrategy<T> {
+
+    public SlidingDurationWindowStrategy(WindowConfig slidingDurationWindow) {
+        super(slidingDurationWindow);
+    }
+
+    /**
+     * Returns a {@code TriggerPolicy} which triggers for every configured 
sliding window duration.
+     *
+     * @param triggerHandler
+     * @param evictionPolicy
+     * @return
+     */
+    @Override
+    public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, 
EvictionPolicy<T> evictionPolicy) {
+        return new TimeTriggerPolicy<>(windowConfig.getSlidingLength(), 
triggerHandler, evictionPolicy);
+    }
+
+    /**
+     * Returns an {@code EvictionPolicy} instance which evicts elements after 
window duration is reached.
+     *
+     * @return
+     */
+    @Override
+    public EvictionPolicy<T> getEvictionPolicy() {
+        return new TimeEvictionPolicy<>(windowConfig.getWindowLength());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingCountWindowStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingCountWindowStrategy.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingCountWindowStrategy.java
new file mode 100644
index 0000000..5e4d6fe
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingCountWindowStrategy.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.strategy;
+
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.windowing.CountEvictionPolicy;
+import org.apache.storm.windowing.CountTriggerPolicy;
+import org.apache.storm.windowing.EvictionPolicy;
+import org.apache.storm.windowing.TriggerHandler;
+import org.apache.storm.windowing.TriggerPolicy;
+
+/**
+ * This class represents tumbling window strategy based on the window count 
from the
+ * given {@code slidingCountWindow} configuration. In this strategy , window 
and sliding lengths are equal.
+ *
+ */
+public final class TumblingCountWindowStrategy<T> extends 
BaseWindowStrategy<T> {
+
+    public TumblingCountWindowStrategy(WindowConfig tumblingCountWindow) {
+        super(tumblingCountWindow);
+    }
+
+    /**
+     * Returns a {@code TriggerPolicy} which triggers for every count of given 
sliding window.
+
+     * @param triggerHandler
+     * @param evictionPolicy
+     * @return
+     */
+    @Override
+    public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, 
EvictionPolicy<T> evictionPolicy) {
+        return new CountTriggerPolicy<>(windowConfig.getSlidingLength(), 
triggerHandler, evictionPolicy);
+    }
+
+    /**
+     * Returns an {@code EvictionPolicy} instance which evicts elements after 
a count of given window length.
+     *
+     * @return
+     */
+    @Override
+    public EvictionPolicy<T> getEvictionPolicy() {
+        return new CountEvictionPolicy<>(windowConfig.getWindowLength());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingDurationWindowStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingDurationWindowStrategy.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingDurationWindowStrategy.java
new file mode 100644
index 0000000..4478667
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingDurationWindowStrategy.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.strategy;
+
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.windowing.EvictionPolicy;
+import org.apache.storm.windowing.TimeEvictionPolicy;
+import org.apache.storm.windowing.TimeTriggerPolicy;
+import org.apache.storm.windowing.TriggerHandler;
+import org.apache.storm.windowing.TriggerPolicy;
+
+/**
+ * This class represents tumbling window strategy based on the window duration 
from the
+ * given {@code slidingCountWindow} configuration. In this strategy , window 
and sliding durations are equal.
+ *
+ */
+public final class TumblingDurationWindowStrategy<T> extends 
BaseWindowStrategy<T> {
+
+    public TumblingDurationWindowStrategy(WindowConfig tumblingDurationWindow) 
{
+        super(tumblingDurationWindow);
+    }
+
+    /**
+     * Returns a {@code TriggerPolicy} which triggers for every given sliding 
duration.
+     *
+     * @param triggerHandler
+     * @param evictionPolicy
+     * @return
+     */
+    @Override
+    public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, 
EvictionPolicy<T> evictionPolicy) {
+        return new TimeTriggerPolicy<>(windowConfig.getSlidingLength(), 
triggerHandler, evictionPolicy);
+    }
+
+    /**
+     * Returns an {@code EvictionPolicy} instance which evicts elements after 
given window duration.
+     *
+     * @return
+     */
+    @Override
+    public EvictionPolicy<T> getEvictionPolicy() {
+        return new TimeEvictionPolicy<>(windowConfig.getWindowLength());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategy.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategy.java
new file mode 100644
index 0000000..1dfb264
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategy.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.strategy;
+
+import org.apache.storm.windowing.EvictionPolicy;
+import org.apache.storm.windowing.TriggerHandler;
+import org.apache.storm.windowing.TriggerPolicy;
+/**
+ * Strategy for windowing which will have respective trigger and eviction 
policies.
+ */
+public interface WindowStrategy<T> {
+
+    /**
+     * Returns a {@code TriggerPolicy}  by creating with {@code 
triggerHandler} and {@code evictionPolicy} with
+     * the given configuration.
+     *
+     * @param triggerHandler
+     * @param evictionPolicy
+     * @return
+     */
+    public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, 
EvictionPolicy<T> evictionPolicy);
+
+    /**
+     * Returns an {@code EvictionPolicy} instance for this strategy with the 
given configuration.
+     *
+     * @return
+     */
+    public EvictionPolicy<T> getEvictionPolicy();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java
 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java
new file mode 100644
index 0000000..d8a3918
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.strategy;
+
+import org.apache.storm.trident.windowing.config.WindowConfig;
+
+/**
+ *
+ */
+public final class WindowStrategyFactory {
+
+    private WindowStrategyFactory() {
+    }
+
+    /**
+     * Creates a {@code WindowStrategy} instance based on the given {@code 
windowConfig}.
+     *
+     * @param windowConfig
+     * @return
+     */
+    public static <T> WindowStrategy<T> create(WindowConfig windowConfig) {
+        WindowStrategy<T> windowStrategy = null;
+        WindowConfig.Type windowType = windowConfig.getWindowType();
+        switch(windowType) {
+            case SLIDING_COUNT:
+                windowStrategy = new 
SlidingCountWindowStrategy<>(windowConfig);
+                break;
+            case TUMBLING_COUNT:
+                windowStrategy = new 
TumblingCountWindowStrategy<>(windowConfig);
+                break;
+            case SLIDING_DURATION:
+                windowStrategy = new 
SlidingDurationWindowStrategy<>(windowConfig);
+                break;
+            case TUMBLING_DURATION:
+                windowStrategy = new 
TumblingDurationWindowStrategy<>(windowConfig);
+                break;
+            default:
+                throw new IllegalArgumentException("Given WindowConfig of type 
"+windowType+" is not supported");
+        }
+
+        return windowStrategy;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/windowing/TriggerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/TriggerHandler.java 
b/storm-core/src/jvm/org/apache/storm/windowing/TriggerHandler.java
index eef4968..ac1fa1c 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/TriggerHandler.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/TriggerHandler.java
@@ -21,7 +21,7 @@ package org.apache.storm.windowing;
  * The callback fired by {@link TriggerPolicy} when the trigger
  * condition is satisfied.
  */
-interface TriggerHandler {
+public interface TriggerHandler {
     /**
      * The code to execute when the {@link TriggerPolicy} condition is 
satisfied.
      *

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java 
b/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
new file mode 100644
index 0000000..03f298d
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident;
+
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.windowing.InMemoryWindowsStore;
+import org.apache.storm.trident.windowing.config.SlidingCountWindow;
+import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
+import org.apache.storm.trident.windowing.strategy.SlidingCountWindowStrategy;
+import 
org.apache.storm.trident.windowing.strategy.SlidingDurationWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.TumblingCountWindowStrategy;
+import 
org.apache.storm.trident.windowing.strategy.TumblingDurationWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class TridentWindowingTest {
+
+    @Test
+    public void testWindowStrategyInstances() throws Exception {
+
+        WindowStrategy<Object> tumblingCountStrategy = 
WindowStrategyFactory.create(TumblingCountWindow.of(10));
+        Assert.assertTrue(tumblingCountStrategy instanceof 
TumblingCountWindowStrategy);
+
+        WindowStrategy<Object> slidingCountStrategy = 
WindowStrategyFactory.create(SlidingCountWindow.of(100, 10));
+        Assert.assertTrue(slidingCountStrategy instanceof 
SlidingCountWindowStrategy);
+
+        WindowStrategy<Object> tumblingDurationStrategy = 
WindowStrategyFactory.create(
+                TumblingDurationWindow.of(new BaseWindowedBolt.Duration(10, 
TimeUnit.SECONDS)));
+        Assert.assertTrue(tumblingDurationStrategy instanceof 
TumblingDurationWindowStrategy);
+
+        WindowStrategy<Object> slidingDurationStrategy = 
WindowStrategyFactory.create(
+                SlidingDurationWindow.of(new BaseWindowedBolt.Duration(10, 
TimeUnit.SECONDS),
+                        new BaseWindowedBolt.Duration(2, TimeUnit.SECONDS)));
+        Assert.assertTrue(slidingDurationStrategy instanceof 
SlidingDurationWindowStrategy);
+    }
+
+    @Test
+    public void testWindowConfig() {
+        int windowLength = 9;
+        TumblingCountWindow tumblingCountWindow = 
TumblingCountWindow.of(windowLength);
+        Assert.assertTrue(tumblingCountWindow.getWindowLength() == 
windowLength);
+        Assert.assertTrue(tumblingCountWindow.getSlidingLength() == 
windowLength);
+
+        windowLength = 10;
+        int slidingLength = 2;
+        SlidingCountWindow slidingCountWindow = SlidingCountWindow.of(10, 2);
+        Assert.assertTrue(slidingCountWindow.getWindowLength() == 
windowLength);
+        Assert.assertTrue(slidingCountWindow.getSlidingLength() == 
slidingLength);
+
+        windowLength = 20;
+        TumblingDurationWindow tumblingDurationWindow = 
TumblingDurationWindow.of(new BaseWindowedBolt.Duration(windowLength, 
TimeUnit.SECONDS));
+        Assert.assertTrue(tumblingDurationWindow.getWindowLength() == 
windowLength*1000);
+        Assert.assertTrue(tumblingDurationWindow.getSlidingLength() == 
windowLength*1000);
+
+        windowLength = 50;
+        slidingLength = 10;
+        SlidingDurationWindow slidingDurationWindow = 
SlidingDurationWindow.of(new BaseWindowedBolt.Duration(windowLength, 
TimeUnit.SECONDS),
+                new BaseWindowedBolt.Duration(slidingLength, 
TimeUnit.SECONDS));
+        Assert.assertTrue(slidingDurationWindow.getWindowLength() == 
windowLength*1000);
+        Assert.assertTrue(slidingDurationWindow.getSlidingLength() == 
slidingLength*1000);
+    }
+
+    @Test
+    public void testInMemoryWindowStore() {
+        InMemoryWindowsStore store = new InMemoryWindowsStore();
+        String keyPrefix = "key";
+        String valuePrefix = "valuePrefix";
+
+        int ct = 10;
+        for (int i=0; i<ct; i++) {
+            store.put(keyPrefix +i, valuePrefix +i);
+        }
+
+        for (int i=0; i<ct; i++) {
+            Assert.assertTrue((valuePrefix + i).equals(store.get(keyPrefix + 
i)));
+        }
+
+        store.remove(keyPrefix+1);
+        Assert.assertNull(store.get(keyPrefix+1));
+
+    }
+
+}
\ No newline at end of file

Reply via email to