This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a0b2453e49401e49e386f18be67aa9b7e60c3e8d Author: huangxingbo <[email protected]> AuthorDate: Wed Nov 25 19:38:56 2020 +0800 [FLINK-20284][python] Port Grpc SharedResourceHolder class to flink-python module This closes #14217. --- .../io/grpc/internal/SharedResourceHolder.java | 184 +++++++++++++++++++++ 1 file changed, 184 insertions(+) diff --git a/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p26p0/io/grpc/internal/SharedResourceHolder.java b/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p26p0/io/grpc/internal/SharedResourceHolder.java new file mode 100644 index 0000000..4f85220 --- /dev/null +++ b/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p26p0/io/grpc/internal/SharedResourceHolder.java @@ -0,0 +1,184 @@ +/* + * Copyright 2014 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal; + +import org.apache.beam.vendor.grpc.v1p26p0.com.google.common.base.Preconditions; +import java.util.IdentityHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import javax.annotation.concurrent.ThreadSafe; + +/** + * A holder for shared resource singletons. + * + * <p>Components like client channels and servers need certain resources, e.g. a thread pool, to + * run. If the user has not provided such resources, these components will use a default one, which + * is shared as a static resource. This class holds these default resources and manages their + * life-cycles. + * + * <p>A resource is identified by the reference of a {@link Resource} object, which is typically a + * singleton, provided to the get() and release() methods. Each Resource object (not its class) maps + * to an object cached in the holder. + * + * <p>Resources are ref-counted and shut down after a delay when the ref-count reaches zero. + */ +@ThreadSafe +public final class SharedResourceHolder { + static final long DESTROY_DELAY_SECONDS = 1; + + // The sole holder instance. + private static final SharedResourceHolder holder = new SharedResourceHolder( + new ScheduledExecutorFactory() { + @Override + public ScheduledExecutorService createScheduledExecutor() { + return Executors.newSingleThreadScheduledExecutor( + GrpcUtil.getThreadFactory("grpc-shared-destroyer-%d", true)); + } + }); + + private final IdentityHashMap<Resource<?>, Instance> instances = + new IdentityHashMap<>(); + + private final ScheduledExecutorFactory destroyerFactory; + + private ScheduledExecutorService destroyer; + + // Visible to tests that would need to create instances of the holder. + SharedResourceHolder(ScheduledExecutorFactory destroyerFactory) { + this.destroyerFactory = destroyerFactory; + } + + /** + * Try to get an existing instance of the given resource. If an instance does not exist, create a + * new one with the given factory. + * + * @param resource the singleton object that identifies the requested static resource + */ + public static <T> T get(Resource<T> resource) { + return holder.getInternal(resource); + } + + /** + * Releases an instance of the given resource. + * + * <p>The instance must have been obtained from {@link #get(Resource)}. Otherwise will throw + * IllegalArgumentException. + * + * <p>Caller must not release a reference more than once. It's advisory that you clear the + * reference to the instance with the null returned by this method. + * + * @param resource the singleton Resource object that identifies the released static resource + * @param instance the released static resource + * + * @return a null which the caller can use to clear the reference to that instance. + */ + public static <T> T release(final Resource<T> resource, final T instance) { + return holder.releaseInternal(resource, instance); + } + + /** + * Visible to unit tests. + * + * @see #get(Resource) + */ + @SuppressWarnings("unchecked") + synchronized <T> T getInternal(Resource<T> resource) { + Instance instance = instances.get(resource); + if (instance == null) { + instance = new Instance(resource.create()); + instances.put(resource, instance); + } + if (instance.destroyTask != null) { + instance.destroyTask.cancel(false); + instance.destroyTask = null; + } + instance.refcount++; + return (T) instance.payload; + } + + /** + * Visible to unit tests. + */ + synchronized <T> T releaseInternal(final Resource<T> resource, final T instance) { + final Instance cached = instances.get(resource); + if (cached == null) { + throw new IllegalArgumentException("No cached instance found for " + resource); + } + Preconditions.checkArgument(instance == cached.payload, "Releasing the wrong instance"); + Preconditions.checkState(cached.refcount > 0, "Refcount has already reached zero"); + cached.refcount--; + if (cached.refcount == 0) { + Preconditions.checkState(cached.destroyTask == null, "Destroy task already scheduled"); + // Schedule a delayed task to destroy the resource. + if (destroyer == null) { + destroyer = destroyerFactory.createScheduledExecutor(); + } + cached.destroyTask = destroyer.schedule(new LogExceptionRunnable(new Runnable() { + @Override + public void run() { + synchronized (SharedResourceHolder.this) { + // Refcount may have gone up since the task was scheduled. Re-check it. + if (cached.refcount == 0) { + try { + resource.close(instance); + } finally { + instances.remove(resource); + if (instances.isEmpty()) { + destroyer.shutdown(); + destroyer = null; + } + } + } + } + } + }), DESTROY_DELAY_SECONDS, TimeUnit.SECONDS); + } + // Always returning null + return null; + } + + /** + * Defines a resource, and the way to create and destroy instances of it. + */ + public interface Resource<T> { + /** + * Create a new instance of the resource. + */ + T create(); + + /** + * Destroy the given instance. + */ + void close(T instance); + } + + interface ScheduledExecutorFactory { + ScheduledExecutorService createScheduledExecutor(); + } + + private static class Instance { + final Object payload; + int refcount; + ScheduledFuture<?> destroyTask; + + Instance(Object payload) { + this.payload = payload; + } + } +}
