This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new de2b3d7 Support utils.Time of kafka-client-spout (#3116)
de2b3d7 is described below
commit de2b3d71ba160c38ef614d7575c3da831fb03b40
Author: se choi <[email protected]>
AuthorDate: Thu Nov 22 13:55:55 2018 +0900
Support utils.Time of kafka-client-spout (#3116)
* Support utils.Time of kafka-client-spout
---
.../src/java/org/apache/storm/utils/Time.java | 241 +++++++++++++++++++++
1 file changed, 241 insertions(+)
diff --git a/storm-compatibility/src/java/org/apache/storm/utils/Time.java
b/storm-compatibility/src/java/org/apache/storm/utils/Time.java
new file mode 100644
index 0000000..3117e0a
--- /dev/null
+++ b/storm-compatibility/src/java/org/apache/storm/utils/Time.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This class implements time simulation support. When time simulation is
enabled,
+ * methods on this class will use fixed time. When time simulation is disabled,
+ * methods will pass through to relevant java.lang.System/java.lang.Thread
calls.
+ * Methods using units higher than nanoseconds will pass through to
System.currentTimeMillis().
+ * Methods supporting nanoseconds will pass through to System.nanoTime().
+ */
+public final class Time {
+ private static final Logger LOG = Logger.getLogger(Time.class.getName());
+ private static final AtomicBoolean SIMULATING = new AtomicBoolean(false);
+ private static final AtomicLong AUTO_ADVANCE_NANOS_ON_SLEEP = new
AtomicLong(0);
+ private static final Map<Thread, AtomicLong> THREAD_SLEEP_TIMES_NANOS = new
ConcurrentHashMap<>();
+ private static final Object SLEEP_TIMES_LOCK = new Object();
+ private static final AtomicLong SIMULATED_CURR_TIME_NANOS = new
AtomicLong(0);
+
+ private Time() {
+ }
+
+ public static boolean isSimulating() {
+ return SIMULATING.get();
+ }
+
+ public static void sleepUntil(long targetTimeMs) throws InterruptedException
{
+ if (SIMULATING.get()) {
+ simulatedSleepUntilNanos(millisToNanos(targetTimeMs));
+ } else {
+ long sleepTimeMs = targetTimeMs - currentTimeMillis();
+ if (sleepTimeMs > 0) {
+ Thread.sleep(sleepTimeMs);
+ }
+ }
+ }
+
+ public static void sleepUntilNanos(long targetTimeNanos) throws
InterruptedException {
+ if (SIMULATING.get()) {
+ simulatedSleepUntilNanos(targetTimeNanos);
+ } else {
+ long sleepTimeNanos = targetTimeNanos - nanoTime();
+ long sleepTimeMs = nanosToMillis(sleepTimeNanos);
+ int sleepTimeNanosSansMs = (int) (sleepTimeNanos % 1_000_000);
+ if (sleepTimeNanos > 0) {
+ Thread.sleep(sleepTimeMs, sleepTimeNanosSansMs);
+ }
+ }
+ }
+
+ private static void simulatedSleepUntilNanos(long targetTimeNanos) throws
InterruptedException {
+ try {
+ synchronized (SLEEP_TIMES_LOCK) {
+ if (!SIMULATING.get()) {
+ LOG.log(Level.FINER, Thread.currentThread()
+ + " is still sleeping after simulated time disabled.",
+ new RuntimeException("STACK TRACE"));
+ throw new InterruptedException();
+ }
+ THREAD_SLEEP_TIMES_NANOS.put(Thread.currentThread(), new
AtomicLong(targetTimeNanos));
+ }
+ while (SIMULATED_CURR_TIME_NANOS.get() < targetTimeNanos) {
+ synchronized (SLEEP_TIMES_LOCK) {
+ if (!SIMULATING.get()) {
+ LOG.log(Level.FINER, Thread.currentThread()
+ + " is still sleeping after simulated time disabled.",
+ new RuntimeException("STACK TRACE"));
+ throw new InterruptedException();
+ }
+ long autoAdvance = AUTO_ADVANCE_NANOS_ON_SLEEP.get();
+ if (autoAdvance > 0) {
+ advanceTimeNanos(autoAdvance);
+ }
+ }
+ Thread.sleep(10);
+ }
+ } finally {
+ THREAD_SLEEP_TIMES_NANOS.remove(Thread.currentThread());
+ }
+ }
+
+ public static void sleep(long ms) throws InterruptedException {
+ if (ms > 0) {
+ if (SIMULATING.get()) {
+ simulatedSleepUntilNanos(millisToNanos(currentTimeMillis() + ms));
+ } else {
+ Thread.sleep(ms);
+ }
+ }
+ }
+
+ public static void parkNanos(long nanos) throws InterruptedException {
+ if (nanos > 0) {
+ if (SIMULATING.get()) {
+ simulatedSleepUntilNanos(nanoTime() + nanos);
+ } else {
+ LockSupport.parkNanos(nanos);
+ }
+ }
+ }
+
+ public static void sleepSecs(long secs) throws InterruptedException {
+ if (secs > 0) {
+ sleep(secs * 1000);
+ }
+ }
+
+ public static long nanoTime() {
+ if (SIMULATING.get()) {
+ return SIMULATED_CURR_TIME_NANOS.get();
+ } else {
+ return System.nanoTime();
+ }
+ }
+
+ public static long currentTimeMillis() {
+ if (SIMULATING.get()) {
+ return nanosToMillis(SIMULATED_CURR_TIME_NANOS.get());
+ } else {
+ return System.currentTimeMillis();
+ }
+ }
+
+ public static long nanosToMillis(long nanos) {
+ return nanos / 1_000_000;
+ }
+
+ public static long millisToNanos(long millis) {
+ return millis * 1_000_000;
+ }
+
+ public static long secsToMillis(int secs) {
+ return 1000 * (long) secs;
+ }
+
+ public static long secsToMillisLong(double secs) {
+ return (long) (1000 * secs);
+ }
+
+ public static int currentTimeSecs() {
+ return (int) (currentTimeMillis() / 1000);
+ }
+
+ public static int deltaSecs(int timeInSeconds) {
+ return Time.currentTimeSecs() - timeInSeconds;
+ }
+
+ public static long deltaMs(long timeInMilliseconds) {
+ return Time.currentTimeMillis() - timeInMilliseconds;
+ }
+
+ public static void advanceTime(long ms) {
+ advanceTimeNanos(millisToNanos(ms));
+ }
+
+ public static void advanceTimeNanos(long nanos) {
+ if (!SIMULATING.get()) {
+ throw new IllegalStateException("Cannot simulate time unless in
simulation mode");
+ }
+ if (nanos < 0) {
+ throw new IllegalArgumentException("advanceTime only accepts positive
time as an argument");
+ }
+ synchronized (SLEEP_TIMES_LOCK) {
+ long newTime = SIMULATED_CURR_TIME_NANOS.addAndGet(nanos);
+ Iterator<AtomicLong> sleepTimesIter =
THREAD_SLEEP_TIMES_NANOS.values().iterator();
+ while (sleepTimesIter.hasNext()) {
+ AtomicLong curr = sleepTimesIter.next();
+ if (SIMULATED_CURR_TIME_NANOS.get() >= curr.get()) {
+ sleepTimesIter.remove();
+ }
+ }
+ LOG.log(Level.FINER, "Advanced simulated time to " + newTime);
+ }
+ }
+
+ public static void advanceTimeSecs(long secs) {
+ advanceTime(secs * 1_000);
+ }
+
+ public static boolean isThreadWaiting(Thread t) {
+ if (!SIMULATING.get()) {
+ throw new IllegalStateException("Must be in simulation mode");
+ }
+ AtomicLong time = THREAD_SLEEP_TIMES_NANOS.get(t);
+ return !t.isAlive() || time != null && nanoTime() < time.longValue();
+ }
+
+ public static class SimulatedTime implements AutoCloseable {
+
+ public SimulatedTime() {
+ this(null);
+ }
+
+ public SimulatedTime(Number advanceTimeMs) {
+ synchronized (Time.SLEEP_TIMES_LOCK) {
+ Time.SIMULATING.set(true);
+ Time.SIMULATED_CURR_TIME_NANOS.set(0);
+ Time.THREAD_SLEEP_TIMES_NANOS.clear();
+ if (advanceTimeMs != null) {
+
Time.AUTO_ADVANCE_NANOS_ON_SLEEP.set(millisToNanos(advanceTimeMs.longValue()));
+ } else {
+ Time.AUTO_ADVANCE_NANOS_ON_SLEEP.set(0);
+ }
+ LOG.warning("AutoCloseable Simulated Time Starting...");
+ }
+ }
+
+ @Override
+ public void close() {
+ synchronized (Time.SLEEP_TIMES_LOCK) {
+ Time.SIMULATING.set(false);
+ LOG.warning("AutoCloseable Simulated Time Ending...");
+ }
+ }
+ }
+}