This is an automated email from the ASF dual-hosted git repository. ncole pushed a commit to branch branch-feature-AMBARI-21674 in repository https://gitbox.apache.org/repos/asf/ambari.git
commit 8b3568d95cee2e7b6f70f1577596ecff3370f0f7 Author: kkasa <[email protected]> AuthorDate: Tue Jan 16 16:13:24 2018 +0100 AMBARI-22799 - define scheduling of archiving Infra Solr Documents --- .../org/apache/ambari/infra/HttpResponse.java} | 20 +++- .../java/org/apache/ambari/infra/InfraClient.java | 32 ++++-- .../org/apache/ambari/infra/JobExecutionInfo.java} | 28 ++++- .../ambari/infra/steps/AbstractInfraSteps.java | 12 ++- .../apache/ambari/infra/steps/ExportJobsSteps.java | 66 +++++++++--- .../test/resources/stories/infra_api_tests.story | 26 +++-- .../ambari-infra-manager/docs/api/swagger.yaml | 2 +- .../InfraManagerSchedulingConfig.java} | 15 ++- .../infra/conf/batch/InfraManagerBatchConfig.java | 25 +++-- .../infra/job/AbstractJobsConfiguration.java | 78 ++++++++++++++ ...ertyMap.java => JobConfigurationException.java} | 8 +- ...{PropertyMap.java => JobContextRepository.java} | 7 +- .../ambari/infra/job/JobContextRepositoryImpl.java | 52 ++++++++++ .../org/apache/ambari/infra/job/JobProperties.java | 35 +++++++ .../org/apache/ambari/infra/job/JobScheduler.java | 89 ++++++++++++++++ .../{JobPropertyMap.java => JobsPropertyMap.java} | 12 ++- ...{PropertyMap.java => SchedulingProperties.java} | 30 +++++- .../archive/DocumentArchivingConfiguration.java | 59 +++++------ .../job/archive/DocumentArchivingProperties.java | 1 + .../job/archive/DocumentArchivingPropertyMap.java | 8 +- .../ambari/infra/job/archive/DocumentExporter.java | 22 +++- .../infra/job/archive/FileNameSuffixFormatter.java | 2 +- .../deleting/DocumentDeletingConfiguration.java | 53 ++++------ .../job/deleting/DocumentDeletingPropertyMap.java | 8 +- .../apache/ambari/infra/manager/JobManager.java | 64 +++++++----- .../java/org/apache/ambari/infra/manager/Jobs.java | 42 ++++++++ .../org/apache/ambari/infra/rest/JobResource.java | 28 +++-- .../src/main/resources/infra-manager.properties | 8 +- .../src/main/resources/log4j2.xml | 3 + .../apache/ambari/infra/job/JobSchedulerTest.java | 114 +++++++++++++++++++++ .../infra/job/archive/DocumentExporterTest.java | 110 ++++++++++++++++---- .../vagrant-infra-manager.properties.sample | 30 +++--- 32 files changed, 863 insertions(+), 226 deletions(-) diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/HttpResponse.java similarity index 72% copy from ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java copy to ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/HttpResponse.java index e7b9e77..3d8711b 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java +++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/HttpResponse.java @@ -16,10 +16,22 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.ambari.infra.job; +package org.apache.ambari.infra; -import java.util.Map; +public class HttpResponse { + private final int code; + private final String body; -public interface PropertyMap<T extends JobProperties<T>> { - Map<String, T> getPropertyMap(); + public HttpResponse(int code, String body) { + this.code = code; + this.body = body; + } + + public int getCode() { + return code; + } + + public String getBody() { + return body; + } } diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java index b798ce1..0118c76 100644 --- a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java +++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.io.IOUtils; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpRequestBase; @@ -71,11 +72,14 @@ public class InfraClient implements AutoCloseable { execute(new HttpGet(baseUrl)); } - private String execute(HttpRequestBase post) { + private HttpResponse execute(HttpRequestBase post) { try (CloseableHttpResponse response = httpClient.execute(post)) { String responseBodyText = IOUtils.toString(response.getEntity().getContent(), Charset.defaultCharset()); - LOG.info("Response code {} body {} ", response.getStatusLine().getStatusCode(), responseBodyText); - return responseBodyText; + int statusCode = response.getStatusLine().getStatusCode(); + LOG.info("Response code {} body {} ", statusCode, responseBodyText); + if (!(200 <= statusCode && statusCode <= 299)) + throw new RuntimeException("Error while executing http request: " + responseBodyText); + return new HttpResponse(statusCode, responseBodyText); } catch (ClientProtocolException e) { throw new RuntimeException(e); } catch (IOException e) { @@ -83,16 +87,16 @@ public class InfraClient implements AutoCloseable { } } - public String startJob(String jobName, String parameters) { + public JobExecutionInfo startJob(String jobName, String parameters) { URIBuilder uriBuilder = new URIBuilder(baseUrl); uriBuilder.setScheme("http"); uriBuilder.setPath(uriBuilder.getPath() + "/" + jobName); if (!isBlank(parameters)) uriBuilder.addParameter("params", parameters); try { - String responseText = execute(new HttpPost(uriBuilder.build())); + String responseText = execute(new HttpPost(uriBuilder.build())).getBody(); Map<String, Object> responseContent = new ObjectMapper().readValue(responseText, new TypeReference<HashMap<String,Object>>() {}); - return responseContent.get("jobId").toString(); + return new JobExecutionInfo(responseContent.get("jobId").toString(), ((Map)responseContent.get("jobExecutionData")).get("id").toString()); } catch (URISyntaxException | JsonParseException | JsonMappingException e) { throw new RuntimeException(e); } catch (IOException e) { @@ -106,7 +110,21 @@ public class InfraClient implements AutoCloseable { uriBuilder.setPath(String.format("%s/%s/%s/executions", uriBuilder.getPath(), jobName, jobId)); uriBuilder.addParameter("operation", "RESTART"); try { - execute(new HttpPost(uriBuilder.build())); + HttpResponse httpResponse = execute(new HttpPost(uriBuilder.build())); + if (httpResponse.getCode() != 200) + throw new RuntimeException(httpResponse.getBody()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + public void stopJob(String jobExecutionId) { + URIBuilder uriBuilder = new URIBuilder(baseUrl); + uriBuilder.setScheme("http"); + uriBuilder.setPath(String.format("%s/executions/%s", uriBuilder.getPath(), jobExecutionId)); + uriBuilder.addParameter("operation", "STOP"); + try { + execute(new HttpDelete(uriBuilder.build())); } catch (URISyntaxException e) { throw new RuntimeException(e); } diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/JobExecutionInfo.java similarity index 59% copy from ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java copy to ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/JobExecutionInfo.java index e7b9e77..92b7834 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java +++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/JobExecutionInfo.java @@ -16,10 +16,30 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.ambari.infra.job; +package org.apache.ambari.infra; -import java.util.Map; +public class JobExecutionInfo { + private final String jobId; + private final String executionId; -public interface PropertyMap<T extends JobProperties<T>> { - Map<String, T> getPropertyMap(); + public JobExecutionInfo(String jobId, String executionId) { + this.jobId = jobId; + this.executionId = executionId; + } + + public String getJobId() { + return jobId; + } + + public String getExecutionId() { + return executionId; + } + + @Override + public String toString() { + return "JobExecutionInfo{" + + "jobId='" + jobId + '\'' + + ", executionId='" + executionId + '\'' + + '}'; + } } diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java index ece1c59..fb8a100 100644 --- a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java +++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java @@ -64,6 +64,7 @@ public abstract class AbstractInfraSteps { private static final int FAKE_S3_PORT = 4569; private static final int HDFS_PORT = 9000; private static final String AUDIT_LOGS_COLLECTION = "audit_logs"; + private static final String HADOOP_LOGS_COLLECTION = "hadoop_logs"; protected static final String S3_BUCKET_NAME = "testbucket"; private String ambariFolder; private String shellScriptLocation; @@ -111,8 +112,8 @@ public abstract class AbstractInfraSteps { SOLR_PORT, AUDIT_LOGS_COLLECTION)).build(); - LOG.info("Creating collection"); - runCommand(new String[]{"docker", "exec", "docker_solr_1", "solr", "create_collection", "-c", AUDIT_LOGS_COLLECTION, "-d", "configsets/"+ AUDIT_LOGS_COLLECTION +"/conf", "-n", AUDIT_LOGS_COLLECTION + "_conf"}); + createSolrCollection(AUDIT_LOGS_COLLECTION); + createSolrCollection(HADOOP_LOGS_COLLECTION); LOG.info("Initializing s3 client"); s3client = new AmazonS3Client(new BasicAWSCredentials("remote-identity", "remote-credential")); @@ -122,6 +123,11 @@ public abstract class AbstractInfraSteps { checkInfraManagerReachable(); } + private void createSolrCollection(String collectionName) { + LOG.info("Creating collection"); + runCommand(new String[]{"docker", "exec", "docker_solr_1", "solr", "create_collection", "-c", collectionName, "-d", "configsets/"+ collectionName +"/conf", "-n", collectionName + "_conf"}); + } + private void runCommand(String[] command) { try { LOG.info("Exec command: {}", StringUtils.join(command, " ")); @@ -146,7 +152,7 @@ public abstract class AbstractInfraSteps { }); } - private void doWithin(int sec, String actionName, Runnable runnable) { + protected void doWithin(int sec, String actionName, Runnable runnable) { long start = currentTimeMillis(); Exception exception; while (true) { diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java index 7e54a31..d4224c6 100644 --- a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java +++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java @@ -23,6 +23,7 @@ import com.amazonaws.services.s3.model.ListObjectsRequest; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.ObjectMetadata; import org.apache.ambari.infra.InfraClient; +import org.apache.ambari.infra.JobExecutionInfo; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -56,7 +57,7 @@ import static org.junit.Assert.assertThat; public class ExportJobsSteps extends AbstractInfraSteps { private static final Logger LOG = LoggerFactory.getLogger(ExportJobsSteps.class); - private Map<String, String> launchedJobs = new HashMap<>(); + private Map<String, JobExecutionInfo> launchedJobs = new HashMap<>(); @Given("$count documents in solr") public void addDocuments(int count) throws Exception { @@ -85,22 +86,39 @@ public class ExportJobsSteps extends AbstractInfraSteps { @When("start $jobName job") public void startJob(String jobName) throws Exception { - startJob(jobName, null); + startJob(jobName, null, 0); } - @When("start $jobName job with parameters $parameters") - public void startJob(String jobName, String parameters) throws Exception { + @When("start $jobName job with parameters $parameters after $waitSec seconds") + public void startJob(String jobName, String parameters, int waitSec) throws Exception { + Thread.sleep(waitSec * 1000); try (InfraClient httpClient = getInfraClient()) { - String jobId = httpClient.startJob(jobName, parameters); - LOG.info("Job {} started jobId: {}", jobName, jobId); - launchedJobs.put(jobName, jobId); + JobExecutionInfo jobExecutionInfo = httpClient.startJob(jobName, parameters); + LOG.info("Job {} started: {}", jobName, jobExecutionInfo); + launchedJobs.put(jobName, jobExecutionInfo); } } - @When("restart $jobName job") - public void restartJob(String jobName) throws Exception { + @When("restart $jobName job within $waitSec seconds") + public void restartJob(String jobName, int waitSec) { + doWithin(waitSec, "Restarting job " + jobName, () -> { + try (InfraClient httpClient = getInfraClient()) { + httpClient.restartJob(jobName, launchedJobs.get(jobName).getJobId()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @When("stop job $jobName after at least $count file exists in s3 with filename containing text $text within $waitSec seconds") + public void stopJob(String jobName, int count, String text, int waitSec) throws Exception { + AmazonS3Client s3Client = getS3client(); + ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME); + doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME) + && fileCountOnS3(text, s3Client, listObjectsRequest) > count); + try (InfraClient httpClient = getInfraClient()) { - httpClient.restartJob(jobName, launchedJobs.get(jobName)); + httpClient.stopJob(launchedJobs.get(jobName).getExecutionId()); } } @@ -125,9 +143,25 @@ public class ExportJobsSteps extends AbstractInfraSteps { AmazonS3Client s3Client = getS3client(); ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME); doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME) - && s3Client.listObjects(listObjectsRequest).getObjectSummaries().stream() - .filter(s3ObjectSummary -> s3ObjectSummary.getKey().contains(text)) - .count() == count); + && fileCountOnS3(text, s3Client, listObjectsRequest) == count); + } + + private long fileCountOnS3(String text, AmazonS3Client s3Client, ListObjectsRequest listObjectsRequest) { + return s3Client.listObjects(listObjectsRequest).getObjectSummaries().stream() + .filter(s3ObjectSummary -> s3ObjectSummary.getKey().contains(text)) + .count(); + } + + @Then("Less than $count files exists on s3 server with filenames containing the text $text after $waitSec seconds") + public void checkLessThanFileExistsOnS3(long count, String text, int waitSec) { + AmazonS3Client s3Client = getS3client(); + ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME); + doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME) && between( + fileCountOnS3(text, s3Client, listObjectsRequest), 1L, count - 1L)); + } + + private boolean between(long count, long from, long to) { + return from <= count && count <= to; } @Then("No file exists on s3 server with filenames containing the text $text") @@ -184,9 +218,9 @@ public class ExportJobsSteps extends AbstractInfraSteps { } } - @Then("Check $count files exists on local filesystem with filenames containing the text $text in the folder $path") - public void checkNumberOfFilesOnLocalFilesystem(long count, String text, String path) { - File destinationDirectory = new File(getLocalDataFolder(), path); + @Then("Check $count files exists on local filesystem with filenames containing the text $text in the folder $path for job $jobName") + public void checkNumberOfFilesOnLocalFilesystem(long count, String text, String path, String jobName) { + File destinationDirectory = new File(getLocalDataFolder(), path.replace("${jobId}", launchedJobs.get(jobName).getJobId())); LOG.info("Destination directory path: {}", destinationDirectory.getAbsolutePath()); doWithin(5, "Destination directory exists", destinationDirectory::exists); diff --git a/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story b/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story index d1eb4a4..122a634 100644 --- a/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story +++ b/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story @@ -8,7 +8,7 @@ Then Check filenames contains the text audit_logs on s3 server after 20 seconds Scenario: Exporting 10 documents using writeBlockSize=3 produces 4 files Given 10 documents in solr with logtime from 2010-10-09T05:00:00.000Z to 2010-10-09T20:00:00.000Z -When start archive_audit_logs job with parameters writeBlockSize=3,start=2010-10-09T00:00:00.000Z,end=2010-10-11T00:00:00.000Z +When start archive_audit_logs job with parameters writeBlockSize=3,start=2010-10-09T00:00:00.000Z,end=2010-10-11T00:00:00.000Z after 2 seconds Then Check 4 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2010-10-09 after 20 seconds And solr does not contain documents between 2010-10-09T05:00:00.000Z and 2010-10-09T20:00:00.000Z after 5 seconds @@ -16,7 +16,7 @@ And solr does not contain documents between 2010-10-09T05:00:00.000Z and 2010-10 Scenario: Running archiving job with a bigger start value than end value exports and deletes 0 documents Given 10 documents in solr with logtime from 2010-01-01T05:00:00.000Z to 2010-01-04T05:00:00.000Z -When start archive_audit_logs job with parameters writeBlockSize=3,start=2010-01-03T05:00:00.000Z,end=2010-01-02T05:00:00.000Z +When start archive_audit_logs job with parameters writeBlockSize=3,start=2010-01-03T05:00:00.000Z,end=2010-01-02T05:00:00.000Z after 2 seconds Then No file exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2010-01-0 And solr contains 10 documents between 2010-01-01T05:00:00.000Z and 2010-01-04T05:00:00.000Z @@ -25,11 +25,11 @@ Scenario: Archiving job fails when part of the data is exported. After resolving Given 200 documents in solr with logtime from 2011-10-09T05:00:00.000Z to 2011-10-09T20:00:00.000Z And a file on s3 with key solr_archive_audit_logs_-_2011-10-09T08-00-00.000Z.json.tar.gz -When start archive_audit_logs job with parameters writeBlockSize=20,start=2010-11-09T00:00:00.000Z,end=2011-10-11T00:00:00.000Z +When start archive_audit_logs job with parameters writeBlockSize=20,start=2010-11-09T00:00:00.000Z,end=2011-10-11T00:00:00.000Z after 2 seconds Then Check 3 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2011-10-09 after 20 seconds And solr does not contain documents between 2011-10-09T05:00:00.000Z and 2011-10-09T07:59:59.999Z after 5 seconds When delete file with key solr_archive_audit_logs_-_2011-10-09T08-00-00.000Z.json.tar.gz from s3 -And restart archive_audit_logs job +And restart archive_audit_logs job within 2 seconds Then Check 10 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2011-10-09 after 20 seconds And solr does not contain documents between 2011-10-09T05:00:00.000Z and 2011-10-09T20:00:00.000Z after 5 seconds @@ -37,14 +37,14 @@ And solr does not contain documents between 2011-10-09T05:00:00.000Z and 2011-10 Scenario: After Deleting job deletes documents from solr no document found in the specified interval Given 10 documents in solr with logtime from 2012-10-09T05:00:00.000Z to 2012-10-09T20:00:00.000Z -When start delete_audit_logs job with parameters start=2012-10-09T05:00:00.000Z,end=2012-10-09T20:00:00.000Z +When start delete_audit_logs job with parameters start=2012-10-09T05:00:00.000Z,end=2012-10-09T20:00:00.000Z after 2 seconds Then solr does not contain documents between 2012-10-09T05:00:00.000Z and 2012-10-09T20:00:00.000Z after 5 seconds Scenario: Archiving documents to hdfs Given 1000 documents in solr with logtime from 2014-01-04T05:00:00.000Z to 2014-01-06T20:00:00.000Z -When start archive_audit_logs job with parameters start=2014-01-04T05:00:00.000Z,end=2014-01-06T20:00:00.000Z,destination=HDFS +When start archive_audit_logs job with parameters start=2014-01-04T05:00:00.000Z,end=2014-01-06T20:00:00.000Z,destination=HDFS after 2 seconds Then Check 7 files exists on hdfs with filenames containing the text audit_logs_-_2014-01-0 in the folder /test_audit_logs after 10 seconds And solr does not contain documents between 2014-01-04T05:00:00.000Z and 2014-01-06T20:00:00.000Z after 10 seconds @@ -52,6 +52,16 @@ And solr does not contain documents between 2014-01-04T05:00:00.000Z and 2014-01 Scenario: Archiving documents to local filesystem Given 200 documents in solr with logtime from 2014-02-04T05:00:00.000Z to 2014-02-06T20:00:00.000Z -When start archive_audit_logs job with parameters start=2014-02-04T05:00:00.000Z,end=2014-02-06T20:00:00.000Z,destination=LOCAL,localDestinationDirectory=/root/archive -Then Check 2 files exists on local filesystem with filenames containing the text audit_logs_-_2014-02-0 in the folder audit_logs_8_2014-02-06T20-00-00.000Z +When start archive_audit_logs job with parameters start=2014-02-04T05:00:00.000Z,end=2014-02-06T20:00:00.000Z,destination=LOCAL,localDestinationDirectory=/root/archive after 2 seconds +Then Check 2 files exists on local filesystem with filenames containing the text audit_logs_-_2014-02-0 in the folder audit_logs_${jobId}_2014-02-06T20-00-00.000Z for job archive_audit_logs And solr does not contain documents between 2014-02-04T05:00:00.000Z and 2014-02-06T20:00:00.000Z after 10 seconds + + +Scenario: Launch Archiving job. Initiate stop and check that part of the data is archived. After restart all data must be extracted. + +Given 200 documents in solr with logtime from 2014-03-09T05:00:00.000Z to 2014-03-09T20:00:00.000Z +When start archive_audit_logs job with parameters writeBlockSize=20,start=2014-03-09T05:00:00.000Z,end=2014-03-09T20:00:00.000Z after 2 seconds +And stop job archive_audit_logs after at least 1 file exists in s3 with filename containing text solr_archive_audit_logs_-_2014-03-09 within 10 seconds +Then Less than 10 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2014-03-09 after 20 seconds +When restart archive_audit_logs job within 10 seconds +Then Check 10 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2014-03-09 after 20 seconds diff --git a/ambari-infra/ambari-infra-manager/docs/api/swagger.yaml b/ambari-infra/ambari-infra-manager/docs/api/swagger.yaml index 824629f..6fad22d 100644 --- a/ambari-infra/ambari-infra-manager/docs/api/swagger.yaml +++ b/ambari-infra/ambari-infra-manager/docs/api/swagger.yaml @@ -65,7 +65,7 @@ paths: - "jobs" summary: "Get job and step details for job execution instance." description: "" - operationId: "getExectionInfo" + operationId: "getExecutionInfo" produces: - "application/json" parameters: diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/InfraManagerSchedulingConfig.java similarity index 65% copy from ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java copy to ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/InfraManagerSchedulingConfig.java index e7b9e77..bb495a2 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/InfraManagerSchedulingConfig.java @@ -16,10 +16,17 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.ambari.infra.job; +package org.apache.ambari.infra.conf; -import java.util.Map; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; -public interface PropertyMap<T extends JobProperties<T>> { - Map<String, T> getPropertyMap(); +@Configuration +public class InfraManagerSchedulingConfig { + @Bean + public TaskScheduler taskScheduler() { + return new ThreadPoolTaskScheduler(); + } } diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/batch/InfraManagerBatchConfig.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/batch/InfraManagerBatchConfig.java index b1169b4..706ed8b 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/batch/InfraManagerBatchConfig.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/batch/InfraManagerBatchConfig.java @@ -30,6 +30,7 @@ import org.springframework.batch.core.configuration.JobRegistry; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor; import org.springframework.batch.core.explore.JobExplorer; +import org.springframework.batch.core.explore.support.JobExplorerFactoryBean; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.launch.JobOperator; import org.springframework.batch.core.launch.support.SimpleJobLauncher; @@ -56,7 +57,6 @@ import org.springframework.transaction.PlatformTransactionManager; import javax.inject.Inject; import javax.sql.DataSource; -import java.net.MalformedURLException; @Configuration @EnableBatchProcessing @@ -85,9 +85,6 @@ public class InfraManagerBatchConfig { @Inject private JobRegistry jobRegistry; - @Inject - private JobExplorer jobExplorer; - @Bean public DataSource dataSource() { DriverManagerDataSource dataSource = new DriverManagerDataSource(); @@ -99,8 +96,7 @@ public class InfraManagerBatchConfig { } @Bean - public DataSourceInitializer dataSourceInitializer(DataSource dataSource) - throws MalformedURLException { + public DataSourceInitializer dataSourceInitializer() { ResourceDatabasePopulator databasePopulator = new ResourceDatabasePopulator(); if (dropDatabaseOnStartup) { databasePopulator.addScript(dropRepositoryTables); @@ -110,7 +106,7 @@ public class InfraManagerBatchConfig { databasePopulator.setContinueOnError(true); DataSourceInitializer initializer = new DataSourceInitializer(); - initializer.setDataSource(dataSource); + initializer.setDataSource(dataSource()); initializer.setDatabasePopulator(databasePopulator); return initializer; @@ -125,14 +121,14 @@ public class InfraManagerBatchConfig { public JobRepository jobRepository() throws Exception { JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); factory.setDataSource(dataSource()); - factory.setTransactionManager(getTransactionManager()); + factory.setTransactionManager(transactionManager()); factory.setSerializer(executionContextSerializer()); factory.afterPropertiesSet(); return factory.getObject(); } @Bean - public PlatformTransactionManager getTransactionManager() { + public PlatformTransactionManager transactionManager() { return new ResourcelessTransactionManager(); } @@ -148,7 +144,7 @@ public class InfraManagerBatchConfig { @Bean public JobOperator jobOperator() throws Exception { SimpleJobOperator jobOperator = new SimpleJobOperator(); - jobOperator.setJobExplorer(jobExplorer); + jobOperator.setJobExplorer(jobExplorer()); jobOperator.setJobLauncher(jobLauncher()); jobOperator.setJobRegistry(jobRegistry); jobOperator.setJobRepository(jobRepository()); @@ -156,6 +152,15 @@ public class InfraManagerBatchConfig { } @Bean + public JobExplorer jobExplorer() throws Exception { + JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean(); + factoryBean.setSerializer(executionContextSerializer()); + factoryBean.setDataSource(dataSource()); + factoryBean.afterPropertiesSet(); + return factoryBean.getObject(); + } + + @Bean public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() { JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor(); jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry); diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/AbstractJobsConfiguration.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/AbstractJobsConfiguration.java new file mode 100644 index 0000000..a57d0e0 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/AbstractJobsConfiguration.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; + +import javax.annotation.PostConstruct; +import java.util.Map; + +public abstract class AbstractJobsConfiguration<T extends JobProperties<T>> { + private static final Logger LOG = LoggerFactory.getLogger(AbstractJobsConfiguration.class); + + private final Map<String, T> propertyMap; + private final JobScheduler scheduler; + private final JobBuilderFactory jobs; + private final JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor; + + protected AbstractJobsConfiguration(Map<String, T> propertyMap, JobScheduler scheduler, JobBuilderFactory jobs, JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor) { + this.propertyMap = propertyMap; + this.scheduler = scheduler; + this.jobs = jobs; + this.jobRegistryBeanPostProcessor = jobRegistryBeanPostProcessor; + } + + @PostConstruct + public void registerJobs() { + if (propertyMap == null) + return; + + for (String jobName : propertyMap.keySet()) + propertyMap.get(jobName).validate(jobName); + + propertyMap.keySet().stream() + .filter(key -> propertyMap.get(key).isEnabled()) + .forEach(jobName -> { + LOG.info("Registering job {}", jobName); + JobBuilder jobBuilder = jobs.get(jobName).listener(new JobsPropertyMap<>(propertyMap)); + Job job = buildJob(jobBuilder); + jobRegistryBeanPostProcessor.postProcessAfterInitialization(job, jobName); + }); + } + + @EventListener(ApplicationReadyEvent.class) + public void scheduleJobs() { + if (propertyMap == null) + return; + + propertyMap.keySet().stream() + .filter(key -> propertyMap.get(key).isEnabled()) + .forEach(jobName -> propertyMap.get(jobName).scheduling().ifPresent( + schedulingProperties -> scheduler.schedule(jobName, schedulingProperties))); + } + + protected abstract Job buildJob(JobBuilder jobBuilder); +} diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobConfigurationException.java similarity index 84% copy from ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java copy to ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobConfigurationException.java index e7b9e77..8c16daa 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobConfigurationException.java @@ -18,8 +18,8 @@ */ package org.apache.ambari.infra.job; -import java.util.Map; - -public interface PropertyMap<T extends JobProperties<T>> { - Map<String, T> getPropertyMap(); +public class JobConfigurationException extends RuntimeException { + public JobConfigurationException(String message, Exception ex) { + super(message, ex); + } } diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobContextRepository.java similarity index 79% copy from ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java copy to ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobContextRepository.java index e7b9e77..eb7f717 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobContextRepository.java @@ -18,8 +18,9 @@ */ package org.apache.ambari.infra.job; -import java.util.Map; +import org.springframework.batch.core.StepExecution; -public interface PropertyMap<T extends JobProperties<T>> { - Map<String, T> getPropertyMap(); +public interface JobContextRepository { + StepExecution getStepExecution(Long jobExecutionId, Long id); + void updateExecutionContext(StepExecution stepExecution); } diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobContextRepositoryImpl.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobContextRepositoryImpl.java new file mode 100644 index 0000000..fbb256f --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobContextRepositoryImpl.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job; + +import org.springframework.batch.admin.service.JobService; +import org.springframework.batch.admin.service.NoSuchStepExecutionException; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.launch.NoSuchJobExecutionException; +import org.springframework.batch.core.repository.JobRepository; + +import javax.inject.Inject; +import javax.inject.Named; + +@Named +public class JobContextRepositoryImpl implements JobContextRepository { + + @Inject + private JobRepository jobRepository; + @Inject + private JobService jobService; + + + @Override + public StepExecution getStepExecution(Long jobExecutionId, Long id) { + try { + return jobService.getStepExecution(jobExecutionId, id); + } catch (NoSuchStepExecutionException | NoSuchJobExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public void updateExecutionContext(StepExecution stepExecution) { + jobRepository.updateExecutionContext(stepExecution); + } +} diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java index 292e15e..53909ae 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java @@ -23,14 +23,32 @@ import org.springframework.batch.core.JobParameters; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Optional; public abstract class JobProperties<T extends JobProperties<T>> { + + private SchedulingProperties scheduling; private final Class<T> clazz; + private boolean enabled; protected JobProperties(Class<T> clazz) { this.clazz = clazz; } + public SchedulingProperties getScheduling() { + return scheduling; + } + + public Optional<SchedulingProperties> scheduling() { + if (scheduling != null && scheduling.isEnabled()) + return Optional.of(scheduling); + return Optional.empty(); + } + + public void setScheduling(SchedulingProperties scheduling) { + this.scheduling = scheduling; + } + public T deepCopy() { try { ObjectMapper objectMapper = new ObjectMapper(); @@ -44,4 +62,21 @@ public abstract class JobProperties<T extends JobProperties<T>> { public abstract void apply(JobParameters jobParameters); public abstract void validate(); + + public void validate(String jobName) { + try { + validate(); + } + catch (Exception ex) { + throw new JobConfigurationException(String.format("Configuration of job %s is invalid!", jobName), ex); + } + } + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } } diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java new file mode 100644 index 0000000..324c0b3 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job; + +import org.apache.ambari.infra.manager.Jobs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.JobParametersInvalidException; +import org.springframework.batch.core.launch.NoSuchJobException; +import org.springframework.batch.core.launch.NoSuchJobExecutionException; +import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; +import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; +import org.springframework.batch.core.repository.JobRestartException; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.support.CronTrigger; + +import javax.inject.Inject; +import javax.inject.Named; +import java.time.Duration; +import java.time.OffsetDateTime; + +import static org.apache.ambari.infra.job.archive.FileNameSuffixFormatter.SOLR_DATETIME_FORMATTER; +import static org.apache.commons.lang.StringUtils.isBlank; + +@Named +public class JobScheduler { + private static final Logger LOG = LoggerFactory.getLogger(JobScheduler.class); + + private final TaskScheduler scheduler; + private final Jobs jobs; + + @Inject + public JobScheduler(TaskScheduler scheduler, Jobs jobs) { + this.scheduler = scheduler; + this.jobs = jobs; + } + + public void schedule(String jobName, SchedulingProperties schedulingProperties) { + try { + jobs.lastRun(jobName).ifPresent(this::restartIfFailed); + } catch (NoSuchJobException | NoSuchJobExecutionException e) { + throw new RuntimeException(e); + } + + scheduler.schedule(() -> launchJob(jobName, schedulingProperties.getIntervalEndDelta()), new CronTrigger(schedulingProperties.getCron())); + LOG.info("Job {} scheduled for running. Cron: {}", jobName, schedulingProperties.getCron()); + } + + private void restartIfFailed(JobExecution jobExecution) { + if (jobExecution.getExitStatus() == ExitStatus.FAILED) { + try { + jobs.restart(jobExecution.getId()); + } catch (JobInstanceAlreadyCompleteException | NoSuchJobException | JobExecutionAlreadyRunningException | JobRestartException | JobParametersInvalidException | NoSuchJobExecutionException e) { + throw new RuntimeException(e); + } + } + } + + private void launchJob(String jobName, String endDelta) { + try { + JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); + if (!isBlank(endDelta)) + jobParametersBuilder.addString("end", SOLR_DATETIME_FORMATTER.format(OffsetDateTime.now().minus(Duration.parse(endDelta)))); + + jobs.launchJob(jobName, jobParametersBuilder.toJobParameters()); + } catch (JobParametersInvalidException | NoSuchJobException | JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException e) { + throw new RuntimeException(e); + } + } +} diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobPropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobsPropertyMap.java similarity index 85% rename from ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobPropertyMap.java rename to ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobsPropertyMap.java index b5061f8..094e797 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobPropertyMap.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobsPropertyMap.java @@ -22,11 +22,13 @@ import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionListener; -public class JobPropertyMap<T extends JobProperties<T>> implements JobExecutionListener { +import java.util.Map; - private final PropertyMap<T> propertyMap; +public class JobsPropertyMap<T extends JobProperties<T>> implements JobExecutionListener { - public JobPropertyMap(PropertyMap<T> propertyMap) { + private final Map<String, T> propertyMap; + + public JobsPropertyMap(Map<String, T> propertyMap) { this.propertyMap = propertyMap; } @@ -34,13 +36,13 @@ public class JobPropertyMap<T extends JobProperties<T>> implements JobExecutionL public void beforeJob(JobExecution jobExecution) { try { String jobName = jobExecution.getJobInstance().getJobName(); - T defaultProperties = propertyMap.getPropertyMap().get(jobName); + T defaultProperties = propertyMap.get(jobName); if (defaultProperties == null) throw new UnsupportedOperationException("Properties not found for job " + jobName); T properties = defaultProperties.deepCopy(); properties.apply(jobExecution.getJobParameters()); - properties.validate(); + properties.validate(jobName); jobExecution.getExecutionContext().put("jobProperties", properties); } catch (UnsupportedOperationException | IllegalArgumentException ex) { diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java similarity index 60% rename from ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java rename to ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java index e7b9e77..af81b4f 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/PropertyMap.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java @@ -18,8 +18,32 @@ */ package org.apache.ambari.infra.job; -import java.util.Map; +public class SchedulingProperties { + private boolean enabled = false; + private String cron; + private String intervalEndDelta; -public interface PropertyMap<T extends JobProperties<T>> { - Map<String, T> getPropertyMap(); + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public String getCron() { + return cron; + } + + public void setCron(String cron) { + this.cron = cron; + } + + public String getIntervalEndDelta() { + return intervalEndDelta; + } + + public void setIntervalEndDelta(String intervalEndDelta) { + this.intervalEndDelta = intervalEndDelta; + } } diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java index 837b9c4..89f94bd 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java @@ -19,7 +19,9 @@ package org.apache.ambari.infra.job.archive; import org.apache.ambari.infra.conf.InfraManagerDataConfig; -import org.apache.ambari.infra.job.JobPropertyMap; +import org.apache.ambari.infra.job.AbstractJobsConfiguration; +import org.apache.ambari.infra.job.JobContextRepository; +import org.apache.ambari.infra.job.JobScheduler; import org.apache.ambari.infra.job.ObjectSource; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; @@ -31,55 +33,41 @@ import org.springframework.batch.core.configuration.annotation.JobScope; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor; +import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import javax.annotation.PostConstruct; import javax.inject.Inject; import java.io.File; import static org.apache.commons.lang.StringUtils.isBlank; @Configuration -public class DocumentArchivingConfiguration { +public class DocumentArchivingConfiguration extends AbstractJobsConfiguration<DocumentArchivingProperties> { private static final Logger LOG = LoggerFactory.getLogger(DocumentArchivingConfiguration.class); private static final DocumentWiper NOT_DELETE = (firstDocument, lastDocument) -> { }; - @Inject - private DocumentArchivingPropertyMap propertyMap; - - @Inject - private StepBuilderFactory steps; + private final StepBuilderFactory steps; + private final Step exportStep; @Inject - private JobBuilderFactory jobs; - - @Inject - @Qualifier("exportStep") - private Step exportStep; - - @Inject - private JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor; - - - @PostConstruct - public void createJobs() { - if (propertyMap == null || propertyMap.getSolrDataArchiving() == null) - return; - - propertyMap.getSolrDataArchiving().values().forEach(DocumentArchivingProperties::validate); - - propertyMap.getSolrDataArchiving().keySet().forEach(jobName -> { - LOG.info("Registering data archiving job {}", jobName); - Job job = logExportJob(jobName, exportStep); - jobRegistryBeanPostProcessor.postProcessAfterInitialization(job, jobName); - }); + public DocumentArchivingConfiguration( + DocumentArchivingPropertyMap jobsPropertyMap, + JobScheduler scheduler, + StepBuilderFactory steps, + JobBuilderFactory jobs, + @Qualifier("exportStep") Step exportStep, + JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor) { + super(jobsPropertyMap.getSolrDataArchiving(), scheduler, jobs, jobRegistryBeanPostProcessor); + this.exportStep = exportStep; + this.steps = steps; } - private Job logExportJob(String jobName, Step logExportStep) { - return jobs.get(jobName).listener(new JobPropertyMap<>(propertyMap)).start(logExportStep).build(); + @Override + protected Job buildJob(JobBuilder jobBuilder) { + return jobBuilder.start(exportStep).build(); } @Bean @@ -93,11 +81,12 @@ public class DocumentArchivingConfiguration { @Bean @StepScope public DocumentExporter documentExporter(DocumentItemReader documentItemReader, - @Value("#{stepExecution.jobExecution.id}") String jobId, + @Value("#{stepExecution.jobExecution.jobId}") String jobId, @Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentArchivingProperties properties, InfraManagerDataConfig infraManagerDataConfig, @Value("#{jobParameters[end]}") String intervalEnd, - DocumentWiper documentWiper) { + DocumentWiper documentWiper, + JobContextRepository jobContextRepository) { File baseDir = new File(infraManagerDataConfig.getDataFolder(), "exporting"); CompositeFileAction fileAction = new CompositeFileAction(new TarGzCompressor()); @@ -134,7 +123,7 @@ public class DocumentArchivingConfiguration { documentItemReader, firstDocument -> new LocalDocumentItemWriter( outFile(properties.getSolr().getCollection(), destinationDirectory, fileNameSuffixFormatter.format(firstDocument)), itemWriterListener), - properties.getWriteBlockSize()); + properties.getWriteBlockSize(), jobContextRepository); } @Bean diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java index b90402a..f8fa33b 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java @@ -259,6 +259,7 @@ public class DocumentArchivingProperties extends JobProperties<DocumentArchiving "The property hdfsDestinationDirectory can not be null or empty string when destination is set to %s!", HDFS.name())); } + requireNonNull(solr, "No solr query was specified for archiving job!"); solr.validate(); } } diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingPropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingPropertyMap.java index 7d1a738..a009031 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingPropertyMap.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingPropertyMap.java @@ -18,7 +18,6 @@ */ package org.apache.ambari.infra.job.archive; -import org.apache.ambari.infra.job.PropertyMap; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; @@ -26,7 +25,7 @@ import java.util.Map; @Configuration @ConfigurationProperties(prefix = "infra-manager.jobs") -public class DocumentArchivingPropertyMap implements PropertyMap<DocumentArchivingProperties> { +public class DocumentArchivingPropertyMap { private Map<String, DocumentArchivingProperties> solrDataArchiving; public Map<String, DocumentArchivingProperties> getSolrDataArchiving() { @@ -36,9 +35,4 @@ public class DocumentArchivingPropertyMap implements PropertyMap<DocumentArchivi public void setSolrDataArchiving(Map<String, DocumentArchivingProperties> solrDataArchiving) { this.solrDataArchiving = solrDataArchiving; } - - @Override - public Map<String, DocumentArchivingProperties> getPropertyMap() { - return getSolrDataArchiving(); - } } diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java index 6106c20..d87fdea 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java @@ -18,6 +18,10 @@ */ package org.apache.ambari.infra.job.archive; +import org.apache.ambari.infra.job.JobContextRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.StepExecution; @@ -30,15 +34,19 @@ import org.springframework.batch.repeat.RepeatStatus; public class DocumentExporter implements Tasklet, StepExecutionListener { + private static final Logger LOG = LoggerFactory.getLogger(DocumentExporter.class); + private boolean complete = false; private final ItemStreamReader<Document> documentReader; private final DocumentDestination documentDestination; private final int writeBlockSize; + private final JobContextRepository jobContextRepository; - public DocumentExporter(ItemStreamReader<Document> documentReader, DocumentDestination documentDestination, int writeBlockSize) { + public DocumentExporter(ItemStreamReader<Document> documentReader, DocumentDestination documentDestination, int writeBlockSize, JobContextRepository jobContextRepository) { this.documentReader = documentReader; this.documentDestination = documentDestination; this.writeBlockSize = writeBlockSize; + this.jobContextRepository = jobContextRepository; } @Override @@ -58,7 +66,8 @@ public class DocumentExporter implements Tasklet, StepExecutionListener { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { - ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext(); + StepExecution stepExecution = chunkContext.getStepContext().getStepExecution(); + ExecutionContext executionContext = stepExecution.getExecutionContext(); documentReader.open(executionContext); DocumentItemWriter writer = null; @@ -67,10 +76,19 @@ public class DocumentExporter implements Tasklet, StepExecutionListener { Document document; while ((document = documentReader.read()) != null) { if (writer != null && writtenCount >= writeBlockSize) { + stepExecution = jobContextRepository.getStepExecution(stepExecution.getJobExecutionId(), stepExecution.getId()); + if (stepExecution.getJobExecution().getStatus() == BatchStatus.STOPPING) { + LOG.info("Received stop signal."); + writer.revert(); + writer = null; + return RepeatStatus.CONTINUABLE; + } + writer.close(); writer = null; writtenCount = 0; documentReader.update(executionContext); + jobContextRepository.updateExecutionContext(stepExecution); } if (writer == null) diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java index 85ec00b..f9016e6 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java @@ -26,7 +26,7 @@ import static org.apache.ambari.infra.job.archive.SolrDocumentIterator.SOLR_DATE import static org.apache.commons.lang.StringUtils.isBlank; public class FileNameSuffixFormatter { - private static final DateTimeFormatter SOLR_DATETIME_FORMATTER = DateTimeFormatter.ofPattern(SOLR_DATE_FORMAT_TEXT); + public static final DateTimeFormatter SOLR_DATETIME_FORMATTER = DateTimeFormatter.ofPattern(SOLR_DATE_FORMAT_TEXT); public static FileNameSuffixFormatter from(DocumentArchivingProperties properties) { return new FileNameSuffixFormatter(properties.getFileNameSuffixColumn(), properties.getFileNameSuffixDateFormat()); diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java index ce8970d..4a68c49 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java @@ -18,9 +18,8 @@ */ package org.apache.ambari.infra.job.deleting; -import org.apache.ambari.infra.job.JobPropertyMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.ambari.infra.job.AbstractJobsConfiguration; +import org.apache.ambari.infra.job.JobScheduler; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; @@ -28,50 +27,36 @@ import org.springframework.batch.core.configuration.annotation.JobScope; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor; +import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import javax.annotation.PostConstruct; import javax.inject.Inject; @Configuration -public class DocumentDeletingConfiguration { - private static final Logger LOG = LoggerFactory.getLogger(DocumentDeletingConfiguration.class); +public class DocumentDeletingConfiguration extends AbstractJobsConfiguration<DocumentDeletingProperties> { - @Inject - private DocumentDeletingPropertyMap propertyMap; - - @Inject - private StepBuilderFactory steps; - - @Inject - private JobBuilderFactory jobs; + private final StepBuilderFactory steps; + private final Step deleteStep; @Inject - private JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor; - - @Inject - @Qualifier("deleteStep") - private Step deleteStep; - - @PostConstruct - public void createJobs() { - if (propertyMap == null || propertyMap.getSolrDataDeleting() == null) - return; - - propertyMap.getSolrDataDeleting().values().forEach(DocumentDeletingProperties::validate); - - propertyMap.getSolrDataDeleting().keySet().forEach(jobName -> { - LOG.info("Registering data deleting job {}", jobName); - Job job = logDeleteJob(jobName, deleteStep); - jobRegistryBeanPostProcessor.postProcessAfterInitialization(job, jobName); - }); + public DocumentDeletingConfiguration( + DocumentDeletingPropertyMap documentDeletingPropertyMap, + JobScheduler scheduler, + StepBuilderFactory steps, + JobBuilderFactory jobs, + JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor, + @Qualifier("deleteStep") Step deleteStep) { + super(documentDeletingPropertyMap.getSolrDataDeleting(), scheduler, jobs, jobRegistryBeanPostProcessor); + this.steps = steps; + this.deleteStep = deleteStep; } - private Job logDeleteJob(String jobName, Step logExportStep) { - return jobs.get(jobName).listener(new JobPropertyMap<>(propertyMap)).start(logExportStep).build(); + @Override + protected Job buildJob(JobBuilder jobBuilder) { + return jobBuilder.start(deleteStep).build(); } @Bean diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingPropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingPropertyMap.java index fccfd59..1dc0caf 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingPropertyMap.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingPropertyMap.java @@ -18,7 +18,6 @@ */ package org.apache.ambari.infra.job.deleting; -import org.apache.ambari.infra.job.PropertyMap; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; @@ -26,7 +25,7 @@ import java.util.Map; @Configuration @ConfigurationProperties(prefix = "infra-manager.jobs") -public class DocumentDeletingPropertyMap implements PropertyMap<DocumentDeletingProperties> { +public class DocumentDeletingPropertyMap { private Map<String, DocumentDeletingProperties> solrDataDeleting; public Map<String, DocumentDeletingProperties> getSolrDataDeleting() { @@ -36,9 +35,4 @@ public class DocumentDeletingPropertyMap implements PropertyMap<DocumentDeleting public void setSolrDataDeleting(Map<String, DocumentDeletingProperties> solrDataDeleting) { this.solrDataDeleting = solrDataDeleting; } - - @Override - public Map<String, DocumentDeletingProperties> getPropertyMap() { - return getSolrDataDeleting(); - } } diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java index 862119a..f35387d 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java @@ -18,7 +18,6 @@ */ package org.apache.ambari.infra.manager; -import com.google.common.base.Splitter; import com.google.common.collect.Lists; import org.apache.ambari.infra.model.ExecutionContextResponse; import org.apache.ambari.infra.model.JobDetailsResponse; @@ -36,9 +35,13 @@ import org.springframework.batch.admin.service.JobService; import org.springframework.batch.admin.service.NoSuchStepExecutionException; import org.springframework.batch.admin.web.JobInfo; import org.springframework.batch.admin.web.StepExecutionProgress; -import org.springframework.batch.core.*; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobInstance; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersInvalidException; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.launch.JobExecutionNotRunningException; -import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException; import org.springframework.batch.core.launch.JobOperator; import org.springframework.batch.core.launch.NoSuchJobException; import org.springframework.batch.core.launch.NoSuchJobExecutionException; @@ -51,16 +54,16 @@ import javax.inject.Inject; import javax.inject.Named; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TimeZone; @Named -public class JobManager { +public class JobManager implements Jobs { private static final Logger LOG = LoggerFactory.getLogger(JobManager.class); @@ -70,6 +73,9 @@ public class JobManager { @Inject private JobOperator jobOperator; + @Inject + private JobExplorer jobExplorer; + private TimeZone timeZone = TimeZone.getDefault(); public Set<String> getAllJobNames() { @@ -80,18 +86,28 @@ public class JobManager { * Launch a new job instance (based on job name) and applies customized parameters to it. * Also add a new date parameter to make sure the job instance will be unique */ - public JobExecutionInfoResponse launchJob(String jobName, String params) - throws JobParametersInvalidException, JobInstanceAlreadyExistsException, NoSuchJobException, + @Override + public JobExecutionInfoResponse launchJob(String jobName, JobParameters jobParameters) + throws JobParametersInvalidException, NoSuchJobException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException { - JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); - if (params != null) { - LOG.info("Parsing parameters of job {} '{}'", jobName, params); - Splitter.on(',') - .trimResults() - .withKeyValueSeparator(Splitter.on('=').limit(2).trimResults()) - .split(params).entrySet().forEach(entry -> jobParametersBuilder.addString(entry.getKey(), entry.getValue())); - } - return new JobExecutionInfoResponse(jobService.launch(jobName, jobParametersBuilder.toJobParameters()), timeZone); + + Set<JobExecution> running = jobExplorer.findRunningJobExecutions(jobName); + if (!running.isEmpty()) + throw new JobExecutionAlreadyRunningException("An instance of this job is already active: "+jobName); + + return new JobExecutionInfoResponse(jobService.launch(jobName, jobParameters), timeZone); + } + + @Override + public void restart(Long jobExecutionId) + throws JobInstanceAlreadyCompleteException, NoSuchJobException, JobExecutionAlreadyRunningException, + JobParametersInvalidException, JobRestartException, NoSuchJobExecutionException { + jobService.restart(jobExecutionId); + } + + @Override + public Optional<JobExecution> lastRun(String jobName) throws NoSuchJobException { + return jobService.listJobExecutionsForJob(jobName, 0, 1).stream().findFirst(); } /** @@ -111,19 +127,14 @@ public class JobManager { /** * Gather job execution details by job execution id. */ - public JobExecutionDetailsResponse getExectionInfo(Long jobExecutionId) throws NoSuchJobExecutionException { + public JobExecutionDetailsResponse getExecutionInfo(Long jobExecutionId) throws NoSuchJobExecutionException { JobExecution jobExecution = jobService.getJobExecution(jobExecutionId); - List<StepExecutionInfoResponse> stepExecutionInfos = new ArrayList<StepExecutionInfoResponse>(); + List<StepExecutionInfoResponse> stepExecutionInfoList = new ArrayList<>(); for (StepExecution stepExecution : jobExecution.getStepExecutions()) { - stepExecutionInfos.add(new StepExecutionInfoResponse(stepExecution, timeZone)); + stepExecutionInfoList.add(new StepExecutionInfoResponse(stepExecution, timeZone)); } - Collections.sort(stepExecutionInfos, new Comparator<StepExecutionInfoResponse>() { - @Override - public int compare(StepExecutionInfoResponse o1, StepExecutionInfoResponse o2) { - return o1.getId().compareTo(o2.getId()); - } - }); - return new JobExecutionDetailsResponse(new JobExecutionInfoResponse(jobExecution, timeZone), stepExecutionInfos); + stepExecutionInfoList.sort(Comparator.comparing(StepExecutionInfoResponse::getId)); + return new JobExecutionDetailsResponse(new JobExecutionInfoResponse(jobExecution, timeZone), stepExecutionInfoList); } /** @@ -139,6 +150,7 @@ public class JobManager { } else { throw new UnsupportedOperationException("Unsupported operaration"); } + LOG.info("Job {} was marked {}", jobExecution.getJobInstance().getJobName(), operation.name()); return new JobExecutionInfoResponse(jobExecution, timeZone); } diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/Jobs.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/Jobs.java new file mode 100644 index 0000000..b2ca605 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/Jobs.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.manager; + +import org.apache.ambari.infra.model.JobExecutionInfoResponse; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersInvalidException; +import org.springframework.batch.core.launch.NoSuchJobException; +import org.springframework.batch.core.launch.NoSuchJobExecutionException; +import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; +import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; +import org.springframework.batch.core.repository.JobRestartException; + +import java.util.Optional; + +public interface Jobs { + JobExecutionInfoResponse launchJob(String jobName, JobParameters params) + throws JobParametersInvalidException, NoSuchJobException, + JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException; + void restart(Long jobExecutionId) + throws JobInstanceAlreadyCompleteException, NoSuchJobException, JobExecutionAlreadyRunningException, + JobParametersInvalidException, JobRestartException, NoSuchJobExecutionException; + + Optional<JobExecution> lastRun(String jobName) throws NoSuchJobException, NoSuchJobExecutionException; +} diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/rest/JobResource.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/rest/JobResource.java index 0e20b54..502057e 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/rest/JobResource.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/rest/JobResource.java @@ -18,6 +18,7 @@ */ package org.apache.ambari.infra.rest; +import com.google.common.base.Splitter; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.apache.ambari.infra.manager.JobManager; @@ -35,11 +36,13 @@ import org.apache.ambari.infra.model.StepExecutionContextResponse; import org.apache.ambari.infra.model.StepExecutionInfoResponse; import org.apache.ambari.infra.model.StepExecutionProgressResponse; import org.apache.ambari.infra.model.StepExecutionRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.batch.admin.service.NoSuchStepExecutionException; import org.springframework.batch.admin.web.JobInfo; +import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.launch.JobExecutionNotRunningException; -import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException; import org.springframework.batch.core.launch.NoSuchJobException; import org.springframework.batch.core.launch.NoSuchJobExecutionException; import org.springframework.batch.core.launch.NoSuchJobInstanceException; @@ -67,6 +70,7 @@ import java.util.Set; @Named @Scope("request") public class JobResource { + private static final Logger LOG = LoggerFactory.getLogger(JobResource.class); @Inject private JobManager jobManager; @@ -83,9 +87,21 @@ public class JobResource { @Path("{jobName}") @ApiOperation("Start a new job instance by job name.") public JobExecutionInfoResponse startJob(@BeanParam @Valid JobInstanceStartRequest request) - throws JobParametersInvalidException, JobInstanceAlreadyExistsException, NoSuchJobException, JobExecutionAlreadyRunningException, + throws JobParametersInvalidException, NoSuchJobException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException { - return jobManager.launchJob(request.getJobName(), request.getParams()); + + String jobName = request.getJobName(); + String params = request.getParams(); + JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); + if (params != null) { + LOG.info("Parsing parameters of job {} '{}'", jobName, params); + Splitter.on(',') + .trimResults() + .withKeyValueSeparator(Splitter.on('=').limit(2).trimResults()) + .split(params).forEach(jobParametersBuilder::addString); + } + + return jobManager.launchJob(jobName, jobParametersBuilder.toJobParameters()); } @GET @@ -117,7 +133,7 @@ public class JobResource { @Path("/executions/{jobExecutionId}") @ApiOperation("Get job and step details for job execution instance.") public JobExecutionDetailsResponse getExectionInfo(@PathParam("jobExecutionId") @Valid Long jobExecutionId) throws NoSuchJobExecutionException { - return jobManager.getExectionInfo(jobExecutionId); + return jobManager.getExecutionInfo(jobExecutionId); } @GET @@ -150,8 +166,8 @@ public class JobResource { @Produces({"application/json"}) @Path("/{jobName}/{jobInstanceId}/executions") @ApiOperation("Get execution for job instance.") - public List<JobExecutionInfoResponse> getExecutionsForInstance(@BeanParam @Valid JobExecutionRequest request) throws JobInstanceAlreadyCompleteException, - NoSuchJobExecutionException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException, NoSuchJobException, NoSuchJobInstanceException { + public List<JobExecutionInfoResponse> getExecutionsForInstance(@BeanParam @Valid JobExecutionRequest request) throws + NoSuchJobException, NoSuchJobInstanceException { return jobManager.getExecutionsForJobInstance(request.getJobName(), request.getJobInstanceId()); } diff --git a/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties b/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties index aea2b88..a0712ba 100644 --- a/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties +++ b/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties @@ -14,13 +14,14 @@ # limitations under the License. infra-manager.batch.db.file=job-repository.db -infra-manager.batch.db.init=true +infra-manager.batch.db.init=false infra-manager.batch.db.username=admin infra-manager.batch.db.password=admin management.security.enabled=false management.health.solr.enabled=false infra-manager.server.data.folder=/tmp/ambariInfraManager +infra-manager.jobs.solr_data_archiving.archive_service_logs.enabled=true infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.zoo_keeper_connection_string=zookeeper:2181 infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.collection=hadoop_logs infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.query_text=logtime:[${start} TO ${end}] @@ -33,6 +34,10 @@ infra-manager.jobs.solr_data_archiving.archive_service_logs.destination=LOCAL infra-manager.jobs.solr_data_archiving.archive_service_logs.local_destination_directory=/tmp/ambariInfraManager infra-manager.jobs.solr_data_archiving.archive_service_logs.file_name_suffix_column=logtime infra-manager.jobs.solr_data_archiving.archive_service_logs.file_name_suffix_date_format=yyyy-MM-dd'T'HH-mm-ss.SSSX +infra-manager.jobs.solr_data_archiving.archive_service_logs.scheduling.enabled=true +infra-manager.jobs.solr_data_archiving.archive_service_logs.scheduling.cron=0 * * * * ? +infra-manager.jobs.solr_data_archiving.archive_service_logs.scheduling.intervalEndDelta=PT24H +infra-manager.jobs.solr_data_archiving.archive_audit_logs.enabled=true infra-manager.jobs.solr_data_archiving.archive_audit_logs.solr.zoo_keeper_connection_string=zookeeper:2181 infra-manager.jobs.solr_data_archiving.archive_audit_logs.solr.collection=audit_logs infra-manager.jobs.solr_data_archiving.archive_audit_logs.solr.query_text=logtime:[${start} TO ${end}] @@ -63,6 +68,7 @@ infra-manager.jobs.solr_data_archiving.archive_audit_logs.s3_endpoint=http://fak #infra-manager.jobs.solr_data_archiving.export_ranger_audit_logs.query.filter_query_text=(logtime:"${logtime}" AND id:{"${id}" TO *]) OR logtime:{"${logtime}" TO "${end}"] #infra-manager.jobs.solr_data_archiving.export_ranger_audit_logs.query.sort_column[0]=logtime #infra-manager.jobs.solr_data_archiving.export_ranger_audit_logs.query.sort_column[1]=id +infra-manager.jobs.solr_data_deleting.delete_audit_logs.enabled=true infra-manager.jobs.solr_data_deleting.delete_audit_logs.zoo_keeper_connection_string=zookeeper:2181 infra-manager.jobs.solr_data_deleting.delete_audit_logs.collection=audit_logs infra-manager.jobs.solr_data_deleting.delete_audit_logs.filter_field=logtime diff --git a/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml b/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml index 9737554..d3db3d7 100644 --- a/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml +++ b/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml @@ -37,5 +37,8 @@ <AppenderRef ref="File" /> <AppenderRef ref="Console" /> </Root> + <!--<Logger name="org.springframework.jdbc.core.JdbcTemplate" level="debug">--> + <!--<AppenderRef ref="Console"/>--> + <!--</Logger>--> </Loggers> </Configuration> diff --git a/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobSchedulerTest.java b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobSchedulerTest.java new file mode 100644 index 0000000..ba1150f --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobSchedulerTest.java @@ -0,0 +1,114 @@ +package org.apache.ambari.infra.job; + +import org.apache.ambari.infra.manager.Jobs; +import org.easymock.EasyMockRunner; +import org.easymock.EasyMockSupport; +import org.easymock.Mock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParameters; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.support.CronTrigger; + +import javax.batch.operations.NoSuchJobException; +import java.util.Optional; +import java.util.concurrent.ScheduledFuture; + +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.isA; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +@RunWith(EasyMockRunner.class) +public class JobSchedulerTest extends EasyMockSupport { + + @Mock + private TaskScheduler taskScheduler; + @Mock + private Jobs jobs; + @Mock + private ScheduledFuture scheduledFuture; + private JobScheduler jobScheduler; + + @Before + public void setUp() throws Exception { + jobScheduler = new JobScheduler(taskScheduler, jobs); + } + + @After + public void tearDown() throws Exception { + verifyAll(); + } + + @Test(expected = NoSuchJobException.class) + public void testScheduleWhenJobNotExistsThrowsException() throws Exception { + String jobName = "notFoundJob"; + expect(jobs.lastRun(jobName)).andThrow(new NoSuchJobException()); + replayAll(); + + jobScheduler.schedule(jobName, null); + } + + @Test + public void testScheduleWhenNoPreviousExecutionExistsJobIsScheduled() throws Exception { + String jobName = "job0"; + SchedulingProperties schedulingProperties = new SchedulingProperties(); + schedulingProperties.setCron("* * * * * ?"); + expect(jobs.lastRun(jobName)).andReturn(Optional.empty()); + expect(taskScheduler.schedule(isA(Runnable.class), eq(new CronTrigger(schedulingProperties.getCron())))).andReturn(scheduledFuture); + replayAll(); + + jobScheduler.schedule(jobName, schedulingProperties); + } + + @Test + public void testScheduleWhenPreviousExecutionWasSuccessfulJobIsScheduled() throws Exception { + String jobName = "job0"; + SchedulingProperties schedulingProperties = new SchedulingProperties(); + schedulingProperties.setCron("* * * * * ?"); + JobExecution jobExecution = new JobExecution(1L, new JobParameters()); + jobExecution.setExitStatus(ExitStatus.COMPLETED); + expect(jobs.lastRun(jobName)).andReturn(Optional.of(jobExecution)); + expect(taskScheduler.schedule(isA(Runnable.class), eq(new CronTrigger(schedulingProperties.getCron())))).andReturn(scheduledFuture); + replayAll(); + + jobScheduler.schedule(jobName, schedulingProperties); + } + + @Test + public void testScheduleWhenPreviousExecutionFailedJobIsRestartedAndScheduled() throws Exception { + String jobName = "job0"; + SchedulingProperties schedulingProperties = new SchedulingProperties(); + schedulingProperties.setCron("* * * * * ?"); + JobExecution jobExecution = new JobExecution(1L, new JobParameters()); + jobExecution.setExitStatus(ExitStatus.FAILED); + expect(jobs.lastRun(jobName)).andReturn(Optional.of(jobExecution)); + jobs.restart(1L); expectLastCall(); + expect(taskScheduler.schedule(isA(Runnable.class), eq(new CronTrigger(schedulingProperties.getCron())))).andReturn(scheduledFuture); + replayAll(); + + jobScheduler.schedule(jobName, schedulingProperties); + } +} \ No newline at end of file diff --git a/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java index 88fbff0..b31110c 100644 --- a/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java +++ b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java @@ -19,6 +19,7 @@ package org.apache.ambari.infra.job.archive; +import org.apache.ambari.infra.job.JobContextRepository; import org.easymock.EasyMockRunner; import org.easymock.EasyMockSupport; import org.easymock.Mock; @@ -26,12 +27,14 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.scope.context.StepContext; import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.ItemStreamReader; +import org.springframework.batch.repeat.RepeatStatus; import java.io.IOException; import java.io.UncheckedIOException; @@ -39,10 +42,20 @@ import java.util.HashMap; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; @RunWith(EasyMockRunner.class) public class DocumentExporterTest extends EasyMockSupport { + private static final long JOB_EXECUTION_ID = 1L; + private static final long STEP_EXECUTION_ID = 1L; + private static final Document DOCUMENT_2 = new Document(new HashMap<String, String>() {{ + put("id", "2"); + }}); + private static final Document DOCUMENT_3 = new Document(new HashMap<String, String>() {{ + put("id", "3"); + }}); private DocumentExporter documentExporter; @Mock private ItemStreamReader<Document> reader; @@ -52,17 +65,30 @@ public class DocumentExporterTest extends EasyMockSupport { private DocumentItemWriter documentItemWriter; @Mock private DocumentItemWriter documentItemWriter2; + @Mock + private DocumentItemWriter documentItemWriter3; + @Mock + private JobContextRepository jobContextRepository; - private ExecutionContext executionContext; +// private ExecutionContext executionContext; private ChunkContext chunkContext; private static final Document DOCUMENT = new Document(new HashMap<String, String>() {{ put("id", "1"); }}); @Before public void setUp() throws Exception { - StepExecution stepExecution = new StepExecution("exportDoc", new JobExecution(1L)); - chunkContext = new ChunkContext(new StepContext(stepExecution)); - executionContext = stepExecution.getExecutionContext(); - documentExporter = new DocumentExporter(reader, documentDestination, 2); + chunkContext = chunkContext(BatchStatus.STARTED); + documentExporter = documentExporter(2); + } + + private DocumentExporter documentExporter(int writeBlockSize) { + return new DocumentExporter(reader, documentDestination, writeBlockSize, jobContextRepository); + } + + private ChunkContext chunkContext(BatchStatus batchStatus) { + StepExecution stepExecution = new StepExecution("exportDoc", new JobExecution(JOB_EXECUTION_ID)); + stepExecution.setId(STEP_EXECUTION_ID); + stepExecution.getJobExecution().setStatus(batchStatus); + return new ChunkContext(new StepContext(stepExecution)); } @After @@ -72,7 +98,7 @@ public class DocumentExporterTest extends EasyMockSupport { @Test public void testNothingToRead() throws Exception { - reader.open(executionContext); expectLastCall(); + reader.open(executionContext(chunkContext)); expectLastCall(); expect(reader.read()).andReturn(null); reader.close(); expectLastCall(); replayAll(); @@ -80,9 +106,13 @@ public class DocumentExporterTest extends EasyMockSupport { documentExporter.execute(null, chunkContext); } + private ExecutionContext executionContext(ChunkContext chunkContext) { + return chunkContext.getStepContext().getStepExecution().getExecutionContext(); + } + @Test public void testWriteLessDocumentsThanWriteBlockSize() throws Exception { - reader.open(executionContext); expectLastCall(); + reader.open(executionContext(chunkContext)); expectLastCall(); expect(reader.read()).andReturn(DOCUMENT); expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter); documentItemWriter.write(DOCUMENT); expectLastCall(); @@ -91,36 +121,35 @@ public class DocumentExporterTest extends EasyMockSupport { documentItemWriter.close(); expectLastCall(); replayAll(); - documentExporter.execute(null, chunkContext); + assertThat(documentExporter.execute(null, chunkContext), is(RepeatStatus.FINISHED)); } @Test public void testWriteMoreDocumentsThanWriteBlockSize() throws Exception { - Document document2 = new Document(new HashMap<String, String>() {{ put("id", "2"); }}); - Document document3 = new Document(new HashMap<String, String>() {{ put("id", "3"); }}); - - reader.open(executionContext); expectLastCall(); + reader.open(executionContext(chunkContext)); expectLastCall(); expect(reader.read()).andReturn(DOCUMENT); expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter); documentItemWriter.write(DOCUMENT); expectLastCall(); - expect(reader.read()).andReturn(document2); - documentItemWriter.write(document2); expectLastCall(); - expect(reader.read()).andReturn(document3); + expect(reader.read()).andReturn(DOCUMENT_2); + documentItemWriter.write(DOCUMENT_2); expectLastCall(); + expect(reader.read()).andReturn(DOCUMENT_3); documentItemWriter.close(); expectLastCall(); - expect(documentDestination.open(document3)).andReturn(documentItemWriter2); - documentItemWriter2.write(document3); expectLastCall(); + jobContextRepository.updateExecutionContext(chunkContext.getStepContext().getStepExecution()); + expect(jobContextRepository.getStepExecution(JOB_EXECUTION_ID, STEP_EXECUTION_ID)).andReturn(chunkContext.getStepContext().getStepExecution()); + expect(documentDestination.open(DOCUMENT_3)).andReturn(documentItemWriter2); + documentItemWriter2.write(DOCUMENT_3); expectLastCall(); expect(reader.read()).andReturn(null); - reader.update(executionContext); + reader.update(executionContext(chunkContext)); reader.close(); expectLastCall(); documentItemWriter2.close(); expectLastCall(); replayAll(); - documentExporter.execute(null, chunkContext); + assertThat(documentExporter.execute(null, chunkContext), is(RepeatStatus.FINISHED)); } @Test(expected = IOException.class) public void testReadError() throws Exception { - reader.open(executionContext); expectLastCall(); + reader.open(executionContext(chunkContext)); expectLastCall(); expect(reader.read()).andReturn(DOCUMENT); expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter); documentItemWriter.write(DOCUMENT); expectLastCall(); @@ -134,7 +163,7 @@ public class DocumentExporterTest extends EasyMockSupport { @Test(expected = UncheckedIOException.class) public void testWriteError() throws Exception { - reader.open(executionContext); expectLastCall(); + reader.open(executionContext(chunkContext)); expectLastCall(); expect(reader.read()).andReturn(DOCUMENT); expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter); documentItemWriter.write(DOCUMENT); expectLastCall().andThrow(new UncheckedIOException(new IOException("TEST"))); @@ -144,4 +173,43 @@ public class DocumentExporterTest extends EasyMockSupport { documentExporter.execute(null, chunkContext); } + + @Test + public void testStopAndRestartExportsAllDocuments() throws Exception { + ChunkContext stoppingChunkContext = chunkContext(BatchStatus.STOPPING); + DocumentExporter documentExporter = documentExporter(1); + + reader.open(executionContext(chunkContext)); expectLastCall(); + expect(reader.read()).andReturn(DOCUMENT); + + expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter); + documentItemWriter.write(DOCUMENT); expectLastCall(); + expect(reader.read()).andReturn(DOCUMENT_2); + expect(jobContextRepository.getStepExecution(JOB_EXECUTION_ID, STEP_EXECUTION_ID)).andReturn(chunkContext.getStepContext().getStepExecution()); + documentItemWriter.close(); expectLastCall(); + reader.update(executionContext(this.chunkContext)); + jobContextRepository.updateExecutionContext(this.chunkContext.getStepContext().getStepExecution()); + + expect(documentDestination.open(DOCUMENT_2)).andReturn(documentItemWriter2); + documentItemWriter2.write(DOCUMENT_2); expectLastCall(); + expect(reader.read()).andReturn(DOCUMENT_3); + expect(jobContextRepository.getStepExecution(JOB_EXECUTION_ID, STEP_EXECUTION_ID)).andReturn(stoppingChunkContext.getStepContext().getStepExecution()); + documentItemWriter2.revert(); expectLastCall(); + reader.close(); expectLastCall(); + + reader.open(executionContext(chunkContext)); + expect(reader.read()).andReturn(DOCUMENT_3); + expect(documentDestination.open(DOCUMENT_3)).andReturn(documentItemWriter3); + documentItemWriter3.write(DOCUMENT_3); expectLastCall(); + documentItemWriter3.close(); expectLastCall(); + + expect(reader.read()).andReturn(null); + reader.close(); expectLastCall(); + replayAll(); + + RepeatStatus repeatStatus = documentExporter.execute(null, this.chunkContext); + assertThat(repeatStatus, is(RepeatStatus.CONTINUABLE)); + repeatStatus = documentExporter.execute(null, this.chunkContext); + assertThat(repeatStatus, is(RepeatStatus.FINISHED)); + } } \ No newline at end of file diff --git a/ambari-infra/ambari-infra-manager/src/test/resoruces/vagrant-infra-manager.properties.sample b/ambari-infra/ambari-infra-manager/src/test/resoruces/vagrant-infra-manager.properties.sample index f008a53..d722f0e 100644 --- a/ambari-infra/ambari-infra-manager/src/test/resoruces/vagrant-infra-manager.properties.sample +++ b/ambari-infra/ambari-infra-manager/src/test/resoruces/vagrant-infra-manager.properties.sample @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + infra-manager.batch.db.file=job-repository.db infra-manager.batch.db.init=true infra-manager.batch.db.username=admin @@ -20,18 +21,19 @@ management.security.enabled=false management.health.solr.enabled=false infra-manager.server.data.folder=/tmp/ambariInfraManager -infra-manager.jobs.solr_data_export.archive_service_logs.solr.zoo_keeper_connection_string=c6401.ambari.apache.org:2181/infra-solr -infra-manager.jobs.solr_data_export.archive_service_logs.solr.collection=hadoop_logs -infra-manager.jobs.solr_data_export.archive_service_logs.solr.query_text=logtime:[${start} TO ${end}] -infra-manager.jobs.solr_data_export.archive_service_logs.solr.filter_query_text=(logtime:${logtime} AND id:{${id} TO *]) OR logtime:{${logtime} TO ${end}] -infra-manager.jobs.solr_data_export.archive_service_logs.solr.sort_column[0]=logtime -infra-manager.jobs.solr_data_export.archive_service_logs.solr.sort_column[1]=id -infra-manager.jobs.solr_data_export.archive_service_logs.solr.delete_query_text=logtime:[${start.logtime} TO ${end.logtime}} OR (logtime:${end.logtime} AND id:[* TO ${end.id}]) -infra-manager.jobs.solr_data_export.archive_service_logs.read_block_size=2000 -infra-manager.jobs.solr_data_export.archive_service_logs.write_block_size=1000 -infra-manager.jobs.solr_data_export.archive_service_logs.destination=HDFS -infra-manager.jobs.solr_data_export.archive_service_logs.file_name_suffix_column=logtime -infra-manager.jobs.solr_data_export.archive_service_logs.file_name_suffix_date_format=yyyy-MM-dd'T'HH-mm-ss.SSSX -infra-manager.jobs.solr_data_export.archive_service_logs.hdfs_endpoint=hdfs://c6401.ambari.apache.org:8020 -infra-manager.jobs.solr_data_export.archive_service_logs.hdfs_destination_directory=/archived_service_logs +infra-manager.jobs.solr_data_archiving.archive_service_logs.enabled=true +infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.zoo_keeper_connection_string=c6401.ambari.apache.org:2181/infra-solr +infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.collection=hadoop_logs +infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.query_text=logtime:[${start} TO ${end}] +infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.filter_query_text=(logtime:${logtime} AND id:{${id} TO *]) OR logtime:{${logtime} TO ${end}] +infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.sort_column[0]=logtime +infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.sort_column[1]=id +infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.delete_query_text=logtime:[${start.logtime} TO ${end.logtime}} OR (logtime:${end.logtime} AND id:[* TO ${end.id}]) +infra-manager.jobs.solr_data_archiving.archive_service_logs.read_block_size=2000 +infra-manager.jobs.solr_data_archiving.archive_service_logs.write_block_size=1000 +infra-manager.jobs.solr_data_archiving.archive_service_logs.destination=HDFS +infra-manager.jobs.solr_data_archiving.archive_service_logs.file_name_suffix_column=logtime +infra-manager.jobs.solr_data_archiving.archive_service_logs.file_name_suffix_date_format=yyyy-MM-dd'T'HH-mm-ss.SSSX +infra-manager.jobs.solr_data_archiving.archive_service_logs.hdfs_endpoint=hdfs://c6401.ambari.apache.org:8020 +infra-manager.jobs.solr_data_archiving.archive_service_logs.hdfs_destination_directory=/archived_service_logs # Note: set hdfs user using the HADOOP_USER_NAME environmental variable. Value: hdfs \ No newline at end of file -- To stop receiving notification emails like this one, please contact [email protected].
