steveloughran commented on a change in pull request #1925: HADOOP-16948. 
Support single writer dirs.
URL: https://github.com/apache/hadoop/pull/1925#discussion_r401585150
 
 

 ##########
 File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SelfRenewingLease.java
 ##########
 @@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * An Azure blob lease that automatically renews itself indefinitely
+ * using a background thread. Use it to synchronize distributed processes,
+ * or to prevent writes to the blob by other processes that don't
+ * have the lease.
+ *
+ * Creating a new Lease object blocks the caller until the Azure blob lease is
+ * acquired.
+ *
+ * Call free() to release the Lease.
+ *
+ * You can use this Lease like a distributed lock. If the holder process
+ * dies, the lease will time out since it won't be renewed.
+ *
+ * See also {@link org.apache.hadoop.fs.azure.SelfRenewingLease}.
+ */
+public class SelfRenewingLease {
+
+  private final AzureBlobFileSystemStore store;
+  private final Path path;
+  private Thread renewer;
+  private volatile boolean leaseFreed;
+  private String leaseID = null;
+  private static final int LEASE_TIMEOUT = 60;  // Lease timeout in seconds
+
+  // Time to wait to renew lease in milliseconds
+  public static final int LEASE_RENEWAL_PERIOD = 40000;
+  private static final Log LOG = LogFactory.getLog(SelfRenewingLease.class);
+
+  // Used to allocate thread serial numbers in thread name
+  private static AtomicInteger threadNumber = new AtomicInteger(0);
+
+
+  // Time to wait to retry getting the lease in milliseconds
+  @VisibleForTesting
+  static final int LEASE_ACQUIRE_RETRY_INTERVAL = 2000;
+
+  public SelfRenewingLease(AzureBlobFileSystemStore store, Path path) {
+
+    this.leaseFreed = false;
+    this.store = store;
+    this.path = path;
+
+    // Keep trying to get the lease until you get it.
+    while(leaseID == null) {
+      try {
+        leaseID = store.acquireLease(this.path, LEASE_TIMEOUT);
+      } catch (IOException e) {
+        LOG.info("Caught exception when trying to get lease on blob " + path + 
". " + e.getMessage());
+      }
+      if (leaseID == null) {
+        try {
+          Thread.sleep(LEASE_ACQUIRE_RETRY_INTERVAL);
+        } catch (InterruptedException e) {
+
+          // Restore the interrupted status
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+    renewer = new Thread(new Renewer());
+
+    // A Renewer running should not keep JVM from exiting, so make it a daemon.
+    renewer.setDaemon(true);
+    renewer.setName("AzureLeaseRenewer-" + threadNumber.getAndIncrement());
+    renewer.start();
+    LOG.debug("Acquired lease " + leaseID + " on " + path
+        + " managed by thread " + renewer.getName());
+  }
+
+  /**
+   * Free the lease and stop the keep-alive thread.
+   */
+  public void free() {
+    try {
+      store.releaseLease(path, leaseID);
+    } catch (IOException e) {
+      LOG.info("Exception when trying to free lease " + leaseID + " on " + 
path + ". " + e.getMessage());
+    } finally {
+
+      // Even if releasing the lease fails (e.g. because the file was deleted),
+      // make sure to record that we freed the lease, to terminate the
+      // keep-alive thread.
+      leaseFreed = true;
+      LOG.debug("Freed lease " + leaseID + " on " + path
+          + " managed by thread " + renewer.getName());
+    }
+  }
+
+  public boolean isFreed() {
+    return leaseFreed;
+  }
+
+  public String getLeaseID() {
+    return leaseID;
+  }
+
+  private class Renewer implements Runnable {
+
+    /**
+     * Start a keep-alive thread that will continue to renew
+     * the lease until it is freed or the process dies.
 
 Review comment:
   should be tied in to the FileSystem instance lifecycle too: an FS instance 
should really have a weak ref to all leases created under it, and fs.close to 
stop them all

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to