SAMZA-1378: Introduce and Implement Scheduler Interface for Polling in Azure
PR 1: AzureClient + AzureConfig PR 2: LeaseBlobManager PR 3: BlobUtils + JobModelBundle PR 4: TableUtils + ProcessorEntity PR 5: AzureLeaderElector PR 6: Added all schedulers (current PR) Author: PawasChhokra <Jaimatadi1$> Author: PawasChhokra <[email protected]> Reviewers: Navina Ramesh <[email protected]> Closes #261 from PawasChhokra/AzureSchedulers Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c3b447ec Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c3b447ec Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c3b447ec Branch: refs/heads/master Commit: c3b447ecb343ddc4e48296448127fda5dfafe913 Parents: 1d253c7 Author: Pawas Chhokra <[email protected]> Authored: Fri Aug 18 14:38:46 2017 -0700 Committer: navina <[email protected]> Committed: Fri Aug 18 14:38:46 2017 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/samza/AzureClient.java | 18 +- .../main/java/org/apache/samza/AzureConfig.java | 73 --- .../org/apache/samza/AzureLeaderElector.java | 111 ---- .../main/java/org/apache/samza/BlobUtils.java | 280 ---------- .../java/org/apache/samza/JobModelBundle.java | 61 --- .../java/org/apache/samza/LeaseBlobManager.java | 98 ---- .../java/org/apache/samza/ProcessorEntity.java | 58 --- .../main/java/org/apache/samza/TableUtils.java | 198 -------- .../org/apache/samza/config/AzureConfig.java | 68 +++ .../samza/coordinator/AzureJobCoordinator.java | 509 +++++++++++++++++++ .../samza/coordinator/AzureLeaderElector.java | 109 ++++ .../samza/coordinator/data/BarrierState.java | 27 + .../samza/coordinator/data/JobModelBundle.java | 61 +++ .../samza/coordinator/data/ProcessorEntity.java | 62 +++ .../scheduler/HeartbeatScheduler.java | 81 +++ .../scheduler/JMVersionUpgradeScheduler.java | 99 ++++ .../LeaderBarrierCompleteScheduler.java | 118 +++++ .../scheduler/LeaderLivenessCheckScheduler.java | 120 +++++ .../scheduler/LivenessCheckScheduler.java | 108 ++++ .../scheduler/RenewLeaseScheduler.java | 79 +++ .../scheduler/SchedulerStateChangeListener.java | 29 ++ .../coordinator/scheduler/TaskScheduler.java | 35 ++ .../java/org/apache/samza/util/BlobUtils.java | 284 +++++++++++ .../org/apache/samza/util/LeaseBlobManager.java | 99 ++++ .../java/org/apache/samza/util/TableUtils.java | 205 ++++++++ 25 files changed, 2105 insertions(+), 885 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/AzureClient.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/AzureClient.java b/samza-azure/src/main/java/org/apache/samza/AzureClient.java index 2248d12..04f8fd3 100644 --- a/samza-azure/src/main/java/org/apache/samza/AzureClient.java +++ b/samza-azure/src/main/java/org/apache/samza/AzureClient.java @@ -25,6 +25,7 @@ import com.microsoft.azure.storage.RetryPolicy; import com.microsoft.azure.storage.blob.BlobRequestOptions; import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.table.CloudTableClient; +import com.microsoft.azure.storage.table.TableRequestOptions; import java.net.URISyntaxException; import java.security.InvalidKeyException; import org.slf4j.Logger; @@ -44,21 +45,26 @@ public class AzureClient { /** * Creates a reference to the Azure Storage account according to the connection string that the client passes. * Also creates references to Azure Blob Storage and Azure Table Storage. - * @param storageConnectionString Connection string to conenct to Azure Storage Account, format: "DefaultEndpointsProtocol=<https>;AccountName=<>;AccountKey=<>" + * @param storageConnectionString Connection string to connect to Azure Storage Account + * Format: DefaultEndpointsProtocol=https;AccountName="Insert your account name";AccountKey="Insert your account key" * @throws AzureException If an Azure storage service error occurred, or when the storageConnectionString is invalid. */ - AzureClient(String storageConnectionString) { + public AzureClient(String storageConnectionString) { try { account = CloudStorageAccount.parse(storageConnectionString); + RetryPolicy retryPolicy = new RetryLinearRetry(5000, 3); blobClient = account.createCloudBlobClient(); // Set retry policy for operations on the blob. In this case, every failed operation on the blob will be retried thrice, after 5 second intervals. - BlobRequestOptions options = new BlobRequestOptions(); - RetryPolicy retryPolicy = new RetryLinearRetry(5000, 3); - options.setRetryPolicyFactory(retryPolicy); - blobClient.setDefaultRequestOptions(options); + BlobRequestOptions blobOptions = new BlobRequestOptions(); + blobOptions.setRetryPolicyFactory(retryPolicy); + blobClient.setDefaultRequestOptions(blobOptions); + // Set retry policy for operations on the table. In this case, every failed operation on the table will be retried thrice, after 5 second intervals. tableClient = account.createCloudTableClient(); + TableRequestOptions tableOptions = new TableRequestOptions(); + tableOptions.setRetryPolicyFactory(retryPolicy); + tableClient.setDefaultRequestOptions(tableOptions); } catch (IllegalArgumentException | URISyntaxException e) { LOG.error("Connection string {} specifies an invalid URI.", storageConnectionString); LOG.error("Please confirm the connection string is in the Azure connection string format."); http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/AzureConfig.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/AzureConfig.java b/samza-azure/src/main/java/org/apache/samza/AzureConfig.java deleted file mode 100644 index 47873a7..0000000 --- a/samza-azure/src/main/java/org/apache/samza/AzureConfig.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza; - -import org.apache.samza.config.ApplicationConfig; -import org.apache.samza.config.Config; -import org.apache.samza.config.ConfigException; -import org.apache.samza.config.MapConfig; - -/** - * Config class for reading all user defined parameters for Azure driven coordination services. - */ -public class AzureConfig extends MapConfig { - - // Connection string for Azure Storage Account, format: "DefaultEndpointsProtocol=<https>;AccountName=<>;AccountKey=<>" - public static final String AZURE_STORAGE_CONNECT = "azure.storage.connect"; - public static final String AZURE_PAGEBLOB_LENGTH = "job.coordinator.azure.blob.length"; - public static final long DEFAULT_AZURE_PAGEBLOB_LENGTH = 5120000; - - private static String containerName; - private static String blobName; - private static String tableName; - - public AzureConfig(Config config) { - super(config); - ApplicationConfig appConfig = new ApplicationConfig(config); - //Remove all non-alphanumeric characters from id as table name does not allow them. - String id = appConfig.getGlobalAppId().replaceAll("[^A-Za-z0-9]", ""); - containerName = "samzacontainer" + id; - blobName = "samzablob" + id; - tableName = "samzatable" + id; - } - - public String getAzureConnect() { - if (!containsKey(AZURE_STORAGE_CONNECT)) { - throw new ConfigException("Missing " + AZURE_STORAGE_CONNECT + " config!"); - } - return get(AZURE_STORAGE_CONNECT); - } - - public String getAzureContainerName() { - return containerName; - } - - public String getAzureBlobName() { - return blobName; - } - - public long getAzureBlobLength() { - return getLong(AZURE_PAGEBLOB_LENGTH, DEFAULT_AZURE_PAGEBLOB_LENGTH); - } - - public String getAzureTableName() { - return tableName; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java b/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java deleted file mode 100644 index efa8ea1..0000000 --- a/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.samza.coordinator.LeaderElector; -import org.apache.samza.coordinator.LeaderElectorListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Class to facilitate leader election in Azure. - * The processor that acquires the lease on the blob becomes the leader. - * The lease ID is null initially. It is generated by Azure when the processor acquires the lease, and updated accordingly. - * Every processor requires a valid active lease ID in order to perform successful write and delete operations on the blob. - * Read operations from the blob are not dependent on the lease ID. - */ -public class AzureLeaderElector implements LeaderElector { - - private static final Logger LOG = LoggerFactory.getLogger(AzureLeaderElector.class); - private static final int LEASE_TIME_IN_SEC = 60; - private final LeaseBlobManager leaseBlobManager; - private LeaderElectorListener leaderElectorListener = null; - private final AtomicReference<String> leaseId; - private final AtomicBoolean isLeader; - - public AzureLeaderElector(LeaseBlobManager leaseBlobManager) { - this.isLeader = new AtomicBoolean(false); - this.leaseBlobManager = leaseBlobManager; - this.leaseId = new AtomicReference<>(null); - } - - @Override - public void setLeaderElectorListener(LeaderElectorListener listener) { - this.leaderElectorListener = listener; - } - - /** - * Tries to become the leader by acquiring a lease on the blob. - * The acquireLease operation has a retry policy where upon failure, the operation is tried 3 times at 5 second intervals. - * Invokes the listener on becoming the leader. - */ - @Override - public void tryBecomeLeader() { - try { - leaseId.getAndSet(leaseBlobManager.acquireLease(LEASE_TIME_IN_SEC, leaseId.get())); - if (leaseId.get() != null) { - LOG.info("Became leader with lease ID {}.", leaseId.get()); - isLeader.set(true); - if (leaderElectorListener != null) { - leaderElectorListener.onBecomingLeader(); - } - } - } catch (AzureException e) { - LOG.error("Error while trying to acquire lease.", e); - } - } - - /** - * Releases the lease in order to resign leadership. It also stops all schedulers scheduled by the leader. - * The releaseLease operation has a retry policy where upon failure, the operation is tried 3 times at 5 second intervals. - */ - @Override - public void resignLeadership() { - if (isLeader.get()) { - leaseBlobManager.releaseLease(leaseId.get()); - isLeader.set(false); - LOG.info("Resigning leadership with lease ID {}", leaseId.get()); - leaseId.getAndSet(null); - } else { - LOG.info("Can't release the lease because it is not the leader and does not hold an active lease."); - } - } - - /** - * Checks whether it's a leader - * @return true if it is the leader, false otherwise - */ - @Override - public boolean amILeader() { - return isLeader.get(); - } - - public AtomicReference<String> getLeaseId() { - return leaseId; - } - - public LeaseBlobManager getLeaseBlobManager() { - return this.leaseBlobManager; - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/BlobUtils.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/BlobUtils.java b/samza-azure/src/main/java/org/apache/samza/BlobUtils.java deleted file mode 100644 index a798384..0000000 --- a/samza-azure/src/main/java/org/apache/samza/BlobUtils.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza; - -import com.microsoft.azure.storage.AccessCondition; -import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.blob.CloudBlobClient; -import com.microsoft.azure.storage.blob.CloudBlobContainer; -import com.microsoft.azure.storage.blob.CloudPageBlob; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.URISyntaxException; -import java.util.Arrays; -import java.util.List; -import org.apache.samza.job.model.JobModel; -import org.apache.samza.serializers.model.SamzaObjectMapper; -import org.eclipse.jetty.http.HttpStatus; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Client side class that has reference to Azure blob storage. - * Used for writing and reading from the blob. - * Every write requires a valid lease ID. - */ -public class BlobUtils { - - private static final Logger LOG = LoggerFactory.getLogger(BlobUtils.class); - private static final long JOB_MODEL_BLOCK_SIZE = 1024000; - private static final long BARRIER_STATE_BLOCK_SIZE = 1024; - private static final long PROCESSOR_LIST_BLOCK_SIZE = 1024; - private CloudBlobClient blobClient; - private CloudBlobContainer container; - private CloudPageBlob blob; - - /** - * Creates an object of BlobUtils. It creates the container and page blob if they don't exist already. - * @param client Client handle for access to Azure Storage account. - * @param containerName Name of container inside which we want the blob to reside. - * @param blobName Name of the blob to be managed. - * @param length Length of the page blob. - * @throws AzureException If an Azure storage service error occurred, or when the container name or blob name is invalid. - */ - public BlobUtils(AzureClient client, String containerName, String blobName, long length) { - this.blobClient = client.getBlobClient(); - try { - this.container = blobClient.getContainerReference(containerName); - container.createIfNotExists(); - this.blob = container.getPageBlobReference(blobName); - if (!blob.exists()) { - blob.create(length, AccessCondition.generateIfNotExistsCondition(), null, null); - } - } catch (URISyntaxException e) { - LOG.error("Container name: " + containerName + " or blob name: " + blobName + " invalid.", e); - throw new AzureException(e); - } catch (StorageException e) { - int httpStatusCode = e.getHttpStatusCode(); - if (httpStatusCode == HttpStatus.CONFLICT_409) { - LOG.info("The blob you're trying to create exists already.", e); - } else { - LOG.error("Azure Storage Exception!", e); - throw new AzureException(e); - } - } - } - - /** - * Writes the job model to the blob. - * Write is successful only if the lease ID passed is valid and the processor holds the lease. - * Called by the leader. - * @param prevJM Previous job model version that the processor was operating on. - * @param currJM Current job model version that the processor is operating on. - * @param prevJMV Previous job model version that the processor was operating on. - * @param currJMV Current job model version that the processor is operating on. - * @param leaseId LeaseID of the lease that the processor holds on the blob. Null if there is no lease. - * @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred. - */ - public boolean publishJobModel(JobModel prevJM, JobModel currJM, String prevJMV, String currJMV, String leaseId) { - try { - if (leaseId == null) { - return false; - } - JobModelBundle bundle = new JobModelBundle(prevJM, currJM, prevJMV, currJMV); - byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(bundle); - byte[] pageData = Arrays.copyOf(data, (int) JOB_MODEL_BLOCK_SIZE); - InputStream is = new ByteArrayInputStream(pageData); - blob.uploadPages(is, 0, JOB_MODEL_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null); - LOG.info("Uploaded {} jobModel to blob", bundle.getCurrJobModel()); - return true; - } catch (StorageException | IOException e) { - LOG.error("JobModel publish failed for version = " + currJMV, e); - return false; - } - } - - /** - * Reads the current job model from the blob. - * @return The current job model published on the blob. Returns null when job model details not found on the blob. - * @throws AzureException in getJobModelBundle() if an Azure storage service error occurred. - * @throws SamzaException in getJobModelBundle() if data retrieved from blob could not be parsed by SamzaObjectMapper. - */ - public JobModel getJobModel() { - LOG.info("Reading the job model from blob."); - JobModelBundle jmBundle = getJobModelBundle(); - if (jmBundle == null) { - LOG.error("Job Model details don't exist on the blob."); - return null; - } - JobModel jm = jmBundle.getCurrJobModel(); - return jm; - } - - /** - * Reads the current job model version from the blob . - * @return Current job model version published on the blob. Returns null when job model details not found on the blob. - * @throws AzureException in getJobModelBundle() if an Azure storage service error occurred. - * @throws SamzaException in getJobModelBundle() if data retrieved from blob could not be parsed by SamzaObjectMapper. - */ - public String getJobModelVersion() { - LOG.info("Reading the job model version from blob."); - JobModelBundle jmBundle = getJobModelBundle(); - if (jmBundle == null) { - LOG.error("Job Model details don't exist on the blob."); - return null; - } - String jmVersion = jmBundle.getCurrJobModelVersion(); - return jmVersion; - } - - /** - * Writes the barrier state to the blob. - * Write is successful only if the lease ID passed is valid and the processor holds the lease. - * Called only by the leader. - * @param state Barrier state to be published to the blob. - * @param leaseId LeaseID of the valid lease that the processor holds on the blob. Null if there is no lease. - * @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred. - */ - public boolean publishBarrierState(String state, String leaseId) { - try { - if (leaseId == null) { - return false; - } - byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(state); - byte[] pageData = Arrays.copyOf(data, (int) BARRIER_STATE_BLOCK_SIZE); - InputStream is = new ByteArrayInputStream(pageData); - - //uploadPages is only successful when the AccessCondition provided has an active and valid lease ID. It fails otherwise. - blob.uploadPages(is, JOB_MODEL_BLOCK_SIZE, BARRIER_STATE_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null); - LOG.info("Uploaded barrier state {} to blob", state); - return true; - } catch (StorageException | IOException e) { - LOG.error("Barrier state " + state + " publish failed", e); - return false; - } - } - - /** - * Reads the current barrier state from the blob. - * @return Barrier state published on the blob. - * @throws AzureException If an Azure storage service error occurred. - * @throws SamzaException If data retrieved from blob could not be parsed by SamzaObjectMapper. - */ - public String getBarrierState() { - LOG.info("Reading the barrier state from blob."); - byte[] data = new byte[(int) BARRIER_STATE_BLOCK_SIZE]; - try { - blob.downloadRangeToByteArray(JOB_MODEL_BLOCK_SIZE, BARRIER_STATE_BLOCK_SIZE, data, 0); - } catch (StorageException e) { - LOG.error("Failed to read barrier state from blob.", e); - throw new AzureException(e); - } - String state; - try { - state = SamzaObjectMapper.getObjectMapper().readValue(data, String.class); - } catch (IOException e) { - LOG.error("Failed to parse byte data: " + data + " for barrier state retrieved from the blob.", e); - throw new SamzaException(e); - } - return state; - } - - /** - * Writes the list of live processors in the system to the blob. - * Write is successful only if the lease ID passed is valid and the processor holds the lease. - * Called only by the leader. - * @param processors List of live processors to be published on the blob. - * @param leaseId LeaseID of the valid lease that the processor holds on the blob. Null if there is no lease. - * @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred. - */ - public boolean publishLiveProcessorList(List<String> processors, String leaseId) { - try { - if (leaseId == null) { - return false; - } - byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(processors); - byte[] pageData = Arrays.copyOf(data, (int) BARRIER_STATE_BLOCK_SIZE); - InputStream is = new ByteArrayInputStream(pageData); - blob.uploadPages(is, JOB_MODEL_BLOCK_SIZE + BARRIER_STATE_BLOCK_SIZE, PROCESSOR_LIST_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null); - LOG.info("Uploaded list of live processors to blob."); - return true; - } catch (StorageException | IOException e) { - LOG.error("Processor list: " + processors + "publish failed", e); - return false; - } - } - - /** - * Reads the list of live processors published on the blob. - * @return String list of live processors. - * @throws AzureException If an Azure storage service error occurred. - * @throws SamzaException If data retrieved from blob could not be parsed by SamzaObjectMapper. - */ - public List<String> getLiveProcessorList() { - LOG.info("Read the the list of live processors from blob."); - byte[] data = new byte[(int) PROCESSOR_LIST_BLOCK_SIZE]; - try { - blob.downloadRangeToByteArray(JOB_MODEL_BLOCK_SIZE + BARRIER_STATE_BLOCK_SIZE, PROCESSOR_LIST_BLOCK_SIZE, data, 0); - } catch (StorageException e) { - LOG.error("Failed to read the list of live processors from the blob.", new AzureException(e)); - throw new AzureException(e); - } - List<String> list; - try { - list = SamzaObjectMapper.getObjectMapper().readValue(data, List.class); - } catch (IOException e) { - LOG.error("Failed to parse byte data: " + data + " for live processor list retrieved from the blob", new SamzaException(e)); - throw new SamzaException(e); - } - return list; - } - - public CloudBlobClient getBlobClient() { - return this.blobClient; - } - - public CloudBlobContainer getBlobContainer() { - return this.container; - } - - public CloudPageBlob getBlob() { - return this.blob; - } - - private JobModelBundle getJobModelBundle() { - byte[] data = new byte[(int) JOB_MODEL_BLOCK_SIZE]; - try { - blob.downloadRangeToByteArray(0, JOB_MODEL_BLOCK_SIZE, data, 0); - } catch (StorageException e) { - LOG.error("Failed to read JobModel details from the blob.", e); - throw new AzureException(e); - } - try { - JobModelBundle jmBundle = SamzaObjectMapper.getObjectMapper().readValue(data, JobModelBundle.class); - return jmBundle; - } catch (IOException e) { - LOG.error("Failed to parse byte data: " + data + " for JobModel details retrieved from the blob", e); - throw new SamzaException(e); - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/JobModelBundle.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/JobModelBundle.java b/samza-azure/src/main/java/org/apache/samza/JobModelBundle.java deleted file mode 100644 index 3ff971f..0000000 --- a/samza-azure/src/main/java/org/apache/samza/JobModelBundle.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza; - -import org.apache.samza.job.model.JobModel; - - -/** - * Bundle class for current and previous - job model and job model version. - * Used for publishing updated data to the blob in one go. - */ -public class JobModelBundle { - - private JobModel prevJobModel; - private JobModel currJobModel; - private String prevJobModelVersion; - private String currJobModelVersion; - - public JobModelBundle() {} - - public JobModelBundle(JobModel prevJM, JobModel currJM, String prevJMV, String currJMV) { - prevJobModel = prevJM; - currJobModel = currJM; - prevJobModelVersion = prevJMV; - currJobModelVersion = currJMV; - } - - public JobModel getCurrJobModel() { - return currJobModel; - } - - public JobModel getPrevJobModel() { - return prevJobModel; - } - - public String getCurrJobModelVersion() { - return currJobModelVersion; - } - - public String getPrevJobModelVersion() { - return prevJobModelVersion; - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java b/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java deleted file mode 100644 index 5375662..0000000 --- a/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza; - -import com.microsoft.azure.storage.AccessCondition; -import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.blob.CloudPageBlob; -import org.eclipse.jetty.http.HttpStatus; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Helper class for lease blob operations. - */ -public class LeaseBlobManager { - - private static final Logger LOG = LoggerFactory.getLogger(LeaseBlobManager.class); - private CloudPageBlob leaseBlob; - - public LeaseBlobManager(CloudPageBlob leaseBlob) { - this.leaseBlob = leaseBlob; - } - - /** - * Acquires a lease on a blob. The lease ID is NULL initially. - * @param leaseTimeInSec The time in seconds you want to acquire the lease for. - * @param leaseId Proposed ID you want to acquire the lease with, null if not proposed. - * @return String that represents lease ID. Null if acquireLease is unsuccessful because the blob is leased already. - * @throws AzureException If a Azure storage service error occurred. This includes the case where the blob you're trying to lease does not exist. - */ - public String acquireLease(int leaseTimeInSec, String leaseId) { - try { - String id = leaseBlob.acquireLease(leaseTimeInSec, leaseId); - LOG.info("Acquired lease with lease id = " + id); - return id; - } catch (StorageException storageException) { - int httpStatusCode = storageException.getHttpStatusCode(); - if (httpStatusCode == HttpStatus.CONFLICT_409) { - LOG.info("The blob you're trying to acquire is leased already.", storageException); - } else if (httpStatusCode == HttpStatus.NOT_FOUND_404) { - LOG.error("The blob you're trying to lease does not exist.", storageException); - throw new AzureException(storageException); - } else { - LOG.error("Error acquiring lease!", storageException); - throw new AzureException(storageException); - } - } - return null; - } - - /** - * Renews the lease on the blob. - * @param leaseId ID of the lease to be renewed. - * @return True if lease was renewed successfully, false otherwise. - */ - public boolean renewLease(String leaseId) { - try { - leaseBlob.renewLease(AccessCondition.generateLeaseCondition(leaseId)); - return true; - } catch (StorageException storageException) { - LOG.error("Wasn't able to renew lease with lease id: " + leaseId, storageException); - return false; - } - } - - /** - * Releases the lease on the blob. - * @param leaseId ID of the lease to be released. - * @return True if released successfully, false otherwise. - */ - public boolean releaseLease(String leaseId) { - try { - leaseBlob.releaseLease(AccessCondition.generateLeaseCondition(leaseId)); - return true; - } catch (StorageException storageException) { - LOG.error("Wasn't able to release lease with lease id: " + leaseId, storageException); - return false; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/ProcessorEntity.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/ProcessorEntity.java b/samza-azure/src/main/java/org/apache/samza/ProcessorEntity.java deleted file mode 100644 index 5145821..0000000 --- a/samza-azure/src/main/java/org/apache/samza/ProcessorEntity.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza; - -import com.microsoft.azure.storage.table.TableServiceEntity; - - -/** - * Table schema for Azure processor table. - * Denotes a row in the table with PARTITION KEY = Job Model Version and ROW KEY = Processor ID - * Other fields include integer liveness value to which each processor heartbeats, - * and boolean isLeader value which denotes whether the processor is a leader or not. - */ -public class ProcessorEntity extends TableServiceEntity { - private int liveness; - private boolean isLeader; - - public ProcessorEntity() {} - - public ProcessorEntity(String jobModelVersion, String processorId) { - this.partitionKey = jobModelVersion; - this.rowKey = processorId; - this.isLeader = false; - } - - /** - * Updates heartbeat by updating the liveness value in the table. - * @param value - */ - public void setLiveness(int value) { - liveness = value; - } - - public void setIsLeader(boolean leader) { - isLeader = leader; - } - - public boolean getIsLeader() { - return isLeader; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/TableUtils.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/TableUtils.java b/samza-azure/src/main/java/org/apache/samza/TableUtils.java deleted file mode 100644 index e49fd90..0000000 --- a/samza-azure/src/main/java/org/apache/samza/TableUtils.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza; - -import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.table.CloudTable; -import com.microsoft.azure.storage.table.CloudTableClient; -import com.microsoft.azure.storage.table.TableOperation; -import com.microsoft.azure.storage.table.TableQuery; -import java.net.URISyntaxException; -import java.util.HashSet; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Client side class that has a reference to Azure Table Storage. - * Enables the user to add or delete information from the table, make updates to the table and retrieve information from the table. - * Every row in a table is uniquely identified by a combination of the PARTIITON KEY and ROW KEY. - * PARTITION KEY = Group ID = Job Model Version (for this case). - * ROW KEY = Unique entity ID for a group = Processor ID (for this case). - */ -public class TableUtils { - - private static final Logger LOG = LoggerFactory.getLogger(TableUtils.class); - private static final String PARTITION_KEY = "PartitionKey"; - private static final long CHECK_LIVENESS_DELAY = 30; - private static final String INITIAL_STATE = "unassigned"; - private CloudTableClient tableClient; - private CloudTable table; - - public TableUtils(AzureClient client, String tableName) { - tableClient = client.getTableClient(); - try { - table = tableClient.getTableReference(tableName); - table.createIfNotExists(); - } catch (URISyntaxException e) { - LOG.error("\nConnection string specifies an invalid URI.", new SamzaException(e)); - } catch (StorageException e) { - LOG.error("Azure storage exception.", new SamzaException(e)); - } - } - - /** - * Add a row which denotes an active processor to the processor table. - * @param jmVersion Job model version that the processor is operating on. - * @param pid Unique processor ID. - * @param liveness Random heartbeat value. - * @param isLeader Denotes whether the processor is a leader or not. - * @throws AzureException If an Azure storage service error occurred. - */ - public void addProcessorEntity(String jmVersion, String pid, int liveness, boolean isLeader) { - ProcessorEntity entity = new ProcessorEntity(jmVersion, pid); - entity.setIsLeader(isLeader); - entity.setLiveness(liveness); - TableOperation add = TableOperation.insert(entity); - try { - table.execute(add); - } catch (StorageException e) { - LOG.error("Azure storage exception while adding processor entity with job model version: " + jmVersion + "and pid: " + pid, e); - throw new AzureException(e); - } - } - - /** - * Retrieve a particular row in the processor table, given the partition key and the row key. - * @param jmVersion Job model version of the processor row to be retrieved. - * @param pid Unique processor ID of the processor row to be retrieved. - * @return An instance of required processor entity. Null if does not exist. - * @throws AzureException If an Azure storage service error occurred. - */ - public ProcessorEntity getEntity(String jmVersion, String pid) { - try { - TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class); - ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType(); - return entity; - } catch (StorageException e) { - LOG.error("Azure storage exception while retrieving processor entity with job model version: " + jmVersion + "and pid: " + pid, e); - throw new AzureException(e); - } - } - - /** - * Updates the liveness value of a particular processor with a randomly generated integer, which in turn updates the last modified since timestamp of the row. - * @param jmVersion Job model version of the processor row to be updated. - * @param pid Unique processor ID of the processor row to be updated. - */ - public void updateHeartbeat(String jmVersion, String pid) { - try { - Random rand = new Random(); - int value = rand.nextInt(10000) + 2; - TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class); - ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType(); - entity.setLiveness(value); - TableOperation update = TableOperation.replace(entity); - table.execute(update); - } catch (StorageException e) { - LOG.error("Azure storage exception while updating heartbeat for job model version: " + jmVersion + "and pid: " + pid, e); - } - } - - /** - * Updates the isLeader value when the processor starts or stops being a leader. - * @param jmVersion Job model version of the processor row to be updated. - * @param pid Unique processor ID of the processor row to be updated. - * @param isLeader Denotes whether the processor is a leader or not. - * @throws AzureException If an Azure storage service error occurred. - */ - public void updateIsLeader(String jmVersion, String pid, boolean isLeader) { - try { - TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class); - ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType(); - entity.setIsLeader(isLeader); - TableOperation update = TableOperation.replace(entity); - table.execute(update); - } catch (StorageException e) { - LOG.error("Azure storage exception while updating isLeader value for job model version: " + jmVersion + "and pid: " + pid, e); - throw new AzureException(e); - } - } - - /** - * Deletes a specified row in the processor table. - * @param jmVersion Job model version of the processor row to be deleted. - * @param pid Unique processor ID of the processor row to be deleted. - * @throws AzureException If an Azure storage service error occurred. - */ - public void deleteProcessorEntity(String jmVersion, String pid) { - try { - TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class); - ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType(); - TableOperation remove = TableOperation.delete(entity); - table.execute(remove); - } catch (StorageException e) { - LOG.error("Azure storage exception while deleting processor entity with job model version: " + jmVersion + "and pid: " + pid, e); - throw new AzureException(e); - } - } - - /** - * Retrieve all rows in a table with the given partition key. - * @param partitionKey Job model version of the processors to be retrieved. - * @return Iterable list of processor entities. - */ - public Iterable<ProcessorEntity> getEntitiesWithPartition(String partitionKey) { - String partitionFilter = TableQuery.generateFilterCondition(PARTITION_KEY, TableQuery.QueryComparisons.EQUAL, partitionKey); - TableQuery<ProcessorEntity> partitionQuery = TableQuery.from(ProcessorEntity.class).where(partitionFilter); - return table.execute(partitionQuery); - } - - /** - * Gets the list of all active processors that are heartbeating to the processor table. - * @param currentJMVersion Current job model version that the processors in the application are operating on. - * @return List of ids of currently active processors in the application, retrieved from the processor table. - */ - public Set<String> getActiveProcessorsList(AtomicReference<String> currentJMVersion) { - Iterable<ProcessorEntity> tableList = getEntitiesWithPartition(currentJMVersion.get()); - Set<String> activeProcessorsList = new HashSet<>(); - for (ProcessorEntity entity: tableList) { - if (System.currentTimeMillis() - entity.getTimestamp().getTime() <= CHECK_LIVENESS_DELAY * 1000) { - activeProcessorsList.add(entity.getRowKey()); - } - } - - Iterable<ProcessorEntity> unassignedList = getEntitiesWithPartition(INITIAL_STATE); - for (ProcessorEntity entity: unassignedList) { - if (System.currentTimeMillis() - entity.getTimestamp().getTime() <= CHECK_LIVENESS_DELAY * 1000) { - activeProcessorsList.add(entity.getRowKey()); - } - } - return activeProcessorsList; - } - - public CloudTable getTable() { - return table; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java b/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java new file mode 100644 index 0000000..dc96d2d --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.config; + +/** + * Config class for reading all user defined parameters for Azure driven coordination services. + */ +public class AzureConfig extends MapConfig { + + // Connection string for Azure Storage Account, format: "DefaultEndpointsProtocol=<https>;AccountName=<>;AccountKey=<>" + public static final String AZURE_STORAGE_CONNECT = "azure.storage.connect"; + public static final String AZURE_PAGEBLOB_LENGTH = "job.coordinator.azure.blob.length"; + public static final long DEFAULT_AZURE_PAGEBLOB_LENGTH = 5120000; + + private static String containerName; + private static String blobName; + private static String tableName; + + public AzureConfig(Config config) { + super(config); + ApplicationConfig appConfig = new ApplicationConfig(config); + //Remove all non-alphanumeric characters from id as table name does not allow them. + String id = appConfig.getGlobalAppId().replaceAll("[^A-Za-z0-9]", ""); + containerName = "samzacontainer" + id; + blobName = "samzablob" + id; + tableName = "samzatable" + id; + } + + public String getAzureConnect() { + if (!containsKey(AZURE_STORAGE_CONNECT)) { + throw new ConfigException("Missing " + AZURE_STORAGE_CONNECT + " config!"); + } + return get(AZURE_STORAGE_CONNECT); + } + + public String getAzureContainerName() { + return containerName; + } + + public String getAzureBlobName() { + return blobName; + } + + public long getAzureBlobLength() { + return getLong(AZURE_PAGEBLOB_LENGTH, DEFAULT_AZURE_PAGEBLOB_LENGTH); + } + + public String getAzureTableName() { + return tableName; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java new file mode 100644 index 0000000..9438690 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java @@ -0,0 +1,509 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.AzureClient; +import org.apache.samza.config.AzureConfig; +import org.apache.samza.coordinator.data.BarrierState; +import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigException; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.TaskConfig; +import org.apache.samza.container.TaskName; +import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper; +import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.runtime.ProcessorIdGenerator; +import org.apache.samza.coordinator.scheduler.HeartbeatScheduler; +import org.apache.samza.coordinator.scheduler.JMVersionUpgradeScheduler; +import org.apache.samza.coordinator.scheduler.LeaderBarrierCompleteScheduler; +import org.apache.samza.coordinator.scheduler.LeaderLivenessCheckScheduler; +import org.apache.samza.coordinator.scheduler.LivenessCheckScheduler; +import org.apache.samza.coordinator.scheduler.RenewLeaseScheduler; +import org.apache.samza.coordinator.scheduler.SchedulerStateChangeListener; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.BlobUtils; +import org.apache.samza.util.ClassLoaderHelper; +import org.apache.samza.util.LeaseBlobManager; +import org.apache.samza.util.TableUtils; +import org.apache.samza.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConverters; + + +/** + * Class that provides coordination mechanism for Samza standalone in Azure. + * Handles processor lifecycle through Azure blob and table storage. Orchestrates leader election. + * The leader job coordinator generates partition mapping, writes shared data to the blob and manages rebalancing. + */ +public class AzureJobCoordinator implements JobCoordinator { + private static final Logger LOG = LoggerFactory.getLogger(AzureJobCoordinator.class); + private static final int METADATA_CACHE_TTL_MS = 5000; + private static final String INITIAL_STATE = "UNASSIGNED"; + private final Consumer<String> errorHandler; + private final AzureLeaderElector azureLeaderElector; + private final BlobUtils leaderBlob; + private final TableUtils table; + private final Config config; + private final String processorId; + private final AzureClient client; + private final AtomicReference<String> currentJMVersion; + private final AtomicBoolean versionUpgradeDetected; + private final HeartbeatScheduler heartbeat; + private final JMVersionUpgradeScheduler versionUpgrade; + private final LeaderLivenessCheckScheduler leaderAlive; + private LivenessCheckScheduler liveness; + private RenewLeaseScheduler renewLease; + private LeaderBarrierCompleteScheduler leaderBarrierScheduler; + private StreamMetadataCache streamMetadataCache = null; + private JobCoordinatorListener coordinatorListener = null; + private JobModel jobModel = null; + + /** + * Creates an instance of Azure job coordinator, along with references to Azure leader elector, Azure Blob and Azure Table. + * @param config User defined config + */ + public AzureJobCoordinator(Config config) { + //TODO: Cleanup previous values in the table when barrier times out. + this.config = config; + processorId = createProcessorId(config); + currentJMVersion = new AtomicReference<>(INITIAL_STATE); + AzureConfig azureConfig = new AzureConfig(config); + client = new AzureClient(azureConfig.getAzureConnect()); + leaderBlob = new BlobUtils(client, azureConfig.getAzureContainerName(), azureConfig.getAzureBlobName(), azureConfig.getAzureBlobLength()); + errorHandler = (errorMsg) -> { + LOG.error(errorMsg); + stop(); + }; + table = new TableUtils(client, azureConfig.getAzureTableName(), INITIAL_STATE); + azureLeaderElector = new AzureLeaderElector(new LeaseBlobManager(leaderBlob.getBlob())); + azureLeaderElector.setLeaderElectorListener(new AzureLeaderElectorListener()); + versionUpgradeDetected = new AtomicBoolean(false); + heartbeat = new HeartbeatScheduler(errorHandler, table, currentJMVersion, processorId); + versionUpgrade = new JMVersionUpgradeScheduler(errorHandler, leaderBlob, currentJMVersion, versionUpgradeDetected, processorId); + leaderAlive = new LeaderLivenessCheckScheduler(errorHandler, table, leaderBlob, currentJMVersion, INITIAL_STATE); + leaderBarrierScheduler = null; + renewLease = null; + liveness = null; + } + + @Override + public void start() { + + LOG.info("Starting Azure job coordinator."); + streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config); + table.addProcessorEntity(INITIAL_STATE, processorId, false); + + // Start scheduler for heartbeating + LOG.info("Starting scheduler for heartbeating."); + heartbeat.scheduleTask(); + + azureLeaderElector.tryBecomeLeader(); + + // Start scheduler to check for job model version upgrades + LOG.info("Starting scheduler to check for job model version upgrades."); + versionUpgrade.setStateChangeListener(createJMVersionUpgradeListener()); + versionUpgrade.scheduleTask(); + + // Start scheduler to check for leader liveness + LOG.info("Starting scheduler to check for leader liveness."); + leaderAlive.setStateChangeListener(createLeaderLivenessListener()); + leaderAlive.scheduleTask(); + } + + @Override + public void stop() { + LOG.info("Shutting down Azure job coordinator."); + + if (coordinatorListener != null) { + coordinatorListener.onJobModelExpired(); + } + + // Resign leadership + if (azureLeaderElector.amILeader()) { + azureLeaderElector.resignLeadership(); + } + + // Shutdown all schedulers + shutdownSchedulers(); + + if (coordinatorListener != null) { + coordinatorListener.onCoordinatorStop(); + } + } + + @Override + public String getProcessorId() { + return processorId; + } + + @Override + public void setListener(JobCoordinatorListener listener) { + this.coordinatorListener = listener; + } + + @Override + public JobModel getJobModel() { + return jobModel; + } + + private void shutdownSchedulers() { + if (renewLease != null) { + renewLease.shutdown(); + } + if (leaderBarrierScheduler != null) { + leaderBarrierScheduler.shutdown(); + } + if (liveness != null) { + liveness.shutdown(); + } + heartbeat.shutdown(); + leaderAlive.shutdown(); + versionUpgrade.shutdown(); + } + + /** + * Creates a listener for LeaderBarrierCompleteScheduler class. + * Invoked by the leader when it detects that rebalancing has completed by polling the processor table. + * Updates the barrier state on the blob to denote that the barrier has completed. + * Cancels all future tasks scheduled by the LeaderBarrierComplete scheduler to check if barrier has completed. + * @return an instance of SchedulerStateChangeListener. + */ + private SchedulerStateChangeListener createLeaderBarrierCompleteListener(String nextJMVersion, AtomicBoolean barrierTimeout) { + return () -> { + versionUpgradeDetected.getAndSet(false); + String state; + if (barrierTimeout.get()) { + LOG.error("Barrier timed out for version {}", nextJMVersion); + state = BarrierState.TIMEOUT.name() + " " + nextJMVersion; + } else { + LOG.info("Leader detected barrier completion."); + state = BarrierState.END.name() + " " + nextJMVersion; + } + if (!leaderBlob.publishBarrierState(state, azureLeaderElector.getLeaseId().get())) { + LOG.info("Leader failed to publish the job model {}. Stopping the processor with PID: .", jobModel, processorId); + stop(); + table.deleteProcessorEntity(currentJMVersion.get(), processorId); + } + leaderBarrierScheduler.shutdown(); + }; + } + + /** + * Creates a listener for LivenessCheckScheduler class. + * Invoked by the leader when the list of active processors in the system changes. + * @return an instance of SchedulerStateChangeListener. + */ + private SchedulerStateChangeListener createLivenessListener(AtomicReference<List<String>> liveProcessors) { + return () -> { + LOG.info("Leader detected change in list of live processors."); + doOnProcessorChange(liveProcessors.get()); + }; + } + + /** + * Creates a listener for JMVersionUpgradeScheduler class. + * Invoked when the processor detects a job model version upgrade on the blob. + * Stops listening for job model version upgrades until rebalancing achieved. + * @return an instance of SchedulerStateChangeListener. + */ + private SchedulerStateChangeListener createJMVersionUpgradeListener() { + return () -> { + LOG.info("Job model version upgrade detected."); + versionUpgradeDetected.getAndSet(true); + onNewJobModelAvailable(leaderBlob.getJobModelVersion()); + }; + } + + /** + * Creates a listener for LeaderLivenessCheckScheduler class. + * Invoked when an existing leader dies. Enables the JC to participate in leader election again. + * @return an instance of SchedulerStateChangeListener. + */ + private SchedulerStateChangeListener createLeaderLivenessListener() { + return () -> { + LOG.info("Existing leader died."); + azureLeaderElector.tryBecomeLeader(); + }; + } + + /** + * For each input stream specified in config, exactly determine its + * partitions, returning a set of SystemStreamPartitions containing them all. + */ + private Set<SystemStreamPartition> getInputStreamPartitions() { + TaskConfig taskConfig = new TaskConfig(config); + scala.collection.immutable.Set<SystemStream> inputSystemStreams = taskConfig.getInputStreams(); + + // Get the set of partitions for each SystemStream from the stream metadata + Set<SystemStreamPartition> + sspSet = JavaConverters.mapAsJavaMapConverter(streamMetadataCache.getStreamMetadata(inputSystemStreams, true)).asJava() + .entrySet() + .stream() + .flatMap(this::mapSSMToSSP) + .collect(Collectors.toSet()); + + return sspSet; + } + + private Stream<SystemStreamPartition> mapSSMToSSP(Map.Entry<SystemStream, SystemStreamMetadata> ssMs) { + return ssMs.getValue() + .getSystemStreamPartitionMetadata() + .keySet() + .stream() + .map(partition -> new SystemStreamPartition(ssMs.getKey(), partition)); + } + + /** + * Gets a SystemStreamPartitionGrouper object from the configuration. + */ + private SystemStreamPartitionGrouper getSystemStreamPartitionGrouper() { + JobConfig jobConfig = new JobConfig(config); + String factoryString = jobConfig.getSystemStreamPartitionGrouperFactory(); + SystemStreamPartitionGrouper grouper = Util.<SystemStreamPartitionGrouperFactory>getObj(factoryString).getSystemStreamPartitionGrouper(jobConfig); + return grouper; + } + + private int getMaxNumTasks() { + // Do grouping to fetch TaskName to SSP mapping + Set<SystemStreamPartition> allSystemStreamPartitions = getInputStreamPartitions(); + SystemStreamPartitionGrouper grouper = getSystemStreamPartitionGrouper(); + Map<TaskName, Set<SystemStreamPartition>> groups = grouper.group(allSystemStreamPartitions); + LOG.info("SystemStreamPartitionGrouper " + grouper.toString() + " has grouped the SystemStreamPartitions into " + Integer.toString(groups.size()) + + " tasks with the following taskNames: {}", groups.keySet()); + return groups.size(); + } + + /** + * Called only by the leader, either when the processor becomes the leader, or when the list of live processors changes. + * @param currentProcessorIds New updated list of processor IDs which caused the rebalancing. + */ + private void doOnProcessorChange(List<String> currentProcessorIds) { + // if list of processors is empty - it means we are called from 'onBecomeLeader' + + // Check if number of processors is greater than number of tasks + List<String> initialProcessorIds = new ArrayList<>(currentProcessorIds); + int numTasks = getMaxNumTasks(); + if (currentProcessorIds.size() > numTasks) { + int iterator = 0; + while (currentProcessorIds.size() != numTasks) { + if (!currentProcessorIds.get(iterator).equals(processorId)) { + currentProcessorIds.remove(iterator); + iterator++; + } + } + } + LOG.info("currentProcessorIds = {}", currentProcessorIds); + LOG.info("initialProcessorIds = {}", initialProcessorIds); + + String nextJMVersion; + String prevJMVersion = currentJMVersion.get(); + JobModel prevJobModel = jobModel; + AtomicBoolean barrierTimeout = new AtomicBoolean(false); + + if (currentProcessorIds.isEmpty()) { + if (currentJMVersion.get().equals(INITIAL_STATE)) { + nextJMVersion = "1"; + } else { + nextJMVersion = Integer.toString(Integer.valueOf(prevJMVersion) + 1); + } + currentProcessorIds = new ArrayList<>(table.getActiveProcessorsList(currentJMVersion)); + initialProcessorIds = currentProcessorIds; + } else { + //Check if previous barrier not reached, then previous barrier times out. + String blobJMV = leaderBlob.getJobModelVersion(); + nextJMVersion = Integer.toString(Integer.valueOf(prevJMVersion) + 1); + if (blobJMV != null && Integer.valueOf(blobJMV) > Integer.valueOf(prevJMVersion)) { + prevJMVersion = blobJMV; + prevJobModel = leaderBlob.getJobModel(); + nextJMVersion = Integer.toString(Integer.valueOf(blobJMV) + 1); + versionUpgradeDetected.getAndSet(false); + leaderBarrierScheduler.shutdown(); + leaderBlob.publishBarrierState(BarrierState.TIMEOUT.name() + " " + blobJMV, azureLeaderElector.getLeaseId().get()); + } + } + + // Generate the new JobModel + JobModel newJobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), + null, streamMetadataCache, currentProcessorIds); + LOG.info("pid=" + processorId + "Generated new Job Model. Version = " + nextJMVersion); + + // Publish the new job model + boolean jmWrite = leaderBlob.publishJobModel(prevJobModel, newJobModel, prevJMVersion, nextJMVersion, azureLeaderElector.getLeaseId().get()); + // Publish barrier state + boolean barrierWrite = leaderBlob.publishBarrierState(BarrierState.START.name() + " " + nextJMVersion, azureLeaderElector.getLeaseId().get()); + barrierTimeout.set(false); + // Publish list of processors this function was called with + boolean processorWrite = leaderBlob.publishLiveProcessorList(initialProcessorIds, azureLeaderElector.getLeaseId().get()); + + //Shut down processor if write fails even after retries. These writes have an inherent retry policy. + if (!jmWrite || !barrierWrite || !processorWrite) { + LOG.info("Leader failed to publish the job model {}. Stopping the processor with PID: .", jobModel, processorId); + stop(); + table.deleteProcessorEntity(currentJMVersion.get(), processorId); + } + + LOG.info("pid=" + processorId + "Published new Job Model. Version = " + nextJMVersion); + + // Start scheduler to check if barrier reached + long startTime = System.currentTimeMillis(); + leaderBarrierScheduler = new LeaderBarrierCompleteScheduler(errorHandler, table, nextJMVersion, initialProcessorIds, startTime, barrierTimeout, currentJMVersion, processorId); + leaderBarrierScheduler.setStateChangeListener(createLeaderBarrierCompleteListener(nextJMVersion, barrierTimeout)); + leaderBarrierScheduler.scheduleTask(); + } + + /** + * Called when the JC detects a job model version upgrade on the shared blob. + * @param nextJMVersion The new job model version after rebalancing. + */ + private void onNewJobModelAvailable(final String nextJMVersion) { + LOG.info("pid=" + processorId + "new JobModel available with job model version {}", nextJMVersion); + + //Get the new job model from blob + jobModel = leaderBlob.getJobModel(); + LOG.info("pid=" + processorId + ": new JobModel available. ver=" + nextJMVersion + "; jm = " + jobModel); + + if (!jobModel.getContainers().containsKey(processorId)) { + LOG.info("JobModel: {} does not contain the processorId: {}. Stopping the processor.", jobModel, processorId); + stop(); + table.deleteProcessorEntity(currentJMVersion.get(), processorId); + } else { + //Stop current work + if (coordinatorListener != null) { + coordinatorListener.onJobModelExpired(); + } + // Add entry with new job model version to the processor table + table.addProcessorEntity(nextJMVersion, processorId, azureLeaderElector.amILeader()); + + // Start polling blob to check if barrier reached + Random random = new Random(); + String blobBarrierState = leaderBlob.getBarrierState(); + while (true) { + if (blobBarrierState.equals(BarrierState.END.name() + " " + nextJMVersion)) { + LOG.info("Barrier completion detected by the worker for barrier version {}.", nextJMVersion); + versionUpgradeDetected.getAndSet(false); + onNewJobModelConfirmed(nextJMVersion); + break; + } else if (blobBarrierState.equals(BarrierState.TIMEOUT.name() + " " + nextJMVersion) || + (Integer.valueOf(leaderBlob.getJobModelVersion()) > Integer.valueOf(nextJMVersion))) { + LOG.info("Barrier timed out for version number {}", nextJMVersion); + versionUpgradeDetected.getAndSet(false); + break; + } else { + try { + Thread.sleep(random.nextInt(5000)); + } catch (InterruptedException e) { + Thread.interrupted(); + } + LOG.info("Checking for barrier state on the blob again..."); + blobBarrierState = leaderBlob.getBarrierState(); + } + } + } + } + + /** + * Called when the JC detects that the barrier has completed by checking the barrier state on the blob. + * @param nextJMVersion The new job model version after rebalancing. + */ + private void onNewJobModelConfirmed(final String nextJMVersion) { + LOG.info("pid=" + processorId + "new version " + nextJMVersion + " of the job model got confirmed"); + + // Delete previous value + if (table.getEntity(currentJMVersion.get(), processorId) != null) { + table.deleteProcessorEntity(currentJMVersion.get(), processorId); + } + if (table.getEntity(INITIAL_STATE, processorId) != null) { + table.deleteProcessorEntity(INITIAL_STATE, processorId); + } + + //Start heartbeating to new entry only when barrier reached. + //Changing the current job model version enables that since we are heartbeating to a row identified by the current job model version. + currentJMVersion.getAndSet(nextJMVersion); + + //Start the container with the new model + if (coordinatorListener != null) { + coordinatorListener.onNewJobModel(processorId, jobModel); + } + } + + private String createProcessorId(Config config) { + // TODO: This check to be removed after 0.13+ + ApplicationConfig appConfig = new ApplicationConfig(config); + if (appConfig.getProcessorId() != null) { + return appConfig.getProcessorId(); + } else if (StringUtils.isNotBlank(appConfig.getAppProcessorIdGeneratorClass())) { + ProcessorIdGenerator idGenerator = + ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class); + return idGenerator.generateProcessorId(config); + } else { + throw new ConfigException(String + .format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID, + ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS)); + } + } + + public class AzureLeaderElectorListener implements LeaderElectorListener { + /** + * Keep renewing the lease and do the required tasks as a leader. + */ + @Override + public void onBecomingLeader() { + // Update table to denote that it is a leader. + table.updateIsLeader(currentJMVersion.get(), processorId, true); + + // Schedule a task to renew the lease after a fixed time interval + LOG.info("Starting scheduler to keep renewing lease held by the leader."); + renewLease = new RenewLeaseScheduler((errorMsg) -> { + LOG.error(errorMsg); + table.updateIsLeader(currentJMVersion.get(), processorId, false); + azureLeaderElector.resignLeadership(); + renewLease.shutdown(); + liveness.shutdown(); + }, azureLeaderElector.getLeaseBlobManager(), azureLeaderElector.getLeaseId()); + renewLease.scheduleTask(); + + doOnProcessorChange(new ArrayList<>()); + + // Start scheduler to check for change in list of live processors + LOG.info("Starting scheduler to check for change in list of live processors in the system."); + liveness = new LivenessCheckScheduler(errorHandler, table, leaderBlob, currentJMVersion, processorId); + liveness.setStateChangeListener(createLivenessListener(liveness.getLiveProcessors())); + liveness.scheduleTask(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLeaderElector.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLeaderElector.java new file mode 100644 index 0000000..c93f1d0 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLeaderElector.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.samza.AzureException; +import org.apache.samza.util.LeaseBlobManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Class to facilitate leader election in Azure. + * The processor that acquires the lease on the blob becomes the leader. + * The lease ID is null initially. It is generated by Azure when the processor acquires the lease, and updated accordingly. + * Every processor requires a valid active lease ID in order to perform successful write and delete operations on the blob. + * Read operations from the blob are not dependent on the lease ID. + */ +public class AzureLeaderElector implements LeaderElector { + + private static final Logger LOG = LoggerFactory.getLogger(AzureLeaderElector.class); + private static final int LEASE_TIME_IN_SEC = 60; + private final LeaseBlobManager leaseBlobManager; + private LeaderElectorListener leaderElectorListener = null; + private final AtomicReference<String> leaseId; + private final AtomicBoolean isLeader; + + public AzureLeaderElector(LeaseBlobManager leaseBlobManager) { + this.isLeader = new AtomicBoolean(false); + this.leaseBlobManager = leaseBlobManager; + this.leaseId = new AtomicReference<>(null); + } + + @Override + public void setLeaderElectorListener(LeaderElectorListener listener) { + this.leaderElectorListener = listener; + } + + /** + * Tries to become the leader by acquiring a lease on the blob. + * The acquireLease operation has a retry policy where upon failure, the operation is tried 3 times at 5 second intervals. + * Invokes the listener on becoming the leader. + * @throws AzureException If a Azure storage service error occurred. This includes the case where the blob you're trying to lease does not exist. + */ + @Override + public void tryBecomeLeader() throws AzureException { + leaseId.getAndSet(leaseBlobManager.acquireLease(LEASE_TIME_IN_SEC, leaseId.get())); + if (leaseId.get() != null) { + LOG.info("Became leader with lease ID {}.", leaseId.get()); + isLeader.set(true); + if (leaderElectorListener != null) { + leaderElectorListener.onBecomingLeader(); + } + } else { + LOG.info("Unable to become the leader. Continuing as a worker."); + } + } + + /** + * Releases the lease in order to resign leadership. It also stops all schedulers scheduled by the leader. + * The releaseLease operation has a retry policy where upon failure, the operation is tried 3 times at 5 second intervals. + */ + @Override + public void resignLeadership() { + if (isLeader.get()) { + leaseBlobManager.releaseLease(leaseId.get()); + isLeader.set(false); + LOG.info("Resigning leadership with lease ID {}", leaseId.get()); + leaseId.getAndSet(null); + } else { + LOG.info("Can't release the lease because it is not the leader and does not hold an active lease."); + } + } + + /** + * Checks whether it's a leader + * @return true if it is the leader, false otherwise + */ + @Override + public boolean amILeader() { + return isLeader.get(); + } + + public AtomicReference<String> getLeaseId() { + return leaseId; + } + + public LeaseBlobManager getLeaseBlobManager() { + return this.leaseBlobManager; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/data/BarrierState.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/data/BarrierState.java b/samza-azure/src/main/java/org/apache/samza/coordinator/data/BarrierState.java new file mode 100644 index 0000000..1c144de --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/coordinator/data/BarrierState.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator.data; + +/** + * Enum depicting different barrier states. + */ +public enum BarrierState { + START, END, TIMEOUT +} http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/data/JobModelBundle.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/data/JobModelBundle.java b/samza-azure/src/main/java/org/apache/samza/coordinator/data/JobModelBundle.java new file mode 100644 index 0000000..d05e0a5 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/coordinator/data/JobModelBundle.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator.data; + +import org.apache.samza.job.model.JobModel; + + +/** + * Bundle class for current and previous - job model and job model version. + * Used for publishing updated data to the blob in one go. + */ +public class JobModelBundle { + + private JobModel prevJobModel; + private JobModel currJobModel; + private String prevJobModelVersion; + private String currJobModelVersion; + + public JobModelBundle() {} + + public JobModelBundle(JobModel prevJM, JobModel currJM, String prevJMV, String currJMV) { + prevJobModel = prevJM; + currJobModel = currJM; + prevJobModelVersion = prevJMV; + currJobModelVersion = currJMV; + } + + public JobModel getCurrJobModel() { + return currJobModel; + } + + public JobModel getPrevJobModel() { + return prevJobModel; + } + + public String getCurrJobModelVersion() { + return currJobModelVersion; + } + + public String getPrevJobModelVersion() { + return prevJobModelVersion; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/data/ProcessorEntity.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/data/ProcessorEntity.java b/samza-azure/src/main/java/org/apache/samza/coordinator/data/ProcessorEntity.java new file mode 100644 index 0000000..9323bde --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/coordinator/data/ProcessorEntity.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator.data; + +import com.microsoft.azure.storage.table.TableServiceEntity; +import java.util.Random; + + +/** + * Table schema for Azure processor table. + * Denotes a row in the table with PARTITION KEY = Job Model Version and ROW KEY = Processor ID + * Other fields include integer liveness value to which each processor heartbeats, + * and boolean isLeader value which denotes whether the processor is a leader or not. + */ +public class ProcessorEntity extends TableServiceEntity { + private Random rand = new Random(); + private int liveness; + private boolean isLeader; + + public ProcessorEntity() {} + + public ProcessorEntity(String jobModelVersion, String processorId) { + this.partitionKey = jobModelVersion; + this.rowKey = processorId; + this.isLeader = false; + this.liveness = rand.nextInt(10000) + 2; + } + + /** + * Updates heartbeat by updating the liveness value in the table. + * Sets the liveness field to a random integer value in order to update the last modified since timestamp of the row in the table. + * This asserts to the leader that the processor is alive. + */ + public void updateLiveness() { + liveness = rand.nextInt(10000) + 2; + } + + public void setIsLeader(boolean leader) { + isLeader = leader; + } + + public boolean getIsLeader() { + return isLeader; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/HeartbeatScheduler.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/HeartbeatScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/HeartbeatScheduler.java new file mode 100644 index 0000000..2abb380 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/HeartbeatScheduler.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator.scheduler; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import org.apache.samza.util.TableUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Scheduler invoked by each processor for heartbeating to a row of the table. + * Heartbeats every 5 seconds. + * The row is determined by the job model version and processor id passed to the scheduler. + * All time units are in SECONDS. + */ +public class HeartbeatScheduler implements TaskScheduler { + + private static final Logger LOG = LoggerFactory.getLogger(HeartbeatScheduler.class); + private static final long HEARTBEAT_DELAY_SEC = 5; + private static final ThreadFactory PROCESSOR_THREAD_FACTORY = + new ThreadFactoryBuilder().setNameFormat("HeartbeatScheduler-%d").build(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY); + private final String processorId; + private final TableUtils table; + private final AtomicReference<String> currentJMVersion; + private final Consumer<String> errorHandler; + + public HeartbeatScheduler(Consumer<String> errorHandler, TableUtils table, AtomicReference<String> currentJMVersion, final String pid) { + this.table = table; + this.currentJMVersion = currentJMVersion; + processorId = pid; + this.errorHandler = errorHandler; + } + + @Override + public ScheduledFuture scheduleTask() { + return scheduler.scheduleWithFixedDelay(() -> { + try { + String currJVM = currentJMVersion.get(); + LOG.info("Updating heartbeat for processor ID: " + processorId + " and job model version: " + currJVM); + table.updateHeartbeat(currJVM, processorId); + } catch (Exception e) { + errorHandler.accept("Exception in Heartbeat Scheduler. Stopping the processor..."); + } + }, HEARTBEAT_DELAY_SEC, HEARTBEAT_DELAY_SEC, TimeUnit.SECONDS); + } + + @Override + public void setStateChangeListener(SchedulerStateChangeListener listener) {} + + @Override + public void shutdown() { + LOG.info("Shutting down HeartbeatScheduler"); + scheduler.shutdownNow(); + } +} \ No newline at end of file
