[ 
https://issues.apache.org/jira/browse/STORM-1253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15145168#comment-15145168
 ] 

ASF GitHub Bot commented on STORM-1253:
---------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1098#discussion_r52788861
  
    --- Diff: storm-core/src/jvm/org/apache/storm/StormTimer.java ---
    @@ -0,0 +1,274 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm;
    +
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Comparator;
    +import java.util.Random;
    +import java.util.concurrent.PriorityBlockingQueue;
    +import java.util.concurrent.Semaphore;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +/**
    + * The timer defined in this file is very similar to java.util.Timer, 
except
    + * it integrates with Storm's time simulation capabilities. This lets us 
test
    + * code that does asynchronous work on the timer thread
    + */
    +
    +public class StormTimer {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(StormTimer.class);
    +
    +    public interface TimerFunc {
    +        public void run(Object o);
    +    }
    +
    +    public static class QueueEntry {
    +        public final Long endTimeMs;
    +        public final TimerFunc afn;
    +        public final String id;
    +
    +        public QueueEntry(Long endTimeMs, TimerFunc afn, String id) {
    +            this.endTimeMs = endTimeMs;
    +            this.afn = afn;
    +            this.id = id;
    +        }
    +    }
    +
    +    public static class StormTimerTask extends Thread {
    +
    +        private PriorityBlockingQueue<QueueEntry> queue = new 
PriorityBlockingQueue<QueueEntry>(10, new Comparator() {
    +            @Override
    +            public int compare(Object o1, Object o2) {
    +                return ((QueueEntry)o1).endTimeMs.intValue() - 
((QueueEntry)o2).endTimeMs.intValue();
    +            }
    +        });
    +
    +        // boolean to indicate whether timer is active
    +        private AtomicBoolean active = new AtomicBoolean(false);
    +
    +        // function to call when timer is killed
    +        private TimerFunc onKill;
    +
    +        //random number generator
    +        private Random random = new Random();
    +
    +        // synchronization for cancelling the timer
    +        private Semaphore cancelNotifier = new Semaphore(0);
    +
    +        //used for synchronization
    +        private Object lock = new Object();
    +
    +        @Override
    +        public void run() {
    +            while (this.active.get()) {
    +                QueueEntry queueEntry = null;
    +                try {
    +                    synchronized (this.lock) {
    +                        queueEntry = this.queue.peek();
    +                    }
    +                    if ((queueEntry != null) && (Time.currentTimeMillis() 
>= queueEntry.endTimeMs)) {
    +                        // It is imperative to not run the function
    +                        // inside the timer lock. Otherwise, it is
    +                        // possible to deadlock if the fn deals with
    +                        // other locks, like the submit lock.
    +                        synchronized (this.lock) {
    +                            this.queue.poll();
    --- End diff --
    
    Crud.  There is actually a race here.  It is in the original code too  We 
cannot blindly do a poll and throw away the next thing on the queue, because 
something else may have been prioritized ahead of what we just peeked at.
    
    We don't run at a high enough load that I think this is critical, but we 
should fix it.  Just replacing the queueEntry with the polled one is probably 
enough.
    
    ```
    queueEntry = queue.peek();
    if ((queueEntry != null) && (Time.currentTimeMillis() >= 
queueEntry.endTimeMs)) {
      queueEntry = queue.poll();
      queueEntry.afn.run();
    }
    ```


> port  backtype.storm.timer to java
> ----------------------------------
>
>                 Key: STORM-1253
>                 URL: https://issues.apache.org/jira/browse/STORM-1253
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Boyang Jerry Peng
>              Labels: java-migration, jstorm-merger
>
> Timer like class that uses simulated time



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to