Repository: samza
Updated Branches:
  refs/heads/master 1d253c757 -> c3b447ecb


http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
 
b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
new file mode 100644
index 0000000..ded014f
--- /dev/null
+++ 
b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.samza.coordinator.data.BarrierState;
+import org.apache.samza.util.BlobUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Scheduler invoked by each processor to check for job model version upgrades 
on the blob.
+ * Checks every 5 seconds.
+ * The processor polls the leader blob in order to track this.
+ * All time units are in SECONDS.
+ */
+public class JMVersionUpgradeScheduler implements TaskScheduler {
+  private static final Logger LOG = 
LoggerFactory.getLogger(JMVersionUpgradeScheduler.class);
+  private static final long JMV_UPGRADE_DELAY_SEC = 5;
+  private static final ThreadFactory
+      PROCESSOR_THREAD_FACTORY = new 
ThreadFactoryBuilder().setNameFormat("JMVersionUpgradeScheduler-%d").build();
+  private final ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY);
+  private final BlobUtils blob;
+  private final AtomicReference<String> currentJMVersion;
+  private final AtomicBoolean versionUpgradeDetected;
+  private final String processorId;
+  private final Consumer<String> errorHandler;
+  private SchedulerStateChangeListener listener = null;
+
+  public JMVersionUpgradeScheduler(Consumer<String> errorHandler, BlobUtils 
blob,
+      AtomicReference<String> currentJMVersion, AtomicBoolean 
versionUpgradeDetected, String processorId) {
+    this.blob = blob;
+    this.currentJMVersion = currentJMVersion;
+    this.versionUpgradeDetected = versionUpgradeDetected;
+    this.processorId = processorId;
+    this.errorHandler = errorHandler;
+  }
+
+  @Override
+  public ScheduledFuture scheduleTask() {
+    return scheduler.scheduleWithFixedDelay(() -> {
+        try {
+          LOG.info("Checking for job model version upgrade");
+          // Read job model version from the blob.
+          String blobJMV = blob.getJobModelVersion();
+          LOG.info("Job Model Version seen on the blob: {}", blobJMV);
+          String blobBarrierState = blob.getBarrierState();
+          String currentJMV = currentJMVersion.get();
+          LOG.info("Current Job Model Version that the job coordinator is 
working on: {}", currentJMV);
+          String expectedBarrierState = BarrierState.START.toString() + " " + 
blobJMV;
+          List<String> processorList = blob.getLiveProcessorList();
+          // Check if the job model version on the blob is consistent with the 
job model version that the processor is operating on.
+          if (processorList != null && processorList.contains(processorId) && 
!currentJMV.equals(blobJMV) && blobBarrierState.equals(expectedBarrierState) && 
!versionUpgradeDetected.get()) {
+            listener.onStateChange();
+          }
+        } catch (Exception e) {
+          errorHandler.accept("Exception in Job Model Version Upgrade 
Scheduler. Stopping the processor...");
+        }
+      }, JMV_UPGRADE_DELAY_SEC, JMV_UPGRADE_DELAY_SEC, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void setStateChangeListener(SchedulerStateChangeListener listener) {
+    this.listener = listener;
+  }
+
+  @Override
+  public void shutdown() {
+    LOG.info("Shutting down JMVersionUpgradeScheduler Scheduler.");
+    scheduler.shutdownNow();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java
 
b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java
new file mode 100644
index 0000000..7386fa9
--- /dev/null
+++ 
b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.samza.coordinator.data.ProcessorEntity;
+import org.apache.samza.util.TableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Scheduler class invoked by the leader to check if the barrier has completed.
+ * Checks every 15 seconds.
+ * The leader polls the Azure processor table in order to track this.
+ * The barrier is completed if all processors that are listed alive on the 
blob, have entries in the Azure table with the new job model version.
+ * All time units are in SECONDS.
+ */
+public class LeaderBarrierCompleteScheduler implements TaskScheduler {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LeaderBarrierCompleteScheduler.class);
+  private static final long BARRIER_REACHED_DELAY_SEC = 5;
+  private static final long BARRIER_TIMEOUT_SEC = 30;
+  private static final ThreadFactory
+      PROCESSOR_THREAD_FACTORY = new 
ThreadFactoryBuilder().setNameFormat("LeaderBarrierCompleteScheduler-%d").build();
+  private final ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY);
+  private final TableUtils table;
+  private final String nextJMVersion;
+  private final Set<String> blobProcessorSet;
+  private final long startTime;
+  private final AtomicBoolean barrierTimeout;
+  private final Consumer<String> errorHandler;
+  private final String processorId;
+  private final AtomicReference<String> currentJMVersion;
+  private SchedulerStateChangeListener listener = null;
+
+  public LeaderBarrierCompleteScheduler(Consumer<String> errorHandler, 
TableUtils table, String nextJMVersion,
+      List<String> blobProcessorList, long startTime, AtomicBoolean 
barrierTimeout, AtomicReference<String> currentJMVersion, final String pid) {
+    this.table = table;
+    this.nextJMVersion = nextJMVersion;
+    this.blobProcessorSet = new HashSet<>(blobProcessorList);
+    this.startTime = startTime;
+    this.barrierTimeout = barrierTimeout;
+    this.errorHandler = errorHandler;
+    this.processorId = pid;
+    this.currentJMVersion = currentJMVersion;
+  }
+
+  @Override
+  public ScheduledFuture scheduleTask() {
+    return scheduler.scheduleWithFixedDelay(() -> {
+        try {
+          if (!table.getEntity(currentJMVersion.get(), 
processorId).getIsLeader()) {
+            LOG.info("Not the leader anymore. Shutting down 
LeaderBarrierCompleteScheduler.");
+            barrierTimeout.getAndSet(true);
+            listener.onStateChange();
+          } else {
+            LOG.info("Leader checking for barrier state");
+            // Get processor IDs listed in the table that have the new job 
model verion.
+            Iterable<ProcessorEntity> tableList = 
table.getEntitiesWithPartition(nextJMVersion);
+            Set<String> tableProcessors = new HashSet<>();
+            for (ProcessorEntity entity : tableList) {
+              tableProcessors.add(entity.getRowKey());
+            }
+            LOG.info("List of live processors as seen on the blob = {}", 
blobProcessorSet);
+            LOG.info("List of live processors as seen in the table = {}", 
tableProcessors);
+            if ((System.currentTimeMillis() - startTime) > 
(BARRIER_TIMEOUT_SEC * 1000)) {
+              barrierTimeout.getAndSet(true);
+              listener.onStateChange();
+            } else if (blobProcessorSet.equals(tableProcessors)) {
+              listener.onStateChange();
+            }
+          }
+        } catch (Exception e) {
+          errorHandler.accept("Exception in LeaderBarrierCompleteScheduler. 
Stopping the processor...");
+        }
+      }, BARRIER_REACHED_DELAY_SEC, BARRIER_REACHED_DELAY_SEC, 
TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void setStateChangeListener(SchedulerStateChangeListener listener) {
+    this.listener = listener;
+  }
+
+  @Override
+  public void shutdown() {
+    LOG.info("Shutting down LeaderBarrierCompleteScheduler Scheduler.");
+    scheduler.shutdownNow();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java
 
b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java
new file mode 100644
index 0000000..e0fa448
--- /dev/null
+++ 
b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.samza.util.BlobUtils;
+import org.apache.samza.coordinator.data.ProcessorEntity;
+import org.apache.samza.util.TableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scheduler class invoked by each processor to check if the leader is alive.
+ * Checks every 30 seconds.
+ * If a processor row hasn't been updated since 30 seconds, the system assumes 
that the processor has died.
+ * All time units are in SECONDS.
+ */
+public class LeaderLivenessCheckScheduler implements TaskScheduler {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LeaderLivenessCheckScheduler.class);
+  private static final long LIVENESS_CHECK_DELAY_SEC = 10;
+  private static final long LIVENESS_DEBOUNCE_TIME_SEC = 30;
+  private static final ThreadFactory
+      PROCESSOR_THREAD_FACTORY = new 
ThreadFactoryBuilder().setNameFormat("LeaderLivenessCheckScheduler-%d").build();
+  private final ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY);
+  private final TableUtils table;
+  private final AtomicReference<String> currentJMVersion;
+  private final BlobUtils blob;
+  private final Consumer<String> errorHandler;
+  private final String initialState;
+  private SchedulerStateChangeListener listener = null;
+
+  public LeaderLivenessCheckScheduler(Consumer<String> errorHandler, 
TableUtils table, BlobUtils blob, AtomicReference<String> currentJMVersion, 
String initialState) {
+    this.table = table;
+    this.blob = blob;
+    this.currentJMVersion = currentJMVersion;
+    this.initialState = initialState;
+    this.errorHandler = errorHandler;
+  }
+
+  @Override
+  public ScheduledFuture scheduleTask() {
+    return scheduler.scheduleWithFixedDelay(() -> {
+        try {
+          LOG.info("Checking for leader liveness");
+          if (!checkIfLeaderAlive()) {
+            listener.onStateChange();
+          }
+        } catch (Exception e) {
+          errorHandler.accept("Exception in Leader Liveness Check Scheduler. 
Stopping the processor...");
+        }
+      }, LIVENESS_CHECK_DELAY_SEC, LIVENESS_CHECK_DELAY_SEC, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void setStateChangeListener(SchedulerStateChangeListener listener) {
+    this.listener = listener;
+  }
+
+  private boolean checkIfLeaderAlive() {
+    String currJMV = currentJMVersion.get();
+    String blobJMV = blob.getJobModelVersion();
+    //Get the leader processor row from the table.
+    Iterable<ProcessorEntity> tableList = 
table.getEntitiesWithPartition(currJMV);
+    ProcessorEntity leader = null, nextLeader = null;
+    for (ProcessorEntity entity: tableList) {
+      if (entity.getIsLeader()) {
+        leader = entity;
+        break;
+      }
+    }
+    int currJMVInt = 0;
+    if (!currJMV.equals(initialState)) {
+      currJMVInt = Integer.valueOf(currJMV);
+    }
+    if (Integer.valueOf(blobJMV) > currJMVInt) {
+      for (ProcessorEntity entity : table.getEntitiesWithPartition(blobJMV)) {
+        if (entity.getIsLeader()) {
+          nextLeader = entity;
+          break;
+        }
+      }
+    }
+    // Check if row hasn't been updated since 30 seconds.
+    if ((leader == null || (System.currentTimeMillis() - 
leader.getTimestamp().getTime() >= (
+        LIVENESS_DEBOUNCE_TIME_SEC * 1000))) && nextLeader == null) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public void shutdown() {
+    LOG.info("Shutting down LeaderLivenessCheckScheduler Scheduler.");
+    scheduler.shutdownNow();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java
 
b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java
new file mode 100644
index 0000000..d4715f3
--- /dev/null
+++ 
b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.samza.util.BlobUtils;
+import org.apache.samza.util.TableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Scheduler class invoked by the leader to check for changes in the list of 
live processors.
+ * Checks every 30 seconds.
+ * If a processor row hasn't been updated since 30 seconds, the system assumes 
that the processor has died.
+ * All time units are in SECONDS.
+ */
+public class LivenessCheckScheduler implements TaskScheduler {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LivenessCheckScheduler.class);
+  private static final long LIVENESS_CHECK_DELAY_SEC = 5;
+  private static final ThreadFactory
+      PROCESSOR_THREAD_FACTORY = new 
ThreadFactoryBuilder().setNameFormat("LivenessCheckScheduler-%d").build();
+  private final ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY);
+  private final TableUtils table;
+  private final BlobUtils blob;
+  private final AtomicReference<String> currentJMVersion;
+  private final AtomicReference<List<String>> liveProcessorsList = new 
AtomicReference<>(null);
+  private final Consumer<String> errorHandler;
+  private SchedulerStateChangeListener listener = null;
+  private final String processorId;
+
+  public LivenessCheckScheduler(Consumer<String> errorHandler, TableUtils 
table, BlobUtils blob, AtomicReference<String> currentJMVersion, final String 
pid) {
+    this.table = table;
+    this.blob = blob;
+    this.currentJMVersion = currentJMVersion;
+    this.errorHandler = errorHandler;
+    this.processorId = pid;
+  }
+
+  @Override
+  public ScheduledFuture scheduleTask() {
+    return scheduler.scheduleWithFixedDelay(() -> {
+        try {
+          if (!table.getEntity(currentJMVersion.get(), 
processorId).getIsLeader()) {
+            LOG.info("Not the leader anymore. Shutting down 
LivenessCheckScheduler.");
+            scheduler.shutdownNow();
+            return;
+          }
+          LOG.info("Checking for list of live processors");
+          //Get the list of live processors published on the blob.
+          Set<String> currProcessors = new 
HashSet<>(blob.getLiveProcessorList());
+          //Get the list of live processors from the table. This is the 
current system state.
+          Set<String> liveProcessors = 
table.getActiveProcessorsList(currentJMVersion);
+          //Invoke listener if the table list is not consistent with the blob 
list.
+          if (!liveProcessors.equals(currProcessors)) {
+            liveProcessorsList.getAndSet(new ArrayList<>(liveProcessors));
+            listener.onStateChange();
+          }
+        } catch (Exception e) {
+          errorHandler.accept("Exception in Liveness Check Scheduler. Stopping 
the processor...");
+        }
+      }, LIVENESS_CHECK_DELAY_SEC, LIVENESS_CHECK_DELAY_SEC, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void setStateChangeListener(SchedulerStateChangeListener listener) {
+    this.listener = listener;
+  }
+
+  public AtomicReference<List<String>> getLiveProcessors() {
+    return liveProcessorsList;
+  }
+
+  @Override
+  public void shutdown() {
+    LOG.info("Shutting down LivenessCheckScheduler Scheduler.");
+    scheduler.shutdownNow();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java
 
b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java
new file mode 100644
index 0000000..f158122
--- /dev/null
+++ 
b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.samza.util.LeaseBlobManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scheduler class to keep renewing the lease once an entity has acquired it.
+ * Renews every 45 seconds.
+ * All time units are in SECONDS.
+ */
+public class RenewLeaseScheduler implements TaskScheduler {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RenewLeaseScheduler.class);
+  private static final long RENEW_LEASE_DELAY_SEC = 45;
+  private static final ThreadFactory
+      PROCESSOR_THREAD_FACTORY = new 
ThreadFactoryBuilder().setNameFormat("RenewLeaseScheduler-%d").build();
+  private final ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY);
+  private final LeaseBlobManager leaseBlobManager;
+  private final AtomicReference<String> leaseId;
+  private final Consumer<String> errorHandler;
+
+  public RenewLeaseScheduler(Consumer<String> errorHandler, LeaseBlobManager 
leaseBlobManager, AtomicReference<String> leaseId) {
+    this.leaseBlobManager = leaseBlobManager;
+    this.leaseId = leaseId;
+    this.errorHandler = errorHandler;
+  }
+
+  @Override
+  public ScheduledFuture scheduleTask() {
+    return scheduler.scheduleWithFixedDelay(() -> {
+        try {
+          LOG.info("Renewing lease");
+          boolean status = leaseBlobManager.renewLease(leaseId.get());
+          if (!status) {
+            errorHandler.accept("Unable to renew lease. Continuing as 
non-leader.");
+          }
+        } catch (Exception e) {
+          errorHandler.accept("Exception in Renew Lease Scheduler. Continuing 
as non-leader.");
+        }
+      }, RENEW_LEASE_DELAY_SEC, RENEW_LEASE_DELAY_SEC, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void setStateChangeListener(SchedulerStateChangeListener listener) {}
+
+  @Override
+  public void shutdown() {
+    LOG.info("Shutting down RenewLeaseScheduler Scheduler.");
+    scheduler.shutdownNow();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/SchedulerStateChangeListener.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/SchedulerStateChangeListener.java
 
b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/SchedulerStateChangeListener.java
new file mode 100644
index 0000000..95fc4e1
--- /dev/null
+++ 
b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/SchedulerStateChangeListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+/**
+ * Listener interface for Azure Job Coordinator, to track state changes and 
take necessary actions.
+ */
+public interface SchedulerStateChangeListener {
+
+  void onStateChange();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/TaskScheduler.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/TaskScheduler.java
 
b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/TaskScheduler.java
new file mode 100644
index 0000000..63d6e24
--- /dev/null
+++ 
b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/TaskScheduler.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import java.util.concurrent.ScheduledFuture;
+
+
+/**
+ * Interface for scheduling tasks for Azure Job Coordinator.
+ */
+public interface TaskScheduler {
+
+  ScheduledFuture scheduleTask();
+
+  void setStateChangeListener(SchedulerStateChangeListener listener);
+
+  void shutdown();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/util/BlobUtils.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/util/BlobUtils.java 
b/samza-azure/src/main/java/org/apache/samza/util/BlobUtils.java
new file mode 100644
index 0000000..85e4273
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/util/BlobUtils.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import com.microsoft.azure.storage.AccessCondition;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudPageBlob;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.samza.AzureClient;
+import org.apache.samza.AzureException;
+import org.apache.samza.coordinator.data.JobModelBundle;
+import org.apache.samza.SamzaException;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.eclipse.jetty.http.HttpStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Client side class that has reference to Azure blob storage.
+ * Used for writing and reading from the blob.
+ * Every write requires a valid lease ID.
+ */
+public class BlobUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BlobUtils.class);
+  private static final long JOB_MODEL_BLOCK_SIZE = 1024000;
+  private static final long BARRIER_STATE_BLOCK_SIZE = 1024;
+  private static final long PROCESSOR_LIST_BLOCK_SIZE = 1024;
+  private CloudBlobClient blobClient;
+  private CloudBlobContainer container;
+  private CloudPageBlob blob;
+
+  /**
+   * Creates an object of BlobUtils. It creates the container and page blob if 
they don't exist already.
+   * @param client Client handle for access to Azure Storage account.
+   * @param containerName Name of container inside which we want the blob to 
reside.
+   * @param blobName Name of the blob to be managed.
+   * @param length Length of the page blob.
+   * @throws AzureException If an Azure storage service error occurred, or 
when the container name or blob name is invalid.
+   */
+  public BlobUtils(AzureClient client, String containerName, String blobName, 
long length) {
+    this.blobClient = client.getBlobClient();
+    try {
+      this.container = blobClient.getContainerReference(containerName);
+      container.createIfNotExists();
+      this.blob = container.getPageBlobReference(blobName);
+      if (!blob.exists()) {
+        blob.create(length, AccessCondition.generateIfNotExistsCondition(), 
null, null);
+      }
+    } catch (URISyntaxException e) {
+      LOG.error("Container name: " + containerName + " or blob name: " + 
blobName + " invalid.", e);
+      throw new AzureException(e);
+    } catch (StorageException e) {
+      int httpStatusCode = e.getHttpStatusCode();
+      if (httpStatusCode == HttpStatus.CONFLICT_409) {
+        LOG.info("The blob you're trying to create exists already.", e);
+      } else {
+        LOG.error("Azure Storage Exception!", e);
+        throw new AzureException(e);
+      }
+    }
+  }
+
+  /**
+   * Writes the job model to the blob.
+   * Write is successful only if the lease ID passed is valid and the 
processor holds the lease.
+   * Called by the leader.
+   * @param prevJM Previous job model version that the processor was operating 
on.
+   * @param currJM Current job model version that the processor is operating 
on.
+   * @param prevJMV Previous job model version that the processor was 
operating on.
+   * @param currJMV Current job model version that the processor is operating 
on.
+   * @param leaseId LeaseID of the lease that the processor holds on the blob. 
Null if there is no lease.
+   * @return true if write to the blob is successful, false if leaseID is null 
or an Azure storage service error or IO exception occurred.
+   */
+  public boolean publishJobModel(JobModel prevJM, JobModel currJM, String 
prevJMV, String currJMV, String leaseId) {
+    try {
+      if (leaseId == null) {
+        return false;
+      }
+      JobModelBundle bundle = new JobModelBundle(prevJM, currJM, prevJMV, 
currJMV);
+      byte[] data = 
SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(bundle);
+      byte[] pageData = Arrays.copyOf(data, (int) JOB_MODEL_BLOCK_SIZE);
+      InputStream is = new ByteArrayInputStream(pageData);
+      blob.uploadPages(is, 0, JOB_MODEL_BLOCK_SIZE, 
AccessCondition.generateLeaseCondition(leaseId), null, null);
+      LOG.info("Uploaded {} jobModel to blob", bundle.getCurrJobModel());
+      return true;
+    } catch (StorageException | IOException e) {
+      LOG.error("JobModel publish failed for version = " + currJMV, e);
+      return false;
+    }
+  }
+
+  /**
+   * Reads the current job model from the blob.
+   * @return The current job model published on the blob. Returns null when 
job model details not found on the blob.
+   * @throws AzureException in getJobModelBundle() if an Azure storage service 
error occurred.
+   * @throws SamzaException in getJobModelBundle() if data retrieved from blob 
could not be parsed by SamzaObjectMapper.
+   */
+  public JobModel getJobModel() {
+    LOG.info("Reading the job model from blob.");
+    JobModelBundle jmBundle = getJobModelBundle();
+    if (jmBundle == null) {
+      LOG.error("Job Model details don't exist on the blob.");
+      return null;
+    }
+    JobModel jm = jmBundle.getCurrJobModel();
+    return jm;
+  }
+
+  /**
+   * Reads the current job model version from the blob .
+   * @return Current job model version published on the blob. Returns null 
when job model details not found on the blob.
+   * @throws AzureException in getJobModelBundle() if an Azure storage service 
error occurred.
+   * @throws SamzaException in getJobModelBundle() if data retrieved from blob 
could not be parsed by SamzaObjectMapper.
+   */
+  public String getJobModelVersion() {
+    LOG.info("Reading the job model version from blob.");
+    JobModelBundle jmBundle = getJobModelBundle();
+    if (jmBundle == null) {
+      LOG.error("Job Model details don't exist on the blob.");
+      return null;
+    }
+    String jmVersion = jmBundle.getCurrJobModelVersion();
+    return jmVersion;
+  }
+
+  /**
+   * Writes the barrier state to the blob.
+   * Write is successful only if the lease ID passed is valid and the 
processor holds the lease.
+   * Called only by the leader.
+   * @param state Barrier state to be published to the blob.
+   * @param leaseId LeaseID of the valid lease that the processor holds on the 
blob. Null if there is no lease.
+   * @return true if write to the blob is successful, false if leaseID is null 
or an Azure storage service error or IO exception occurred.
+   */
+  public boolean publishBarrierState(String state, String leaseId) {
+    try {
+      if (leaseId == null) {
+        return false;
+      }
+      byte[] data = 
SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(state);
+      byte[] pageData = Arrays.copyOf(data, (int) BARRIER_STATE_BLOCK_SIZE);
+      InputStream is = new ByteArrayInputStream(pageData);
+
+      //uploadPages is only successful when the AccessCondition provided has 
an active and valid lease ID. It fails otherwise.
+      blob.uploadPages(is, JOB_MODEL_BLOCK_SIZE, BARRIER_STATE_BLOCK_SIZE, 
AccessCondition.generateLeaseCondition(leaseId), null, null);
+      LOG.info("Uploaded barrier state {} to blob", state);
+      return true;
+    } catch (StorageException | IOException e) {
+      LOG.error("Barrier state " + state + " publish failed", e);
+      return false;
+    }
+  }
+
+  /**
+   * Reads the current barrier state from the blob.
+   * @return Barrier state published on the blob.
+   * @throws AzureException If an Azure storage service error occurred.
+   * @throws SamzaException If data retrieved from blob could not be parsed by 
SamzaObjectMapper.
+   */
+  public String getBarrierState() {
+    LOG.info("Reading the barrier state from blob.");
+    byte[] data = new byte[(int) BARRIER_STATE_BLOCK_SIZE];
+    try {
+      blob.downloadRangeToByteArray(JOB_MODEL_BLOCK_SIZE, 
BARRIER_STATE_BLOCK_SIZE, data, 0);
+    } catch (StorageException e) {
+      LOG.error("Failed to read barrier state from blob.", e);
+      throw new AzureException(e);
+    }
+    String state;
+    try {
+      state = SamzaObjectMapper.getObjectMapper().readValue(data, 
String.class);
+    } catch (IOException e) {
+      LOG.error("Failed to parse byte data: " + data + " for barrier state 
retrieved from the blob.", e);
+      throw new SamzaException(e);
+    }
+    return state;
+  }
+
+  /**
+   * Writes the list of live processors in the system to the blob.
+   * Write is successful only if the lease ID passed is valid and the 
processor holds the lease.
+   * Called only by the leader.
+   * @param processors List of live processors to be published on the blob.
+   * @param leaseId LeaseID of the valid lease that the processor holds on the 
blob. Null if there is no lease.
+   * @return true if write to the blob is successful, false if leaseID is null 
or an Azure storage service error or IO exception occurred.
+   */
+  public boolean publishLiveProcessorList(List<String> processors, String 
leaseId) {
+    try {
+      if (leaseId == null) {
+        return false;
+      }
+      byte[] data = 
SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(processors);
+      byte[] pageData = Arrays.copyOf(data, (int) BARRIER_STATE_BLOCK_SIZE);
+      InputStream is = new ByteArrayInputStream(pageData);
+      blob.uploadPages(is, JOB_MODEL_BLOCK_SIZE + BARRIER_STATE_BLOCK_SIZE, 
PROCESSOR_LIST_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), 
null, null);
+      LOG.info("Uploaded list of live processors to blob.");
+      return true;
+    } catch (StorageException | IOException e) {
+      LOG.error("Processor list: " + processors + "publish failed", e);
+      return false;
+    }
+  }
+
+  /**
+   * Reads the list of live processors published on the blob.
+   * @return String list of live processors.
+   * @throws AzureException If an Azure storage service error occurred.
+   * @throws SamzaException If data retrieved from blob could not be parsed by 
SamzaObjectMapper.
+   */
+  public List<String> getLiveProcessorList() {
+    LOG.info("Read the the list of live processors from blob.");
+    byte[] data = new byte[(int) PROCESSOR_LIST_BLOCK_SIZE];
+    try {
+      blob.downloadRangeToByteArray(JOB_MODEL_BLOCK_SIZE + 
BARRIER_STATE_BLOCK_SIZE, PROCESSOR_LIST_BLOCK_SIZE, data, 0);
+    } catch (StorageException e) {
+      LOG.error("Failed to read the list of live processors from the blob.", 
new AzureException(e));
+      throw new AzureException(e);
+    }
+    List<String> list;
+    try {
+      list = SamzaObjectMapper.getObjectMapper().readValue(data, List.class);
+    } catch (IOException e) {
+      LOG.error("Failed to parse byte data: " + data + " for live processor 
list retrieved from the blob", new SamzaException(e));
+      throw new SamzaException(e);
+    }
+    return list;
+  }
+
+  public CloudBlobClient getBlobClient() {
+    return this.blobClient;
+  }
+
+  public CloudBlobContainer getBlobContainer() {
+    return this.container;
+  }
+
+  public CloudPageBlob getBlob() {
+    return this.blob;
+  }
+
+  private JobModelBundle getJobModelBundle() {
+    byte[] data = new byte[(int) JOB_MODEL_BLOCK_SIZE];
+    try {
+      blob.downloadRangeToByteArray(0, JOB_MODEL_BLOCK_SIZE, data, 0);
+    } catch (StorageException e) {
+      LOG.error("Failed to read JobModel details from the blob.", e);
+      throw new AzureException(e);
+    }
+    try {
+      JobModelBundle jmBundle = 
SamzaObjectMapper.getObjectMapper().readValue(data, JobModelBundle.class);
+      return jmBundle;
+    } catch (IOException e) {
+      LOG.error("Failed to parse byte data: " + data + " for JobModel details 
retrieved from the blob", e);
+      throw new SamzaException(e);
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/util/LeaseBlobManager.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/util/LeaseBlobManager.java 
b/samza-azure/src/main/java/org/apache/samza/util/LeaseBlobManager.java
new file mode 100644
index 0000000..dffb7ae
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/util/LeaseBlobManager.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import com.microsoft.azure.storage.AccessCondition;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudPageBlob;
+import org.apache.samza.AzureException;
+import org.eclipse.jetty.http.HttpStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Helper class for lease blob operations.
+ */
+public class LeaseBlobManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LeaseBlobManager.class);
+  private final CloudPageBlob leaseBlob;
+
+  public LeaseBlobManager(CloudPageBlob leaseBlob) {
+    this.leaseBlob = leaseBlob;
+  }
+
+  /**
+   * Acquires a lease on a blob. The lease ID is NULL initially.
+   * @param leaseTimeInSec The time in seconds you want to acquire the lease 
for.
+   * @param leaseId Proposed ID you want to acquire the lease with, null if 
not proposed.
+   * @return String that represents lease ID.  Null if acquireLease is 
unsuccessful because the blob is leased already.
+   * @throws AzureException If a Azure storage service error occurred. This 
includes the case where the blob you're trying to lease does not exist.
+   */
+  public String acquireLease(int leaseTimeInSec, String leaseId) {
+    try {
+      String id = leaseBlob.acquireLease(leaseTimeInSec, leaseId);
+      LOG.info("Acquired lease with lease id = " + id);
+      return id;
+    } catch (StorageException storageException) {
+      int httpStatusCode = storageException.getHttpStatusCode();
+      if (httpStatusCode == HttpStatus.CONFLICT_409) {
+        LOG.info("The blob you're trying to acquire is leased already.", 
storageException.getMessage());
+      } else if (httpStatusCode == HttpStatus.NOT_FOUND_404) {
+        LOG.error("The blob you're trying to lease does not exist.", 
storageException);
+        throw new AzureException(storageException);
+      } else {
+        LOG.error("Error acquiring lease!", storageException);
+        throw new AzureException(storageException);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Renews the lease on the blob.
+   * @param leaseId ID of the lease to be renewed.
+   * @return True if lease was renewed successfully, false otherwise.
+   */
+  public boolean renewLease(String leaseId) {
+    try {
+      leaseBlob.renewLease(AccessCondition.generateLeaseCondition(leaseId));
+      return true;
+    } catch (StorageException storageException) {
+      LOG.error("Wasn't able to renew lease with lease id: " + leaseId, 
storageException);
+      return false;
+    }
+  }
+
+  /**
+   * Releases the lease on the blob.
+   * @param leaseId ID of the lease to be released.
+   * @return True if released successfully, false otherwise.
+   */
+  public boolean releaseLease(String leaseId) {
+    try {
+      leaseBlob.releaseLease(AccessCondition.generateLeaseCondition(leaseId));
+      return true;
+    } catch (StorageException storageException) {
+      LOG.error("Wasn't able to release lease with lease id: " + leaseId, 
storageException);
+      return false;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java 
b/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java
new file mode 100644
index 0000000..f49ce27
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.table.CloudTable;
+import com.microsoft.azure.storage.table.CloudTableClient;
+import com.microsoft.azure.storage.table.TableOperation;
+import com.microsoft.azure.storage.table.TableQuery;
+import java.net.URISyntaxException;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.samza.AzureClient;
+import org.apache.samza.AzureException;
+import org.apache.samza.coordinator.data.ProcessorEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ *  Client side class that has a reference to Azure Table Storage.
+ *  Enables the user to add or delete information from the table, make updates 
to the table and retrieve information from the table.
+ *  Every row in a table is uniquely identified by a combination of the 
PARTIITON KEY and ROW KEY.
+ *  PARTITION KEY = Group ID = Job Model Version (for this case).
+ *  ROW KEY = Unique entity ID for a group = Processor ID (for this case).
+ */
+public class TableUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TableUtils.class);
+  private static final String PARTITION_KEY = "PartitionKey";
+  private static final long LIVENESS_DEBOUNCE_TIME_SEC = 30;
+  private final String initialState;
+  private final CloudTableClient tableClient;
+  private final CloudTable table;
+
+  public TableUtils(AzureClient client, String tableName, String initialState) 
{
+    this.initialState = initialState;
+    tableClient = client.getTableClient();
+    try {
+      table = tableClient.getTableReference(tableName);
+      table.createIfNotExists();
+    } catch (URISyntaxException e) {
+      LOG.error("\nConnection string specifies an invalid URI.", e);
+      throw new AzureException(e);
+    } catch (StorageException e) {
+      LOG.error("Azure storage exception.", e);
+      throw new AzureException(e);
+    }
+  }
+
+  /**
+   * Add a row which denotes an active processor to the processor table.
+   * @param jmVersion Job model version that the processor is operating on.
+   * @param pid Unique processor ID.
+   * @param isLeader Denotes whether the processor is a leader or not.
+   * @throws AzureException If an Azure storage service error occurred.
+   */
+  public void addProcessorEntity(String jmVersion, String pid, boolean 
isLeader) {
+    ProcessorEntity entity = new ProcessorEntity(jmVersion, pid);
+    entity.setIsLeader(isLeader);
+    entity.updateLiveness();
+    TableOperation add = TableOperation.insert(entity);
+    try {
+      table.execute(add);
+    } catch (StorageException e) {
+      LOG.error("Azure storage exception while adding processor entity with 
job model version: " + jmVersion + "and pid: " + pid, e);
+      throw new AzureException(e);
+    }
+  }
+
+  /**
+   * Retrieve a particular row in the processor table, given the partition key 
and the row key.
+   * @param jmVersion Job model version of the processor row to be retrieved.
+   * @param pid Unique processor ID of the processor row to be retrieved.
+   * @return An instance of required processor entity. Null if does not exist.
+   * @throws AzureException If an Azure storage service error occurred.
+   */
+  public ProcessorEntity getEntity(String jmVersion, String pid) {
+    try {
+      TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, 
ProcessorEntity.class);
+      ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
+      return entity;
+    } catch (StorageException e) {
+      LOG.error("Azure storage exception while retrieving processor entity 
with job model version: " + jmVersion + "and pid: " + pid, e);
+      throw new AzureException(e);
+    }
+  }
+
+  /**
+   * Updates the liveness value of a particular processor with a randomly 
generated integer, which in turn updates the last modified since timestamp of 
the row.
+   * @param jmVersion Job model version of the processor row to be updated.
+   * @param pid Unique processor ID of the processor row to be updated.
+   */
+  public void updateHeartbeat(String jmVersion, String pid) {
+    try {
+      Random rand = new Random();
+      TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, 
ProcessorEntity.class);
+      ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
+      entity.updateLiveness();
+      TableOperation update = TableOperation.replace(entity);
+      table.execute(update);
+    } catch (StorageException e) {
+      LOG.error("Azure storage exception while updating heartbeat for job 
model version: " + jmVersion + "and pid: " + pid, e);
+    }
+  }
+
+  /**
+   * Updates the isLeader value when the processor starts or stops being a 
leader.
+   * @param jmVersion Job model version of the processor row to be updated.
+   * @param pid Unique processor ID of the processor row to be updated.
+   * @param isLeader Denotes whether the processor is a leader or not.
+   * @throws AzureException If an Azure storage service error occurred.
+   */
+  public void updateIsLeader(String jmVersion, String pid, boolean isLeader) {
+    try {
+      TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, 
ProcessorEntity.class);
+      ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
+      entity.setIsLeader(isLeader);
+      TableOperation update = TableOperation.replace(entity);
+      table.execute(update);
+    } catch (StorageException e) {
+      LOG.error("Azure storage exception while updating isLeader value for job 
model version: " + jmVersion + "and pid: " + pid, e);
+      throw new AzureException(e);
+    }
+  }
+
+  /**
+   * Deletes a specified row in the processor table.
+   * @param jmVersion Job model version of the processor row to be deleted.
+   * @param pid Unique processor ID of the processor row to be deleted.
+   * @throws AzureException If an Azure storage service error occurred.
+   */
+  public void deleteProcessorEntity(String jmVersion, String pid) {
+    try {
+      TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, 
ProcessorEntity.class);
+      ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
+      TableOperation remove = TableOperation.delete(entity);
+      table.execute(remove);
+    } catch (StorageException e) {
+      LOG.error("Azure storage exception while deleting processor entity with 
job model version: " + jmVersion + "and pid: " + pid, e);
+      throw new AzureException(e);
+    }
+  }
+
+  /**
+   * Retrieve all rows in a table with the given partition key.
+   * @param partitionKey Job model version of the processors to be retrieved.
+   * @return Iterable list of processor entities.
+   */
+  public Iterable<ProcessorEntity> getEntitiesWithPartition(String 
partitionKey) {
+    String partitionFilter = TableQuery.generateFilterCondition(PARTITION_KEY, 
TableQuery.QueryComparisons.EQUAL, partitionKey);
+    TableQuery<ProcessorEntity> partitionQuery = 
TableQuery.from(ProcessorEntity.class).where(partitionFilter);
+    return table.execute(partitionQuery);
+  }
+
+  /**
+   * Gets the list of all active processors that are heartbeating to the 
processor table.
+   * @param currentJMVersion Current job model version that the processors in 
the application are operating on.
+   * @return List of ids of currently active processors in the application, 
retrieved from the processor table.
+   */
+  public Set<String> getActiveProcessorsList(AtomicReference<String> 
currentJMVersion) {
+    Iterable<ProcessorEntity> tableList = 
getEntitiesWithPartition(currentJMVersion.get());
+    Set<String> activeProcessorsList = new HashSet<>();
+    for (ProcessorEntity entity: tableList) {
+      if (System.currentTimeMillis() - entity.getTimestamp().getTime() <= 
(LIVENESS_DEBOUNCE_TIME_SEC * 1000)) {
+        activeProcessorsList.add(entity.getRowKey());
+      }
+    }
+
+    Iterable<ProcessorEntity> unassignedList = 
getEntitiesWithPartition(initialState);
+    for (ProcessorEntity entity: unassignedList) {
+      long temp = System.currentTimeMillis() - entity.getTimestamp().getTime();
+      LOG.info("Time elapsed since last heartbeat: {}", temp);
+      if (temp <= (LIVENESS_DEBOUNCE_TIME_SEC * 1000)) {
+        activeProcessorsList.add(entity.getRowKey());
+      }
+    }
+    LOG.info("Active processors list: {}", activeProcessorsList);
+    return activeProcessorsList;
+  }
+
+  public CloudTable getTable() {
+    return table;
+  }
+
+}
\ No newline at end of file

Reply via email to