johnjcasey commented on code in PR #23545: URL: https://github.com/apache/beam/pull/23545#discussion_r993679494
########## sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java: ########## @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.PriorityQueue; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.Callable; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.RunnableScheduledFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.checkerframework.checker.nullness.qual.KeyForBottom; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * An unbounded {@link ScheduledExecutorService} based upon the {@link ScheduledThreadPoolExecutor} + * API contract. + * + * <p>Note that this implementation differs from a {@link ScheduledThreadPoolExecutor} in the + * following ways: + * + * <ul> + * <li>The core pool size is always 0. + * <li>Any work that is immediately executable is given to a thread before returning from the + * corresponding {@code execute}, {@code submit}, {@code schedule*} methods. + * <li>An unbounded number of threads can be started. + * </ul> + */ +public final class UnboundedScheduledExecutorService implements ScheduledExecutorService { + + /** + * A {@link FutureTask} that handles periodically rescheduling tasks. + * + * <p>Note that it is important that this class extends {@link FutureTask} and {@link + * RunnableScheduledFuture} to be compatible with the types of objects returned by a {@link + * ScheduledThreadPoolExecutor}. + */ + @VisibleForTesting + @SuppressFBWarnings( + value = "EQ_COMPARETO_USE_OBJECT_EQUALS", + justification = + "Default equals/hashCode is what we want since two scheduled tasks are only equivalent if they point to the same instance.") + final class ScheduledFutureTask<@Nullable @KeyForBottom V> extends FutureTask<V> + implements RunnableScheduledFuture<V> { + + /** Sequence number to break ties FIFO. */ + private final long sequenceNumber; + + /** The time the task is enabled to execute in nanoTime units. */ + private long time; + + /** + * Period in nanoseconds for repeating tasks. A positive value indicates fixed-rate execution. A + * negative value indicates fixed-delay execution. A value of 0 indicates a non-repeating + * (one-shot) task. + */ + private final long period; + + /** Creates a one-shot action with given nanoTime-based trigger time. */ + ScheduledFutureTask(Runnable r, @Nullable V result, long triggerTime) { + this(r, result, triggerTime, 0); + } + + /** Creates a periodic action with given nanoTime-based initial trigger time and period. */ + @SuppressWarnings("argument.type.incompatible") + ScheduledFutureTask(Runnable r, @Nullable V result, long triggerTime, long period) { + super(r, result); + this.time = triggerTime; + this.period = period; + this.sequenceNumber = sequencer.getAndIncrement(); + } + + /** Creates a one-shot action with given nanoTime-based trigger time. */ + ScheduledFutureTask(Callable<V> callable, long triggerTime) { + super(callable); + this.time = triggerTime; + this.period = 0; + this.sequenceNumber = sequencer.getAndIncrement(); + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(LongMath.saturatedSubtract(time, clock.nanoTime()), NANOSECONDS); + } + + @Override + public int compareTo(Delayed other) { + if (other == this) // compare zero if same object + { + return 0; + } + if (other instanceof ScheduledFutureTask) { + ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other; + int diff = Longs.compare(time, x.time); + if (diff != 0) { + return diff; + } + if (sequenceNumber < x.sequenceNumber) { + return -1; + } + return 1; + } + long diff = LongMath.saturatedSubtract(getDelay(NANOSECONDS), other.getDelay(NANOSECONDS)); + return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; + } + + @Override + public boolean isPeriodic() { + return period != 0; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean cancelled = super.cancel(mayInterruptIfRunning); + synchronized (tasks) { + tasks.remove(this); + } + return cancelled; + } + + /** Overrides {@link FutureTask} so as to reset/requeue if periodic. */ + @Override + public void run() { + boolean periodic = isPeriodic(); + if (!periodic) { + super.run(); + } else if (super.runAndReset()) { + // Set the next runtime + if (period > 0) { + time = LongMath.saturatedAdd(time, period); + } else { + time = triggerTime(-period); + } + synchronized (tasks) { + tasks.add(this); + tasks.notify(); + } + } + } + } + + // Used to break ties in ordering of future tasks that are scheduled for the same time + // so that they have a consistent ordering based upon their insertion order into + // this ScheduledExecutorService. + private final AtomicLong sequencer = new AtomicLong(); + + private final NanoClock clock; + private final ThreadPoolExecutor threadPoolExecutor; + @VisibleForTesting final PriorityQueue<ScheduledFutureTask<?>> tasks; + private final AbstractExecutorService invokeMethodsAdapter; + private final Future<?> launchTasks; + + public UnboundedScheduledExecutorService() { + this(NanoClock.SYSTEM); + } + + @VisibleForTesting + UnboundedScheduledExecutorService(NanoClock clock) { + this.clock = clock; + ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder(); + threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory()); + threadFactoryBuilder.setDaemon(true); + + this.threadPoolExecutor = + new ThreadPoolExecutor( + 0, + Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads. + Long.MAX_VALUE, + TimeUnit.NANOSECONDS, // Keep non-core threads alive forever. + new SynchronousQueue<>(), + threadFactoryBuilder.build()); + + // Create an internal adapter so that execute does not re-wrap the ScheduledFutureTask again + this.invokeMethodsAdapter = + new AbstractExecutorService() { + + @Override + protected <@KeyForBottom T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { + return new ScheduledFutureTask<>(runnable, value, 0); + } + + @Override + protected <@KeyForBottom T> RunnableFuture<T> newTaskFor(Callable<T> callable) { + return new ScheduledFutureTask<>(callable, 0); + } + + @Override + public void shutdown() { + throw new UnsupportedOperationException(); + } + + @Override + public List<Runnable> shutdownNow() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isShutdown() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isTerminated() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + /* UnboundedScheduledExecutorService is the only caller after it has been initialized.*/ + @SuppressWarnings("method.invocation.invalid") + public void execute(Runnable command) { + // These are already guaranteed to be a ScheduledFutureTask so there is no need to wrap + // it in another ScheduledFutureTask. + threadPoolExecutor.execute(command); + } + }; + this.tasks = new PriorityQueue<>(); + this.launchTasks = + threadPoolExecutor.submit(new TaskLauncher(tasks, threadPoolExecutor, clock)); + } + + private static class TaskLauncher implements Callable<Void> { + private final PriorityQueue<ScheduledFutureTask<?>> tasks; + private final ThreadPoolExecutor threadPoolExecutor; + private final NanoClock clock; + + private TaskLauncher( + PriorityQueue<ScheduledFutureTask<?>> tasks, + ThreadPoolExecutor threadPoolExecutor, + NanoClock clock) { + this.tasks = tasks; + this.threadPoolExecutor = threadPoolExecutor; + this.clock = clock; + } + + @Override + public Void call() throws Exception { + while (true) { + synchronized (tasks) { + if (threadPoolExecutor.isShutdown()) { + return null; + } + ScheduledFutureTask<?> task = tasks.peek(); + if (task == null) { + tasks.wait(); + continue; + } + long nanosToWait = LongMath.saturatedSubtract(task.time, clock.nanoTime()); + if (nanosToWait > 0) { + long millisToWait = nanosToWait / 1_000_000; + int nanosRemainder = (int) (nanosToWait % 1_000_000); + tasks.wait(millisToWait, nanosRemainder); + continue; + } + // Remove the task from the queue since it is ready to be scheduled now + task = tasks.remove(); + threadPoolExecutor.execute(task); + } + } + } + } + + @Override + public void shutdown() { + threadPoolExecutor.shutdown(); + synchronized (tasks) { + // Notify tasks which checks to see if the ThreadPoolExecutor is shutdown and exits cleanly. + tasks.notify(); + } + + // Re-throw any errors during shutdown of the launchTasks thread. + try { + launchTasks.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } + } + + @Override + public List<Runnable> shutdownNow() { + shutdown(); + synchronized (tasks) { + List<Runnable> rval = new ArrayList<>(tasks); + tasks.clear(); + rval.addAll(threadPoolExecutor.shutdownNow()); + return rval; + } + } + + @Override + public boolean isShutdown() { + return threadPoolExecutor.isShutdown(); + } + + @Override + public boolean isTerminated() { + return threadPoolExecutor.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return threadPoolExecutor.awaitTermination(timeout, unit); + } + + @Override + public void execute(Runnable command) { Review Comment: makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
