Repository: storm
Updated Branches:
  refs/heads/1.x-branch 191a806de -> 8b69d4382


STORM-2250: Kafka spout refactoring to increase modularity and testability. 
Also support nanoseconds in Storm time simulation


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a03137ed
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a03137ed
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a03137ed

Branch: refs/heads/1.x-branch
Commit: a03137ed70a3edf155fc2c06355e12f2d4fb38f6
Parents: 191a806
Author: Stig Rohde Døssing <[email protected]>
Authored: Tue Feb 14 21:31:45 2017 +0100
Committer: Stig Rohde Døssing <[email protected]>
Committed: Tue Feb 14 22:32:57 2017 +0100

----------------------------------------------------------------------
 external/storm-kafka-client/pom.xml             |   9 +-
 .../apache/storm/kafka/spout/KafkaSpout.java    | 159 ++-------
 .../kafka/spout/internal/OffsetManager.java     | 157 ++++++++
 .../storm/kafka/spout/internal/Timer.java       |   7 +-
 .../spout/ByTopicRecordTranslatorTest.java      |   2 +-
 .../spout/DefaultRecordTranslatorTest.java      |   2 +-
 .../storm/kafka/spout/KafkaSpoutConfigTest.java |   4 +-
 .../kafka/spout/KafkaSpoutRebalanceTest.java    |  77 ++--
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  | 355 ++++++++++---------
 .../test/KafkaSpoutTopologyMainNamedTopics.java |   6 +-
 .../KafkaSpoutTopologyMainWildcardTopics.java   |   2 +-
 pom.xml                                         |   1 -
 .../src/jvm/org/apache/storm/utils/Time.java    | 183 +++++++---
 .../storm/daemon/supervisor/SlotTest.java       |  31 +-
 14 files changed, 569 insertions(+), 426 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a03137ed/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml 
b/external/storm-kafka-client/pom.xml
index 0878fdf..92b666d 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -76,7 +76,13 @@
         </dependency>
         <dependency>
             <groupId>org.hamcrest</groupId>
-            <artifactId>hamcrest-all</artifactId>
+            <artifactId>hamcrest-core</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-library</artifactId>
             <version>1.3</version>
             <scope>test</scope>
         </dependency>
@@ -94,7 +100,6 @@
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>log4j-over-slf4j</artifactId>
-            <version>${log4j-over-slf4j.version}</version>
             <scope>test</scope>
         </dependency>
     </dependencies>

http://git-wip-us.apache.org/repos/asf/storm/blob/a03137ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 9ad2be2..bbad9e8 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -25,16 +25,13 @@ import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableSet;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -48,6 +45,7 @@ import org.apache.kafka.common.errors.RetriableException;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
 import org.apache.storm.kafka.spout.internal.Timer;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -58,19 +56,19 @@ import org.slf4j.LoggerFactory;
 
 public class KafkaSpout<K, V> extends BaseRichSpout {
     private static final long serialVersionUID = 4151921085047987154L;
+    //Initial delay for the commit and subscription refresh timers
+    public static final long TIMER_DELAY_MS = 500;
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpout.class);
-    private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = 
new OffsetComparator();
 
     // Storm
     protected SpoutOutputCollector collector;
 
     // Kafka
     private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
-    private final KafkaConsumerFactory kafkaConsumerFactory;
+    private KafkaConsumerFactory kafkaConsumerFactory;
     private transient KafkaConsumer<K, V> kafkaConsumer;
     private transient boolean consumerAutoCommitMode;
 
-
     // Bookkeeping
     private transient FirstPollOffsetStrategy firstPollOffsetStrategy;  // 
Strategy to determine the fetch offset of the first realized by the spout upon 
activation
     private transient KafkaSpoutRetryService retryService;              // 
Class that has the logic to handle tuple failure
@@ -78,7 +76,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private transient boolean initialized;                              // 
Flag indicating that the spout is still undergoing initialization process.
     // Initialization is only complete after the first call to  
KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
 
-    transient Map<TopicPartition, OffsetEntry> acked;           // Tuples that 
were successfully acked. These tuples will be committed periodically when the 
commit timer expires, after consumer rebalance, or on close/deactivate. Not 
used if it's AutoCommitMode
+    private transient Map<TopicPartition, OffsetManager> acked;           // 
Tuples that were successfully acked. These tuples will be committed 
periodically when the commit timer expires, after consumer rebalance, or on 
close/deactivate
     private transient Set<KafkaSpoutMessageId> emitted;                 // 
Tuples that have been emitted but that are "on the wire", i.e. pending being 
acked or failed. Not used if it's AutoCommitMode
     private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;         // 
Records that have been polled and are queued to be emitted in the nextTuple() 
call. One record is emitted per nextTuple()
     private transient long numUncommittedOffsets;                       // 
Number of offsets that have been polled and emitted but not yet been committed. 
Not used if it's AutoCommitMode
@@ -87,13 +85,13 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
 
     public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
-        this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault());
+        this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<K, V>());
     }
     
     //This constructor is here for testing
     KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, 
KafkaConsumerFactory<K, V> kafkaConsumerFactory) {
-        this.kafkaSpoutConfig = kafkaSpoutConfig;                 // Pass in 
configuration
         this.kafkaConsumerFactory = kafkaConsumerFactory;
+        this.kafkaSpoutConfig = kafkaSpoutConfig;
     }
 
     @Override
@@ -114,9 +112,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         retryService = kafkaSpoutConfig.getRetryService();
 
         if (!consumerAutoCommitMode) {     // If it is auto commit, no need to 
commit offsets manually
-            commitTimer = new Timer(500, 
kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
+            commitTimer = new Timer(TIMER_DELAY_MS, 
kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
         }
-        refreshSubscriptionTimer = new Timer(500, 
kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
+        refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, 
kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
 
         acked = new HashMap<>();
         emitted = new HashSet<>();
@@ -204,7 +202,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private void setAcked(TopicPartition tp, long fetchOffset) {
         // If this partition was previously assigned to this spout, leave the 
acked offsets as they were to resume where it left off
         if (!consumerAutoCommitMode && !acked.containsKey(tp)) {
-            acked.put(tp, new OffsetEntry(tp, fetchOffset));
+            acked.put(tp, new OffsetManager(tp, fetchOffset));
         }
     }
 
@@ -296,7 +294,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             if (offsetAndMeta != null) {
                 kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek 
to the next offset that is ready to commit in next commit cycle
             } else {
-                kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1);   
 // Seek to last committed offset
+                kafkaConsumer.seek(rtp, acked.get(rtp).getCommittedOffset() + 
1);    // Seek to last committed offset
             }
         }
     }
@@ -353,7 +351,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private void commitOffsetsForAckedTuples() {
         // Find offsets that are ready to be committed for every topic 
partition
         final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new 
HashMap<>();
-        for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : 
acked.entrySet()) {
+        for (Map.Entry<TopicPartition, OffsetManager> tpOffset : 
acked.entrySet()) {
             final OffsetAndMetadata nextCommitOffset = 
tpOffset.getValue().findNextCommitOffset();
             if (nextCommitOffset != null) {
                 nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
@@ -366,9 +364,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             LOG.debug("Offsets successfully committed to Kafka [{}]", 
nextCommitOffsets);
             // Instead of iterating again, it would be possible to commit and 
update the state for each TopicPartition
             // in the prior loop, but the multiple network calls should be 
more expensive than iterating twice over a small loop
-            for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : 
acked.entrySet()) {
-                final OffsetEntry offsetEntry = tpOffset.getValue();
-                offsetEntry.commit(nextCommitOffsets.get(tpOffset.getKey()));
+            for (Map.Entry<TopicPartition, OffsetAndMetadata> tpOffset : 
nextCommitOffsets.entrySet()) {
+                //Update the OffsetManager for each committed partition, and 
update numUncommittedOffsets
+                final TopicPartition tp = tpOffset.getKey();
+                final OffsetManager offsetManager = acked.get(tp);
+                long numCommittedOffsets = 
offsetManager.commit(tpOffset.getValue());
+                numUncommittedOffsets -= numCommittedOffsets;
+                LOG.debug("[{}] uncommitted offsets across all topic 
partitions",
+                    numUncommittedOffsets);
             }
         } else {
             LOG.trace("No offsets to commit. {}", this);
@@ -489,127 +492,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private String getTopicsString() {
         return kafkaSpoutConfig.getSubscription().getTopicsString();
     }
+}
 
-    // ======= Offsets Commit Management ==========
-
-    private static class OffsetComparator implements 
Comparator<KafkaSpoutMessageId> {
-        public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) {
-            return m1.offset() < m2.offset() ? -1 : m1.offset() == m2.offset() 
? 0 : 1;
-        }
-    }
-
-    /**
-     * This class is not thread safe
-     */
-    class OffsetEntry {
-        private final TopicPartition tp;
-        private final long initialFetchOffset;  /* First offset to be fetched. 
It is either set to the beginning, end, or to the first uncommitted offset.
-                                                 * Initial value depends on 
offset strategy. See KafkaSpoutConsumerRebalanceListener */
-        private long committedOffset;     // last offset committed to Kafka. 
Initially it is set to fetchOffset - 1
-        private final NavigableSet<KafkaSpoutMessageId> ackedMsgs = new 
TreeSet<>(OFFSET_COMPARATOR);     // acked messages sorted by ascending order 
of offset
-
-        public OffsetEntry(TopicPartition tp, long initialFetchOffset) {
-            this.tp = tp;
-            this.initialFetchOffset = initialFetchOffset;
-            this.committedOffset = initialFetchOffset - 1;
-            LOG.debug("Instantiated {}", this);
-        }
-
-        public void add(KafkaSpoutMessageId msgId) {          // O(Log N)
-            ackedMsgs.add(msgId);
-        }
-
-        /**
-         * An offset is only committed when all records with lower offset have
-         * been acked. This guarantees that all offsets smaller than the
-         * committedOffset have been delivered.
-         * @return the next OffsetAndMetadata to commit, or null if no offset 
is ready to commit.
-         */
-        public OffsetAndMetadata findNextCommitOffset() {
-            boolean found = false;
-            long currOffset;
-            long nextCommitOffset = committedOffset;
-            KafkaSpoutMessageId nextCommitMsg = null;     // this is a 
convenience variable to make it faster to create OffsetAndMetadata
-
-            for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // 
complexity is that of a linear scan on a TreeMap
-                if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 
1) {            // found the next offset to commit
-                    found = true;
-                    nextCommitMsg = currAckedMsg;
-                    nextCommitOffset = currOffset;
-                } else if (currAckedMsg.offset() > nextCommitOffset + 1) {    
// offset found is not continuous to the offsets listed to go in the next 
commit, so stop search
-                    LOG.debug("topic-partition [{}] has non-continuous offset 
[{}]. It will be processed in a subsequent batch.", tp, currOffset);
-                    break;
-                } else {
-                    //Received a redundant ack. Ignore and continue processing.
-                    LOG.warn("topic-partition [{}] has unexpected offset [{}]. 
Current committed Offset [{}]",
-                            tp, currOffset,  committedOffset);
-                }
-            }
-
-            OffsetAndMetadata nextCommitOffsetAndMetadata = null;
-            if (found) {
-                nextCommitOffsetAndMetadata = new 
OffsetAndMetadata(nextCommitOffset, 
nextCommitMsg.getMetadata(Thread.currentThread()));
-                LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to 
be committed",tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset());
-            } else {
-                LOG.debug("topic-partition [{}] has NO offsets ready to be 
committed", tp);
-            }
-            LOG.trace("{}", this);
-            return nextCommitOffsetAndMetadata;
-        }
-
-        /**
-         * Marks an offset has committed. This method has side effects - it 
sets the internal state in such a way that future
-         * calls to {@link #findNextCommitOffset()} will return offsets 
greater than the offset specified, if any.
-         *
-         * @param committedOffset offset to be marked as committed
-         */
-        public void commit(OffsetAndMetadata committedOffset) {
-            long numCommittedOffsets = 0;
-            if (committedOffset != null) {
-                final long oldCommittedOffset = this.committedOffset;
-                numCommittedOffsets = committedOffset.offset() - 
this.committedOffset;
-                this.committedOffset = committedOffset.offset();
-                for (Iterator<KafkaSpoutMessageId> iterator = 
ackedMsgs.iterator(); iterator.hasNext(); ) {
-                    if (iterator.next().offset() <= committedOffset.offset()) {
-                        iterator.remove();
-                    } else {
-                        break;
-                    }
-                }
-                numUncommittedOffsets-= numCommittedOffsets;
-                LOG.debug("Committed offsets [{}-{} = {}] for topic-partition 
[{}]. [{}] uncommitted offsets across all topic partitions",
-                        oldCommittedOffset + 1, this.committedOffset, 
numCommittedOffsets, tp, numUncommittedOffsets);
-            } else {
-                LOG.debug("Committed [{}] offsets for topic-partition [{}]. 
[{}] uncommitted offsets across all topic partitions",
-                        numCommittedOffsets, tp, numUncommittedOffsets);
-            }
-            LOG.trace("{}", this);
-        }
-
-        long getCommittedOffset() {
-            return committedOffset;
-        }
-
-        public boolean isEmpty() {
-            return ackedMsgs.isEmpty();
-        }
 
-        public boolean contains(ConsumerRecord<K, V> record) {
-            return contains(new KafkaSpoutMessageId(record));
-        }
-
-        public boolean contains(KafkaSpoutMessageId msgId) {
-            return ackedMsgs.contains(msgId);
-        }
 
-        @Override
-        public String toString() {
-            return "OffsetEntry{" +
-                    "topic-partition=" + tp +
-                    ", fetchOffset=" + initialFetchOffset +
-                    ", committedOffset=" + committedOffset +
-                    ", ackedMsgs=" + ackedMsgs +
-                    '}';
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a03137ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
new file mode 100755
index 0000000..4ce0471
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.internal;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpoutMessageId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages acked and committed offsets for a TopicPartition. This class is not 
thread safe
+ */
+public class OffsetManager {
+
+    private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = 
new OffsetComparator();
+    private static final Logger LOG = 
LoggerFactory.getLogger(OffsetManager.class);
+    private final TopicPartition tp;
+    /* First offset to be fetched. It is either set to the beginning, end, or 
to the first uncommitted offset.
+    * Initial value depends on offset strategy. See 
KafkaSpoutConsumerRebalanceListener */
+    private final long initialFetchOffset;
+    // Last offset committed to Kafka. Initially it is set to fetchOffset - 1
+    private long committedOffset;
+    // Acked messages sorted by ascending order of offset
+    private final NavigableSet<KafkaSpoutMessageId> ackedMsgs = new 
TreeSet<>(OFFSET_COMPARATOR);
+
+    public OffsetManager(TopicPartition tp, long initialFetchOffset) {
+        this.tp = tp;
+        this.initialFetchOffset = initialFetchOffset;
+        this.committedOffset = initialFetchOffset - 1;
+        LOG.debug("Instantiated {}", this);
+    }
+
+    public void add(KafkaSpoutMessageId msgId) {          // O(Log N)
+        ackedMsgs.add(msgId);
+    }
+
+    /**
+     * An offset is only committed when all records with lower offset have been
+     * acked. This guarantees that all offsets smaller than the committedOffset
+     * have been delivered.
+     *
+     * @return the next OffsetAndMetadata to commit, or null if no offset is
+     * ready to commit.
+     */
+    public OffsetAndMetadata findNextCommitOffset() {
+        boolean found = false;
+        long currOffset;
+        long nextCommitOffset = committedOffset;
+        KafkaSpoutMessageId nextCommitMsg = null;     // this is a convenience 
variable to make it faster to create OffsetAndMetadata
+
+        for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is 
that of a linear scan on a TreeMap
+            if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) 
{            // found the next offset to commit
+                found = true;
+                nextCommitMsg = currAckedMsg;
+                nextCommitOffset = currOffset;
+            } else if (currAckedMsg.offset() > nextCommitOffset + 1) {    // 
offset found is not continuous to the offsets listed to go in the next commit, 
so stop search
+                LOG.debug("topic-partition [{}] has non-continuous offset 
[{}]. It will be processed in a subsequent batch.", tp, currOffset);
+                break;
+            } else {
+                //Received a redundant ack. Ignore and continue processing.
+                LOG.warn("topic-partition [{}] has unexpected offset [{}]. 
Current committed Offset [{}]",
+                    tp, currOffset, committedOffset);
+            }
+        }
+
+        OffsetAndMetadata nextCommitOffsetAndMetadata = null;
+        if (found) {
+            nextCommitOffsetAndMetadata = new 
OffsetAndMetadata(nextCommitOffset, 
nextCommitMsg.getMetadata(Thread.currentThread()));
+            LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be 
committed", tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset());
+        } else {
+            LOG.debug("topic-partition [{}] has NO offsets ready to be 
committed", tp);
+        }
+        LOG.trace("{}", this);
+        return nextCommitOffsetAndMetadata;
+    }
+
+    /**
+     * Marks an offset has committed. This method has side effects - it sets 
the
+     * internal state in such a way that future calls to
+     * {@link #findNextCommitOffset()} will return offsets greater than the
+     * offset specified, if any.
+     *
+     * @param committedOffset offset to be marked as committed
+     * @return Number of offsets committed in this commit
+     */
+    public long commit(OffsetAndMetadata committedOffset) {
+        long preCommitCommittedOffsets = this.committedOffset;
+        long numCommittedOffsets = committedOffset.offset() - 
this.committedOffset;
+        this.committedOffset = committedOffset.offset();
+        for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); 
iterator.hasNext();) {
+            if (iterator.next().offset() <= committedOffset.offset()) {
+                iterator.remove();
+            } else {
+                break;
+            }
+        }
+        LOG.trace("{}", this);
+        
+        LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}].",
+                    preCommitCommittedOffsets + 1, this.committedOffset, 
numCommittedOffsets, tp);
+        
+        return numCommittedOffsets;
+    }
+
+    public long getCommittedOffset() {
+        return committedOffset;
+    }
+
+    public boolean isEmpty() {
+        return ackedMsgs.isEmpty();
+    }
+
+    public boolean contains(ConsumerRecord record) {
+        return contains(new KafkaSpoutMessageId(record));
+    }
+
+    public boolean contains(KafkaSpoutMessageId msgId) {
+        return ackedMsgs.contains(msgId);
+    }
+
+    @Override
+    public String toString() {
+        return "OffsetManager{"
+            + "topic-partition=" + tp
+            + ", fetchOffset=" + initialFetchOffset
+            + ", committedOffset=" + committedOffset
+            + ", ackedMsgs=" + ackedMsgs
+            + '}';
+    }
+
+    private static class OffsetComparator implements 
Comparator<KafkaSpoutMessageId> {
+
+        @Override
+        public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) {
+            return m1.offset() < m2.offset() ? -1 : m1.offset() == m2.offset() 
? 0 : 1;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a03137ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
index d51104d..2a2e1cb 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
@@ -18,6 +18,7 @@
 package org.apache.storm.kafka.spout.internal;
 
 import java.util.concurrent.TimeUnit;
+import org.apache.storm.utils.Time;
 
 public class Timer {
     private final long delay;
@@ -41,7 +42,7 @@ public class Timer {
         this.timeUnit = timeUnit;
 
         periodNanos = timeUnit.toNanos(period);
-        start = System.nanoTime() + timeUnit.toNanos(delay);
+        start = Time.nanoTime() + timeUnit.toNanos(delay);
     }
 
     public long period() {
@@ -65,9 +66,9 @@ public class Timer {
      * otherwise.
      */
     public boolean isExpiredResetOnTrue() {
-        final boolean expired = System.nanoTime() - start > periodNanos;
+        final boolean expired = Time.nanoTime() - start >= periodNanos;
         if (expired) {
-            start = System.nanoTime();
+            start = Time.nanoTime();
         }
         return expired;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/a03137ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
index ea0b6e7..abc58f0 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 import java.util.Arrays;
 import java.util.HashSet;

http://git-wip-us.apache.org/repos/asf/storm/blob/a03137ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
index f4275e4..681953d 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 import java.util.Arrays;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a03137ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
index 08220dd..57e0120 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
@@ -17,7 +17,9 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import java.util.HashMap;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a03137ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 9969d84..6a0a63e 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -21,10 +21,10 @@ import static org.hamcrest.Matchers.hasKey;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.anyCollection;
 import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -44,6 +44,8 @@ import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -56,18 +58,16 @@ public class KafkaSpoutRebalanceTest {
     @Captor
     private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> 
commitCapture;
 
-    private TopologyContext contextMock;
-    private SpoutOutputCollector collectorMock;
-    private Map<String, Object> conf;
+    private final long offsetCommitPeriodMs = 2_000;
+    private final TopologyContext contextMock = mock(TopologyContext.class);
+    private final SpoutOutputCollector collectorMock = 
mock(SpoutOutputCollector.class);
+    private final Map<String, Object> conf = new HashMap<>();
     private KafkaConsumer<String, String> consumerMock;
     private KafkaConsumerFactory<String, String> consumerFactoryMock;
 
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
-        contextMock = mock(TopologyContext.class);
-        collectorMock = mock(SpoutOutputCollector.class);
-        conf = new HashMap<>();
         consumerMock = mock(KafkaConsumer.class);
         consumerFactoryMock = new KafkaConsumerFactory<String, String>(){
             @Override
@@ -99,9 +99,9 @@ public class KafkaSpoutRebalanceTest {
         Map<TopicPartition, List<ConsumerRecord<String, String>>> 
secondPartitionRecords = new HashMap<>();
         secondPartitionRecords.put(assignedPartition, 
Collections.singletonList(new ConsumerRecord<>(assignedPartition.topic(), 
assignedPartition.partition(), 0L, "key", "value")));
         when(consumerMock.poll(anyLong()))
-                .thenReturn(new ConsumerRecords(firstPartitionRecords))
-                .thenReturn(new ConsumerRecords(secondPartitionRecords))
-                .thenReturn(new ConsumerRecords(Collections.emptyMap()));
+            .thenReturn(new ConsumerRecords(firstPartitionRecords))
+            .thenReturn(new ConsumerRecords(secondPartitionRecords))
+            .thenReturn(new ConsumerRecords(Collections.emptyMap()));
 
         //Emit the messages
         spout.nextTuple();
@@ -115,7 +115,7 @@ public class KafkaSpoutRebalanceTest {
         //Now rebalance
         consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
         
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(assignedPartition));
-        
+
         List<KafkaSpoutMessageId> emittedMessageIds = new ArrayList<>();
         emittedMessageIds.add(messageIdForRevokedPartition.getValue());
         emittedMessageIds.add(messageIdForAssignedPartition.getValue());
@@ -125,31 +125,32 @@ public class KafkaSpoutRebalanceTest {
     @Test
     public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() 
throws Exception {
         //Acking tuples for partitions that are no longer assigned is useless 
since the spout will not be allowed to commit them
-        KafkaSpout<String, String> spout = new 
KafkaSpout<>(getKafkaSpoutConfig(-1, 10), consumerFactoryMock);
-        String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
-        TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 
1);
-        TopicPartition assignedPartition = new TopicPartition(topic, 2);
-        
-        //Emit a message on each partition and revoke the first partition
-        List<KafkaSpoutMessageId> emittedMessageIds = 
emitOneMessagePerPartitionThenRevokeOnePartition(spout, 
partitionThatWillBeRevoked, assignedPartition);
-        
-        //Ack both emitted tuples
-        spout.ack(emittedMessageIds.get(0));
-        spout.ack(emittedMessageIds.get(1));
-
-        //Ensure the commit timer has expired
-        Thread.sleep(510);
-
-        //Make the spout commit any acked tuples
-        spout.nextTuple();
-        //Verify that it only committed the message on the assigned partition
-        verify(consumerMock).commitSync(commitCapture.capture());
-
-        Map<TopicPartition, OffsetAndMetadata> commitCaptureMap = 
commitCapture.getValue();
-        assertThat(commitCaptureMap, hasKey(assignedPartition));
-        assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked)));
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            KafkaSpout<String, String> spout = new 
KafkaSpout<>(getKafkaSpoutConfig(-1, this.offsetCommitPeriodMs), 
consumerFactoryMock);
+            String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
+            TopicPartition partitionThatWillBeRevoked = new 
TopicPartition(topic, 1);
+            TopicPartition assignedPartition = new TopicPartition(topic, 2);
+
+            //Emit a message on each partition and revoke the first partition
+            List<KafkaSpoutMessageId> emittedMessageIds = 
emitOneMessagePerPartitionThenRevokeOnePartition(spout, 
partitionThatWillBeRevoked, assignedPartition);
+
+            //Ack both emitted tuples
+            spout.ack(emittedMessageIds.get(0));
+            spout.ack(emittedMessageIds.get(1));
+
+            //Ensure the commit timer has expired
+            Time.advanceTime(offsetCommitPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+            //Make the spout commit any acked tuples
+            spout.nextTuple();
+            //Verify that it only committed the message on the assigned 
partition
+            verify(consumerMock, times(1)).commitSync(commitCapture.capture());
+
+            Map<TopicPartition, OffsetAndMetadata> commitCaptureMap = 
commitCapture.getValue();
+            assertThat(commitCaptureMap, hasKey(assignedPartition));
+            assertThat(commitCaptureMap, 
not(hasKey(partitionThatWillBeRevoked)));
+        }
     }
-    
+
     @Test
     public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() 
throws Exception {
         //Failing tuples for partitions that are no longer assigned is useless 
since the spout will not be allowed to commit them if they later pass
@@ -158,14 +159,14 @@ public class KafkaSpoutRebalanceTest {
         String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
         TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 
1);
         TopicPartition assignedPartition = new TopicPartition(topic, 2);
-        
+
         //Emit a message on each partition and revoke the first partition
         List<KafkaSpoutMessageId> emittedMessageIds = 
emitOneMessagePerPartitionThenRevokeOnePartition(spout, 
partitionThatWillBeRevoked, assignedPartition);
-        
+
         //Fail both emitted tuples
         spout.fail(emittedMessageIds.get(0));
         spout.fail(emittedMessageIds.get(1));
-        
+
         //Check that only the tuple on the currently assigned partition is 
retried
         verify(retryServiceMock, never()).schedule(emittedMessageIds.get(0));
         verify(retryServiceMock).schedule(emittedMessageIds.get(1));

http://git-wip-us.apache.org/repos/asf/storm/blob/a03137ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index f457b59..3ea2d69 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.storm.kafka.spout;
 
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
+
 import info.batey.kafka.unit.KafkaUnitRule;
 import kafka.producer.KeyedMessage;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -28,29 +30,62 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
-import static org.junit.Assert.*;
-
 import java.util.Map;
-import static org.mockito.Mockito.*;
-import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.*;
-
-public class SingleTopicKafkaSpoutTest {
 
-    private class SpoutContext {
-        public KafkaSpout<String, String> spout;
-        public SpoutOutputCollector collector;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.HashMap;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
+import org.mockito.Captor;
+import org.mockito.MockitoAnnotations;
 
-        public SpoutContext(KafkaSpout<String, String> spout,
-                            SpoutOutputCollector collector) {
-            this.spout = spout;
-            this.collector = collector;
-        }
-    }
+public class SingleTopicKafkaSpoutTest {
 
     @Rule
     public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
 
-    void populateTopicData(String topicName, int msgCount) {
+    @Captor
+    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> 
commitCapture;
+
+    private final TopologyContext topologyContext = 
mock(TopologyContext.class);
+    private final Map<String, Object> conf = new HashMap<>();
+    private final SpoutOutputCollector collector = 
mock(SpoutOutputCollector.class);
+    private final long commitOffsetPeriodMs = 2_000;
+    private KafkaConsumer<String, String> consumerSpy;
+    private KafkaConsumerFactory<String, String> consumerFactory;
+    private KafkaSpout<String, String> spout;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+        KafkaSpoutConfig spoutConfig = 
getKafkaSpoutConfig(kafkaUnitRule.getKafkaPort(), commitOffsetPeriodMs);
+        this.consumerSpy = spy(new 
KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
+        this.consumerFactory = new KafkaConsumerFactory<String, String>() {
+            @Override
+            public KafkaConsumer<String, String> 
createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
+                return consumerSpy;
+            }
+        
+        };
+        this.spout = new KafkaSpout<>(spoutConfig, consumerFactory);
+    }
+
+    private void populateTopicData(String topicName, int msgCount) {
         kafkaUnitRule.getKafkaUnit().createTopic(topicName);
 
         for (int i = 0; i < msgCount; i++){
@@ -62,184 +97,180 @@ public class SingleTopicKafkaSpoutTest {
         };
     }
 
-    SpoutContext initializeSpout(int msgCount) {
+    private void initializeSpout(int msgCount) {
         populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
-        int kafkaPort = kafkaUnitRule.getKafkaPort();
-
-        TopologyContext topology = mock(TopologyContext.class);
-        SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
-        Map conf = mock(Map.class);
-
-        KafkaSpout<String, String> spout = new 
KafkaSpout<>(getKafkaSpoutConfig(kafkaPort));
-        spout.open(conf, topology, collector);
+        spout.open(conf, topologyContext, collector);
         spout.activate();
-        return new SpoutContext(spout, collector);
     }
+
     /*
-     * Asserts that the next possible offset to commit or the committed offset 
is the provided offset.
-     * An offset that is ready to be committed is not guarenteed to be already 
committed.
+     * Asserts that commitSync has been called once, 
+     * that there are only commits on one topic,
+     * and that the committed offset covers messageCount messages
      */
-    private void assertOffsetCommitted(int offset, KafkaSpout.OffsetEntry 
entry) {
-
-        boolean currentOffsetMatch = entry.getCommittedOffset() == offset;
-        OffsetAndMetadata nextOffset = entry.findNextCommitOffset();
-        boolean nextOffsetMatch =  nextOffset != null && nextOffset.offset() 
== offset;
-        assertTrue("Next offset: " +
-                        entry.findNextCommitOffset() +
-                        " OR current offset: " +
-                        entry.getCommittedOffset() +
-                        " must equal desired offset: " +
-                        offset,
-                currentOffsetMatch | nextOffsetMatch);
+    private void verifyAllMessagesCommitted(long messageCount) {
+        verify(consumerSpy, times(1)).commitSync(commitCapture.capture());
+        Map<TopicPartition, OffsetAndMetadata> commits = 
commitCapture.getValue();
+        assertThat("Expected commits for only one topic partition", 
commits.entrySet().size(), is(1));
+        OffsetAndMetadata offset = 
commits.entrySet().iterator().next().getValue();
+        assertThat("Expected committed offset to cover all emitted messages", 
offset.offset(), is(messageCount - 1));
     }
 
     @Test
     public void shouldContinueWithSlowDoubleAcks() throws Exception {
-        int messageCount = 20;
-        SpoutContext context = initializeSpout(messageCount);
-
-        //play 1st tuple
-        ArgumentCaptor<Object> messageIdToDoubleAck = 
ArgumentCaptor.forClass(Object.class);
-        context.spout.nextTuple();
-        verify(context.collector).emit(anyString(), anyList(), 
messageIdToDoubleAck.capture());
-        context.spout.ack(messageIdToDoubleAck.getValue());
-
-        for (int i = 0; i < messageCount/2; i++) {
-            context.spout.nextTuple();
-        };
-
-        context.spout.ack(messageIdToDoubleAck.getValue());
-
-        for (int i = 0; i < messageCount; i++) {
-            context.spout.nextTuple();
-        };
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            int messageCount = 20;
+            initializeSpout(messageCount);
+
+            //play 1st tuple
+            ArgumentCaptor<Object> messageIdToDoubleAck = 
ArgumentCaptor.forClass(Object.class);
+            spout.nextTuple();
+            verify(collector).emit(anyString(), anyList(), 
messageIdToDoubleAck.capture());
+            spout.ack(messageIdToDoubleAck.getValue());
+
+            //Emit some more messages
+            for(int i = 0; i < messageCount / 2; i++) {
+                spout.nextTuple();
+            }
+
+            spout.ack(messageIdToDoubleAck.getValue());
+
+            //Emit any remaining messages
+            for(int i = 0; i < messageCount; i++) {
+                spout.nextTuple();
+            }
+
+            //Verify that all messages are emitted, ack all the messages
+            ArgumentCaptor<Object> messageIds = 
ArgumentCaptor.forClass(Object.class);
+            verify(collector, 
times(messageCount)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+                anyList(),
+                messageIds.capture());
+            for(Object id : messageIds.getAllValues()) {
+                spout.ack(id);
+            }
 
-        ArgumentCaptor<Object> remainingIds = 
ArgumentCaptor.forClass(Object.class);
+            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+            //Commit offsets
+            spout.nextTuple();
 
-        verify(context.collector, times(messageCount)).emit(
-                eq(SingleTopicKafkaSpoutConfiguration.STREAM),
-                anyList(),
-                remainingIds.capture());
-        for (Object id : remainingIds.getAllValues()) {
-            context.spout.ack(id);
+            verifyAllMessagesCommitted(messageCount);
         }
-
-        for(Object item : context.spout.acked.values()) {
-            assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) 
item);
-        };
     }
 
     @Test
     public void shouldEmitAllMessages() throws Exception {
-        int messageCount = 10;
-        SpoutContext context = initializeSpout(messageCount);
-
-
-        for (int i = 0; i < messageCount; i++) {
-            context.spout.nextTuple();
-            ArgumentCaptor<Object> messageId = 
ArgumentCaptor.forClass(Object.class);
-            verify(context.collector).emit(
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            int messageCount = 10;
+            initializeSpout(messageCount);
+
+            //Emit all messages and check that they are emitted. Ack the 
messages too
+            for(int i = 0; i < messageCount; i++) {
+                spout.nextTuple();
+                ArgumentCaptor<Object> messageId = 
ArgumentCaptor.forClass(Object.class);
+                verify(collector).emit(
                     eq(SingleTopicKafkaSpoutConfiguration.STREAM),
                     eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC,
-                            Integer.toString(i),
-                            Integer.toString(i))),
-            messageId.capture());
-            context.spout.ack(messageId.getValue());
-            reset(context.collector);
-        };
-
-        for (Object item : context.spout.acked.values()) {
-            assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) 
item);
-        };
+                        Integer.toString(i),
+                        Integer.toString(i))),
+                    messageId.capture());
+                spout.ack(messageId.getValue());
+                reset(collector);
+            }
+
+            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+            //Commit offsets
+            spout.nextTuple();
+
+            verifyAllMessagesCommitted(messageCount);
+        }
     }
 
     @Test
     public void shouldReplayInOrderFailedMessages() throws Exception {
-        int messageCount = 10;
-        SpoutContext context = initializeSpout(messageCount);
-
-        //play and ack 1 tuple
-        ArgumentCaptor<Object> messageIdAcked = 
ArgumentCaptor.forClass(Object.class);
-        context.spout.nextTuple();
-        verify(context.collector).emit(anyString(), anyList(), 
messageIdAcked.capture());
-        context.spout.ack(messageIdAcked.getValue());
-        reset(context.collector);
-
-        //play and fail 1 tuple
-        ArgumentCaptor<Object> messageIdFailed = 
ArgumentCaptor.forClass(Object.class);
-        context.spout.nextTuple();
-        verify(context.collector).emit(anyString(), anyList(), 
messageIdFailed.capture());
-        context.spout.fail(messageIdFailed.getValue());
-        reset(context.collector);
-
-        //pause so that failed tuples will be retried
-        Thread.sleep(200);
-
-
-        //allow for some calls to nextTuple() to fail to emit a tuple
-        for (int i = 0; i < messageCount + 5; i++) {
-            context.spout.nextTuple();
-        };
-
-        ArgumentCaptor<Object> remainingMessageIds = 
ArgumentCaptor.forClass(Object.class);
-
-        //1 message replayed, messageCount - 2 messages emitted for the first 
time
-        verify(context.collector, times(messageCount - 1)).emit(
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            int messageCount = 10;
+            initializeSpout(messageCount);
+
+            //play and ack 1 tuple
+            ArgumentCaptor<Object> messageIdAcked = 
ArgumentCaptor.forClass(Object.class);
+            spout.nextTuple();
+            verify(collector).emit(anyString(), anyList(), 
messageIdAcked.capture());
+            spout.ack(messageIdAcked.getValue());
+            reset(collector);
+
+            //play and fail 1 tuple
+            ArgumentCaptor<Object> messageIdFailed = 
ArgumentCaptor.forClass(Object.class);
+            spout.nextTuple();
+            verify(collector).emit(anyString(), anyList(), 
messageIdFailed.capture());
+            spout.fail(messageIdFailed.getValue());
+            reset(collector);
+
+            //Emit all remaining messages. Failed tuples retry immediately 
with current configuration, so no need to wait.
+            for(int i = 0; i < messageCount; i++) {
+                spout.nextTuple();
+            }
+
+            ArgumentCaptor<Object> remainingMessageIds = 
ArgumentCaptor.forClass(Object.class);
+            //All messages except the first acked message should have been 
emitted
+            verify(collector, times(messageCount - 1)).emit(
                 eq(SingleTopicKafkaSpoutConfiguration.STREAM),
                 anyList(),
                 remainingMessageIds.capture());
-        for (Object id : remainingMessageIds.getAllValues()) {
-            context.spout.ack(id);
-        }
+            for(Object id : remainingMessageIds.getAllValues()) {
+                spout.ack(id);
+            }
 
-        for (Object item : context.spout.acked.values()) {
-            assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) 
item);
-        };
+            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+            //Commit offsets
+            spout.nextTuple();
+
+            verifyAllMessagesCommitted(messageCount);
+        }
     }
 
     @Test
     public void shouldReplayFirstTupleFailedOutOfOrder() throws Exception {
-        int messageCount = 10;
-        SpoutContext context = initializeSpout(messageCount);
-
-
-        //play 1st tuple
-        ArgumentCaptor<Object> messageIdToFail = 
ArgumentCaptor.forClass(Object.class);
-        context.spout.nextTuple();
-        verify(context.collector).emit(anyString(), anyList(), 
messageIdToFail.capture());
-        reset(context.collector);
-
-        //play 2nd tuple
-        ArgumentCaptor<Object> messageIdToAck = 
ArgumentCaptor.forClass(Object.class);
-        context.spout.nextTuple();
-        verify(context.collector).emit(anyString(), anyList(), 
messageIdToAck.capture());
-        reset(context.collector);
-
-        //ack 2nd tuple
-        context.spout.ack(messageIdToAck.getValue());
-        //fail 1st tuple
-        context.spout.fail(messageIdToFail.getValue());
-
-        //pause so that failed tuples will be retried
-        Thread.sleep(200);
-
-        //allow for some calls to nextTuple() to fail to emit a tuple
-        for (int i = 0; i < messageCount + 5; i++) {
-            context.spout.nextTuple();
-        };
-
-        ArgumentCaptor<Object> remainingIds = 
ArgumentCaptor.forClass(Object.class);
-        //1 message replayed, messageCount - 2 messages emitted for the first 
time
-        verify(context.collector, times(messageCount - 1)).emit(
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            int messageCount = 10;
+            initializeSpout(messageCount);
+
+            //play 1st tuple
+            ArgumentCaptor<Object> messageIdToFail = 
ArgumentCaptor.forClass(Object.class);
+            spout.nextTuple();
+            verify(collector).emit(anyString(), anyList(), 
messageIdToFail.capture());
+            reset(collector);
+
+            //play 2nd tuple
+            ArgumentCaptor<Object> messageIdToAck = 
ArgumentCaptor.forClass(Object.class);
+            spout.nextTuple();
+            verify(collector).emit(anyString(), anyList(), 
messageIdToAck.capture());
+            reset(collector);
+
+            //ack 2nd tuple
+            spout.ack(messageIdToAck.getValue());
+            //fail 1st tuple
+            spout.fail(messageIdToFail.getValue());
+
+            //Emit all remaining messages. Failed tuples retry immediately 
with current configuration, so no need to wait.
+            for(int i = 0; i < messageCount; i++) {
+                spout.nextTuple();
+            }
+
+            ArgumentCaptor<Object> remainingIds = 
ArgumentCaptor.forClass(Object.class);
+            //All messages except the first acked message should have been 
emitted
+            verify(collector, times(messageCount - 1)).emit(
                 eq(SingleTopicKafkaSpoutConfiguration.STREAM),
                 anyList(),
                 remainingIds.capture());
-        for (Object id : remainingIds.getAllValues()) {
-            context.spout.ack(id);
-        };
+            for(Object id : remainingIds.getAllValues()) {
+                spout.ack(id);
+            }
 
-        for (Object item : context.spout.acked.values()) {
-            assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) 
item);
-        };
+            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+            //Commit offsets
+            spout.nextTuple();
+
+            verifyAllMessagesCommitted(messageCount);
+        }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a03137ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
index 6e59d42..d49516e 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
@@ -53,9 +53,9 @@ public class KafkaSpoutTopologyMainNamedTopics {
 
     protected void runMain(String[] args) throws Exception {
         if (args.length == 0) {
-            submitTopologyLocalCluster(getTopolgyKafkaSpout(), getConfig());
+            submitTopologyLocalCluster(getTopologyKafkaSpout(), getConfig());
         } else {
-            submitTopologyRemoteCluster(args[0], getTopolgyKafkaSpout(), 
getConfig());
+            submitTopologyRemoteCluster(args[0], getTopologyKafkaSpout(), 
getConfig());
         }
 
     }
@@ -86,7 +86,7 @@ public class KafkaSpoutTopologyMainNamedTopics {
         return config;
     }
 
-    protected StormTopology getTopolgyKafkaSpout() {
+    protected StormTopology getTopologyKafkaSpout() {
         final TopologyBuilder tp = new TopologyBuilder();
         tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1);
         tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt())

http://git-wip-us.apache.org/repos/asf/storm/blob/a03137ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
index 8b967fa..cfc3446 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
@@ -40,7 +40,7 @@ public class KafkaSpoutTopologyMainWildcardTopics extends 
KafkaSpoutTopologyMain
         new KafkaSpoutTopologyMainWildcardTopics().runMain(args);
     }
 
-    protected StormTopology getTopolgyKafkaSpout() {
+    protected StormTopology getTopologyKafkaSpout() {
         final TopologyBuilder tp = new TopologyBuilder();
         tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1);
         tp.setBolt("kafka_bolt", new 
KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM);

http://git-wip-us.apache.org/repos/asf/storm/blob/a03137ed/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7f4f965..a45903a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -338,7 +338,6 @@
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <version>${junit.version}</version>
             <scope>test</scope>
         </dependency>
     </dependencies>

http://git-wip-us.apache.org/repos/asf/storm/blob/a03137ed/storm-core/src/jvm/org/apache/storm/utils/Time.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Time.java 
b/storm-core/src/jvm/org/apache/storm/utils/Time.java
index e501b6c..a6a4fe1 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Time.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Time.java
@@ -24,38 +24,67 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
+/**
+ * This class implements time simulation support. When time simulation is 
enabled, methods on this class will use fixed time.
+ * When time simulation is disabled, methods will pass through to relevant 
java.lang.System/java.lang.Thread calls.
+ * Methods using units higher than nanoseconds will pass through to 
System.currentTimeMillis(). Methods supporting nanoseconds will pass through to 
System.nanoTime().
+ */
 public class Time {
     public static final Logger LOG = LoggerFactory.getLogger(Time.class);
     
     private static AtomicBoolean simulating = new AtomicBoolean(false);
-    private static AtomicLong autoAdvanceOnSleep = new AtomicLong(0);
+    private static AtomicLong autoAdvanceNanosOnSleep = new AtomicLong(0);
     //TODO: should probably use weak references here or something
-    private static volatile Map<Thread, AtomicLong> threadSleepTimes;
+    private static volatile Map<Thread, AtomicLong> threadSleepTimesNanos;
     private static final Object sleepTimesLock = new Object();
+    private static AtomicLong simulatedCurrTimeNanos;
     
-    private static AtomicLong simulatedCurrTimeMs; //should this be a thread 
local that's allowed to keep advancing?
-    
-    public static void startSimulating() {
-        synchronized(sleepTimesLock) {
-            simulating.set(true);
-            simulatedCurrTimeMs = new AtomicLong(0);
-            threadSleepTimes = new ConcurrentHashMap<>();
+    public static class SimulatedTime implements AutoCloseable {
+
+        public SimulatedTime() {
+            this(null);
+        }
+        
+        public SimulatedTime(Number advanceTimeMs) {
+            synchronized(Time.sleepTimesLock) {
+                Time.simulating.set(true);
+                Time.simulatedCurrTimeNanos = new AtomicLong(0);
+                Time.threadSleepTimesNanos = new ConcurrentHashMap<>();
+                if (advanceTimeMs != null) {
+                    
Time.autoAdvanceNanosOnSleep.set(millisToNanos(advanceTimeMs.longValue()));
+                }
+                LOG.warn("AutoCloseable Simulated Time Starting...");
+            }
+        }
+        
+        @Override
+        public void close() {
+            synchronized(Time.sleepTimesLock) {
+                Time.simulating.set(false);    
+                Time.autoAdvanceNanosOnSleep.set(0);
+                Time.threadSleepTimesNanos = null;
+                LOG.warn("AutoCloseable Simulated Time Ending...");
+            }
         }
     }
     
-    public static void startSimulatingAutoAdvanceOnSleep(long ms) {
-        synchronized(sleepTimesLock) {
-            startSimulating();
-            autoAdvanceOnSleep.set(ms);
+    @Deprecated
+    public static void startSimulating() {
+        synchronized(Time.sleepTimesLock) {
+            Time.simulating.set(true);
+            Time.simulatedCurrTimeNanos = new AtomicLong(0);
+            Time.threadSleepTimesNanos = new ConcurrentHashMap<>();
+            LOG.warn("Simulated Time Starting...");
         }
     }
     
+    @Deprecated
     public static void stopSimulating() {
-        synchronized(sleepTimesLock) {
-            simulating.set(false);    
-            autoAdvanceOnSleep.set(0);
-            threadSleepTimes = null;
+        synchronized(Time.sleepTimesLock) {
+            Time.simulating.set(false);    
+            Time.autoAdvanceNanosOnSleep.set(0);
+            Time.threadSleepTimesNanos = null;
+            LOG.warn("Simulated Time Ending...");
         }
     }
     
@@ -65,44 +94,66 @@ public class Time {
     
     public static void sleepUntil(long targetTimeMs) throws 
InterruptedException {
         if(simulating.get()) {
-            try {
-                synchronized(sleepTimesLock) {
-                    if (threadSleepTimes == null) {
+            simulatedSleepUntilNanos(millisToNanos(targetTimeMs));
+        } else {
+            long sleepTimeMs = targetTimeMs - currentTimeMillis();
+            if(sleepTimeMs>0) {
+                Thread.sleep(sleepTimeMs);
+            }
+        }
+    }
+    
+    public static void sleepUntilNanos(long targetTimeNanos) throws 
InterruptedException {
+        if(simulating.get()) {
+            simulatedSleepUntilNanos(targetTimeNanos);
+        } else {
+            long sleepTimeNanos = targetTimeNanos-nanoTime();
+            long sleepTimeMs = nanosToMillis(sleepTimeNanos);
+            int sleepTimeNanosSansMs = (int)(sleepTimeNanos%1_000_000);
+            if(sleepTimeNanos>0) {
+                Thread.sleep(sleepTimeMs, sleepTimeNanosSansMs);
+            } 
+        }
+    }
+    
+    private static void simulatedSleepUntilNanos(long targetTimeNanos) throws 
InterruptedException {
+        try {
+            synchronized (sleepTimesLock) {
+                if (threadSleepTimesNanos == null) {
+                    LOG.debug("{} is still sleeping after simulated time 
disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE"));
+                    throw new InterruptedException();
+                }
+                threadSleepTimesNanos.put(Thread.currentThread(), new 
AtomicLong(targetTimeNanos));
+            }
+            while (simulatedCurrTimeNanos.get() < targetTimeNanos) {
+                synchronized (sleepTimesLock) {
+                    if (threadSleepTimesNanos == null) {
                         LOG.debug("{} is still sleeping after simulated time 
disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE"));
                         throw new InterruptedException();
                     }
-                    threadSleepTimes.put(Thread.currentThread(), new 
AtomicLong(targetTimeMs));
                 }
-                while(simulatedCurrTimeMs.get() < targetTimeMs) {
-                    synchronized(sleepTimesLock) {
-                        if (threadSleepTimes == null) {
-                            LOG.debug("{} is still sleeping after simulated 
time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE"));
-                            throw new InterruptedException();
-                        }
-                    }
-                    long autoAdvance = autoAdvanceOnSleep.get();
-                    if (autoAdvance > 0) {
-                        advanceTime(autoAdvance);
-                    }
-                    Thread.sleep(10);
+                long autoAdvance = autoAdvanceNanosOnSleep.get();
+                if (autoAdvance > 0) {
+                    advanceTimeNanos(autoAdvance);
                 }
-            } finally {
-                synchronized(sleepTimesLock) {
-                    if (simulating.get() && threadSleepTimes != null) {
-                        threadSleepTimes.remove(Thread.currentThread());
-                    }
+                Thread.sleep(10);
+            }
+        } finally {
+            synchronized (sleepTimesLock) {
+                if (simulating.get() && threadSleepTimesNanos != null) {
+                    threadSleepTimesNanos.remove(Thread.currentThread());
                 }
             }
-        } else {
-            long sleepTime = targetTimeMs-currentTimeMillis();
-            if(sleepTime>0) 
-                Thread.sleep(sleepTime);
         }
     }
 
     public static void sleep(long ms) throws InterruptedException {
         sleepUntil(currentTimeMillis()+ms);
     }
+    
+    public static void sleepNanos(long nanos) throws InterruptedException {
+        sleepUntilNanos(nanoTime() + nanos);
+    }
 
     public static void sleepSecs (long secs) throws InterruptedException {
         if (secs > 0) {
@@ -110,14 +161,30 @@ public class Time {
         }
     }
     
+    public static long nanoTime() {
+        if (simulating.get()) {
+            return simulatedCurrTimeNanos.get();
+        } else {
+            return System.nanoTime();
+        }
+    }
+    
     public static long currentTimeMillis() {
         if(simulating.get()) {
-            return simulatedCurrTimeMs.get();
+            return nanosToMillis(simulatedCurrTimeNanos.get());
         } else {
             return System.currentTimeMillis();
         }
     }
 
+    public static long nanosToMillis(long nanos) {
+        return nanos/1_000_000;
+    }
+    
+    public static long millisToNanos(long millis) {
+        return millis*1_000_000;
+    }
+    
     public static long secsToMillis (int secs) {
         return 1000*(long) secs;
     }
@@ -139,18 +206,32 @@ public class Time {
     }
     
     public static void advanceTime(long ms) {
-        if (!simulating.get()) throw new IllegalStateException("Cannot 
simulate time unless in simulation mode");
-        if (ms < 0) throw new IllegalArgumentException("advanceTime only 
accepts positive time as an argument");
-        long newTime = simulatedCurrTimeMs.addAndGet(ms);
-        LOG.warn("Advanced simulated time to {}", newTime);
+        advanceTimeNanos(millisToNanos(ms));
+    }
+    
+    public static void advanceTimeNanos(long nanos) {
+        if (!simulating.get()) {
+            throw new IllegalStateException("Cannot simulate time unless in 
simulation mode");
+        }
+        if (nanos < 0) {
+            throw new IllegalArgumentException("advanceTime only accepts 
positive time as an argument");
+        }
+        long newTime = simulatedCurrTimeNanos.addAndGet(nanos);
+        LOG.debug("Advanced simulated time to {}", newTime);
+    }
+    
+    public static void advanceTimeSecs(long secs) {
+        advanceTime(secs * 1_000);
     }
     
     public static boolean isThreadWaiting(Thread t) {
-        if(!simulating.get()) throw new IllegalStateException("Must be in 
simulation mode");
+        if(!simulating.get()) {
+            throw new IllegalStateException("Must be in simulation mode");
+        }
         AtomicLong time;
         synchronized(sleepTimesLock) {
-            time = threadSleepTimes.get(t);
+            time = threadSleepTimesNanos.get(t);
         }
-        return !t.isAlive() || time!=null && currentTimeMillis() < 
time.longValue();
+        return !t.isAlive() || time!=null && nanoTime() < time.longValue();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a03137ed/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java 
b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java
index 24ccda5..9cd85f8 100644
--- a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java
+++ b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java
@@ -43,6 +43,7 @@ import org.apache.storm.localizer.ILocalizer;
 import org.apache.storm.scheduler.ISupervisor;
 import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
 import org.junit.Test;
 
 public class SlotTest {
@@ -113,8 +114,7 @@ public class SlotTest {
     
     @Test
     public void testEmptyToEmpty() throws Exception {
-        Time.startSimulatingAutoAdvanceOnSleep(1010);
-        try {
+        try (SimulatedTime simulatedTime = new SimulatedTime(1010)) {
             ILocalizer localizer = mock(ILocalizer.class);
             LocalState state = mock(LocalState.class);
             ContainerLauncher containerLauncher = 
mock(ContainerLauncher.class);
@@ -125,15 +125,12 @@ public class SlotTest {
             DynamicState nextState = Slot.handleEmpty(dynamicState, 
staticState);
             assertEquals(MachineState.EMPTY, nextState.state);
             assertTrue(Time.currentTimeMillis() > 1000);
-        } finally {
-            Time.stopSimulating();
         }
     }
     
     @Test
     public void testLaunchContainerFromEmpty() throws Exception {
-        Time.startSimulatingAutoAdvanceOnSleep(1010);
-        try {
+        try (SimulatedTime simulatedTime = new SimulatedTime(1010)) {
             int port = 8080;
             String topoId = "NEW";
             List<ExecutorInfo> execList =  mkExecutorInfoList(1,2,3,4,5);
@@ -210,16 +207,13 @@ public class SlotTest {
             assertSame(newAssignment, nextState.currentAssignment);
             assertSame(container, nextState.container);
             assertTrue(Time.currentTimeMillis() > 2000);
-        } finally {
-            Time.stopSimulating();
         }
     }
 
 
     @Test
     public void testRelaunch() throws Exception {
-        Time.startSimulatingAutoAdvanceOnSleep(1010);
-        try {
+        try (SimulatedTime simulatedTime = new SimulatedTime(1010)) {
             int port = 8080;
             String topoId = "CURRENT";
             List<ExecutorInfo> execList =  mkExecutorInfoList(1,2,3,4,5);
@@ -260,15 +254,12 @@ public class SlotTest {
             
             nextState = Slot.stateMachineStep(nextState, staticState);
             assertEquals(MachineState.RUNNING, nextState.state);
-        } finally {
-            Time.stopSimulating();
         }
     }
     
     @Test
     public void testReschedule() throws Exception {
-        Time.startSimulatingAutoAdvanceOnSleep(1010);
-        try {
+        try (SimulatedTime simulatedTime = new SimulatedTime(1010)) {
             int port = 8080;
             String cTopoId = "CURRENT";
             List<ExecutorInfo> cExecList =  mkExecutorInfoList(1,2,3,4,5);
@@ -368,16 +359,13 @@ public class SlotTest {
             assertSame(nAssignment, nextState.currentAssignment);
             assertSame(nContainer, nextState.container);
             assertTrue(Time.currentTimeMillis() > 4000);
-        } finally {
-            Time.stopSimulating();
         }
     }
 
     
     @Test
     public void testRunningToEmpty() throws Exception {
-        Time.startSimulatingAutoAdvanceOnSleep(1010);
-        try {
+        try (SimulatedTime simulatedTime = new SimulatedTime(1010)) {
             int port = 8080;
             String cTopoId = "CURRENT";
             List<ExecutorInfo> cExecList =  mkExecutorInfoList(1,2,3,4,5);
@@ -432,15 +420,12 @@ public class SlotTest {
             assertEquals(null, nextState.container);
             assertEquals(null, nextState.currentAssignment);
             assertTrue(Time.currentTimeMillis() > 3000);
-        } finally {
-            Time.stopSimulating();
         }
     }
     
     @Test
     public void testRunWithProfileActions() throws Exception {
-        Time.startSimulatingAutoAdvanceOnSleep(1010);
-        try {
+        try (SimulatedTime simulatedTime = new SimulatedTime(1010)) {
             int port = 8080;
             String cTopoId = "CURRENT";
             List<ExecutorInfo> cExecList =  mkExecutorInfoList(1,2,3,4,5);
@@ -508,8 +493,6 @@ public class SlotTest {
             assertEquals(Collections.<TopoProfileAction> emptySet(), 
nextState.pendingStopProfileActions);
             assertEquals(Collections.<TopoProfileAction> emptySet(), 
nextState.profileActions);
             assertTrue(Time.currentTimeMillis() > 5000);
-        } finally {
-            Time.stopSimulating();
         }
     }
 }

Reply via email to