This is an automated email from the ASF dual-hosted git repository.
krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ambari-infra.git
The following commit(s) were added to refs/heads/master by this push:
new 541db08 AMBARI-24807 - Infra Manager: unable to restart job (#9)
541db08 is described below
commit 541db0851d40f8e87dc9472c0c30b9e5bf4ca740
Author: kasakrisz <[email protected]>
AuthorDate: Fri Oct 19 19:35:50 2018 +0200
AMBARI-24807 - Infra Manager: unable to restart job (#9)
---
.../org/apache/ambari/infra/job/JobScheduler.java | 10 ++++---
.../apache/ambari/infra/manager/JobManager.java | 31 ++++++++++++--------
.../java/org/apache/ambari/infra/manager/Jobs.java | 6 ++--
.../src/main/resources/infra-manager.properties | 2 +-
.../apache/ambari/infra/job/JobSchedulerTest.java | 34 ++++++++++++++++------
5 files changed, 55 insertions(+), 28 deletions(-)
diff --git
a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java
b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java
index 8729c4e..f63ea58 100644
---
a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java
+++
b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java
@@ -63,12 +63,14 @@ public class JobScheduler {
}
private void restartIfFailed(JobExecution jobExecution) {
- if (jobExecution.getExitStatus() == ExitStatus.FAILED) {
- try {
+ try {
+ if (ExitStatus.FAILED.compareTo(jobExecution.getExitStatus()) == 0) {
jobs.restart(jobExecution.getId());
- } catch (JobInstanceAlreadyCompleteException | NoSuchJobException |
JobExecutionAlreadyRunningException | JobRestartException |
JobParametersInvalidException | NoSuchJobExecutionException e) {
- throw new RuntimeException(e);
+ } else if (ExitStatus.UNKNOWN.compareTo(jobExecution.getExitStatus()) ==
0) {
+ jobs.abandon(jobExecution.getId());
}
+ } catch (JobInstanceAlreadyCompleteException | NoSuchJobException |
JobExecutionAlreadyRunningException | JobRestartException |
JobParametersInvalidException | NoSuchJobExecutionException e) {
+ throw new RuntimeException(e);
}
}
diff --git
a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java
b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java
index f35387d..ac8cd72 100644
---
a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java
+++
b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java
@@ -18,7 +18,19 @@
*/
package org.apache.ambari.infra.manager;
-import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TimeZone;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+
import org.apache.ambari.infra.model.ExecutionContextResponse;
import org.apache.ambari.infra.model.JobDetailsResponse;
import org.apache.ambari.infra.model.JobExecutionDetailsResponse;
@@ -50,17 +62,7 @@ import
org.springframework.batch.core.repository.JobExecutionAlreadyRunningExcep
import
org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
-import javax.inject.Inject;
-import javax.inject.Named;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.TimeZone;
+import com.google.common.collect.Lists;
@Named
public class JobManager implements Jobs {
@@ -110,6 +112,11 @@ public class JobManager implements Jobs {
return jobService.listJobExecutionsForJob(jobName, 0,
1).stream().findFirst();
}
+ @Override
+ public void abandon(Long jobExecution) throws NoSuchJobExecutionException,
JobExecutionAlreadyRunningException {
+ jobService.abandon(jobExecution);
+ }
+
/**
* Get all executions ids that mapped to specific job name,
*/
diff --git
a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/Jobs.java
b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/Jobs.java
index b2ca605..5e435e8 100644
---
a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/Jobs.java
+++
b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/Jobs.java
@@ -18,6 +18,8 @@
*/
package org.apache.ambari.infra.manager;
+import java.util.Optional;
+
import org.apache.ambari.infra.model.JobExecutionInfoResponse;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
@@ -28,8 +30,6 @@ import
org.springframework.batch.core.repository.JobExecutionAlreadyRunningExcep
import
org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
-import java.util.Optional;
-
public interface Jobs {
JobExecutionInfoResponse launchJob(String jobName, JobParameters params)
throws JobParametersInvalidException, NoSuchJobException,
@@ -39,4 +39,6 @@ public interface Jobs {
JobParametersInvalidException, JobRestartException,
NoSuchJobExecutionException;
Optional<JobExecution> lastRun(String jobName) throws NoSuchJobException,
NoSuchJobExecutionException;
+
+ void abandon(Long jobExecution) throws NoSuchJobExecutionException,
JobExecutionAlreadyRunningException;
}
diff --git a/ambari-infra-manager/src/main/resources/infra-manager.properties
b/ambari-infra-manager/src/main/resources/infra-manager.properties
index d7bdc29..50c43e3 100644
--- a/ambari-infra-manager/src/main/resources/infra-manager.properties
+++ b/ambari-infra-manager/src/main/resources/infra-manager.properties
@@ -83,6 +83,6 @@
infra-manager.jobs.solr_data_deleting.delete_audit_logs.enabled=true
infra-manager.jobs.solr_data_deleting.delete_audit_logs.zoo_keeper_connection_string=zookeeper:2181
infra-manager.jobs.solr_data_deleting.delete_audit_logs.collection=audit_logs
infra-manager.jobs.solr_data_deleting.delete_audit_logs.filter_field=evtTime
-infra-manager.jobs.clean-up.ttl=PT24H
+infra-manager.jobs.clean-up.ttl=PT240H
infra-manager.jobs.clean-up.scheduling.enabled=true
infra-manager.jobs.clean-up.scheduling.cron=0 * * * * ?
diff --git
a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobSchedulerTest.java
b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobSchedulerTest.java
index ba1150f..6880740 100644
---
a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobSchedulerTest.java
+++
b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobSchedulerTest.java
@@ -1,5 +1,15 @@
package org.apache.ambari.infra.job;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.isA;
+
+import java.util.Optional;
+import java.util.concurrent.ScheduledFuture;
+
+import javax.batch.operations.NoSuchJobException;
+
import org.apache.ambari.infra.manager.Jobs;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
@@ -14,15 +24,6 @@ import org.springframework.batch.core.JobParameters;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
-import javax.batch.operations.NoSuchJobException;
-import java.util.Optional;
-import java.util.concurrent.ScheduledFuture;
-
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.isA;
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -111,4 +112,19 @@ public class JobSchedulerTest extends EasyMockSupport {
jobScheduler.schedule(jobName, schedulingProperties);
}
+
+ @Test
+ public void
testScheduleWhenPreviousExecutionIsUnknownJobIsAbandonedAndScheduled() throws
Exception {
+ String jobName = "job0";
+ SchedulingProperties schedulingProperties = new SchedulingProperties();
+ schedulingProperties.setCron("* * * * * ?");
+ JobExecution jobExecution = new JobExecution(1L, new JobParameters());
+ jobExecution.setExitStatus(ExitStatus.UNKNOWN);
+ expect(jobs.lastRun(jobName)).andReturn(Optional.of(jobExecution));
+ jobs.abandon(1L); expectLastCall();
+ expect(taskScheduler.schedule(isA(Runnable.class), eq(new
CronTrigger(schedulingProperties.getCron())))).andReturn(scheduledFuture);
+ replayAll();
+
+ jobScheduler.schedule(jobName, schedulingProperties);
+ }
}
\ No newline at end of file