[ 
https://issues.apache.org/jira/browse/STORM-1253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15149025#comment-15149025
 ] 

ASF GitHub Bot commented on STORM-1253:
---------------------------------------

Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1098#discussion_r53053060
  
    --- Diff: storm-core/src/jvm/org/apache/storm/StormTimer.java ---
    @@ -0,0 +1,246 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm;
    +
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Comparator;
    +import java.util.Random;
    +import java.util.concurrent.PriorityBlockingQueue;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +/**
    + * The timer defined in this file is very similar to java.util.Timer, 
except
    + * it integrates with Storm's time simulation capabilities. This lets us 
test
    + * code that does asynchronous work on the timer thread
    + */
    +
    +public class StormTimer implements AutoCloseable{
    +    private static final Logger LOG = 
LoggerFactory.getLogger(StormTimer.class);
    +
    +    public static class QueueEntry {
    +        public final Long endTimeMs;
    +        public final Runnable func;
    +        public final String id;
    +
    +        public QueueEntry(Long endTimeMs, Runnable func, String id) {
    +            this.endTimeMs = endTimeMs;
    +            this.func = func;
    +            this.id = id;
    +        }
    +    }
    +
    +    public static class StormTimerTask extends Thread {
    +
    +        private PriorityBlockingQueue<QueueEntry> queue = new 
PriorityBlockingQueue<QueueEntry>(10, new Comparator<QueueEntry>() {
    +            @Override
    +            public int compare(QueueEntry o1, QueueEntry o2) {
    +                return o1.endTimeMs.intValue() - o2.endTimeMs.intValue();
    +            }
    +        });
    +
    +        // boolean to indicate whether timer is active
    +        private AtomicBoolean active = new AtomicBoolean(false);
    +
    +        // function to call when timer is killed
    +        private Thread.UncaughtExceptionHandler onKill;
    +
    +        //random number generator
    +        private Random random = new Random();
    +
    +        @Override
    +        public void run() {
    +            while (this.active.get()) {
    +                QueueEntry queueEntry = null;
    +                try {
    +                    queueEntry = this.queue.peek();
    +                    if ((queueEntry != null) && (Time.currentTimeMillis() 
>= queueEntry.endTimeMs)) {
    +                        // It is imperative to not run the function
    +                        // inside the timer lock. Otherwise, it is
    +                        // possible to deadlock if the fn deals with
    +                        // other locks, like the submit lock.
    +                        this.queue.remove(queueEntry);
    +                        queueEntry.func.run();
    +                    } else if (queueEntry != null) {
    +                        //  If any events are scheduled, sleep until
    +                        // event generation. If any recurring events
    +                        // are scheduled then we will always go
    +                        // through this branch, sleeping only the
    +                        // exact necessary amount of time. We give
    +                        // an upper bound, e.g. 1000 millis, to the
    +                        // sleeping time, to limit the response time
    +                        // for detecting any new event within 1 secs.
    +                        Time.sleep(Math.min(1000, (queueEntry.endTimeMs - 
Time.currentTimeMillis())));
    +                    } else {
    +                        // Otherwise poll to see if any new event
    +                        // was scheduled. This is, in essence, the
    +                        // response time for detecting any new event
    +                        // schedulings when there are no scheduled
    +                        // events.
    +                        Time.sleep(1000);
    +                    }
    +                } catch (Throwable e) {
    +                    if 
(!(Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e))) {
    +                        this.onKill.uncaughtException(this, e);
    +                        this.setActive(false);
    +                    }
    +                }
    +            }
    +        }
    +
    +        public void setOnKillFunc(Thread.UncaughtExceptionHandler onKill) {
    +            this.onKill = onKill;
    +        }
    +
    +        public void setActive(boolean flag) {
    +            this.active.set(flag);
    +        }
    +
    +        public boolean isActive() {
    +            return this.active.get();
    +        }
    +
    +        public void add(QueueEntry queueEntry) {
    +            this.queue.add(queueEntry);
    +        }
    +    }
    +
    +    //task to run
    +    StormTimerTask task = new StormTimerTask();
    +
    +    /**
    +     * Makes a Timer in the form of a StormTimerTask Object
    +     * @param name name of the timer
    +     * @param onKill function to call when timer is killed unexpectedly
    +     * @return StormTimerTask object that was initialized
    +     */
    +    public StormTimer (String name, Thread.UncaughtExceptionHandler 
onKill) {
    +        if (onKill == null) {
    +            throw new RuntimeException("onKill func is null!");
    +        }
    +        if (name == null) {
    +            this.task.setName("timer");
    +        } else {
    +            this.task.setName(name);
    +        }
    +        this.task.setOnKillFunc(onKill);
    +        this.task.setActive(true);
    +
    +        this.task.setDaemon(true);
    +        this.task.setPriority(Thread.MAX_PRIORITY);
    +        this.task.start();
    +    }
    +
    +    /**
    +     * Schedule a function to be executed in the timer
    +     * @param delaySecs the number of seconds to delay before running the 
function
    +     * @param func the function to run
    +     * @param checkActive whether to check is the timer is active
    +     * @param jitterMs add jitter to the run
    +     */
    +    public void schedule(int delaySecs, Runnable func, boolean 
checkActive, int jitterMs) {
    +        if (this.task == null) {
    +            throw new RuntimeException("task is null!");
    --- End diff --
    
    will fix


> port  backtype.storm.timer to java
> ----------------------------------
>
>                 Key: STORM-1253
>                 URL: https://issues.apache.org/jira/browse/STORM-1253
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Boyang Jerry Peng
>              Labels: java-migration, jstorm-merger
>
> Timer like class that uses simulated time



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to