Repository: storm Updated Branches: refs/heads/1.x-branch 1783d7ae9 -> b0db246ad
http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/TridentBatchTuple.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/TridentBatchTuple.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/TridentBatchTuple.java new file mode 100644 index 0000000..13643db --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/TridentBatchTuple.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing; + +import org.apache.storm.trident.tuple.TridentTuple; + +/** + * + */ +public class TridentBatchTuple { + final String effectiveBatchId; + final long timeStamp; + final int tupleIndex; + final TridentTuple tridentTuple; + + public TridentBatchTuple(String effectiveBatchId, long timeStamp, int tupleIndex) { + this(effectiveBatchId, timeStamp, tupleIndex, null); + } + + public TridentBatchTuple(String effectiveBatchId, long timeStamp, int tupleIndex, TridentTuple tridentTuple) { + this.effectiveBatchId = effectiveBatchId; + this.timeStamp = timeStamp; + this.tupleIndex = tupleIndex; + this.tridentTuple = tridentTuple; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java new file mode 100644 index 0000000..c2d9362 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing; + +import org.apache.storm.Config; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.trident.operation.Aggregator; +import org.apache.storm.trident.planner.ProcessorContext; +import org.apache.storm.trident.planner.TridentProcessor; +import org.apache.storm.trident.planner.processor.FreshCollector; +import org.apache.storm.trident.planner.processor.TridentContext; +import org.apache.storm.trident.spout.IBatchID; +import org.apache.storm.trident.tuple.ConsList; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.tuple.TridentTupleView; +import org.apache.storm.trident.windowing.config.WindowConfig; +import org.apache.storm.tuple.Fields; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; + +/** + * {@code TridentProcessor} implementation for windowing operations on trident stream. + * + */ +public class WindowTridentProcessor implements TridentProcessor { + private static final Logger log = LoggerFactory.getLogger(WindowTridentProcessor.class); + + public static final String TRIGGER_INPROCESS_PREFIX = "tip" + WindowsStore.KEY_SEPARATOR; + public static final String TRIGGER_PREFIX = "tr" + WindowsStore.KEY_SEPARATOR; + public static final String TRIGGER_COUNT_PREFIX = "tc" + WindowsStore.KEY_SEPARATOR; + + public static final String TRIGGER_FIELD_NAME = "_task_info"; + public static final long DEFAULT_INMEMORY_TUPLE_CACHE_LIMIT = 100l; + + private final String windowId; + private final Fields inputFields; + private final Aggregator aggregator; + private final boolean storeTuplesInStore; + + private String windowTriggerInprocessId; + private WindowConfig windowConfig; + private WindowsStoreFactory windowStoreFactory; + private WindowsStore windowStore; + + private Map conf; + private TopologyContext topologyContext; + private FreshCollector collector; + private TridentTupleView.ProjectionFactory projection; + private TridentContext tridentContext; + private ITridentWindowManager tridentWindowManager; + private String windowTaskId; + + public WindowTridentProcessor(WindowConfig windowConfig, String uniqueWindowId, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, boolean storeTuplesInStore) { + + this.windowConfig = windowConfig; + this.windowId = uniqueWindowId; + this.windowStoreFactory = windowStoreFactory; + this.inputFields = inputFields; + this.aggregator = aggregator; + this.storeTuplesInStore = storeTuplesInStore; + } + + @Override + public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) { + this.conf = conf; + this.topologyContext = context; + List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories(); + if (parents.size() != 1) { + throw new RuntimeException("Aggregation related operation can only have one parent"); + } + + Long maxTuplesCacheSize = getWindowTuplesCacheSize(conf); + + this.tridentContext = tridentContext; + collector = new FreshCollector(tridentContext); + projection = new TridentTupleView.ProjectionFactory(parents.get(0), inputFields); + + windowStore = windowStoreFactory.create(); + windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR; + windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(windowTaskId); + + tridentWindowManager = storeTuplesInStore ? + new StoreBasedTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector(), maxTuplesCacheSize, inputFields) + : new InMemoryTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector()); + + tridentWindowManager.prepare(); + } + + public static String getWindowTriggerInprocessIdPrefix(String windowTaskId) { + return TRIGGER_INPROCESS_PREFIX + windowTaskId; + } + + public static String getWindowTriggerTaskPrefix(String windowTaskId) { + return TRIGGER_PREFIX + windowTaskId; + } + + private Long getWindowTuplesCacheSize(Map conf) { + if (conf.containsKey(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT)) { + return ((Number) conf.get(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT)).longValue(); + } + return DEFAULT_INMEMORY_TUPLE_CACHE_LIMIT; + } + + @Override + public void cleanup() { + log.info("shutting down window manager"); + tridentWindowManager.shutdown(); + } + + @Override + public void startBatch(ProcessorContext processorContext) { + // initialize state for batch + processorContext.state[tridentContext.getStateIndex()] = new ArrayList<TridentTuple>(); + } + + @Override + public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) { + // add tuple to the batch state + Object state = processorContext.state[tridentContext.getStateIndex()]; + ((List<TridentTuple>) state).add(projection.create(tuple)); + } + + @Override + public void finishBatch(ProcessorContext processorContext) { + + Object batchId = processorContext.batchId; + Object batchTxnId = getBatchTxnId(batchId); + + log.debug("Received finishBatch of : {} ", batchId); + // get all the tuples in a batch and add it to trident-window-manager + List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()]; + tridentWindowManager.addTuplesBatch(batchId, tuples); + + List<Integer> pendingTriggerIds = null; + List<String> triggerKeys = new ArrayList<>(); + Iterable<Object> triggerValues = null; + + if (retriedAttempt(batchId)) { + pendingTriggerIds = (List<Integer>) windowStore.get(inprocessTriggerKey(batchTxnId)); + for (Integer pendingTriggerId : pendingTriggerIds) { + triggerKeys.add(triggerKey(pendingTriggerId)); + } + triggerValues = windowStore.get(triggerKeys); + } + + // if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers. + if(triggerValues == null) { + pendingTriggerIds = new ArrayList<>(); + Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers(); + log.debug("pending triggers at batch: {} and triggers.size: {} ", batchId, pendingTriggers.size()); + try { + Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator(); + List<Object> values = new ArrayList<>(); + StoreBasedTridentWindowManager.TriggerResult triggerResult = null; + while (pendingTriggersIter.hasNext()) { + triggerResult = pendingTriggersIter.next(); + for (List<Object> aggregatedResult : triggerResult.result) { + String triggerKey = triggerKey(triggerResult.id); + triggerKeys.add(triggerKey); + values.add(aggregatedResult); + pendingTriggerIds.add(triggerResult.id); + } + pendingTriggersIter.remove(); + } + triggerValues = values; + } finally { + // store inprocess triggers of a batch in store for batch retries for any failures + if (!pendingTriggerIds.isEmpty()) { + windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds); + } + } + } + + collector.setContext(processorContext); + int i = 0; + for (Object resultValue : triggerValues) { + collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i++)), (List<Object>) resultValue)); + } + collector.setContext(null); + } + + private String inprocessTriggerKey(Object batchTxnId) { + return windowTriggerInprocessId + batchTxnId; + } + + public static Object getBatchTxnId(Object batchId) { + if (batchId instanceof IBatchID) { + return ((IBatchID) batchId).getId(); + } + return null; + } + + static boolean retriedAttempt(Object batchId) { + if (batchId instanceof IBatchID) { + return ((IBatchID) batchId).getAttemptId() > 0; + } + + return false; + } + + @Override + public TridentTuple.Factory getOutputFactory() { + return collector.getOutputFactory(); + } + + public static class TriggerInfo implements Serializable { + public final String windowTaskId; + public final int triggerId; + + public TriggerInfo(String windowTaskId, int triggerId) { + this.windowTaskId = windowTaskId; + this.triggerId = triggerId; + } + + public String generateTriggerKey() { + return generateWindowTriggerKey(windowTaskId, triggerId); + } + + @Override + public String toString() { + return "TriggerInfo{" + + "windowTaskId='" + windowTaskId + '\'' + + ", triggerId=" + triggerId + + '}'; + } + } + + public String triggerKey(int triggerId) { + return generateWindowTriggerKey(windowTaskId, triggerId); + } + + public static String generateWindowTriggerKey(String windowTaskId, int triggerId) { + return TRIGGER_PREFIX + windowTaskId + triggerId; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java new file mode 100644 index 0000000..378d24f --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing; + +import org.apache.storm.trident.state.State; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@code State} implementation for windowing operation. This is mainly used to get callback of commit txId of batches + * in which triggers are emitted. + * + */ +public class WindowsState implements State { + private static final Logger log = LoggerFactory.getLogger(WindowsState.class); + + private Long currentTxId; + + public WindowsState() { + } + + @Override + public void beginCommit(Long txId) { + currentTxId = txId; + log.debug(" WindowsState.beginCommit:: [{}] ", txId); + } + + @Override + public void commit(Long txId) { + log.debug("WindowsState.commit :: [{}]", txId); + } + + public Long getCurrentTxId() { + return currentTxId; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateFactory.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateFactory.java new file mode 100644 index 0000000..28893c7 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateFactory.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing; + +import org.apache.storm.task.IMetricsContext; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.state.StateFactory; + +import java.util.Map; + +/** + * {@code StateFactory} instance for creating {@code WindowsState} instances. + * + */ +public class WindowsStateFactory implements StateFactory { + + public WindowsStateFactory() { + } + + @Override + public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { + return new WindowsState(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java new file mode 100644 index 0000000..45ac885 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing; + +import com.google.common.collect.Lists; +import org.apache.commons.lang.IllegalClassException; +import org.apache.storm.topology.FailedException; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.operation.TridentOperationContext; +import org.apache.storm.trident.state.StateUpdater; +import org.apache.storm.trident.tuple.TridentTuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +/** + * {@code StateUpdater<WindowState>} instance which removes successfully emitted triggers from store + */ +public class WindowsStateUpdater implements StateUpdater<WindowsState> { + + private static final Logger log = LoggerFactory.getLogger(WindowsStateUpdater.class); + + private final WindowsStoreFactory windowStoreFactory; + private WindowsStore windowsStore; + + public WindowsStateUpdater(WindowsStoreFactory windowStoreFactory) { + this.windowStoreFactory = windowStoreFactory; + } + + @Override + public void updateState(WindowsState state, List<TridentTuple> tuples, TridentCollector collector) { + Long currentTxId = state.getCurrentTxId(); + log.debug("Removing triggers using WindowStateUpdater, txnId: {} ", currentTxId); + for (TridentTuple tuple : tuples) { + try { + Object fieldValue = tuple.getValueByField(WindowTridentProcessor.TRIGGER_FIELD_NAME); + if(! (fieldValue instanceof WindowTridentProcessor.TriggerInfo)) { + throw new IllegalClassException(WindowTridentProcessor.TriggerInfo.class, fieldValue.getClass()); + } + WindowTridentProcessor.TriggerInfo triggerInfo = (WindowTridentProcessor.TriggerInfo) fieldValue; + String triggerCompletedKey = WindowTridentProcessor.getWindowTriggerInprocessIdPrefix(triggerInfo.windowTaskId)+currentTxId; + + log.debug("Removing trigger key [{}] and trigger completed key [{}] from store: [{}]", triggerInfo, triggerCompletedKey, windowsStore); + + windowsStore.removeAll(Lists.newArrayList(triggerInfo.generateTriggerKey(), triggerCompletedKey)); + } catch (Exception ex) { + log.warn(ex.getMessage()); + collector.reportError(ex); + throw new FailedException(ex); + } + } + } + + @Override + public void prepare(Map conf, TridentOperationContext context) { + windowsStore = windowStoreFactory.create(); + } + + @Override + public void cleanup() { + windowsStore.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java new file mode 100644 index 0000000..8904b7b --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing; + +import com.google.common.base.Preconditions; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; + +/** + * Store for storing window related entities like windowed tuples, triggers etc. + * + */ +public interface WindowsStore extends Serializable { + + /** + * This can be used as a separator while generating a key from sequence of strings. + */ + public static final String KEY_SEPARATOR = "|"; + + public Object get(String key); + + public Iterable<Object> get(List<String> keys); + + public Iterable<String> getAllKeys(); + + public void put(String key, Object value); + + public void putAll(Collection<Entry> entries); + + public void remove(String key); + + public void removeAll(Collection<String> keys); + + public void shutdown(); + + /** + * This class wraps key and value objects which can be passed to {@code putAll} method. + */ + public static class Entry implements Serializable { + public final String key; + public final Object value; + + public Entry(String key, Object value) { + nonNullCheckForKey(key); + nonNullCheckForValue(value); + this.key = key; + this.value = value; + } + + public static void nonNullCheckForKey(Object key) { + Preconditions.checkArgument(key != null, "key argument can not be null"); + } + + public static void nonNullCheckForValue(Object value) { + Preconditions.checkArgument(value != null, "value argument can not be null"); + } + + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java new file mode 100644 index 0000000..409d672 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing; + +import java.io.Serializable; + +/** + * Factory to create instances of {@code WindowsStore}. + * + */ +public interface WindowsStoreFactory extends Serializable { + + /** + * Creates a window store + * + * @return + */ + public WindowsStore create(); +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/BaseWindowConfig.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/BaseWindowConfig.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/BaseWindowConfig.java new file mode 100644 index 0000000..8f9ef1a --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/BaseWindowConfig.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing.config; + +/** + * + */ +public abstract class BaseWindowConfig implements WindowConfig { + protected final int windowLength; + protected final int slideLength; + + protected BaseWindowConfig(int windowLength, int slideLength) { + this.windowLength = windowLength; + this.slideLength = slideLength; + } + + @Override + public int getWindowLength() { + return windowLength; + } + + @Override + public int getSlidingLength() { + return slideLength; + } + + public void validate() { + if (slideLength > windowLength) { + throw new IllegalArgumentException("slideLength '" + slideLength + "' should always be less than windowLegth '" + windowLength + "'"); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java new file mode 100644 index 0000000..a0dd13c --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing.config; + +/** + * Represents configuration of sliding window based on count of events. Window of length {@code windowLength} slides + * at every count of given {@code slideLength} + * + */ +public final class SlidingCountWindow extends BaseWindowConfig { + + private SlidingCountWindow(int windowLength, int slideLength) { + super(windowLength, slideLength); + } + + @Override + public Type getWindowType() { + return Type.SLIDING_COUNT; + } + + public static SlidingCountWindow of(int windowCount, int slidingCount) { + return new SlidingCountWindow(windowCount, slidingCount); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java new file mode 100644 index 0000000..f2fe291 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing.config; + +import org.apache.storm.topology.base.BaseWindowedBolt; + +/** + * Represents configuration of sliding window based on duration. Window duration of {@code windowLength} slides + * at every {@code slideLength} interval. + * + */ +public final class SlidingDurationWindow extends BaseWindowConfig { + + private SlidingDurationWindow(int windowLength, int slideLength) { + super(windowLength, slideLength); + } + + @Override + public Type getWindowType() { + return Type.SLIDING_DURATION; + } + + public static SlidingDurationWindow of(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingDuration) { + return new SlidingDurationWindow(windowDuration.value, slidingDuration.value); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java new file mode 100644 index 0000000..a5f3528 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing.config; + +/** + * Represents tumbling count window configuration. Window tumbles at each given {@code windowLength} count of events. + */ +public final class TumblingCountWindow extends BaseWindowConfig { + + private TumblingCountWindow(int windowLength) { + super(windowLength, windowLength); + } + + @Override + public Type getWindowType() { + return Type.TUMBLING_COUNT; + } + + public static TumblingCountWindow of(int windowLength) { + return new TumblingCountWindow(windowLength); + + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java new file mode 100644 index 0000000..8beb68d --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing.config; + +import org.apache.storm.topology.base.BaseWindowedBolt; + +/** + * Represents tumbling duration window configuration. Window tumbles every given {@code windowLength} duration. + */ +public final class TumblingDurationWindow extends BaseWindowConfig { + + private TumblingDurationWindow(int windowLength) { + super(windowLength, windowLength); + } + + @Override + public Type getWindowType() { + return Type.TUMBLING_DURATION; + } + + public static TumblingDurationWindow of(BaseWindowedBolt.Duration windowLength) { + return new TumblingDurationWindow(windowLength.value); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java new file mode 100644 index 0000000..49347e7 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing.config; + +import java.io.Serializable; + +/** + * Windowing configuration with window and sliding length. + */ +public interface WindowConfig extends Serializable { + + /** + * Returns the length of the window. + * @return + */ + public int getWindowLength(); + + /** + * Returns the sliding length of the moving window. + * @return + */ + public int getSlidingLength(); + + /** + * Gives the type of windowing. It can be any of {@code Type} values. + * + * @return + */ + public Type getWindowType(); + + public void validate(); + + public enum Type { + SLIDING_COUNT, + TUMBLING_COUNT, + SLIDING_DURATION, + TUMBLING_DURATION + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/BaseWindowStrategy.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/BaseWindowStrategy.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/BaseWindowStrategy.java new file mode 100644 index 0000000..ed9befa --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/BaseWindowStrategy.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing.strategy; + +import org.apache.storm.trident.windowing.config.WindowConfig; + +/** + * + */ +public abstract class BaseWindowStrategy<T> implements WindowStrategy<T> { + protected final WindowConfig windowConfig; + + public BaseWindowStrategy(WindowConfig windowConfig) { + this.windowConfig = windowConfig; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingCountWindowStrategy.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingCountWindowStrategy.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingCountWindowStrategy.java new file mode 100644 index 0000000..c26b795 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingCountWindowStrategy.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing.strategy; + +import org.apache.storm.trident.windowing.config.WindowConfig; +import org.apache.storm.windowing.CountEvictionPolicy; +import org.apache.storm.windowing.CountTriggerPolicy; +import org.apache.storm.windowing.EvictionPolicy; +import org.apache.storm.windowing.TriggerHandler; +import org.apache.storm.windowing.TriggerPolicy; + +/** + * This class represents sliding window strategy based on the sliding window count and sliding interval count from the + * given {@code slidingCountWindow} configuration. + */ +public class SlidingCountWindowStrategy<T> extends BaseWindowStrategy<T> { + + public SlidingCountWindowStrategy(WindowConfig slidingCountWindow) { + super(slidingCountWindow); + } + + /** + * Returns a {@code TriggerPolicy} which triggers for every count of given sliding window. + * + * @param triggerHandler + * @param evictionPolicy + * @return + */ + @Override + public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy) { + return new CountTriggerPolicy<>(windowConfig.getSlidingLength(), triggerHandler, evictionPolicy); + } + + /** + * Returns an {@code EvictionPolicy} instance which evicts elements after a count of given window length. + * + * @return + */ + @Override + public EvictionPolicy<T> getEvictionPolicy() { + return new CountEvictionPolicy<>(windowConfig.getWindowLength()); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java new file mode 100644 index 0000000..9e71220 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing.strategy; + +import org.apache.storm.trident.windowing.config.WindowConfig; +import org.apache.storm.windowing.EvictionPolicy; +import org.apache.storm.windowing.TimeEvictionPolicy; +import org.apache.storm.windowing.TimeTriggerPolicy; +import org.apache.storm.windowing.TriggerHandler; +import org.apache.storm.windowing.TriggerPolicy; + +/** + * This class represents sliding window strategy based on the sliding window duration and sliding interval from the + * given {@code slidingCountWindow} configuration. + * + **/ +public final class SlidingDurationWindowStrategy<T> extends BaseWindowStrategy<T> { + + public SlidingDurationWindowStrategy(WindowConfig slidingDurationWindow) { + super(slidingDurationWindow); + } + + /** + * Returns a {@code TriggerPolicy} which triggers for every configured sliding window duration. + * + * @param triggerHandler + * @param evictionPolicy + * @return + */ + @Override + public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy) { + return new TimeTriggerPolicy<>(windowConfig.getSlidingLength(), triggerHandler, evictionPolicy); + } + + /** + * Returns an {@code EvictionPolicy} instance which evicts elements after window duration is reached. + * + * @return + */ + @Override + public EvictionPolicy<T> getEvictionPolicy() { + return new TimeEvictionPolicy<>(windowConfig.getWindowLength()); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingCountWindowStrategy.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingCountWindowStrategy.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingCountWindowStrategy.java new file mode 100644 index 0000000..5e4d6fe --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingCountWindowStrategy.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing.strategy; + +import org.apache.storm.trident.windowing.config.WindowConfig; +import org.apache.storm.windowing.CountEvictionPolicy; +import org.apache.storm.windowing.CountTriggerPolicy; +import org.apache.storm.windowing.EvictionPolicy; +import org.apache.storm.windowing.TriggerHandler; +import org.apache.storm.windowing.TriggerPolicy; + +/** + * This class represents tumbling window strategy based on the window count from the + * given {@code slidingCountWindow} configuration. In this strategy , window and sliding lengths are equal. + * + */ +public final class TumblingCountWindowStrategy<T> extends BaseWindowStrategy<T> { + + public TumblingCountWindowStrategy(WindowConfig tumblingCountWindow) { + super(tumblingCountWindow); + } + + /** + * Returns a {@code TriggerPolicy} which triggers for every count of given sliding window. + + * @param triggerHandler + * @param evictionPolicy + * @return + */ + @Override + public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy) { + return new CountTriggerPolicy<>(windowConfig.getSlidingLength(), triggerHandler, evictionPolicy); + } + + /** + * Returns an {@code EvictionPolicy} instance which evicts elements after a count of given window length. + * + * @return + */ + @Override + public EvictionPolicy<T> getEvictionPolicy() { + return new CountEvictionPolicy<>(windowConfig.getWindowLength()); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingDurationWindowStrategy.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingDurationWindowStrategy.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingDurationWindowStrategy.java new file mode 100644 index 0000000..4478667 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingDurationWindowStrategy.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing.strategy; + +import org.apache.storm.trident.windowing.config.WindowConfig; +import org.apache.storm.windowing.EvictionPolicy; +import org.apache.storm.windowing.TimeEvictionPolicy; +import org.apache.storm.windowing.TimeTriggerPolicy; +import org.apache.storm.windowing.TriggerHandler; +import org.apache.storm.windowing.TriggerPolicy; + +/** + * This class represents tumbling window strategy based on the window duration from the + * given {@code slidingCountWindow} configuration. In this strategy , window and sliding durations are equal. + * + */ +public final class TumblingDurationWindowStrategy<T> extends BaseWindowStrategy<T> { + + public TumblingDurationWindowStrategy(WindowConfig tumblingDurationWindow) { + super(tumblingDurationWindow); + } + + /** + * Returns a {@code TriggerPolicy} which triggers for every given sliding duration. + * + * @param triggerHandler + * @param evictionPolicy + * @return + */ + @Override + public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy) { + return new TimeTriggerPolicy<>(windowConfig.getSlidingLength(), triggerHandler, evictionPolicy); + } + + /** + * Returns an {@code EvictionPolicy} instance which evicts elements after given window duration. + * + * @return + */ + @Override + public EvictionPolicy<T> getEvictionPolicy() { + return new TimeEvictionPolicy<>(windowConfig.getWindowLength()); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategy.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategy.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategy.java new file mode 100644 index 0000000..1dfb264 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategy.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing.strategy; + +import org.apache.storm.windowing.EvictionPolicy; +import org.apache.storm.windowing.TriggerHandler; +import org.apache.storm.windowing.TriggerPolicy; +/** + * Strategy for windowing which will have respective trigger and eviction policies. + */ +public interface WindowStrategy<T> { + + /** + * Returns a {@code TriggerPolicy} by creating with {@code triggerHandler} and {@code evictionPolicy} with + * the given configuration. + * + * @param triggerHandler + * @param evictionPolicy + * @return + */ + public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy); + + /** + * Returns an {@code EvictionPolicy} instance for this strategy with the given configuration. + * + * @return + */ + public EvictionPolicy<T> getEvictionPolicy(); +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java new file mode 100644 index 0000000..d8a3918 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing.strategy; + +import org.apache.storm.trident.windowing.config.WindowConfig; + +/** + * + */ +public final class WindowStrategyFactory { + + private WindowStrategyFactory() { + } + + /** + * Creates a {@code WindowStrategy} instance based on the given {@code windowConfig}. + * + * @param windowConfig + * @return + */ + public static <T> WindowStrategy<T> create(WindowConfig windowConfig) { + WindowStrategy<T> windowStrategy = null; + WindowConfig.Type windowType = windowConfig.getWindowType(); + switch(windowType) { + case SLIDING_COUNT: + windowStrategy = new SlidingCountWindowStrategy<>(windowConfig); + break; + case TUMBLING_COUNT: + windowStrategy = new TumblingCountWindowStrategy<>(windowConfig); + break; + case SLIDING_DURATION: + windowStrategy = new SlidingDurationWindowStrategy<>(windowConfig); + break; + case TUMBLING_DURATION: + windowStrategy = new TumblingDurationWindowStrategy<>(windowConfig); + break; + default: + throw new IllegalArgumentException("Given WindowConfig of type "+windowType+" is not supported"); + } + + return windowStrategy; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/windowing/TriggerHandler.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/windowing/TriggerHandler.java b/storm-core/src/jvm/org/apache/storm/windowing/TriggerHandler.java index eef4968..ac1fa1c 100644 --- a/storm-core/src/jvm/org/apache/storm/windowing/TriggerHandler.java +++ b/storm-core/src/jvm/org/apache/storm/windowing/TriggerHandler.java @@ -21,7 +21,7 @@ package org.apache.storm.windowing; * The callback fired by {@link TriggerPolicy} when the trigger * condition is satisfied. */ -interface TriggerHandler { +public interface TriggerHandler { /** * The code to execute when the {@link TriggerPolicy} condition is satisfied. * http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java b/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java new file mode 100644 index 0000000..03f298d --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident; + +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.trident.windowing.InMemoryWindowsStore; +import org.apache.storm.trident.windowing.config.SlidingCountWindow; +import org.apache.storm.trident.windowing.config.SlidingDurationWindow; +import org.apache.storm.trident.windowing.config.TumblingCountWindow; +import org.apache.storm.trident.windowing.config.TumblingDurationWindow; +import org.apache.storm.trident.windowing.strategy.SlidingCountWindowStrategy; +import org.apache.storm.trident.windowing.strategy.SlidingDurationWindowStrategy; +import org.apache.storm.trident.windowing.strategy.TumblingCountWindowStrategy; +import org.apache.storm.trident.windowing.strategy.TumblingDurationWindowStrategy; +import org.apache.storm.trident.windowing.strategy.WindowStrategy; +import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +/** + * + */ +public class TridentWindowingTest { + + @Test + public void testWindowStrategyInstances() throws Exception { + + WindowStrategy<Object> tumblingCountStrategy = WindowStrategyFactory.create(TumblingCountWindow.of(10)); + Assert.assertTrue(tumblingCountStrategy instanceof TumblingCountWindowStrategy); + + WindowStrategy<Object> slidingCountStrategy = WindowStrategyFactory.create(SlidingCountWindow.of(100, 10)); + Assert.assertTrue(slidingCountStrategy instanceof SlidingCountWindowStrategy); + + WindowStrategy<Object> tumblingDurationStrategy = WindowStrategyFactory.create( + TumblingDurationWindow.of(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS))); + Assert.assertTrue(tumblingDurationStrategy instanceof TumblingDurationWindowStrategy); + + WindowStrategy<Object> slidingDurationStrategy = WindowStrategyFactory.create( + SlidingDurationWindow.of(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS), + new BaseWindowedBolt.Duration(2, TimeUnit.SECONDS))); + Assert.assertTrue(slidingDurationStrategy instanceof SlidingDurationWindowStrategy); + } + + @Test + public void testWindowConfig() { + int windowLength = 9; + TumblingCountWindow tumblingCountWindow = TumblingCountWindow.of(windowLength); + Assert.assertTrue(tumblingCountWindow.getWindowLength() == windowLength); + Assert.assertTrue(tumblingCountWindow.getSlidingLength() == windowLength); + + windowLength = 10; + int slidingLength = 2; + SlidingCountWindow slidingCountWindow = SlidingCountWindow.of(10, 2); + Assert.assertTrue(slidingCountWindow.getWindowLength() == windowLength); + Assert.assertTrue(slidingCountWindow.getSlidingLength() == slidingLength); + + windowLength = 20; + TumblingDurationWindow tumblingDurationWindow = TumblingDurationWindow.of(new BaseWindowedBolt.Duration(windowLength, TimeUnit.SECONDS)); + Assert.assertTrue(tumblingDurationWindow.getWindowLength() == windowLength*1000); + Assert.assertTrue(tumblingDurationWindow.getSlidingLength() == windowLength*1000); + + windowLength = 50; + slidingLength = 10; + SlidingDurationWindow slidingDurationWindow = SlidingDurationWindow.of(new BaseWindowedBolt.Duration(windowLength, TimeUnit.SECONDS), + new BaseWindowedBolt.Duration(slidingLength, TimeUnit.SECONDS)); + Assert.assertTrue(slidingDurationWindow.getWindowLength() == windowLength*1000); + Assert.assertTrue(slidingDurationWindow.getSlidingLength() == slidingLength*1000); + } + + @Test + public void testInMemoryWindowStore() { + InMemoryWindowsStore store = new InMemoryWindowsStore(); + String keyPrefix = "key"; + String valuePrefix = "valuePrefix"; + + int ct = 10; + for (int i=0; i<ct; i++) { + store.put(keyPrefix +i, valuePrefix +i); + } + + for (int i=0; i<ct; i++) { + Assert.assertTrue((valuePrefix + i).equals(store.get(keyPrefix + i))); + } + + store.remove(keyPrefix+1); + Assert.assertNull(store.get(keyPrefix+1)); + + } + +} \ No newline at end of file
