This is an automated email from the ASF dual-hosted git repository. krisztiankasa pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ambari-infra.git
The following commit(s) were added to refs/heads/master by this push: new 541db08 AMBARI-24807 - Infra Manager: unable to restart job (#9) 541db08 is described below commit 541db0851d40f8e87dc9472c0c30b9e5bf4ca740 Author: kasakrisz <33458261+kasakr...@users.noreply.github.com> AuthorDate: Fri Oct 19 19:35:50 2018 +0200 AMBARI-24807 - Infra Manager: unable to restart job (#9) --- .../org/apache/ambari/infra/job/JobScheduler.java | 10 ++++--- .../apache/ambari/infra/manager/JobManager.java | 31 ++++++++++++-------- .../java/org/apache/ambari/infra/manager/Jobs.java | 6 ++-- .../src/main/resources/infra-manager.properties | 2 +- .../apache/ambari/infra/job/JobSchedulerTest.java | 34 ++++++++++++++++------ 5 files changed, 55 insertions(+), 28 deletions(-) diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java index 8729c4e..f63ea58 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java @@ -63,12 +63,14 @@ public class JobScheduler { } private void restartIfFailed(JobExecution jobExecution) { - if (jobExecution.getExitStatus() == ExitStatus.FAILED) { - try { + try { + if (ExitStatus.FAILED.compareTo(jobExecution.getExitStatus()) == 0) { jobs.restart(jobExecution.getId()); - } catch (JobInstanceAlreadyCompleteException | NoSuchJobException | JobExecutionAlreadyRunningException | JobRestartException | JobParametersInvalidException | NoSuchJobExecutionException e) { - throw new RuntimeException(e); + } else if (ExitStatus.UNKNOWN.compareTo(jobExecution.getExitStatus()) == 0) { + jobs.abandon(jobExecution.getId()); } + } catch (JobInstanceAlreadyCompleteException | NoSuchJobException | JobExecutionAlreadyRunningException | JobRestartException | JobParametersInvalidException | NoSuchJobExecutionException e) { + throw new RuntimeException(e); } } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java index f35387d..ac8cd72 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java @@ -18,7 +18,19 @@ */ package org.apache.ambari.infra.manager; -import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TimeZone; + +import javax.inject.Inject; +import javax.inject.Named; + import org.apache.ambari.infra.model.ExecutionContextResponse; import org.apache.ambari.infra.model.JobDetailsResponse; import org.apache.ambari.infra.model.JobExecutionDetailsResponse; @@ -50,17 +62,7 @@ import org.springframework.batch.core.repository.JobExecutionAlreadyRunningExcep import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; -import javax.inject.Inject; -import javax.inject.Named; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.TimeZone; +import com.google.common.collect.Lists; @Named public class JobManager implements Jobs { @@ -110,6 +112,11 @@ public class JobManager implements Jobs { return jobService.listJobExecutionsForJob(jobName, 0, 1).stream().findFirst(); } + @Override + public void abandon(Long jobExecution) throws NoSuchJobExecutionException, JobExecutionAlreadyRunningException { + jobService.abandon(jobExecution); + } + /** * Get all executions ids that mapped to specific job name, */ diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/Jobs.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/Jobs.java index b2ca605..5e435e8 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/Jobs.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/Jobs.java @@ -18,6 +18,8 @@ */ package org.apache.ambari.infra.manager; +import java.util.Optional; + import org.apache.ambari.infra.model.JobExecutionInfoResponse; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; @@ -28,8 +30,6 @@ import org.springframework.batch.core.repository.JobExecutionAlreadyRunningExcep import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; -import java.util.Optional; - public interface Jobs { JobExecutionInfoResponse launchJob(String jobName, JobParameters params) throws JobParametersInvalidException, NoSuchJobException, @@ -39,4 +39,6 @@ public interface Jobs { JobParametersInvalidException, JobRestartException, NoSuchJobExecutionException; Optional<JobExecution> lastRun(String jobName) throws NoSuchJobException, NoSuchJobExecutionException; + + void abandon(Long jobExecution) throws NoSuchJobExecutionException, JobExecutionAlreadyRunningException; } diff --git a/ambari-infra-manager/src/main/resources/infra-manager.properties b/ambari-infra-manager/src/main/resources/infra-manager.properties index d7bdc29..50c43e3 100644 --- a/ambari-infra-manager/src/main/resources/infra-manager.properties +++ b/ambari-infra-manager/src/main/resources/infra-manager.properties @@ -83,6 +83,6 @@ infra-manager.jobs.solr_data_deleting.delete_audit_logs.enabled=true infra-manager.jobs.solr_data_deleting.delete_audit_logs.zoo_keeper_connection_string=zookeeper:2181 infra-manager.jobs.solr_data_deleting.delete_audit_logs.collection=audit_logs infra-manager.jobs.solr_data_deleting.delete_audit_logs.filter_field=evtTime -infra-manager.jobs.clean-up.ttl=PT24H +infra-manager.jobs.clean-up.ttl=PT240H infra-manager.jobs.clean-up.scheduling.enabled=true infra-manager.jobs.clean-up.scheduling.cron=0 * * * * ? diff --git a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobSchedulerTest.java b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobSchedulerTest.java index ba1150f..6880740 100644 --- a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobSchedulerTest.java +++ b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobSchedulerTest.java @@ -1,5 +1,15 @@ package org.apache.ambari.infra.job; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.isA; + +import java.util.Optional; +import java.util.concurrent.ScheduledFuture; + +import javax.batch.operations.NoSuchJobException; + import org.apache.ambari.infra.manager.Jobs; import org.easymock.EasyMockRunner; import org.easymock.EasyMockSupport; @@ -14,15 +24,6 @@ import org.springframework.batch.core.JobParameters; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.support.CronTrigger; -import javax.batch.operations.NoSuchJobException; -import java.util.Optional; -import java.util.concurrent.ScheduledFuture; - -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.isA; - /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -111,4 +112,19 @@ public class JobSchedulerTest extends EasyMockSupport { jobScheduler.schedule(jobName, schedulingProperties); } + + @Test + public void testScheduleWhenPreviousExecutionIsUnknownJobIsAbandonedAndScheduled() throws Exception { + String jobName = "job0"; + SchedulingProperties schedulingProperties = new SchedulingProperties(); + schedulingProperties.setCron("* * * * * ?"); + JobExecution jobExecution = new JobExecution(1L, new JobParameters()); + jobExecution.setExitStatus(ExitStatus.UNKNOWN); + expect(jobs.lastRun(jobName)).andReturn(Optional.of(jobExecution)); + jobs.abandon(1L); expectLastCall(); + expect(taskScheduler.schedule(isA(Runnable.class), eq(new CronTrigger(schedulingProperties.getCron())))).andReturn(scheduledFuture); + replayAll(); + + jobScheduler.schedule(jobName, schedulingProperties); + } } \ No newline at end of file