[ 
https://issues.apache.org/jira/browse/STORM-1253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15145172#comment-15145172
 ] 

ASF GitHub Bot commented on STORM-1253:
---------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1098#discussion_r52789091
  
    --- Diff: storm-core/src/jvm/org/apache/storm/StormTimer.java ---
    @@ -0,0 +1,274 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm;
    +
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Comparator;
    +import java.util.Random;
    +import java.util.concurrent.PriorityBlockingQueue;
    +import java.util.concurrent.Semaphore;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +/**
    + * The timer defined in this file is very similar to java.util.Timer, 
except
    + * it integrates with Storm's time simulation capabilities. This lets us 
test
    + * code that does asynchronous work on the timer thread
    + */
    +
    +public class StormTimer {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(StormTimer.class);
    +
    +    public interface TimerFunc {
    +        public void run(Object o);
    +    }
    +
    +    public static class QueueEntry {
    +        public final Long endTimeMs;
    +        public final TimerFunc afn;
    +        public final String id;
    +
    +        public QueueEntry(Long endTimeMs, TimerFunc afn, String id) {
    +            this.endTimeMs = endTimeMs;
    +            this.afn = afn;
    +            this.id = id;
    +        }
    +    }
    +
    +    public static class StormTimerTask extends Thread {
    +
    +        private PriorityBlockingQueue<QueueEntry> queue = new 
PriorityBlockingQueue<QueueEntry>(10, new Comparator() {
    +            @Override
    +            public int compare(Object o1, Object o2) {
    +                return ((QueueEntry)o1).endTimeMs.intValue() - 
((QueueEntry)o2).endTimeMs.intValue();
    +            }
    +        });
    +
    +        // boolean to indicate whether timer is active
    +        private AtomicBoolean active = new AtomicBoolean(false);
    +
    +        // function to call when timer is killed
    +        private TimerFunc onKill;
    +
    +        //random number generator
    +        private Random random = new Random();
    +
    +        // synchronization for cancelling the timer
    +        private Semaphore cancelNotifier = new Semaphore(0);
    +
    +        //used for synchronization
    +        private Object lock = new Object();
    +
    +        @Override
    +        public void run() {
    +            while (this.active.get()) {
    +                QueueEntry queueEntry = null;
    +                try {
    +                    synchronized (this.lock) {
    +                        queueEntry = this.queue.peek();
    +                    }
    +                    if ((queueEntry != null) && (Time.currentTimeMillis() 
>= queueEntry.endTimeMs)) {
    +                        // It is imperative to not run the function
    +                        // inside the timer lock. Otherwise, it is
    +                        // possible to deadlock if the fn deals with
    +                        // other locks, like the submit lock.
    +                        synchronized (this.lock) {
    +                            this.queue.poll();
    +                        }
    +                        queueEntry.afn.run(null);
    +                    } else if (queueEntry != null) {
    +                        //  If any events are scheduled, sleep until
    +                        // event generation. If any recurring events
    +                        // are scheduled then we will always go
    +                        // through this branch, sleeping only the
    +                        // exact necessary amount of time. We give
    +                        // an upper bound, e.g. 1000 millis, to the
    +                        // sleeping time, to limit the response time
    +                        // for detecting any new event within 1 secs.
    +                        Time.sleep(Math.min(1000, (queueEntry.endTimeMs - 
Time.currentTimeMillis())));
    --- End diff --
    
    Could you file a follow on JIRA so we can figure out a way to do a notify 
with simulate time, so we don't need this 1 second polling. 


> port  backtype.storm.timer to java
> ----------------------------------
>
>                 Key: STORM-1253
>                 URL: https://issues.apache.org/jira/browse/STORM-1253
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Boyang Jerry Peng
>              Labels: java-migration, jstorm-merger
>
> Timer like class that uses simulated time



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to