[ 
https://issues.apache.org/jira/browse/FLINK-7757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16190167#comment-16190167
 ] 

ASF GitHub Bot commented on FLINK-7757:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4764#discussion_r142493960
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/ResourceGuard.java 
---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.util;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +
    +/**
    + * This class is a guard for shared resources with the following 
invariants. The resource can be acquired by multiple
    + * clients in parallel through the {@link #acquireResource()} call. 
Clients must release the resource after usage
    + * with a matching call to {@link #releaseResource()}. The resource can 
only be disposed once the guard is closed, but
    + * the guard can only be closed once all clients that acquired the 
resource have released it. Before this is happened,
    + * the call to {@link #close()} will block until the condition is 
triggered. After the guard is closed, calls to
    + * {@link #acquireResource()} will fail with exception. Notice that, 
obviously clients are responsible to release the
    + * resource after usage. All clients are considered equal, i.e. there is 
only a global count maintained how many
    + * times the resource was acquired but not by whom.
    + */
    +public class ResourceGuard implements AutoCloseable, Serializable {
    +
    +   private static final long serialVersionUID = 1L;
    +
    +   /**
    +    * Introduced additional checks of invariants.
    +    */
    +   private static final boolean STRICT_CHECKS = true;
    +
    +   /**
    +    * The object that serves as lock for count and the closed-flag.
    +    */
    +   private final SerializableObject lock;
    +
    +   /**
    +    * Number of clients that have ongoing access to the resource.
    +    */
    +   private int clientCount;
    +
    +   /**
    +    * This flag indicated if it is still possible to acquire access to the 
resource.
    +    */
    +   private volatile boolean closed;
    +
    +   public ResourceGuard() {
    +           this.lock = new SerializableObject();
    +           this.clientCount = 0;
    +           this.closed = false;
    +   }
    +
    +   /**
    +    * Acquired access from one new client for the guarded resource.
    +    *
    +    * @throws IOException when the resource guard is already closed.
    +    */
    +   public void acquireResource() throws IOException {
    +
    +           synchronized (lock) {
    +
    +                   if (closed) {
    +                           throw new IOException("Resource guard was 
already closed.");
    +                   }
    +
    +                   ++clientCount;
    +           }
    +   }
    +
    +   /**
    +    * Releases access for one client of the guarded resource. This method 
must only be called after a matching call to
    +    * {@link #acquireResource()}.
    +    */
    +   public void releaseResource() {
    +
    +           synchronized (lock) {
    +
    +                   if (STRICT_CHECKS && clientCount <= 0) {
    +                           throw new IllegalStateException("Resource guard 
was released more times than it has been acquired!");
    +                   }
    +
    +                   --clientCount;
    +
    +                   if (closed && clientCount == 0) {
    +                           lock.notifyAll();
    +                   }
    +
    +           }
    +   }
    +
    +   /**
    +    * Closed the resource guard. This method will block until all calls to 
{@link #acquireResource()} have seen their
    +    * matching call to {@link #releaseResource()}.
    +    */
    +   @Override
    +   public void close() {
    +
    +           synchronized (lock) {
    +
    +                   closed = true;
    +
    +                   while (clientCount > 0) {
    +
    +                           try {
    +                                   lock.wait();
    +                           } catch (InterruptedException e) {
    +                                   Thread.interrupted();
    --- End diff --
    
    Just double checked, seems like I remembered incorrectly and this call is 
not needed here because the flag is reset already by the triggered exception.


> RocksDB lock is too strict and can block snapshots in synchronous phase
> -----------------------------------------------------------------------
>
>                 Key: FLINK-7757
>                 URL: https://issues.apache.org/jira/browse/FLINK-7757
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.2.2, 1.3.2
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Blocker
>             Fix For: 1.4.0
>
>
> {{RocksDBKeyedStateBackend}} uses a lock to guard the db instance against 
> disposal of the native resources while some parallel threads might still 
> access db, which might otherwise lead to segfaults.
> Unfortunately, this locking is a bit to strict and can lead to situations 
> where snapshots block the pipeline. This can happen when a snapshot s1 is 
> running and somewhere blocking in IO while holding the guarding lock. A 
> second snapshot s2 can be triggered in parallel and requires to hold the lock 
> in the synchronous part to get a snapshot from db. As s1 is still holding on 
> to the lock, s2 can block here and stop the operator from processing further 
> elements.
> A simple solution could remove lock acquisition from the synchronous phase, 
> because both, synchronous phase and disposing the backend are only allowed to 
> be triggered from the thread that also drives element processing.
> A better solution would be to remove long sections under the lock all 
> together, because as of now they will always prevent the possibility of 
> parallel checkpointing. I think a guard for the rocksdb instance would be 
> sufficient that blocks disposal for as long as there are still clients 
> potentially accessing the instance in parallel. This could be realized by 
> keeping a synchronized counter for active clients and block disposal until 
> the client count drops to zero.
> This approach could also be integrated with triggering timers, which have 
> always been problematic in the disposal phase are currently unregulated. In 
> the new model, they could register as yet another client.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to