Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1098#discussion_r52789091
  
    --- Diff: storm-core/src/jvm/org/apache/storm/StormTimer.java ---
    @@ -0,0 +1,274 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm;
    +
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Comparator;
    +import java.util.Random;
    +import java.util.concurrent.PriorityBlockingQueue;
    +import java.util.concurrent.Semaphore;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +/**
    + * The timer defined in this file is very similar to java.util.Timer, 
except
    + * it integrates with Storm's time simulation capabilities. This lets us 
test
    + * code that does asynchronous work on the timer thread
    + */
    +
    +public class StormTimer {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(StormTimer.class);
    +
    +    public interface TimerFunc {
    +        public void run(Object o);
    +    }
    +
    +    public static class QueueEntry {
    +        public final Long endTimeMs;
    +        public final TimerFunc afn;
    +        public final String id;
    +
    +        public QueueEntry(Long endTimeMs, TimerFunc afn, String id) {
    +            this.endTimeMs = endTimeMs;
    +            this.afn = afn;
    +            this.id = id;
    +        }
    +    }
    +
    +    public static class StormTimerTask extends Thread {
    +
    +        private PriorityBlockingQueue<QueueEntry> queue = new 
PriorityBlockingQueue<QueueEntry>(10, new Comparator() {
    +            @Override
    +            public int compare(Object o1, Object o2) {
    +                return ((QueueEntry)o1).endTimeMs.intValue() - 
((QueueEntry)o2).endTimeMs.intValue();
    +            }
    +        });
    +
    +        // boolean to indicate whether timer is active
    +        private AtomicBoolean active = new AtomicBoolean(false);
    +
    +        // function to call when timer is killed
    +        private TimerFunc onKill;
    +
    +        //random number generator
    +        private Random random = new Random();
    +
    +        // synchronization for cancelling the timer
    +        private Semaphore cancelNotifier = new Semaphore(0);
    +
    +        //used for synchronization
    +        private Object lock = new Object();
    +
    +        @Override
    +        public void run() {
    +            while (this.active.get()) {
    +                QueueEntry queueEntry = null;
    +                try {
    +                    synchronized (this.lock) {
    +                        queueEntry = this.queue.peek();
    +                    }
    +                    if ((queueEntry != null) && (Time.currentTimeMillis() 
>= queueEntry.endTimeMs)) {
    +                        // It is imperative to not run the function
    +                        // inside the timer lock. Otherwise, it is
    +                        // possible to deadlock if the fn deals with
    +                        // other locks, like the submit lock.
    +                        synchronized (this.lock) {
    +                            this.queue.poll();
    +                        }
    +                        queueEntry.afn.run(null);
    +                    } else if (queueEntry != null) {
    +                        //  If any events are scheduled, sleep until
    +                        // event generation. If any recurring events
    +                        // are scheduled then we will always go
    +                        // through this branch, sleeping only the
    +                        // exact necessary amount of time. We give
    +                        // an upper bound, e.g. 1000 millis, to the
    +                        // sleeping time, to limit the response time
    +                        // for detecting any new event within 1 secs.
    +                        Time.sleep(Math.min(1000, (queueEntry.endTimeMs - 
Time.currentTimeMillis())));
    --- End diff --
    
    Could you file a follow on JIRA so we can figure out a way to do a notify 
with simulate time, so we don't need this 1 second polling. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to