Repository: samza Updated Branches: refs/heads/master 1d253c757 -> c3b447ecb
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java new file mode 100644 index 0000000..ded014f --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator.scheduler; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import org.apache.samza.coordinator.data.BarrierState; +import org.apache.samza.util.BlobUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Scheduler invoked by each processor to check for job model version upgrades on the blob. + * Checks every 5 seconds. + * The processor polls the leader blob in order to track this. + * All time units are in SECONDS. + */ +public class JMVersionUpgradeScheduler implements TaskScheduler { + private static final Logger LOG = LoggerFactory.getLogger(JMVersionUpgradeScheduler.class); + private static final long JMV_UPGRADE_DELAY_SEC = 5; + private static final ThreadFactory + PROCESSOR_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("JMVersionUpgradeScheduler-%d").build(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY); + private final BlobUtils blob; + private final AtomicReference<String> currentJMVersion; + private final AtomicBoolean versionUpgradeDetected; + private final String processorId; + private final Consumer<String> errorHandler; + private SchedulerStateChangeListener listener = null; + + public JMVersionUpgradeScheduler(Consumer<String> errorHandler, BlobUtils blob, + AtomicReference<String> currentJMVersion, AtomicBoolean versionUpgradeDetected, String processorId) { + this.blob = blob; + this.currentJMVersion = currentJMVersion; + this.versionUpgradeDetected = versionUpgradeDetected; + this.processorId = processorId; + this.errorHandler = errorHandler; + } + + @Override + public ScheduledFuture scheduleTask() { + return scheduler.scheduleWithFixedDelay(() -> { + try { + LOG.info("Checking for job model version upgrade"); + // Read job model version from the blob. + String blobJMV = blob.getJobModelVersion(); + LOG.info("Job Model Version seen on the blob: {}", blobJMV); + String blobBarrierState = blob.getBarrierState(); + String currentJMV = currentJMVersion.get(); + LOG.info("Current Job Model Version that the job coordinator is working on: {}", currentJMV); + String expectedBarrierState = BarrierState.START.toString() + " " + blobJMV; + List<String> processorList = blob.getLiveProcessorList(); + // Check if the job model version on the blob is consistent with the job model version that the processor is operating on. + if (processorList != null && processorList.contains(processorId) && !currentJMV.equals(blobJMV) && blobBarrierState.equals(expectedBarrierState) && !versionUpgradeDetected.get()) { + listener.onStateChange(); + } + } catch (Exception e) { + errorHandler.accept("Exception in Job Model Version Upgrade Scheduler. Stopping the processor..."); + } + }, JMV_UPGRADE_DELAY_SEC, JMV_UPGRADE_DELAY_SEC, TimeUnit.SECONDS); + } + + @Override + public void setStateChangeListener(SchedulerStateChangeListener listener) { + this.listener = listener; + } + + @Override + public void shutdown() { + LOG.info("Shutting down JMVersionUpgradeScheduler Scheduler."); + scheduler.shutdownNow(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java new file mode 100644 index 0000000..7386fa9 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator.scheduler; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import org.apache.samza.coordinator.data.ProcessorEntity; +import org.apache.samza.util.TableUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Scheduler class invoked by the leader to check if the barrier has completed. + * Checks every 15 seconds. + * The leader polls the Azure processor table in order to track this. + * The barrier is completed if all processors that are listed alive on the blob, have entries in the Azure table with the new job model version. + * All time units are in SECONDS. + */ +public class LeaderBarrierCompleteScheduler implements TaskScheduler { + + private static final Logger LOG = LoggerFactory.getLogger(LeaderBarrierCompleteScheduler.class); + private static final long BARRIER_REACHED_DELAY_SEC = 5; + private static final long BARRIER_TIMEOUT_SEC = 30; + private static final ThreadFactory + PROCESSOR_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("LeaderBarrierCompleteScheduler-%d").build(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY); + private final TableUtils table; + private final String nextJMVersion; + private final Set<String> blobProcessorSet; + private final long startTime; + private final AtomicBoolean barrierTimeout; + private final Consumer<String> errorHandler; + private final String processorId; + private final AtomicReference<String> currentJMVersion; + private SchedulerStateChangeListener listener = null; + + public LeaderBarrierCompleteScheduler(Consumer<String> errorHandler, TableUtils table, String nextJMVersion, + List<String> blobProcessorList, long startTime, AtomicBoolean barrierTimeout, AtomicReference<String> currentJMVersion, final String pid) { + this.table = table; + this.nextJMVersion = nextJMVersion; + this.blobProcessorSet = new HashSet<>(blobProcessorList); + this.startTime = startTime; + this.barrierTimeout = barrierTimeout; + this.errorHandler = errorHandler; + this.processorId = pid; + this.currentJMVersion = currentJMVersion; + } + + @Override + public ScheduledFuture scheduleTask() { + return scheduler.scheduleWithFixedDelay(() -> { + try { + if (!table.getEntity(currentJMVersion.get(), processorId).getIsLeader()) { + LOG.info("Not the leader anymore. Shutting down LeaderBarrierCompleteScheduler."); + barrierTimeout.getAndSet(true); + listener.onStateChange(); + } else { + LOG.info("Leader checking for barrier state"); + // Get processor IDs listed in the table that have the new job model verion. + Iterable<ProcessorEntity> tableList = table.getEntitiesWithPartition(nextJMVersion); + Set<String> tableProcessors = new HashSet<>(); + for (ProcessorEntity entity : tableList) { + tableProcessors.add(entity.getRowKey()); + } + LOG.info("List of live processors as seen on the blob = {}", blobProcessorSet); + LOG.info("List of live processors as seen in the table = {}", tableProcessors); + if ((System.currentTimeMillis() - startTime) > (BARRIER_TIMEOUT_SEC * 1000)) { + barrierTimeout.getAndSet(true); + listener.onStateChange(); + } else if (blobProcessorSet.equals(tableProcessors)) { + listener.onStateChange(); + } + } + } catch (Exception e) { + errorHandler.accept("Exception in LeaderBarrierCompleteScheduler. Stopping the processor..."); + } + }, BARRIER_REACHED_DELAY_SEC, BARRIER_REACHED_DELAY_SEC, TimeUnit.SECONDS); + } + + @Override + public void setStateChangeListener(SchedulerStateChangeListener listener) { + this.listener = listener; + } + + @Override + public void shutdown() { + LOG.info("Shutting down LeaderBarrierCompleteScheduler Scheduler."); + scheduler.shutdownNow(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java new file mode 100644 index 0000000..e0fa448 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator.scheduler; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import org.apache.samza.util.BlobUtils; +import org.apache.samza.coordinator.data.ProcessorEntity; +import org.apache.samza.util.TableUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Scheduler class invoked by each processor to check if the leader is alive. + * Checks every 30 seconds. + * If a processor row hasn't been updated since 30 seconds, the system assumes that the processor has died. + * All time units are in SECONDS. + */ +public class LeaderLivenessCheckScheduler implements TaskScheduler { + private static final Logger LOG = LoggerFactory.getLogger(LeaderLivenessCheckScheduler.class); + private static final long LIVENESS_CHECK_DELAY_SEC = 10; + private static final long LIVENESS_DEBOUNCE_TIME_SEC = 30; + private static final ThreadFactory + PROCESSOR_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("LeaderLivenessCheckScheduler-%d").build(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY); + private final TableUtils table; + private final AtomicReference<String> currentJMVersion; + private final BlobUtils blob; + private final Consumer<String> errorHandler; + private final String initialState; + private SchedulerStateChangeListener listener = null; + + public LeaderLivenessCheckScheduler(Consumer<String> errorHandler, TableUtils table, BlobUtils blob, AtomicReference<String> currentJMVersion, String initialState) { + this.table = table; + this.blob = blob; + this.currentJMVersion = currentJMVersion; + this.initialState = initialState; + this.errorHandler = errorHandler; + } + + @Override + public ScheduledFuture scheduleTask() { + return scheduler.scheduleWithFixedDelay(() -> { + try { + LOG.info("Checking for leader liveness"); + if (!checkIfLeaderAlive()) { + listener.onStateChange(); + } + } catch (Exception e) { + errorHandler.accept("Exception in Leader Liveness Check Scheduler. Stopping the processor..."); + } + }, LIVENESS_CHECK_DELAY_SEC, LIVENESS_CHECK_DELAY_SEC, TimeUnit.SECONDS); + } + + @Override + public void setStateChangeListener(SchedulerStateChangeListener listener) { + this.listener = listener; + } + + private boolean checkIfLeaderAlive() { + String currJMV = currentJMVersion.get(); + String blobJMV = blob.getJobModelVersion(); + //Get the leader processor row from the table. + Iterable<ProcessorEntity> tableList = table.getEntitiesWithPartition(currJMV); + ProcessorEntity leader = null, nextLeader = null; + for (ProcessorEntity entity: tableList) { + if (entity.getIsLeader()) { + leader = entity; + break; + } + } + int currJMVInt = 0; + if (!currJMV.equals(initialState)) { + currJMVInt = Integer.valueOf(currJMV); + } + if (Integer.valueOf(blobJMV) > currJMVInt) { + for (ProcessorEntity entity : table.getEntitiesWithPartition(blobJMV)) { + if (entity.getIsLeader()) { + nextLeader = entity; + break; + } + } + } + // Check if row hasn't been updated since 30 seconds. + if ((leader == null || (System.currentTimeMillis() - leader.getTimestamp().getTime() >= ( + LIVENESS_DEBOUNCE_TIME_SEC * 1000))) && nextLeader == null) { + return false; + } + return true; + } + + @Override + public void shutdown() { + LOG.info("Shutting down LeaderLivenessCheckScheduler Scheduler."); + scheduler.shutdownNow(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java new file mode 100644 index 0000000..d4715f3 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator.scheduler; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import org.apache.samza.util.BlobUtils; +import org.apache.samza.util.TableUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Scheduler class invoked by the leader to check for changes in the list of live processors. + * Checks every 30 seconds. + * If a processor row hasn't been updated since 30 seconds, the system assumes that the processor has died. + * All time units are in SECONDS. + */ +public class LivenessCheckScheduler implements TaskScheduler { + + private static final Logger LOG = LoggerFactory.getLogger(LivenessCheckScheduler.class); + private static final long LIVENESS_CHECK_DELAY_SEC = 5; + private static final ThreadFactory + PROCESSOR_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("LivenessCheckScheduler-%d").build(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY); + private final TableUtils table; + private final BlobUtils blob; + private final AtomicReference<String> currentJMVersion; + private final AtomicReference<List<String>> liveProcessorsList = new AtomicReference<>(null); + private final Consumer<String> errorHandler; + private SchedulerStateChangeListener listener = null; + private final String processorId; + + public LivenessCheckScheduler(Consumer<String> errorHandler, TableUtils table, BlobUtils blob, AtomicReference<String> currentJMVersion, final String pid) { + this.table = table; + this.blob = blob; + this.currentJMVersion = currentJMVersion; + this.errorHandler = errorHandler; + this.processorId = pid; + } + + @Override + public ScheduledFuture scheduleTask() { + return scheduler.scheduleWithFixedDelay(() -> { + try { + if (!table.getEntity(currentJMVersion.get(), processorId).getIsLeader()) { + LOG.info("Not the leader anymore. Shutting down LivenessCheckScheduler."); + scheduler.shutdownNow(); + return; + } + LOG.info("Checking for list of live processors"); + //Get the list of live processors published on the blob. + Set<String> currProcessors = new HashSet<>(blob.getLiveProcessorList()); + //Get the list of live processors from the table. This is the current system state. + Set<String> liveProcessors = table.getActiveProcessorsList(currentJMVersion); + //Invoke listener if the table list is not consistent with the blob list. + if (!liveProcessors.equals(currProcessors)) { + liveProcessorsList.getAndSet(new ArrayList<>(liveProcessors)); + listener.onStateChange(); + } + } catch (Exception e) { + errorHandler.accept("Exception in Liveness Check Scheduler. Stopping the processor..."); + } + }, LIVENESS_CHECK_DELAY_SEC, LIVENESS_CHECK_DELAY_SEC, TimeUnit.SECONDS); + } + + @Override + public void setStateChangeListener(SchedulerStateChangeListener listener) { + this.listener = listener; + } + + public AtomicReference<List<String>> getLiveProcessors() { + return liveProcessorsList; + } + + @Override + public void shutdown() { + LOG.info("Shutting down LivenessCheckScheduler Scheduler."); + scheduler.shutdownNow(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java new file mode 100644 index 0000000..f158122 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator.scheduler; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import org.apache.samza.util.LeaseBlobManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Scheduler class to keep renewing the lease once an entity has acquired it. + * Renews every 45 seconds. + * All time units are in SECONDS. + */ +public class RenewLeaseScheduler implements TaskScheduler { + + private static final Logger LOG = LoggerFactory.getLogger(RenewLeaseScheduler.class); + private static final long RENEW_LEASE_DELAY_SEC = 45; + private static final ThreadFactory + PROCESSOR_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("RenewLeaseScheduler-%d").build(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY); + private final LeaseBlobManager leaseBlobManager; + private final AtomicReference<String> leaseId; + private final Consumer<String> errorHandler; + + public RenewLeaseScheduler(Consumer<String> errorHandler, LeaseBlobManager leaseBlobManager, AtomicReference<String> leaseId) { + this.leaseBlobManager = leaseBlobManager; + this.leaseId = leaseId; + this.errorHandler = errorHandler; + } + + @Override + public ScheduledFuture scheduleTask() { + return scheduler.scheduleWithFixedDelay(() -> { + try { + LOG.info("Renewing lease"); + boolean status = leaseBlobManager.renewLease(leaseId.get()); + if (!status) { + errorHandler.accept("Unable to renew lease. Continuing as non-leader."); + } + } catch (Exception e) { + errorHandler.accept("Exception in Renew Lease Scheduler. Continuing as non-leader."); + } + }, RENEW_LEASE_DELAY_SEC, RENEW_LEASE_DELAY_SEC, TimeUnit.SECONDS); + } + + @Override + public void setStateChangeListener(SchedulerStateChangeListener listener) {} + + @Override + public void shutdown() { + LOG.info("Shutting down RenewLeaseScheduler Scheduler."); + scheduler.shutdownNow(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/SchedulerStateChangeListener.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/SchedulerStateChangeListener.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/SchedulerStateChangeListener.java new file mode 100644 index 0000000..95fc4e1 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/SchedulerStateChangeListener.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator.scheduler; + +/** + * Listener interface for Azure Job Coordinator, to track state changes and take necessary actions. + */ +public interface SchedulerStateChangeListener { + + void onStateChange(); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/TaskScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/TaskScheduler.java new file mode 100644 index 0000000..63d6e24 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/TaskScheduler.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator.scheduler; + +import java.util.concurrent.ScheduledFuture; + + +/** + * Interface for scheduling tasks for Azure Job Coordinator. + */ +public interface TaskScheduler { + + ScheduledFuture scheduleTask(); + + void setStateChangeListener(SchedulerStateChangeListener listener); + + void shutdown(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/util/BlobUtils.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/util/BlobUtils.java b/samza-azure/src/main/java/org/apache/samza/util/BlobUtils.java new file mode 100644 index 0000000..85e4273 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/util/BlobUtils.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.util; + +import com.microsoft.azure.storage.AccessCondition; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobClient; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.CloudPageBlob; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.List; +import org.apache.samza.AzureClient; +import org.apache.samza.AzureException; +import org.apache.samza.coordinator.data.JobModelBundle; +import org.apache.samza.SamzaException; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.serializers.model.SamzaObjectMapper; +import org.eclipse.jetty.http.HttpStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Client side class that has reference to Azure blob storage. + * Used for writing and reading from the blob. + * Every write requires a valid lease ID. + */ +public class BlobUtils { + + private static final Logger LOG = LoggerFactory.getLogger(BlobUtils.class); + private static final long JOB_MODEL_BLOCK_SIZE = 1024000; + private static final long BARRIER_STATE_BLOCK_SIZE = 1024; + private static final long PROCESSOR_LIST_BLOCK_SIZE = 1024; + private CloudBlobClient blobClient; + private CloudBlobContainer container; + private CloudPageBlob blob; + + /** + * Creates an object of BlobUtils. It creates the container and page blob if they don't exist already. + * @param client Client handle for access to Azure Storage account. + * @param containerName Name of container inside which we want the blob to reside. + * @param blobName Name of the blob to be managed. + * @param length Length of the page blob. + * @throws AzureException If an Azure storage service error occurred, or when the container name or blob name is invalid. + */ + public BlobUtils(AzureClient client, String containerName, String blobName, long length) { + this.blobClient = client.getBlobClient(); + try { + this.container = blobClient.getContainerReference(containerName); + container.createIfNotExists(); + this.blob = container.getPageBlobReference(blobName); + if (!blob.exists()) { + blob.create(length, AccessCondition.generateIfNotExistsCondition(), null, null); + } + } catch (URISyntaxException e) { + LOG.error("Container name: " + containerName + " or blob name: " + blobName + " invalid.", e); + throw new AzureException(e); + } catch (StorageException e) { + int httpStatusCode = e.getHttpStatusCode(); + if (httpStatusCode == HttpStatus.CONFLICT_409) { + LOG.info("The blob you're trying to create exists already.", e); + } else { + LOG.error("Azure Storage Exception!", e); + throw new AzureException(e); + } + } + } + + /** + * Writes the job model to the blob. + * Write is successful only if the lease ID passed is valid and the processor holds the lease. + * Called by the leader. + * @param prevJM Previous job model version that the processor was operating on. + * @param currJM Current job model version that the processor is operating on. + * @param prevJMV Previous job model version that the processor was operating on. + * @param currJMV Current job model version that the processor is operating on. + * @param leaseId LeaseID of the lease that the processor holds on the blob. Null if there is no lease. + * @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred. + */ + public boolean publishJobModel(JobModel prevJM, JobModel currJM, String prevJMV, String currJMV, String leaseId) { + try { + if (leaseId == null) { + return false; + } + JobModelBundle bundle = new JobModelBundle(prevJM, currJM, prevJMV, currJMV); + byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(bundle); + byte[] pageData = Arrays.copyOf(data, (int) JOB_MODEL_BLOCK_SIZE); + InputStream is = new ByteArrayInputStream(pageData); + blob.uploadPages(is, 0, JOB_MODEL_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null); + LOG.info("Uploaded {} jobModel to blob", bundle.getCurrJobModel()); + return true; + } catch (StorageException | IOException e) { + LOG.error("JobModel publish failed for version = " + currJMV, e); + return false; + } + } + + /** + * Reads the current job model from the blob. + * @return The current job model published on the blob. Returns null when job model details not found on the blob. + * @throws AzureException in getJobModelBundle() if an Azure storage service error occurred. + * @throws SamzaException in getJobModelBundle() if data retrieved from blob could not be parsed by SamzaObjectMapper. + */ + public JobModel getJobModel() { + LOG.info("Reading the job model from blob."); + JobModelBundle jmBundle = getJobModelBundle(); + if (jmBundle == null) { + LOG.error("Job Model details don't exist on the blob."); + return null; + } + JobModel jm = jmBundle.getCurrJobModel(); + return jm; + } + + /** + * Reads the current job model version from the blob . + * @return Current job model version published on the blob. Returns null when job model details not found on the blob. + * @throws AzureException in getJobModelBundle() if an Azure storage service error occurred. + * @throws SamzaException in getJobModelBundle() if data retrieved from blob could not be parsed by SamzaObjectMapper. + */ + public String getJobModelVersion() { + LOG.info("Reading the job model version from blob."); + JobModelBundle jmBundle = getJobModelBundle(); + if (jmBundle == null) { + LOG.error("Job Model details don't exist on the blob."); + return null; + } + String jmVersion = jmBundle.getCurrJobModelVersion(); + return jmVersion; + } + + /** + * Writes the barrier state to the blob. + * Write is successful only if the lease ID passed is valid and the processor holds the lease. + * Called only by the leader. + * @param state Barrier state to be published to the blob. + * @param leaseId LeaseID of the valid lease that the processor holds on the blob. Null if there is no lease. + * @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred. + */ + public boolean publishBarrierState(String state, String leaseId) { + try { + if (leaseId == null) { + return false; + } + byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(state); + byte[] pageData = Arrays.copyOf(data, (int) BARRIER_STATE_BLOCK_SIZE); + InputStream is = new ByteArrayInputStream(pageData); + + //uploadPages is only successful when the AccessCondition provided has an active and valid lease ID. It fails otherwise. + blob.uploadPages(is, JOB_MODEL_BLOCK_SIZE, BARRIER_STATE_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null); + LOG.info("Uploaded barrier state {} to blob", state); + return true; + } catch (StorageException | IOException e) { + LOG.error("Barrier state " + state + " publish failed", e); + return false; + } + } + + /** + * Reads the current barrier state from the blob. + * @return Barrier state published on the blob. + * @throws AzureException If an Azure storage service error occurred. + * @throws SamzaException If data retrieved from blob could not be parsed by SamzaObjectMapper. + */ + public String getBarrierState() { + LOG.info("Reading the barrier state from blob."); + byte[] data = new byte[(int) BARRIER_STATE_BLOCK_SIZE]; + try { + blob.downloadRangeToByteArray(JOB_MODEL_BLOCK_SIZE, BARRIER_STATE_BLOCK_SIZE, data, 0); + } catch (StorageException e) { + LOG.error("Failed to read barrier state from blob.", e); + throw new AzureException(e); + } + String state; + try { + state = SamzaObjectMapper.getObjectMapper().readValue(data, String.class); + } catch (IOException e) { + LOG.error("Failed to parse byte data: " + data + " for barrier state retrieved from the blob.", e); + throw new SamzaException(e); + } + return state; + } + + /** + * Writes the list of live processors in the system to the blob. + * Write is successful only if the lease ID passed is valid and the processor holds the lease. + * Called only by the leader. + * @param processors List of live processors to be published on the blob. + * @param leaseId LeaseID of the valid lease that the processor holds on the blob. Null if there is no lease. + * @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred. + */ + public boolean publishLiveProcessorList(List<String> processors, String leaseId) { + try { + if (leaseId == null) { + return false; + } + byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(processors); + byte[] pageData = Arrays.copyOf(data, (int) BARRIER_STATE_BLOCK_SIZE); + InputStream is = new ByteArrayInputStream(pageData); + blob.uploadPages(is, JOB_MODEL_BLOCK_SIZE + BARRIER_STATE_BLOCK_SIZE, PROCESSOR_LIST_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null); + LOG.info("Uploaded list of live processors to blob."); + return true; + } catch (StorageException | IOException e) { + LOG.error("Processor list: " + processors + "publish failed", e); + return false; + } + } + + /** + * Reads the list of live processors published on the blob. + * @return String list of live processors. + * @throws AzureException If an Azure storage service error occurred. + * @throws SamzaException If data retrieved from blob could not be parsed by SamzaObjectMapper. + */ + public List<String> getLiveProcessorList() { + LOG.info("Read the the list of live processors from blob."); + byte[] data = new byte[(int) PROCESSOR_LIST_BLOCK_SIZE]; + try { + blob.downloadRangeToByteArray(JOB_MODEL_BLOCK_SIZE + BARRIER_STATE_BLOCK_SIZE, PROCESSOR_LIST_BLOCK_SIZE, data, 0); + } catch (StorageException e) { + LOG.error("Failed to read the list of live processors from the blob.", new AzureException(e)); + throw new AzureException(e); + } + List<String> list; + try { + list = SamzaObjectMapper.getObjectMapper().readValue(data, List.class); + } catch (IOException e) { + LOG.error("Failed to parse byte data: " + data + " for live processor list retrieved from the blob", new SamzaException(e)); + throw new SamzaException(e); + } + return list; + } + + public CloudBlobClient getBlobClient() { + return this.blobClient; + } + + public CloudBlobContainer getBlobContainer() { + return this.container; + } + + public CloudPageBlob getBlob() { + return this.blob; + } + + private JobModelBundle getJobModelBundle() { + byte[] data = new byte[(int) JOB_MODEL_BLOCK_SIZE]; + try { + blob.downloadRangeToByteArray(0, JOB_MODEL_BLOCK_SIZE, data, 0); + } catch (StorageException e) { + LOG.error("Failed to read JobModel details from the blob.", e); + throw new AzureException(e); + } + try { + JobModelBundle jmBundle = SamzaObjectMapper.getObjectMapper().readValue(data, JobModelBundle.class); + return jmBundle; + } catch (IOException e) { + LOG.error("Failed to parse byte data: " + data + " for JobModel details retrieved from the blob", e); + throw new SamzaException(e); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/util/LeaseBlobManager.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/util/LeaseBlobManager.java b/samza-azure/src/main/java/org/apache/samza/util/LeaseBlobManager.java new file mode 100644 index 0000000..dffb7ae --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/util/LeaseBlobManager.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.util; + +import com.microsoft.azure.storage.AccessCondition; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudPageBlob; +import org.apache.samza.AzureException; +import org.eclipse.jetty.http.HttpStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Helper class for lease blob operations. + */ +public class LeaseBlobManager { + + private static final Logger LOG = LoggerFactory.getLogger(LeaseBlobManager.class); + private final CloudPageBlob leaseBlob; + + public LeaseBlobManager(CloudPageBlob leaseBlob) { + this.leaseBlob = leaseBlob; + } + + /** + * Acquires a lease on a blob. The lease ID is NULL initially. + * @param leaseTimeInSec The time in seconds you want to acquire the lease for. + * @param leaseId Proposed ID you want to acquire the lease with, null if not proposed. + * @return String that represents lease ID. Null if acquireLease is unsuccessful because the blob is leased already. + * @throws AzureException If a Azure storage service error occurred. This includes the case where the blob you're trying to lease does not exist. + */ + public String acquireLease(int leaseTimeInSec, String leaseId) { + try { + String id = leaseBlob.acquireLease(leaseTimeInSec, leaseId); + LOG.info("Acquired lease with lease id = " + id); + return id; + } catch (StorageException storageException) { + int httpStatusCode = storageException.getHttpStatusCode(); + if (httpStatusCode == HttpStatus.CONFLICT_409) { + LOG.info("The blob you're trying to acquire is leased already.", storageException.getMessage()); + } else if (httpStatusCode == HttpStatus.NOT_FOUND_404) { + LOG.error("The blob you're trying to lease does not exist.", storageException); + throw new AzureException(storageException); + } else { + LOG.error("Error acquiring lease!", storageException); + throw new AzureException(storageException); + } + } + return null; + } + + /** + * Renews the lease on the blob. + * @param leaseId ID of the lease to be renewed. + * @return True if lease was renewed successfully, false otherwise. + */ + public boolean renewLease(String leaseId) { + try { + leaseBlob.renewLease(AccessCondition.generateLeaseCondition(leaseId)); + return true; + } catch (StorageException storageException) { + LOG.error("Wasn't able to renew lease with lease id: " + leaseId, storageException); + return false; + } + } + + /** + * Releases the lease on the blob. + * @param leaseId ID of the lease to be released. + * @return True if released successfully, false otherwise. + */ + public boolean releaseLease(String leaseId) { + try { + leaseBlob.releaseLease(AccessCondition.generateLeaseCondition(leaseId)); + return true; + } catch (StorageException storageException) { + LOG.error("Wasn't able to release lease with lease id: " + leaseId, storageException); + return false; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java b/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java new file mode 100644 index 0000000..f49ce27 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.util; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.table.CloudTable; +import com.microsoft.azure.storage.table.CloudTableClient; +import com.microsoft.azure.storage.table.TableOperation; +import com.microsoft.azure.storage.table.TableQuery; +import java.net.URISyntaxException; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.samza.AzureClient; +import org.apache.samza.AzureException; +import org.apache.samza.coordinator.data.ProcessorEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Client side class that has a reference to Azure Table Storage. + * Enables the user to add or delete information from the table, make updates to the table and retrieve information from the table. + * Every row in a table is uniquely identified by a combination of the PARTIITON KEY and ROW KEY. + * PARTITION KEY = Group ID = Job Model Version (for this case). + * ROW KEY = Unique entity ID for a group = Processor ID (for this case). + */ +public class TableUtils { + + private static final Logger LOG = LoggerFactory.getLogger(TableUtils.class); + private static final String PARTITION_KEY = "PartitionKey"; + private static final long LIVENESS_DEBOUNCE_TIME_SEC = 30; + private final String initialState; + private final CloudTableClient tableClient; + private final CloudTable table; + + public TableUtils(AzureClient client, String tableName, String initialState) { + this.initialState = initialState; + tableClient = client.getTableClient(); + try { + table = tableClient.getTableReference(tableName); + table.createIfNotExists(); + } catch (URISyntaxException e) { + LOG.error("\nConnection string specifies an invalid URI.", e); + throw new AzureException(e); + } catch (StorageException e) { + LOG.error("Azure storage exception.", e); + throw new AzureException(e); + } + } + + /** + * Add a row which denotes an active processor to the processor table. + * @param jmVersion Job model version that the processor is operating on. + * @param pid Unique processor ID. + * @param isLeader Denotes whether the processor is a leader or not. + * @throws AzureException If an Azure storage service error occurred. + */ + public void addProcessorEntity(String jmVersion, String pid, boolean isLeader) { + ProcessorEntity entity = new ProcessorEntity(jmVersion, pid); + entity.setIsLeader(isLeader); + entity.updateLiveness(); + TableOperation add = TableOperation.insert(entity); + try { + table.execute(add); + } catch (StorageException e) { + LOG.error("Azure storage exception while adding processor entity with job model version: " + jmVersion + "and pid: " + pid, e); + throw new AzureException(e); + } + } + + /** + * Retrieve a particular row in the processor table, given the partition key and the row key. + * @param jmVersion Job model version of the processor row to be retrieved. + * @param pid Unique processor ID of the processor row to be retrieved. + * @return An instance of required processor entity. Null if does not exist. + * @throws AzureException If an Azure storage service error occurred. + */ + public ProcessorEntity getEntity(String jmVersion, String pid) { + try { + TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class); + ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType(); + return entity; + } catch (StorageException e) { + LOG.error("Azure storage exception while retrieving processor entity with job model version: " + jmVersion + "and pid: " + pid, e); + throw new AzureException(e); + } + } + + /** + * Updates the liveness value of a particular processor with a randomly generated integer, which in turn updates the last modified since timestamp of the row. + * @param jmVersion Job model version of the processor row to be updated. + * @param pid Unique processor ID of the processor row to be updated. + */ + public void updateHeartbeat(String jmVersion, String pid) { + try { + Random rand = new Random(); + TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class); + ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType(); + entity.updateLiveness(); + TableOperation update = TableOperation.replace(entity); + table.execute(update); + } catch (StorageException e) { + LOG.error("Azure storage exception while updating heartbeat for job model version: " + jmVersion + "and pid: " + pid, e); + } + } + + /** + * Updates the isLeader value when the processor starts or stops being a leader. + * @param jmVersion Job model version of the processor row to be updated. + * @param pid Unique processor ID of the processor row to be updated. + * @param isLeader Denotes whether the processor is a leader or not. + * @throws AzureException If an Azure storage service error occurred. + */ + public void updateIsLeader(String jmVersion, String pid, boolean isLeader) { + try { + TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class); + ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType(); + entity.setIsLeader(isLeader); + TableOperation update = TableOperation.replace(entity); + table.execute(update); + } catch (StorageException e) { + LOG.error("Azure storage exception while updating isLeader value for job model version: " + jmVersion + "and pid: " + pid, e); + throw new AzureException(e); + } + } + + /** + * Deletes a specified row in the processor table. + * @param jmVersion Job model version of the processor row to be deleted. + * @param pid Unique processor ID of the processor row to be deleted. + * @throws AzureException If an Azure storage service error occurred. + */ + public void deleteProcessorEntity(String jmVersion, String pid) { + try { + TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class); + ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType(); + TableOperation remove = TableOperation.delete(entity); + table.execute(remove); + } catch (StorageException e) { + LOG.error("Azure storage exception while deleting processor entity with job model version: " + jmVersion + "and pid: " + pid, e); + throw new AzureException(e); + } + } + + /** + * Retrieve all rows in a table with the given partition key. + * @param partitionKey Job model version of the processors to be retrieved. + * @return Iterable list of processor entities. + */ + public Iterable<ProcessorEntity> getEntitiesWithPartition(String partitionKey) { + String partitionFilter = TableQuery.generateFilterCondition(PARTITION_KEY, TableQuery.QueryComparisons.EQUAL, partitionKey); + TableQuery<ProcessorEntity> partitionQuery = TableQuery.from(ProcessorEntity.class).where(partitionFilter); + return table.execute(partitionQuery); + } + + /** + * Gets the list of all active processors that are heartbeating to the processor table. + * @param currentJMVersion Current job model version that the processors in the application are operating on. + * @return List of ids of currently active processors in the application, retrieved from the processor table. + */ + public Set<String> getActiveProcessorsList(AtomicReference<String> currentJMVersion) { + Iterable<ProcessorEntity> tableList = getEntitiesWithPartition(currentJMVersion.get()); + Set<String> activeProcessorsList = new HashSet<>(); + for (ProcessorEntity entity: tableList) { + if (System.currentTimeMillis() - entity.getTimestamp().getTime() <= (LIVENESS_DEBOUNCE_TIME_SEC * 1000)) { + activeProcessorsList.add(entity.getRowKey()); + } + } + + Iterable<ProcessorEntity> unassignedList = getEntitiesWithPartition(initialState); + for (ProcessorEntity entity: unassignedList) { + long temp = System.currentTimeMillis() - entity.getTimestamp().getTime(); + LOG.info("Time elapsed since last heartbeat: {}", temp); + if (temp <= (LIVENESS_DEBOUNCE_TIME_SEC * 1000)) { + activeProcessorsList.add(entity.getRowKey()); + } + } + LOG.info("Active processors list: {}", activeProcessorsList); + return activeProcessorsList; + } + + public CloudTable getTable() { + return table; + } + +} \ No newline at end of file
