johnjcasey commented on code in PR #23545:
URL: https://github.com/apache/beam/pull/23545#discussion_r991715756


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java:
##########
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.checkerframework.checker.nullness.qual.KeyForBottom;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An unbounded {@link ScheduledExecutorService} based upon the {@link 
ScheduledThreadPoolExecutor}
+ * API contract.
+ *
+ * <p>Note that this implementation differs from a {@link 
ScheduledThreadPoolExecutor} in the
+ * following ways:
+ *
+ * <ul>
+ *   <li>The core pool size is always 0.
+ *   <li>Any work that is immediately executable is given to a thread before 
returning from the
+ *       corresponding {@code execute}, {@code submit}, {@code schedule*} 
methods.
+ *   <li>An unbounded number of threads can be started.
+ * </ul>
+ */
+public final class UnboundedScheduledExecutorService implements 
ScheduledExecutorService {
+
+  /**
+   * A {@link FutureTask} that handles periodically rescheduling tasks.
+   *
+   * <p>Note that it is important that this class extends {@link FutureTask} 
and {@link
+   * RunnableScheduledFuture} to be compatible with the types of objects 
returned by a {@link
+   * ScheduledThreadPoolExecutor}.
+   */
+  @VisibleForTesting
+  @SuppressFBWarnings(
+      value = "EQ_COMPARETO_USE_OBJECT_EQUALS",
+      justification =
+          "Default equals/hashCode is what we want since two scheduled tasks 
are only equivalent if they point to the same instance.")
+  final class ScheduledFutureTask<@Nullable @KeyForBottom V> extends 
FutureTask<V>
+      implements RunnableScheduledFuture<V> {
+
+    /** Sequence number to break ties FIFO. */
+    private final long sequenceNumber;
+
+    /** The time the task is enabled to execute in nanoTime units. */
+    private long time;
+
+    /**
+     * Period in nanoseconds for repeating tasks. A positive value indicates 
fixed-rate execution. A
+     * negative value indicates fixed-delay execution. A value of 0 indicates 
a non-repeating
+     * (one-shot) task.
+     */
+    private final long period;
+
+    /** Creates a one-shot action with given nanoTime-based trigger time. */
+    ScheduledFutureTask(Runnable r, @Nullable V result, long triggerTime) {
+      this(r, result, triggerTime, 0);
+    }
+
+    /** Creates a periodic action with given nanoTime-based initial trigger 
time and period. */
+    @SuppressWarnings("argument.type.incompatible")
+    ScheduledFutureTask(Runnable r, @Nullable V result, long triggerTime, long 
period) {
+      super(r, result);
+      this.time = triggerTime;
+      this.period = period;
+      this.sequenceNumber = sequencer.getAndIncrement();
+    }
+
+    /** Creates a one-shot action with given nanoTime-based trigger time. */
+    ScheduledFutureTask(Callable<V> callable, long triggerTime) {
+      super(callable);
+      this.time = triggerTime;
+      this.period = 0;
+      this.sequenceNumber = sequencer.getAndIncrement();
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+      return unit.convert(LongMath.saturatedSubtract(time, clock.nanoTime()), 
NANOSECONDS);
+    }
+
+    @Override
+    public int compareTo(Delayed other) {
+      if (other == this) // compare zero if same object
+      {
+        return 0;
+      }
+      if (other instanceof ScheduledFutureTask) {
+        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;
+        int diff = Longs.compare(time, x.time);
+        if (diff != 0) {
+          return diff;
+        }
+        if (sequenceNumber < x.sequenceNumber) {
+          return -1;
+        }
+        return 1;
+      }
+      long diff = LongMath.saturatedSubtract(getDelay(NANOSECONDS), 
other.getDelay(NANOSECONDS));
+      return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
+    }
+
+    @Override
+    public boolean isPeriodic() {
+      return period != 0;
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+      boolean cancelled = super.cancel(mayInterruptIfRunning);
+      synchronized (tasks) {
+        tasks.remove(this);
+      }
+      return cancelled;
+    }
+
+    /** Overrides {@link FutureTask} so as to reset/requeue if periodic. */
+    @Override
+    public void run() {
+      boolean periodic = isPeriodic();
+      if (!periodic) {
+        super.run();
+      } else if (super.runAndReset()) {
+        // Set the next runtime
+        if (period > 0) {
+          time = LongMath.saturatedAdd(time, period);
+        } else {
+          time = triggerTime(-period);
+        }
+        synchronized (tasks) {
+          tasks.add(this);
+          tasks.notify();
+        }
+      }
+    }
+  }
+
+  // Used to break ties in ordering of future tasks that are scheduled for the 
same time
+  // so that they have a consistent ordering based upon their insertion order 
into
+  // this ScheduledExecutorService.
+  private final AtomicLong sequencer = new AtomicLong();
+
+  private final NanoClock clock;
+  private final ThreadPoolExecutor threadPoolExecutor;
+  @VisibleForTesting final PriorityQueue<ScheduledFutureTask<?>> tasks;
+  private final AbstractExecutorService invokeMethodsAdapter;
+  private final Future<?> launchTasks;
+
+  public UnboundedScheduledExecutorService() {
+    this(NanoClock.SYSTEM);
+  }
+
+  @VisibleForTesting
+  UnboundedScheduledExecutorService(NanoClock clock) {
+    this.clock = clock;
+    ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
+    
threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory());
+    threadFactoryBuilder.setDaemon(true);
+
+    this.threadPoolExecutor =
+        new ThreadPoolExecutor(
+            0,
+            Integer.MAX_VALUE, // Allow an unlimited number of re-usable 
threads.
+            Long.MAX_VALUE,
+            TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
+            new SynchronousQueue<>(),
+            threadFactoryBuilder.build());
+
+    // Create an internal adapter so that execute does not re-wrap the 
ScheduledFutureTask again
+    this.invokeMethodsAdapter =
+        new AbstractExecutorService() {
+
+          @Override
+          protected <@KeyForBottom T> RunnableFuture<T> newTaskFor(Runnable 
runnable, T value) {
+            return new ScheduledFutureTask<>(runnable, value, 0);
+          }
+
+          @Override
+          protected <@KeyForBottom T> RunnableFuture<T> newTaskFor(Callable<T> 
callable) {
+            return new ScheduledFutureTask<>(callable, 0);
+          }
+
+          @Override
+          public void shutdown() {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public List<Runnable> shutdownNow() {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public boolean isShutdown() {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public boolean isTerminated() {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          /* UnboundedScheduledExecutorService is the only caller after it has 
been initialized.*/
+          @SuppressWarnings("method.invocation.invalid")
+          public void execute(Runnable command) {
+            // These are already guaranteed to be a ScheduledFutureTask so 
there is no need to wrap
+            // it in another ScheduledFutureTask.
+            threadPoolExecutor.execute(command);
+          }
+        };
+    this.tasks = new PriorityQueue<>();
+    this.launchTasks =
+        threadPoolExecutor.submit(new TaskLauncher(tasks, threadPoolExecutor, 
clock));
+  }
+
+  private static class TaskLauncher implements Callable<Void> {
+    private final PriorityQueue<ScheduledFutureTask<?>> tasks;
+    private final ThreadPoolExecutor threadPoolExecutor;
+    private final NanoClock clock;
+
+    private TaskLauncher(
+        PriorityQueue<ScheduledFutureTask<?>> tasks,
+        ThreadPoolExecutor threadPoolExecutor,
+        NanoClock clock) {
+      this.tasks = tasks;
+      this.threadPoolExecutor = threadPoolExecutor;
+      this.clock = clock;
+    }
+
+    @Override
+    public Void call() throws Exception {
+      while (true) {
+        synchronized (tasks) {

Review Comment:
   because we sync on tasks here, is it possible for us to have late execution 
of a task that is intended to be submitted  during this lock?



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java:
##########
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.checkerframework.checker.nullness.qual.KeyForBottom;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An unbounded {@link ScheduledExecutorService} based upon the {@link 
ScheduledThreadPoolExecutor}
+ * API contract.
+ *
+ * <p>Note that this implementation differs from a {@link 
ScheduledThreadPoolExecutor} in the
+ * following ways:
+ *
+ * <ul>
+ *   <li>The core pool size is always 0.
+ *   <li>Any work that is immediately executable is given to a thread before 
returning from the
+ *       corresponding {@code execute}, {@code submit}, {@code schedule*} 
methods.
+ *   <li>An unbounded number of threads can be started.
+ * </ul>
+ */
+public final class UnboundedScheduledExecutorService implements 
ScheduledExecutorService {
+
+  /**
+   * A {@link FutureTask} that handles periodically rescheduling tasks.
+   *
+   * <p>Note that it is important that this class extends {@link FutureTask} 
and {@link
+   * RunnableScheduledFuture} to be compatible with the types of objects 
returned by a {@link
+   * ScheduledThreadPoolExecutor}.
+   */
+  @VisibleForTesting
+  @SuppressFBWarnings(
+      value = "EQ_COMPARETO_USE_OBJECT_EQUALS",
+      justification =
+          "Default equals/hashCode is what we want since two scheduled tasks 
are only equivalent if they point to the same instance.")
+  final class ScheduledFutureTask<@Nullable @KeyForBottom V> extends 
FutureTask<V>
+      implements RunnableScheduledFuture<V> {
+
+    /** Sequence number to break ties FIFO. */
+    private final long sequenceNumber;
+
+    /** The time the task is enabled to execute in nanoTime units. */
+    private long time;
+
+    /**
+     * Period in nanoseconds for repeating tasks. A positive value indicates 
fixed-rate execution. A
+     * negative value indicates fixed-delay execution. A value of 0 indicates 
a non-repeating
+     * (one-shot) task.
+     */
+    private final long period;
+
+    /** Creates a one-shot action with given nanoTime-based trigger time. */
+    ScheduledFutureTask(Runnable r, @Nullable V result, long triggerTime) {
+      this(r, result, triggerTime, 0);
+    }
+
+    /** Creates a periodic action with given nanoTime-based initial trigger 
time and period. */
+    @SuppressWarnings("argument.type.incompatible")
+    ScheduledFutureTask(Runnable r, @Nullable V result, long triggerTime, long 
period) {
+      super(r, result);
+      this.time = triggerTime;
+      this.period = period;
+      this.sequenceNumber = sequencer.getAndIncrement();
+    }
+
+    /** Creates a one-shot action with given nanoTime-based trigger time. */
+    ScheduledFutureTask(Callable<V> callable, long triggerTime) {
+      super(callable);
+      this.time = triggerTime;
+      this.period = 0;
+      this.sequenceNumber = sequencer.getAndIncrement();
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+      return unit.convert(LongMath.saturatedSubtract(time, clock.nanoTime()), 
NANOSECONDS);
+    }
+
+    @Override
+    public int compareTo(Delayed other) {
+      if (other == this) // compare zero if same object
+      {
+        return 0;
+      }
+      if (other instanceof ScheduledFutureTask) {
+        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;
+        int diff = Longs.compare(time, x.time);
+        if (diff != 0) {
+          return diff;
+        }
+        if (sequenceNumber < x.sequenceNumber) {
+          return -1;
+        }
+        return 1;
+      }
+      long diff = LongMath.saturatedSubtract(getDelay(NANOSECONDS), 
other.getDelay(NANOSECONDS));
+      return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
+    }
+
+    @Override
+    public boolean isPeriodic() {
+      return period != 0;
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+      boolean cancelled = super.cancel(mayInterruptIfRunning);
+      synchronized (tasks) {
+        tasks.remove(this);
+      }
+      return cancelled;
+    }
+
+    /** Overrides {@link FutureTask} so as to reset/requeue if periodic. */
+    @Override
+    public void run() {
+      boolean periodic = isPeriodic();
+      if (!periodic) {
+        super.run();
+      } else if (super.runAndReset()) {
+        // Set the next runtime
+        if (period > 0) {
+          time = LongMath.saturatedAdd(time, period);
+        } else {
+          time = triggerTime(-period);
+        }
+        synchronized (tasks) {
+          tasks.add(this);
+          tasks.notify();
+        }
+      }
+    }
+  }
+
+  // Used to break ties in ordering of future tasks that are scheduled for the 
same time
+  // so that they have a consistent ordering based upon their insertion order 
into
+  // this ScheduledExecutorService.
+  private final AtomicLong sequencer = new AtomicLong();
+
+  private final NanoClock clock;
+  private final ThreadPoolExecutor threadPoolExecutor;
+  @VisibleForTesting final PriorityQueue<ScheduledFutureTask<?>> tasks;
+  private final AbstractExecutorService invokeMethodsAdapter;
+  private final Future<?> launchTasks;
+
+  public UnboundedScheduledExecutorService() {
+    this(NanoClock.SYSTEM);
+  }
+
+  @VisibleForTesting
+  UnboundedScheduledExecutorService(NanoClock clock) {
+    this.clock = clock;
+    ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
+    
threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory());
+    threadFactoryBuilder.setDaemon(true);
+
+    this.threadPoolExecutor =
+        new ThreadPoolExecutor(
+            0,
+            Integer.MAX_VALUE, // Allow an unlimited number of re-usable 
threads.
+            Long.MAX_VALUE,
+            TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
+            new SynchronousQueue<>(),
+            threadFactoryBuilder.build());
+
+    // Create an internal adapter so that execute does not re-wrap the 
ScheduledFutureTask again
+    this.invokeMethodsAdapter =
+        new AbstractExecutorService() {
+
+          @Override
+          protected <@KeyForBottom T> RunnableFuture<T> newTaskFor(Runnable 
runnable, T value) {
+            return new ScheduledFutureTask<>(runnable, value, 0);
+          }
+
+          @Override
+          protected <@KeyForBottom T> RunnableFuture<T> newTaskFor(Callable<T> 
callable) {
+            return new ScheduledFutureTask<>(callable, 0);
+          }
+
+          @Override
+          public void shutdown() {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public List<Runnable> shutdownNow() {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public boolean isShutdown() {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public boolean isTerminated() {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          /* UnboundedScheduledExecutorService is the only caller after it has 
been initialized.*/
+          @SuppressWarnings("method.invocation.invalid")
+          public void execute(Runnable command) {
+            // These are already guaranteed to be a ScheduledFutureTask so 
there is no need to wrap
+            // it in another ScheduledFutureTask.
+            threadPoolExecutor.execute(command);
+          }
+        };
+    this.tasks = new PriorityQueue<>();
+    this.launchTasks =
+        threadPoolExecutor.submit(new TaskLauncher(tasks, threadPoolExecutor, 
clock));
+  }
+
+  private static class TaskLauncher implements Callable<Void> {
+    private final PriorityQueue<ScheduledFutureTask<?>> tasks;
+    private final ThreadPoolExecutor threadPoolExecutor;
+    private final NanoClock clock;
+
+    private TaskLauncher(
+        PriorityQueue<ScheduledFutureTask<?>> tasks,
+        ThreadPoolExecutor threadPoolExecutor,
+        NanoClock clock) {
+      this.tasks = tasks;
+      this.threadPoolExecutor = threadPoolExecutor;
+      this.clock = clock;
+    }
+
+    @Override
+    public Void call() throws Exception {
+      while (true) {
+        synchronized (tasks) {
+          if (threadPoolExecutor.isShutdown()) {
+            return null;
+          }
+          ScheduledFutureTask<?> task = tasks.peek();
+          if (task == null) {
+            tasks.wait();
+            continue;
+          }
+          long nanosToWait = LongMath.saturatedSubtract(task.time, 
clock.nanoTime());
+          if (nanosToWait > 0) {
+            long millisToWait = nanosToWait / 1_000_000;
+            int nanosRemainder = (int) (nanosToWait % 1_000_000);
+            tasks.wait(millisToWait, nanosRemainder);
+            continue;
+          }
+          // Remove the task from the queue since it is ready to be scheduled 
now
+          task = tasks.remove();
+          threadPoolExecutor.execute(task);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    threadPoolExecutor.shutdown();
+    synchronized (tasks) {
+      // Notify tasks which checks to see if the ThreadPoolExecutor is 
shutdown and exits cleanly.
+      tasks.notify();
+    }
+
+    // Re-throw any errors during shutdown of the launchTasks thread.
+    try {
+      launchTasks.get();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    } catch (ExecutionException e) {
+      throw new RuntimeException(e.getCause());
+    }
+  }
+
+  @Override
+  public List<Runnable> shutdownNow() {
+    shutdown();
+    synchronized (tasks) {
+      List<Runnable> rval = new ArrayList<>(tasks);
+      tasks.clear();
+      rval.addAll(threadPoolExecutor.shutdownNow());
+      return rval;
+    }
+  }
+
+  @Override
+  public boolean isShutdown() {
+    return threadPoolExecutor.isShutdown();
+  }
+
+  @Override
+  public boolean isTerminated() {
+    return threadPoolExecutor.isTerminated();
+  }
+
+  @Override
+  public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+    return threadPoolExecutor.awaitTermination(timeout, unit);
+  }
+
+  @Override
+  public void execute(Runnable command) {

Review Comment:
   I assume we can't use an @nonnull annotation because we are implementing a 
method?



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java:
##########
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.checkerframework.checker.nullness.qual.KeyForBottom;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An unbounded {@link ScheduledExecutorService} based upon the {@link 
ScheduledThreadPoolExecutor}
+ * API contract.
+ *
+ * <p>Note that this implementation differs from a {@link 
ScheduledThreadPoolExecutor} in the
+ * following ways:
+ *
+ * <ul>
+ *   <li>The core pool size is always 0.
+ *   <li>Any work that is immediately executable is given to a thread before 
returning from the
+ *       corresponding {@code execute}, {@code submit}, {@code schedule*} 
methods.
+ *   <li>An unbounded number of threads can be started.
+ * </ul>
+ */
+public final class UnboundedScheduledExecutorService implements 
ScheduledExecutorService {
+
+  /**
+   * A {@link FutureTask} that handles periodically rescheduling tasks.
+   *
+   * <p>Note that it is important that this class extends {@link FutureTask} 
and {@link
+   * RunnableScheduledFuture} to be compatible with the types of objects 
returned by a {@link
+   * ScheduledThreadPoolExecutor}.
+   */
+  @VisibleForTesting
+  @SuppressFBWarnings(
+      value = "EQ_COMPARETO_USE_OBJECT_EQUALS",
+      justification =
+          "Default equals/hashCode is what we want since two scheduled tasks 
are only equivalent if they point to the same instance.")
+  final class ScheduledFutureTask<@Nullable @KeyForBottom V> extends 
FutureTask<V>
+      implements RunnableScheduledFuture<V> {
+
+    /** Sequence number to break ties FIFO. */
+    private final long sequenceNumber;
+
+    /** The time the task is enabled to execute in nanoTime units. */
+    private long time;
+
+    /**
+     * Period in nanoseconds for repeating tasks. A positive value indicates 
fixed-rate execution. A
+     * negative value indicates fixed-delay execution. A value of 0 indicates 
a non-repeating
+     * (one-shot) task.
+     */
+    private final long period;
+
+    /** Creates a one-shot action with given nanoTime-based trigger time. */
+    ScheduledFutureTask(Runnable r, @Nullable V result, long triggerTime) {
+      this(r, result, triggerTime, 0);
+    }
+
+    /** Creates a periodic action with given nanoTime-based initial trigger 
time and period. */
+    @SuppressWarnings("argument.type.incompatible")
+    ScheduledFutureTask(Runnable r, @Nullable V result, long triggerTime, long 
period) {
+      super(r, result);
+      this.time = triggerTime;
+      this.period = period;
+      this.sequenceNumber = sequencer.getAndIncrement();
+    }
+
+    /** Creates a one-shot action with given nanoTime-based trigger time. */
+    ScheduledFutureTask(Callable<V> callable, long triggerTime) {
+      super(callable);
+      this.time = triggerTime;
+      this.period = 0;
+      this.sequenceNumber = sequencer.getAndIncrement();
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+      return unit.convert(LongMath.saturatedSubtract(time, clock.nanoTime()), 
NANOSECONDS);
+    }
+
+    @Override
+    public int compareTo(Delayed other) {
+      if (other == this) // compare zero if same object
+      {
+        return 0;
+      }
+      if (other instanceof ScheduledFutureTask) {
+        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;
+        int diff = Longs.compare(time, x.time);
+        if (diff != 0) {
+          return diff;
+        }
+        if (sequenceNumber < x.sequenceNumber) {
+          return -1;
+        }
+        return 1;
+      }
+      long diff = LongMath.saturatedSubtract(getDelay(NANOSECONDS), 
other.getDelay(NANOSECONDS));
+      return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
+    }
+
+    @Override
+    public boolean isPeriodic() {
+      return period != 0;
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+      boolean cancelled = super.cancel(mayInterruptIfRunning);
+      synchronized (tasks) {
+        tasks.remove(this);
+      }
+      return cancelled;
+    }
+
+    /** Overrides {@link FutureTask} so as to reset/requeue if periodic. */
+    @Override
+    public void run() {
+      boolean periodic = isPeriodic();
+      if (!periodic) {
+        super.run();
+      } else if (super.runAndReset()) {
+        // Set the next runtime
+        if (period > 0) {
+          time = LongMath.saturatedAdd(time, period);
+        } else {
+          time = triggerTime(-period);
+        }
+        synchronized (tasks) {
+          tasks.add(this);
+          tasks.notify();
+        }
+      }
+    }
+  }
+
+  // Used to break ties in ordering of future tasks that are scheduled for the 
same time
+  // so that they have a consistent ordering based upon their insertion order 
into
+  // this ScheduledExecutorService.
+  private final AtomicLong sequencer = new AtomicLong();
+
+  private final NanoClock clock;
+  private final ThreadPoolExecutor threadPoolExecutor;
+  @VisibleForTesting final PriorityQueue<ScheduledFutureTask<?>> tasks;
+  private final AbstractExecutorService invokeMethodsAdapter;
+  private final Future<?> launchTasks;
+
+  public UnboundedScheduledExecutorService() {
+    this(NanoClock.SYSTEM);
+  }
+
+  @VisibleForTesting
+  UnboundedScheduledExecutorService(NanoClock clock) {
+    this.clock = clock;
+    ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
+    
threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory());
+    threadFactoryBuilder.setDaemon(true);
+
+    this.threadPoolExecutor =

Review Comment:
   Given that we are basically wrapping a thread pool executor here, does it 
make sense to allow users to configure different TPEs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to