Repository: samza Updated Branches: refs/heads/master a9866d629 -> bf4c7619f
SAMZA-1381: Create Utility Class for interacting with Azure Table Storage PR 1: AzureClient + AzureConfig PR 2: LeaseBlobManager PR 3: BlobUtils + JobModelBundle **PR 4: TableUtils + ProcessorEntity** (Current PR) Author: PawasChhokra <Jaimatadi1$> Author: PawasChhokra <[email protected]> Reviewers: Navina Ramesh <[email protected]> Closes #258 from PawasChhokra/TableUtils Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/bf4c7619 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/bf4c7619 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/bf4c7619 Branch: refs/heads/master Commit: bf4c7619f0c750b351ed28926eca284d93bdbd53 Parents: a9866d6 Author: Pawas Chhokra <[email protected]> Authored: Mon Aug 7 17:17:33 2017 -0700 Committer: navina <[email protected]> Committed: Mon Aug 7 17:17:33 2017 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/samza/AzureConfig.java | 1 - .../java/org/apache/samza/ProcessorEntity.java | 58 ++++++ .../main/java/org/apache/samza/TableUtils.java | 198 +++++++++++++++++++ 3 files changed, 256 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/bf4c7619/samza-azure/src/main/java/org/apache/samza/AzureConfig.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/AzureConfig.java b/samza-azure/src/main/java/org/apache/samza/AzureConfig.java index 32b8082..47873a7 100644 --- a/samza-azure/src/main/java/org/apache/samza/AzureConfig.java +++ b/samza-azure/src/main/java/org/apache/samza/AzureConfig.java @@ -24,7 +24,6 @@ import org.apache.samza.config.Config; import org.apache.samza.config.ConfigException; import org.apache.samza.config.MapConfig; - /** * Config class for reading all user defined parameters for Azure driven coordination services. */ http://git-wip-us.apache.org/repos/asf/samza/blob/bf4c7619/samza-azure/src/main/java/org/apache/samza/ProcessorEntity.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/ProcessorEntity.java b/samza-azure/src/main/java/org/apache/samza/ProcessorEntity.java new file mode 100644 index 0000000..5145821 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/ProcessorEntity.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza; + +import com.microsoft.azure.storage.table.TableServiceEntity; + + +/** + * Table schema for Azure processor table. + * Denotes a row in the table with PARTITION KEY = Job Model Version and ROW KEY = Processor ID + * Other fields include integer liveness value to which each processor heartbeats, + * and boolean isLeader value which denotes whether the processor is a leader or not. + */ +public class ProcessorEntity extends TableServiceEntity { + private int liveness; + private boolean isLeader; + + public ProcessorEntity() {} + + public ProcessorEntity(String jobModelVersion, String processorId) { + this.partitionKey = jobModelVersion; + this.rowKey = processorId; + this.isLeader = false; + } + + /** + * Updates heartbeat by updating the liveness value in the table. + * @param value + */ + public void setLiveness(int value) { + liveness = value; + } + + public void setIsLeader(boolean leader) { + isLeader = leader; + } + + public boolean getIsLeader() { + return isLeader; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/bf4c7619/samza-azure/src/main/java/org/apache/samza/TableUtils.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/TableUtils.java b/samza-azure/src/main/java/org/apache/samza/TableUtils.java new file mode 100644 index 0000000..e49fd90 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/TableUtils.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.table.CloudTable; +import com.microsoft.azure.storage.table.CloudTableClient; +import com.microsoft.azure.storage.table.TableOperation; +import com.microsoft.azure.storage.table.TableQuery; +import java.net.URISyntaxException; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Client side class that has a reference to Azure Table Storage. + * Enables the user to add or delete information from the table, make updates to the table and retrieve information from the table. + * Every row in a table is uniquely identified by a combination of the PARTIITON KEY and ROW KEY. + * PARTITION KEY = Group ID = Job Model Version (for this case). + * ROW KEY = Unique entity ID for a group = Processor ID (for this case). + */ +public class TableUtils { + + private static final Logger LOG = LoggerFactory.getLogger(TableUtils.class); + private static final String PARTITION_KEY = "PartitionKey"; + private static final long CHECK_LIVENESS_DELAY = 30; + private static final String INITIAL_STATE = "unassigned"; + private CloudTableClient tableClient; + private CloudTable table; + + public TableUtils(AzureClient client, String tableName) { + tableClient = client.getTableClient(); + try { + table = tableClient.getTableReference(tableName); + table.createIfNotExists(); + } catch (URISyntaxException e) { + LOG.error("\nConnection string specifies an invalid URI.", new SamzaException(e)); + } catch (StorageException e) { + LOG.error("Azure storage exception.", new SamzaException(e)); + } + } + + /** + * Add a row which denotes an active processor to the processor table. + * @param jmVersion Job model version that the processor is operating on. + * @param pid Unique processor ID. + * @param liveness Random heartbeat value. + * @param isLeader Denotes whether the processor is a leader or not. + * @throws AzureException If an Azure storage service error occurred. + */ + public void addProcessorEntity(String jmVersion, String pid, int liveness, boolean isLeader) { + ProcessorEntity entity = new ProcessorEntity(jmVersion, pid); + entity.setIsLeader(isLeader); + entity.setLiveness(liveness); + TableOperation add = TableOperation.insert(entity); + try { + table.execute(add); + } catch (StorageException e) { + LOG.error("Azure storage exception while adding processor entity with job model version: " + jmVersion + "and pid: " + pid, e); + throw new AzureException(e); + } + } + + /** + * Retrieve a particular row in the processor table, given the partition key and the row key. + * @param jmVersion Job model version of the processor row to be retrieved. + * @param pid Unique processor ID of the processor row to be retrieved. + * @return An instance of required processor entity. Null if does not exist. + * @throws AzureException If an Azure storage service error occurred. + */ + public ProcessorEntity getEntity(String jmVersion, String pid) { + try { + TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class); + ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType(); + return entity; + } catch (StorageException e) { + LOG.error("Azure storage exception while retrieving processor entity with job model version: " + jmVersion + "and pid: " + pid, e); + throw new AzureException(e); + } + } + + /** + * Updates the liveness value of a particular processor with a randomly generated integer, which in turn updates the last modified since timestamp of the row. + * @param jmVersion Job model version of the processor row to be updated. + * @param pid Unique processor ID of the processor row to be updated. + */ + public void updateHeartbeat(String jmVersion, String pid) { + try { + Random rand = new Random(); + int value = rand.nextInt(10000) + 2; + TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class); + ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType(); + entity.setLiveness(value); + TableOperation update = TableOperation.replace(entity); + table.execute(update); + } catch (StorageException e) { + LOG.error("Azure storage exception while updating heartbeat for job model version: " + jmVersion + "and pid: " + pid, e); + } + } + + /** + * Updates the isLeader value when the processor starts or stops being a leader. + * @param jmVersion Job model version of the processor row to be updated. + * @param pid Unique processor ID of the processor row to be updated. + * @param isLeader Denotes whether the processor is a leader or not. + * @throws AzureException If an Azure storage service error occurred. + */ + public void updateIsLeader(String jmVersion, String pid, boolean isLeader) { + try { + TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class); + ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType(); + entity.setIsLeader(isLeader); + TableOperation update = TableOperation.replace(entity); + table.execute(update); + } catch (StorageException e) { + LOG.error("Azure storage exception while updating isLeader value for job model version: " + jmVersion + "and pid: " + pid, e); + throw new AzureException(e); + } + } + + /** + * Deletes a specified row in the processor table. + * @param jmVersion Job model version of the processor row to be deleted. + * @param pid Unique processor ID of the processor row to be deleted. + * @throws AzureException If an Azure storage service error occurred. + */ + public void deleteProcessorEntity(String jmVersion, String pid) { + try { + TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class); + ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType(); + TableOperation remove = TableOperation.delete(entity); + table.execute(remove); + } catch (StorageException e) { + LOG.error("Azure storage exception while deleting processor entity with job model version: " + jmVersion + "and pid: " + pid, e); + throw new AzureException(e); + } + } + + /** + * Retrieve all rows in a table with the given partition key. + * @param partitionKey Job model version of the processors to be retrieved. + * @return Iterable list of processor entities. + */ + public Iterable<ProcessorEntity> getEntitiesWithPartition(String partitionKey) { + String partitionFilter = TableQuery.generateFilterCondition(PARTITION_KEY, TableQuery.QueryComparisons.EQUAL, partitionKey); + TableQuery<ProcessorEntity> partitionQuery = TableQuery.from(ProcessorEntity.class).where(partitionFilter); + return table.execute(partitionQuery); + } + + /** + * Gets the list of all active processors that are heartbeating to the processor table. + * @param currentJMVersion Current job model version that the processors in the application are operating on. + * @return List of ids of currently active processors in the application, retrieved from the processor table. + */ + public Set<String> getActiveProcessorsList(AtomicReference<String> currentJMVersion) { + Iterable<ProcessorEntity> tableList = getEntitiesWithPartition(currentJMVersion.get()); + Set<String> activeProcessorsList = new HashSet<>(); + for (ProcessorEntity entity: tableList) { + if (System.currentTimeMillis() - entity.getTimestamp().getTime() <= CHECK_LIVENESS_DELAY * 1000) { + activeProcessorsList.add(entity.getRowKey()); + } + } + + Iterable<ProcessorEntity> unassignedList = getEntitiesWithPartition(INITIAL_STATE); + for (ProcessorEntity entity: unassignedList) { + if (System.currentTimeMillis() - entity.getTimestamp().getTime() <= CHECK_LIVENESS_DELAY * 1000) { + activeProcessorsList.add(entity.getRowKey()); + } + } + return activeProcessorsList; + } + + public CloudTable getTable() { + return table; + } + +} \ No newline at end of file
