[ 
https://issues.apache.org/jira/browse/HADOOP-16948?focusedWorklogId=567439&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-567439
 ]

ASF GitHub Bot logged work on HADOOP-16948:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/Mar/21 06:04
            Start Date: 17/Mar/21 06:04
    Worklog Time Spent: 10m 
      Work Description: snehavarma commented on a change in pull request #1925:
URL: https://github.com/apache/hadoop/pull/1925#discussion_r595728811



##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SelfRenewingLease.java
##########
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_ACQUIRING_LEASE;
+
+/**
+ * An Azure blob lease that automatically renews itself indefinitely
+ * using a background thread. Use it to synchronize distributed processes,
+ * or to prevent writes to the blob by other processes that don't
+ * have the lease.
+ *
+ * Creating a new Lease object blocks the caller until the Azure blob lease is
+ * acquired.
+ *
+ * Call free() to release the Lease.
+ *
+ * You can use this Lease like a distributed lock. If the holder process
+ * dies, the lease will time out since it won't be renewed.
+ *
+ * See also {@link org.apache.hadoop.fs.azure.SelfRenewingLease}.
+ */
+public class SelfRenewingLease {
+
+  private final AbfsClient client;
+  private final Path path;
+  private Thread renewer;
+  private volatile boolean leaseFreed;
+  private String leaseID = null;
+  private static final int LEASE_TIMEOUT = 60;  // Lease timeout in seconds
+
+  // Time to wait to renew lease in milliseconds
+  public static final int LEASE_RENEWAL_PERIOD = 40000;
+  public static final Logger LOG = 
LoggerFactory.getLogger(SelfRenewingLease.class);
+
+  // Used to allocate thread serial numbers in thread name
+  private static AtomicInteger threadNumber = new AtomicInteger(0);
+
+
+  // Time to wait to retry getting the lease in milliseconds
+  static final int LEASE_ACQUIRE_RETRY_INTERVAL = 2000;
+  static final int LEASE_MAX_RETRIES = 5;
+
+  public static class LeaseException extends AzureBlobFileSystemException {
+    public LeaseException(Exception innerException) {
+      super(ERR_ACQUIRING_LEASE, innerException);
+    }
+  }
+
+  public SelfRenewingLease(AbfsClient client, Path path) throws 
AzureBlobFileSystemException {
+
+    this.leaseFreed = false;
+    this.client = client;
+    this.path = path;
+
+    // Try to get the lease a specified number of times, else throw an error
+    int numRetries = 0;
+    while (leaseID == null && numRetries < LEASE_MAX_RETRIES) {
+      numRetries++;
+      try {
+        LOG.debug("lease path: {}", path);
+        final AbfsRestOperation op =
+            client.acquireLease(getRelativePath(path),

Review comment:
       this is still 60 seconds

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SelfRenewingLease.java
##########
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_ACQUIRING_LEASE;
+
+/**
+ * An Azure blob lease that automatically renews itself indefinitely
+ * using a background thread. Use it to synchronize distributed processes,
+ * or to prevent writes to the blob by other processes that don't
+ * have the lease.
+ *
+ * Creating a new Lease object blocks the caller until the Azure blob lease is
+ * acquired.
+ *
+ * Call free() to release the Lease.
+ *
+ * You can use this Lease like a distributed lock. If the holder process
+ * dies, the lease will time out since it won't be renewed.
+ *
+ * See also {@link org.apache.hadoop.fs.azure.SelfRenewingLease}.
+ */
+public class SelfRenewingLease {
+
+  private final AbfsClient client;
+  private final Path path;
+  private Thread renewer;
+  private volatile boolean leaseFreed;
+  private String leaseID = null;
+  private static final int LEASE_TIMEOUT = 60;  // Lease timeout in seconds
+
+  // Time to wait to renew lease in milliseconds
+  public static final int LEASE_RENEWAL_PERIOD = 40000;
+  public static final Logger LOG = 
LoggerFactory.getLogger(SelfRenewingLease.class);
+
+  // Used to allocate thread serial numbers in thread name
+  private static AtomicInteger threadNumber = new AtomicInteger(0);
+
+
+  // Time to wait to retry getting the lease in milliseconds
+  static final int LEASE_ACQUIRE_RETRY_INTERVAL = 2000;
+  static final int LEASE_MAX_RETRIES = 5;
+
+  public static class LeaseException extends AzureBlobFileSystemException {
+    public LeaseException(Exception innerException) {
+      super(ERR_ACQUIRING_LEASE, innerException);
+    }
+  }
+
+  public SelfRenewingLease(AbfsClient client, Path path) throws 
AzureBlobFileSystemException {
+
+    this.leaseFreed = false;
+    this.client = client;
+    this.path = path;
+
+    // Try to get the lease a specified number of times, else throw an error
+    int numRetries = 0;
+    while (leaseID == null && numRetries < LEASE_MAX_RETRIES) {
+      numRetries++;
+      try {
+        LOG.debug("lease path: {}", path);
+        final AbfsRestOperation op =
+            client.acquireLease(getRelativePath(path),
+                LEASE_TIMEOUT);
+
+        leaseID = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_LEASE_ID);
+      } catch (IOException e) {
+        if (numRetries < LEASE_MAX_RETRIES) {
+          LOG.info("Caught exception when trying to acquire lease on blob {}, 
retrying: {}", path,
+              e.getMessage());
+          LOG.debug("Exception acquiring lease", e);
+        } else {
+          throw new LeaseException(e);
+        }
+      }
+      if (leaseID == null) {
+        try {
+          Thread.sleep(LEASE_ACQUIRE_RETRY_INTERVAL);
+        } catch (InterruptedException e) {
+
+          // Restore the interrupted status
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+    renewer = new Thread(new Renewer());
+
+    // A Renewer running should not keep JVM from exiting, so make it a daemon.
+    renewer.setDaemon(true);
+    renewer.setName("AzureBFSLeaseRenewer-" + threadNumber.getAndIncrement());
+    renewer.start();
+    LOG.debug("Acquired lease {} on {} managed by thread {}", leaseID, path, 
renewer.getName());
+  }
+
+  /**
+   * Free the lease and stop the keep-alive thread.
+   */
+  public void free() {
+    try {
+      LOG.debug("lease path: {}, release lease id: {}", path, leaseID);
+      client.releaseLease(getRelativePath(path), leaseID);
+    } catch (IOException e) {
+      LOG.info("Exception when trying to release lease {} on {}. Lease will be 
left to expire: {}",
+          leaseID, path, e.getMessage());
+      LOG.debug("Exception releasing lease", e);
+    } finally {
+
+      // Even if releasing the lease fails (e.g. because the file was deleted),
+      // make sure to record that we freed the lease, to terminate the
+      // keep-alive thread.
+      leaseFreed = true;
+      LOG.debug("Freed lease {} on {} managed by thread {}", leaseID, path, 
renewer.getName());
+    }
+  }
+
+  public boolean isFreed() {
+    return leaseFreed;
+  }
+
+  public String getLeaseID() {
+    return leaseID;
+  }
+
+  private class Renewer implements Runnable {

Review comment:
       Do we need renewer now?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 567439)
    Time Spent: 8h 20m  (was: 8h 10m)

> ABFS: Support single writer dirs
> --------------------------------
>
>                 Key: HADOOP-16948
>                 URL: https://issues.apache.org/jira/browse/HADOOP-16948
>             Project: Hadoop Common
>          Issue Type: Sub-task
>            Reporter: Billie Rinaldi
>            Assignee: Billie Rinaldi
>            Priority: Minor
>              Labels: abfsactive, pull-request-available
>          Time Spent: 8h 20m
>  Remaining Estimate: 0h
>
> This would allow some directories to be configured as single writer 
> directories. The ABFS driver would obtain a lease when creating or opening a 
> file for writing and would automatically renew the lease and release the 
> lease when closing the file.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to