Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4764#discussion_r142490404
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/ResourceGuard.java 
---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.util;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +
    +/**
    + * This class is a guard for shared resources with the following 
invariants. The resource can be acquired by multiple
    + * clients in parallel through the {@link #acquireResource()} call. 
Clients must release the resource after usage
    + * with a matching call to {@link #releaseResource()}. The resource can 
only be disposed once the guard is closed, but
    + * the guard can only be closed once all clients that acquired the 
resource have released it. Before this is happened,
    + * the call to {@link #close()} will block until the condition is 
triggered. After the guard is closed, calls to
    + * {@link #acquireResource()} will fail with exception. Notice that, 
obviously clients are responsible to release the
    + * resource after usage. All clients are considered equal, i.e. there is 
only a global count maintained how many
    + * times the resource was acquired but not by whom.
    + */
    +public class ResourceGuard implements AutoCloseable, Serializable {
    +
    +   private static final long serialVersionUID = 1L;
    +
    +   /**
    +    * Introduced additional checks of invariants.
    +    */
    +   private static final boolean STRICT_CHECKS = true;
    +
    +   /**
    +    * The object that serves as lock for count and the closed-flag.
    +    */
    +   private final SerializableObject lock;
    +
    +   /**
    +    * Number of clients that have ongoing access to the resource.
    +    */
    +   private int clientCount;
    +
    +   /**
    +    * This flag indicated if it is still possible to acquire access to the 
resource.
    +    */
    +   private volatile boolean closed;
    +
    +   public ResourceGuard() {
    +           this.lock = new SerializableObject();
    +           this.clientCount = 0;
    +           this.closed = false;
    +   }
    +
    +   /**
    +    * Acquired access from one new client for the guarded resource.
    +    *
    +    * @throws IOException when the resource guard is already closed.
    +    */
    +   public void acquireResource() throws IOException {
    +
    +           synchronized (lock) {
    +
    +                   if (closed) {
    +                           throw new IOException("Resource guard was 
already closed.");
    +                   }
    +
    +                   ++clientCount;
    +           }
    +   }
    +
    +   /**
    +    * Releases access for one client of the guarded resource. This method 
must only be called after a matching call to
    +    * {@link #acquireResource()}.
    +    */
    +   public void releaseResource() {
    +
    +           synchronized (lock) {
    +
    +                   if (STRICT_CHECKS && clientCount <= 0) {
    +                           throw new IllegalStateException("Resource guard 
was released more times than it has been acquired!");
    +                   }
    +
    +                   --clientCount;
    +
    +                   if (closed && clientCount == 0) {
    +                           lock.notifyAll();
    +                   }
    +
    +           }
    +   }
    +
    +   /**
    +    * Closed the resource guard. This method will block until all calls to 
{@link #acquireResource()} have seen their
    +    * matching call to {@link #releaseResource()}.
    +    */
    +   @Override
    +   public void close() {
    +
    +           synchronized (lock) {
    +
    +                   closed = true;
    +
    +                   while (clientCount > 0) {
    +
    +                           try {
    +                                   lock.wait();
    +                           } catch (InterruptedException e) {
    +                                   Thread.interrupted();
    --- End diff --
    
    This line caught my eye.
    ```
    public static boolean interrupted()
    ```
    > Tests whether the current thread has been interrupted. [...]
    
    The boolean result is not used and when `InterruptedException` is thrown, 
the interrupt flag is cleared, so the method should return `false` here.
    
    Did you intend to call `Thread.currentThread().interrupt()` and exit the 
`while` loop prematurely?


---

Reply via email to