Github user Parth-Brahmbhatt commented on a diff in the pull request:
https://github.com/apache/storm/pull/855#discussion_r43940643
--- Diff: storm-core/src/jvm/backtype/storm/windowing/WindowManager.java ---
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static backtype.storm.topology.base.BaseWindowedBolt.Count;
+import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
+
+/**
+ * Tracks a window of events and fires {@link WindowLifecycleListener}
callbacks
+ * on expiry of events or activation of the window due to {@link
TriggerPolicy}.
+ *
+ * @param <T> the type of event in the window.
+ */
+public class WindowManager<T> implements TriggerHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(WindowManager.class);
+
+ /**
+ * Expire old events every EXPIRE_EVENTS_THRESHOLD to
+ * keep the window size in check.
+ */
+ public static final int EXPIRE_EVENTS_THRESHOLD = 100;
+
+ private WindowLifecycleListener<T> windowLifecycleListener;
+ private ConcurrentLinkedQueue<Event<T>> window;
+ private EvictionPolicy<T> evictionPolicy;
+ private TriggerPolicy<T> triggerPolicy;
+ private List<T> expiredEvents;
+ private Set<Event<T>> prevWindowEvents;
+ private AtomicInteger eventsSinceLastExpiry;
+ private ReentrantLock lock;
+
+ public WindowManager(WindowLifecycleListener<T> lifecycleListener) {
+ windowLifecycleListener = lifecycleListener;
+ window = new ConcurrentLinkedQueue<>();
+ expiredEvents = new ArrayList<>();
+ prevWindowEvents = new HashSet<>();
+ eventsSinceLastExpiry = new AtomicInteger();
+ lock = new ReentrantLock(true);
+ }
+
+ public void setWindowLength(Count count) {
+ this.evictionPolicy = new CountEvictionPolicy<>(count.value);
+ }
+
+ public void setWindowLength(Duration duration) {
+ this.evictionPolicy = new TimeEvictionPolicy<>(duration.value);
+ }
+
+ public void setSlidingInterval(Count count) {
+ this.triggerPolicy = new CountTriggerPolicy<>(count.value, this);
+ }
+
+ public void setSlidingInterval(Duration duration) {
+ this.triggerPolicy = new TimeTriggerPolicy<>(duration.value, this);
+ }
+
+ /**
+ * Add an event into the window, with {@link
System#currentTimeMillis()} as
+ * the tracking ts.
+ *
+ * @param event the event to add
+ */
+ public void add(T event) {
+ add(event, System.currentTimeMillis());
+ }
+
+ /**
+ * Add an event into the window, with the given ts as the tracking ts.
+ *
+ * @param event the event to track
+ * @param ts the timestamp
+ */
+ public void add(T event, long ts) {
+ Event<T> windowEvent = new EventImpl<T>(event, ts);
+ window.add(windowEvent);
+ track(windowEvent);
+ compactWindow();
--- End diff --
not sure why we need this? Why not just rely on onTrigger to expire events?
Also don't we need the locking here as well to ensure that when onTrigger
is fired no new events can be added given that will change the events in
window? plus i think it will give us undefined behavior as in line 187 we are
using an iterator to remove an element from window list while this add method
can add an element to it.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---