Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/4764#discussion_r142493960
--- Diff: flink-core/src/main/java/org/apache/flink/util/ResourceGuard.java
---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * This class is a guard for shared resources with the following
invariants. The resource can be acquired by multiple
+ * clients in parallel through the {@link #acquireResource()} call.
Clients must release the resource after usage
+ * with a matching call to {@link #releaseResource()}. The resource can
only be disposed once the guard is closed, but
+ * the guard can only be closed once all clients that acquired the
resource have released it. Before this is happened,
+ * the call to {@link #close()} will block until the condition is
triggered. After the guard is closed, calls to
+ * {@link #acquireResource()} will fail with exception. Notice that,
obviously clients are responsible to release the
+ * resource after usage. All clients are considered equal, i.e. there is
only a global count maintained how many
+ * times the resource was acquired but not by whom.
+ */
+public class ResourceGuard implements AutoCloseable, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Introduced additional checks of invariants.
+ */
+ private static final boolean STRICT_CHECKS = true;
+
+ /**
+ * The object that serves as lock for count and the closed-flag.
+ */
+ private final SerializableObject lock;
+
+ /**
+ * Number of clients that have ongoing access to the resource.
+ */
+ private int clientCount;
+
+ /**
+ * This flag indicated if it is still possible to acquire access to the
resource.
+ */
+ private volatile boolean closed;
+
+ public ResourceGuard() {
+ this.lock = new SerializableObject();
+ this.clientCount = 0;
+ this.closed = false;
+ }
+
+ /**
+ * Acquired access from one new client for the guarded resource.
+ *
+ * @throws IOException when the resource guard is already closed.
+ */
+ public void acquireResource() throws IOException {
+
+ synchronized (lock) {
+
+ if (closed) {
+ throw new IOException("Resource guard was
already closed.");
+ }
+
+ ++clientCount;
+ }
+ }
+
+ /**
+ * Releases access for one client of the guarded resource. This method
must only be called after a matching call to
+ * {@link #acquireResource()}.
+ */
+ public void releaseResource() {
+
+ synchronized (lock) {
+
+ if (STRICT_CHECKS && clientCount <= 0) {
+ throw new IllegalStateException("Resource guard
was released more times than it has been acquired!");
+ }
+
+ --clientCount;
+
+ if (closed && clientCount == 0) {
+ lock.notifyAll();
+ }
+
+ }
+ }
+
+ /**
+ * Closed the resource guard. This method will block until all calls to
{@link #acquireResource()} have seen their
+ * matching call to {@link #releaseResource()}.
+ */
+ @Override
+ public void close() {
+
+ synchronized (lock) {
+
+ closed = true;
+
+ while (clientCount > 0) {
+
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ Thread.interrupted();
--- End diff --
Just double checked, seems like I remembered incorrectly and this call is
not needed here because the flag is reset already by the triggered exception.
---