This is an automated email from the ASF dual-hosted git repository.
alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 2c61554 ExpiredJob Workaround for Selective Update Race Conditions
(#1470)
2c61554 is described below
commit 2c615543bd86e50d76dd4d7006d1c90e130e9755
Author: Neal Sun <[email protected]>
AuthorDate: Wed Oct 21 15:31:01 2020 -0700
ExpiredJob Workaround for Selective Update Race Conditions (#1470)
This PR implements a workaround for determining expired jobs
that avoids selective update race condition: if JobConfig doesn't
exist in the cache, check ZK directly.
---
.../apache/helix/controller/stages/TaskGarbageCollectionStage.java | 4 ++--
helix-core/src/main/java/org/apache/helix/task/TaskUtil.java | 7 ++++++-
helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java | 4 ++--
3 files changed, 10 insertions(+), 5 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
index 82def48..0b0d7d4 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
@@ -81,8 +81,8 @@ public class TaskGarbageCollectionStage extends
AbstractAsyncBaseStage {
if (nextPurgeTime <= currentTime) {
nextPurgeTime = currentTime + purgeInterval;
// Find jobs that are ready to be purged
- Set<String> expiredJobs =
- TaskUtil.getExpiredJobsFromCache(dataProvider, workflowConfig,
workflowContext);
+ Set<String> expiredJobs = TaskUtil
+ .getExpiredJobsFromCache(dataProvider, workflowConfig,
workflowContext, manager);
if (!expiredJobs.isEmpty()) {
expiredJobsMap.put(workflowConfig.getWorkflowId(), expiredJobs);
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index fa89cde..890b151 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -789,7 +789,7 @@ public class TaskUtil {
*/
public static Set<String> getExpiredJobsFromCache(
WorkflowControllerDataProvider workflowControllerDataProvider,
WorkflowConfig workflowConfig,
- WorkflowContext workflowContext) {
+ WorkflowContext workflowContext, HelixManager manager) {
Set<String> expiredJobs = new HashSet<>();
Map<String, TaskState> jobStates = workflowContext.getJobStates();
for (String job : workflowConfig.getJobDag().getAllNodes()) {
@@ -797,6 +797,11 @@ public class TaskUtil {
continue;
}
JobConfig jobConfig = workflowControllerDataProvider.getJobConfig(job);
+ // TODO: Temporary solution for cache selective update race conditions
+ if (jobConfig == null) {
+ jobConfig = TaskUtil.getJobConfig(manager, job);
+ }
+
JobContext jobContext =
workflowControllerDataProvider.getJobContext(job);
TaskState jobState = jobStates.get(job);
if (isJobExpired(job, jobConfig, jobContext, jobState)) {
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
index a85ba67..c930954 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
@@ -98,7 +98,7 @@ public class TestTaskUtil extends TaskTestBase {
expectedJobs.add(workflowName + "_Job_3");
Assert.assertEquals(TaskUtil
.getExpiredJobsFromCache(workflowControllerDataProvider,
jobQueue.getWorkflowConfig(),
- workflowContext), expectedJobs);
+ workflowContext, _manager), expectedJobs);
}
@Test
@@ -188,7 +188,7 @@ public class TestTaskUtil extends TaskTestBase {
expectedJobs.add(workflowName + "_Job_8");
Assert.assertEquals(TaskUtil
.getExpiredJobsFromCache(workflowControllerDataProvider,
workflow.getWorkflowConfig(),
- workflowContext), expectedJobs);
+ workflowContext, _manager), expectedJobs);
}
@Test