SAMZA-1381: Create Utility Class for interacting with Azure Table Storage

PR 1: AzureClient + AzureConfig
PR 2: LeaseBlobManager
PR 3: BlobUtils + JobModelBundle
**PR 4: TableUtils + ProcessorEntity** (Current PR)

Author: PawasChhokra <Jaimatadi1$>
Author: PawasChhokra <pawas2...@gmail.com>

Reviewers: Navina Ramesh <nav...@apache.org>

Closes #258 from PawasChhokra/TableUtils


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/bf4c7619
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/bf4c7619
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/bf4c7619

Branch: refs/heads/0.14.0
Commit: bf4c7619f0c750b351ed28926eca284d93bdbd53
Parents: a9866d6
Author: Pawas Chhokra <pawas2...@gmail.com>
Authored: Mon Aug 7 17:17:33 2017 -0700
Committer: navina <nav...@apache.org>
Committed: Mon Aug 7 17:17:33 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/samza/AzureConfig.java |   1 -
 .../java/org/apache/samza/ProcessorEntity.java  |  58 ++++++
 .../main/java/org/apache/samza/TableUtils.java  | 198 +++++++++++++++++++
 3 files changed, 256 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/bf4c7619/samza-azure/src/main/java/org/apache/samza/AzureConfig.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/AzureConfig.java 
b/samza-azure/src/main/java/org/apache/samza/AzureConfig.java
index 32b8082..47873a7 100644
--- a/samza-azure/src/main/java/org/apache/samza/AzureConfig.java
+++ b/samza-azure/src/main/java/org/apache/samza/AzureConfig.java
@@ -24,7 +24,6 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.MapConfig;
 
-
 /**
  * Config class for reading all user defined parameters for Azure driven 
coordination services.
  */

http://git-wip-us.apache.org/repos/asf/samza/blob/bf4c7619/samza-azure/src/main/java/org/apache/samza/ProcessorEntity.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/ProcessorEntity.java 
b/samza-azure/src/main/java/org/apache/samza/ProcessorEntity.java
new file mode 100644
index 0000000..5145821
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/ProcessorEntity.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza;
+
+import com.microsoft.azure.storage.table.TableServiceEntity;
+
+
+/**
+ * Table schema for Azure processor table.
+ * Denotes a row in the table with PARTITION KEY = Job Model Version and ROW 
KEY = Processor ID
+ * Other fields include integer liveness value to which each processor 
heartbeats,
+ * and boolean isLeader value which denotes whether the processor is a leader 
or not.
+ */
+public class ProcessorEntity extends TableServiceEntity {
+  private int liveness;
+  private boolean isLeader;
+
+  public ProcessorEntity() {}
+
+  public ProcessorEntity(String jobModelVersion, String processorId) {
+    this.partitionKey = jobModelVersion;
+    this.rowKey = processorId;
+    this.isLeader = false;
+  }
+
+  /**
+   * Updates heartbeat by updating the liveness value in the table.
+   * @param value
+   */
+  public void setLiveness(int value) {
+    liveness = value;
+  }
+
+  public void setIsLeader(boolean leader) {
+    isLeader = leader;
+  }
+
+  public boolean getIsLeader() {
+    return isLeader;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/bf4c7619/samza-azure/src/main/java/org/apache/samza/TableUtils.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/TableUtils.java 
b/samza-azure/src/main/java/org/apache/samza/TableUtils.java
new file mode 100644
index 0000000..e49fd90
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/TableUtils.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.table.CloudTable;
+import com.microsoft.azure.storage.table.CloudTableClient;
+import com.microsoft.azure.storage.table.TableOperation;
+import com.microsoft.azure.storage.table.TableQuery;
+import java.net.URISyntaxException;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ *  Client side class that has a reference to Azure Table Storage.
+ *  Enables the user to add or delete information from the table, make updates 
to the table and retrieve information from the table.
+ *  Every row in a table is uniquely identified by a combination of the 
PARTIITON KEY and ROW KEY.
+ *  PARTITION KEY = Group ID = Job Model Version (for this case).
+ *  ROW KEY = Unique entity ID for a group = Processor ID (for this case).
+ */
+public class TableUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TableUtils.class);
+  private static final String PARTITION_KEY = "PartitionKey";
+  private static final long CHECK_LIVENESS_DELAY = 30;
+  private static final String INITIAL_STATE = "unassigned";
+  private CloudTableClient tableClient;
+  private CloudTable table;
+
+  public TableUtils(AzureClient client, String tableName) {
+    tableClient = client.getTableClient();
+    try {
+      table = tableClient.getTableReference(tableName);
+      table.createIfNotExists();
+    } catch (URISyntaxException e) {
+      LOG.error("\nConnection string specifies an invalid URI.", new 
SamzaException(e));
+    } catch (StorageException e) {
+      LOG.error("Azure storage exception.", new SamzaException(e));
+    }
+  }
+
+  /**
+   * Add a row which denotes an active processor to the processor table.
+   * @param jmVersion Job model version that the processor is operating on.
+   * @param pid Unique processor ID.
+   * @param liveness Random heartbeat value.
+   * @param isLeader Denotes whether the processor is a leader or not.
+   * @throws AzureException If an Azure storage service error occurred.
+   */
+  public void addProcessorEntity(String jmVersion, String pid, int liveness, 
boolean isLeader) {
+    ProcessorEntity entity = new ProcessorEntity(jmVersion, pid);
+    entity.setIsLeader(isLeader);
+    entity.setLiveness(liveness);
+    TableOperation add = TableOperation.insert(entity);
+    try {
+      table.execute(add);
+    } catch (StorageException e) {
+      LOG.error("Azure storage exception while adding processor entity with 
job model version: " + jmVersion + "and pid: " + pid, e);
+      throw new AzureException(e);
+    }
+  }
+
+  /**
+   * Retrieve a particular row in the processor table, given the partition key 
and the row key.
+   * @param jmVersion Job model version of the processor row to be retrieved.
+   * @param pid Unique processor ID of the processor row to be retrieved.
+   * @return An instance of required processor entity. Null if does not exist.
+   * @throws AzureException If an Azure storage service error occurred.
+   */
+  public ProcessorEntity getEntity(String jmVersion, String pid) {
+    try {
+      TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, 
ProcessorEntity.class);
+      ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
+      return entity;
+    } catch (StorageException e) {
+      LOG.error("Azure storage exception while retrieving processor entity 
with job model version: " + jmVersion + "and pid: " + pid, e);
+      throw new AzureException(e);
+    }
+  }
+
+  /**
+   * Updates the liveness value of a particular processor with a randomly 
generated integer, which in turn updates the last modified since timestamp of 
the row.
+   * @param jmVersion Job model version of the processor row to be updated.
+   * @param pid Unique processor ID of the processor row to be updated.
+   */
+  public void updateHeartbeat(String jmVersion, String pid) {
+    try {
+      Random rand = new Random();
+      int value = rand.nextInt(10000) + 2;
+      TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, 
ProcessorEntity.class);
+      ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
+      entity.setLiveness(value);
+      TableOperation update = TableOperation.replace(entity);
+      table.execute(update);
+    } catch (StorageException e) {
+      LOG.error("Azure storage exception while updating heartbeat for job 
model version: " + jmVersion + "and pid: " + pid, e);
+    }
+  }
+
+  /**
+   * Updates the isLeader value when the processor starts or stops being a 
leader.
+   * @param jmVersion Job model version of the processor row to be updated.
+   * @param pid Unique processor ID of the processor row to be updated.
+   * @param isLeader Denotes whether the processor is a leader or not.
+   * @throws AzureException If an Azure storage service error occurred.
+   */
+  public void updateIsLeader(String jmVersion, String pid, boolean isLeader) {
+    try {
+      TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, 
ProcessorEntity.class);
+      ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
+      entity.setIsLeader(isLeader);
+      TableOperation update = TableOperation.replace(entity);
+      table.execute(update);
+    } catch (StorageException e) {
+      LOG.error("Azure storage exception while updating isLeader value for job 
model version: " + jmVersion + "and pid: " + pid, e);
+      throw new AzureException(e);
+    }
+  }
+
+  /**
+   * Deletes a specified row in the processor table.
+   * @param jmVersion Job model version of the processor row to be deleted.
+   * @param pid Unique processor ID of the processor row to be deleted.
+   * @throws AzureException If an Azure storage service error occurred.
+   */
+  public void deleteProcessorEntity(String jmVersion, String pid) {
+    try {
+      TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, 
ProcessorEntity.class);
+      ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
+      TableOperation remove = TableOperation.delete(entity);
+      table.execute(remove);
+    } catch (StorageException e) {
+      LOG.error("Azure storage exception while deleting processor entity with 
job model version: " + jmVersion + "and pid: " + pid, e);
+      throw new AzureException(e);
+    }
+  }
+
+  /**
+   * Retrieve all rows in a table with the given partition key.
+   * @param partitionKey Job model version of the processors to be retrieved.
+   * @return Iterable list of processor entities.
+   */
+  public Iterable<ProcessorEntity> getEntitiesWithPartition(String 
partitionKey) {
+    String partitionFilter = TableQuery.generateFilterCondition(PARTITION_KEY, 
TableQuery.QueryComparisons.EQUAL, partitionKey);
+    TableQuery<ProcessorEntity> partitionQuery = 
TableQuery.from(ProcessorEntity.class).where(partitionFilter);
+    return table.execute(partitionQuery);
+  }
+
+  /**
+   * Gets the list of all active processors that are heartbeating to the 
processor table.
+   * @param currentJMVersion Current job model version that the processors in 
the application are operating on.
+   * @return List of ids of currently active processors in the application, 
retrieved from the processor table.
+   */
+  public Set<String> getActiveProcessorsList(AtomicReference<String> 
currentJMVersion) {
+    Iterable<ProcessorEntity> tableList = 
getEntitiesWithPartition(currentJMVersion.get());
+    Set<String> activeProcessorsList = new HashSet<>();
+    for (ProcessorEntity entity: tableList) {
+      if (System.currentTimeMillis() - entity.getTimestamp().getTime() <= 
CHECK_LIVENESS_DELAY * 1000) {
+        activeProcessorsList.add(entity.getRowKey());
+      }
+    }
+
+    Iterable<ProcessorEntity> unassignedList = 
getEntitiesWithPartition(INITIAL_STATE);
+    for (ProcessorEntity entity: unassignedList) {
+      if (System.currentTimeMillis() - entity.getTimestamp().getTime() <= 
CHECK_LIVENESS_DELAY * 1000) {
+        activeProcessorsList.add(entity.getRowKey());
+      }
+    }
+    return activeProcessorsList;
+  }
+
+  public CloudTable getTable() {
+    return table;
+  }
+
+}
\ No newline at end of file

Reply via email to