Repository: samza Updated Branches: refs/heads/master bf4c7619f -> 403042394
SAMZA-1374: Implement Leader Election using Lease Blob in Azure PR 1: AzureClient + AzureConfig PR 2: LeaseBlobManager PR 3: BlobUtils + JobModelBundle PR 4: TableUtils + ProcessorEntity **PR 5: AzureLeaderElector** (current PR) Author: PawasChhokra <Jaimatadi1$> Author: PawasChhokra <[email protected]> Reviewers: Navina Ramesh <[email protected]>, Shanthoosh Venkataraman <[email protected]> Closes #259 from PawasChhokra/LeaderElection Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/40304239 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/40304239 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/40304239 Branch: refs/heads/master Commit: 403042394945deba1affa7269c7ead3129326cbd Parents: bf4c761 Author: Pawas Chhokra <[email protected]> Authored: Tue Aug 8 18:26:50 2017 -0700 Committer: navina <[email protected]> Committed: Tue Aug 8 18:26:50 2017 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/samza/AzureClient.java | 10 ++ .../org/apache/samza/AzureLeaderElector.java | 111 +++++++++++++++++++ 2 files changed, 121 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/40304239/samza-azure/src/main/java/org/apache/samza/AzureClient.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/AzureClient.java b/samza-azure/src/main/java/org/apache/samza/AzureClient.java index 7c12055..2248d12 100644 --- a/samza-azure/src/main/java/org/apache/samza/AzureClient.java +++ b/samza-azure/src/main/java/org/apache/samza/AzureClient.java @@ -20,6 +20,9 @@ package org.apache.samza; import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.RetryLinearRetry; +import com.microsoft.azure.storage.RetryPolicy; +import com.microsoft.azure.storage.blob.BlobRequestOptions; import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.table.CloudTableClient; import java.net.URISyntaxException; @@ -47,7 +50,14 @@ public class AzureClient { AzureClient(String storageConnectionString) { try { account = CloudStorageAccount.parse(storageConnectionString); + blobClient = account.createCloudBlobClient(); + // Set retry policy for operations on the blob. In this case, every failed operation on the blob will be retried thrice, after 5 second intervals. + BlobRequestOptions options = new BlobRequestOptions(); + RetryPolicy retryPolicy = new RetryLinearRetry(5000, 3); + options.setRetryPolicyFactory(retryPolicy); + blobClient.setDefaultRequestOptions(options); + tableClient = account.createCloudTableClient(); } catch (IllegalArgumentException | URISyntaxException e) { LOG.error("Connection string {} specifies an invalid URI.", storageConnectionString); http://git-wip-us.apache.org/repos/asf/samza/blob/40304239/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java b/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java new file mode 100644 index 0000000..efa8ea1 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.samza.coordinator.LeaderElector; +import org.apache.samza.coordinator.LeaderElectorListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Class to facilitate leader election in Azure. + * The processor that acquires the lease on the blob becomes the leader. + * The lease ID is null initially. It is generated by Azure when the processor acquires the lease, and updated accordingly. + * Every processor requires a valid active lease ID in order to perform successful write and delete operations on the blob. + * Read operations from the blob are not dependent on the lease ID. + */ +public class AzureLeaderElector implements LeaderElector { + + private static final Logger LOG = LoggerFactory.getLogger(AzureLeaderElector.class); + private static final int LEASE_TIME_IN_SEC = 60; + private final LeaseBlobManager leaseBlobManager; + private LeaderElectorListener leaderElectorListener = null; + private final AtomicReference<String> leaseId; + private final AtomicBoolean isLeader; + + public AzureLeaderElector(LeaseBlobManager leaseBlobManager) { + this.isLeader = new AtomicBoolean(false); + this.leaseBlobManager = leaseBlobManager; + this.leaseId = new AtomicReference<>(null); + } + + @Override + public void setLeaderElectorListener(LeaderElectorListener listener) { + this.leaderElectorListener = listener; + } + + /** + * Tries to become the leader by acquiring a lease on the blob. + * The acquireLease operation has a retry policy where upon failure, the operation is tried 3 times at 5 second intervals. + * Invokes the listener on becoming the leader. + */ + @Override + public void tryBecomeLeader() { + try { + leaseId.getAndSet(leaseBlobManager.acquireLease(LEASE_TIME_IN_SEC, leaseId.get())); + if (leaseId.get() != null) { + LOG.info("Became leader with lease ID {}.", leaseId.get()); + isLeader.set(true); + if (leaderElectorListener != null) { + leaderElectorListener.onBecomingLeader(); + } + } + } catch (AzureException e) { + LOG.error("Error while trying to acquire lease.", e); + } + } + + /** + * Releases the lease in order to resign leadership. It also stops all schedulers scheduled by the leader. + * The releaseLease operation has a retry policy where upon failure, the operation is tried 3 times at 5 second intervals. + */ + @Override + public void resignLeadership() { + if (isLeader.get()) { + leaseBlobManager.releaseLease(leaseId.get()); + isLeader.set(false); + LOG.info("Resigning leadership with lease ID {}", leaseId.get()); + leaseId.getAndSet(null); + } else { + LOG.info("Can't release the lease because it is not the leader and does not hold an active lease."); + } + } + + /** + * Checks whether it's a leader + * @return true if it is the leader, false otherwise + */ + @Override + public boolean amILeader() { + return isLeader.get(); + } + + public AtomicReference<String> getLeaseId() { + return leaseId; + } + + public LeaseBlobManager getLeaseBlobManager() { + return this.leaseBlobManager; + } + +}
