Repository: samza
Updated Branches:
  refs/heads/master bf4c7619f -> 403042394


SAMZA-1374: Implement Leader Election using Lease Blob in Azure

PR 1: AzureClient + AzureConfig
PR 2: LeaseBlobManager
PR 3: BlobUtils + JobModelBundle
PR 4: TableUtils + ProcessorEntity
**PR 5: AzureLeaderElector** (current PR)

Author: PawasChhokra <Jaimatadi1$>
Author: PawasChhokra <pawas2...@gmail.com>

Reviewers: Navina Ramesh <nav...@apache.org>, Shanthoosh Venkataraman 
<svenkatara...@linkedin.com>

Closes #259 from PawasChhokra/LeaderElection


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/40304239
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/40304239
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/40304239

Branch: refs/heads/master
Commit: 403042394945deba1affa7269c7ead3129326cbd
Parents: bf4c761
Author: Pawas Chhokra <pawas2...@gmail.com>
Authored: Tue Aug 8 18:26:50 2017 -0700
Committer: navina <nav...@apache.org>
Committed: Tue Aug 8 18:26:50 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/samza/AzureClient.java |  10 ++
 .../org/apache/samza/AzureLeaderElector.java    | 111 +++++++++++++++++++
 2 files changed, 121 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/40304239/samza-azure/src/main/java/org/apache/samza/AzureClient.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/AzureClient.java 
b/samza-azure/src/main/java/org/apache/samza/AzureClient.java
index 7c12055..2248d12 100644
--- a/samza-azure/src/main/java/org/apache/samza/AzureClient.java
+++ b/samza-azure/src/main/java/org/apache/samza/AzureClient.java
@@ -20,6 +20,9 @@
 package org.apache.samza;
 
 import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.RetryLinearRetry;
+import com.microsoft.azure.storage.RetryPolicy;
+import com.microsoft.azure.storage.blob.BlobRequestOptions;
 import com.microsoft.azure.storage.blob.CloudBlobClient;
 import com.microsoft.azure.storage.table.CloudTableClient;
 import java.net.URISyntaxException;
@@ -47,7 +50,14 @@ public class AzureClient {
   AzureClient(String storageConnectionString) {
     try {
       account = CloudStorageAccount.parse(storageConnectionString);
+
       blobClient = account.createCloudBlobClient();
+      // Set retry policy for operations on the blob. In this case, every 
failed operation on the blob will be retried thrice, after 5 second intervals.
+      BlobRequestOptions options = new BlobRequestOptions();
+      RetryPolicy retryPolicy = new RetryLinearRetry(5000, 3);
+      options.setRetryPolicyFactory(retryPolicy);
+      blobClient.setDefaultRequestOptions(options);
+
       tableClient = account.createCloudTableClient();
     } catch (IllegalArgumentException | URISyntaxException e) {
       LOG.error("Connection string {} specifies an invalid URI.", 
storageConnectionString);

http://git-wip-us.apache.org/repos/asf/samza/blob/40304239/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java 
b/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java
new file mode 100644
index 0000000..efa8ea1
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.samza.coordinator.LeaderElector;
+import org.apache.samza.coordinator.LeaderElectorListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class to facilitate leader election in Azure.
+ * The processor that acquires the lease on the blob becomes the leader.
+ * The lease ID is null initially. It is generated by Azure when the processor 
acquires the lease, and updated accordingly.
+ * Every processor requires a valid active lease ID in order to perform 
successful write and delete operations on the blob.
+ * Read operations from the blob are not dependent on the lease ID.
+ */
+public class AzureLeaderElector implements LeaderElector {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AzureLeaderElector.class);
+  private static final int LEASE_TIME_IN_SEC = 60;
+  private final LeaseBlobManager leaseBlobManager;
+  private LeaderElectorListener leaderElectorListener = null;
+  private final AtomicReference<String> leaseId;
+  private final AtomicBoolean isLeader;
+
+  public AzureLeaderElector(LeaseBlobManager leaseBlobManager) {
+    this.isLeader = new AtomicBoolean(false);
+    this.leaseBlobManager = leaseBlobManager;
+    this.leaseId = new AtomicReference<>(null);
+  }
+
+  @Override
+  public void setLeaderElectorListener(LeaderElectorListener listener) {
+    this.leaderElectorListener = listener;
+  }
+
+  /**
+   * Tries to become the leader by acquiring a lease on the blob.
+   * The acquireLease operation has a retry policy where upon failure, the 
operation is tried 3 times at 5 second intervals.
+   * Invokes the listener on becoming the leader.
+   */
+  @Override
+  public void tryBecomeLeader() {
+    try {
+      leaseId.getAndSet(leaseBlobManager.acquireLease(LEASE_TIME_IN_SEC, 
leaseId.get()));
+      if (leaseId.get() != null) {
+        LOG.info("Became leader with lease ID {}.", leaseId.get());
+        isLeader.set(true);
+        if (leaderElectorListener != null) {
+          leaderElectorListener.onBecomingLeader();
+        }
+      }
+    } catch (AzureException e) {
+      LOG.error("Error while trying to acquire lease.", e);
+    }
+  }
+
+  /**
+   * Releases the lease in order to resign leadership. It also stops all 
schedulers scheduled by the leader.
+   * The releaseLease operation has a retry policy where upon failure, the 
operation is tried 3 times at 5 second intervals.
+   */
+  @Override
+  public void resignLeadership() {
+    if (isLeader.get()) {
+      leaseBlobManager.releaseLease(leaseId.get());
+      isLeader.set(false);
+      LOG.info("Resigning leadership with lease ID {}", leaseId.get());
+      leaseId.getAndSet(null);
+    } else {
+      LOG.info("Can't release the lease because it is not the leader and does 
not hold an active lease.");
+    }
+  }
+
+  /**
+   * Checks whether it's a leader
+   * @return true if it is the leader, false otherwise
+   */
+  @Override
+  public boolean amILeader() {
+    return isLeader.get();
+  }
+
+  public AtomicReference<String> getLeaseId() {
+    return leaseId;
+  }
+
+  public LeaseBlobManager getLeaseBlobManager() {
+    return this.leaseBlobManager;
+  }
+
+}

Reply via email to