AMBARI-20989. Ambari infra manager: add batch support (oleewere)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4e6babdf Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4e6babdf Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4e6babdf Branch: refs/heads/branch-feature-AMBARI-20859 Commit: 4e6babdf1569f83c188b817c0aff5c4535b4d015 Parents: fffd07c Author: oleewere <[email protected]> Authored: Thu May 11 15:03:31 2017 +0200 Committer: oleewere <[email protected]> Committed: Fri May 12 11:57:49 2017 +0200 ---------------------------------------------------------------------- ambari-infra/.gitignore | 6 + ambari-infra/ambari-infra-manager/pom.xml | 17 +- .../conf/batch/InfraManagerBatchConfig.java | 227 +++++++++++++++++++ .../infra/job/dummy/DummyItemProcessor.java | 36 +++ .../ambari/infra/job/dummy/DummyItemWriter.java | 36 +++ .../ambari/infra/job/dummy/DummyObject.java | 40 ++++ .../apache/ambari/infra/rest/JobResource.java | 43 +++- .../src/main/resources/dummy/dummy.txt | 3 + .../src/main/resources/infra-manager.properties | 6 +- 9 files changed, 409 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/4e6babdf/ambari-infra/.gitignore ---------------------------------------------------------------------- diff --git a/ambari-infra/.gitignore b/ambari-infra/.gitignore new file mode 100644 index 0000000..a7d91c4 --- /dev/null +++ b/ambari-infra/.gitignore @@ -0,0 +1,6 @@ +target +.settings +.classpath +.project +/bin/ +job-repository.db \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/4e6babdf/ambari-infra/ambari-infra-manager/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/pom.xml b/ambari-infra/ambari-infra-manager/pom.xml index c3a09ab..bd5bf03 100644 --- a/ambari-infra/ambari-infra-manager/pom.xml +++ b/ambari-infra/ambari-infra-manager/pom.xml @@ -39,6 +39,7 @@ <jjwt.version>0.6.0</jjwt.version> <spring-batch.version>3.0.7.RELEASE</spring-batch.version> <jdk.version>1.7</jdk.version> + <sqlite.version>3.8.11.2</sqlite.version> </properties> <build> @@ -360,6 +361,11 @@ <version>${spring-batch.version}</version> </dependency> <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-jdbc</artifactId> + <version>${spring.version}</version> + </dependency> + <dependency> <groupId>io.jsonwebtoken</groupId> <artifactId>jjwt</artifactId> <version>${jjwt.version}</version> @@ -369,7 +375,16 @@ <artifactId>jersey-bean-validation</artifactId> <version>2.25</version> </dependency> + <dependency> + <groupId>org.xerial</groupId> + <artifactId>sqlite-jdbc</artifactId> + <version>${sqlite.version}</version> + </dependency> + <dependency> + <groupId>org.springframework.batch</groupId> + <artifactId>spring-batch-admin-manager</artifactId> + <version>1.3.1.RELEASE</version> + </dependency> </dependencies> - </project> http://git-wip-us.apache.org/repos/asf/ambari/blob/4e6babdf/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/batch/InfraManagerBatchConfig.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/batch/InfraManagerBatchConfig.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/batch/InfraManagerBatchConfig.java new file mode 100644 index 0000000..a587ec2 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/batch/InfraManagerBatchConfig.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.conf.batch; + +import org.apache.ambari.infra.job.dummy.DummyItemProcessor; +import org.apache.ambari.infra.job.dummy.DummyItemWriter; +import org.apache.ambari.infra.job.dummy.DummyObject; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.JobRegistry; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor; +import org.springframework.batch.core.explore.JobExplorer; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.launch.JobOperator; +import org.springframework.batch.core.launch.support.SimpleJobLauncher; +import org.springframework.batch.core.launch.support.SimpleJobOperator; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.file.FlatFileItemReader; +import org.springframework.batch.item.file.LineMapper; +import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; +import org.springframework.batch.item.file.mapping.DefaultLineMapper; +import org.springframework.batch.item.file.mapping.FieldSetMapper; +import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; +import org.springframework.batch.item.file.transform.LineTokenizer; +import org.springframework.batch.support.transaction.ResourcelessTransactionManager; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.Resource; +import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.jdbc.datasource.DriverManagerDataSource; +import org.springframework.jdbc.datasource.init.DataSourceInitializer; +import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.transaction.PlatformTransactionManager; + +import javax.inject.Inject; +import javax.sql.DataSource; +import java.net.MalformedURLException; + +@Configuration +@EnableBatchProcessing +@EnableScheduling +public class InfraManagerBatchConfig { + + @Value("classpath:org/springframework/batch/core/schema-drop-sqlite.sql") + private Resource dropRepositoryTables; + + @Value("classpath:org/springframework/batch/core/schema-sqlite.sql") + private Resource dataRepositorySchema; + + @Value("${infra-manager.batch.db.init:false}") + private boolean dropDatabaseOnStartup; + + @Value("${infra-manager.batch.db.file:/etc/ambari-inra-manager/conf/repository.db}") + private String sqliteDbFileLocation; + + @Value("${infra-manager.batch.db.username}") + private String databaseUsername; + + @Value("${infra-manager.batch.db.password}") + private String databasePassword; + + @Inject + private StepBuilderFactory steps; + + @Inject + private JobBuilderFactory jobs; + + @Inject + private JobRegistry jobRegistry; + + @Inject + private JobExplorer jobExplorer; + + @Bean + public DataSource dataSource() { + DriverManagerDataSource dataSource = new DriverManagerDataSource(); + dataSource.setDriverClassName("org.sqlite.JDBC"); + dataSource.setUrl("jdbc:sqlite:" + sqliteDbFileLocation); + dataSource.setUsername(databaseUsername); + dataSource.setUsername(databasePassword); + return dataSource; + } + + @Bean + public DataSourceInitializer dataSourceInitializer(DataSource dataSource) + throws MalformedURLException { + ResourceDatabasePopulator databasePopulator = new ResourceDatabasePopulator(); + if (dropDatabaseOnStartup) { + databasePopulator.addScript(dropRepositoryTables); + databasePopulator.setIgnoreFailedDrops(true); + } + databasePopulator.addScript(dataRepositorySchema); + databasePopulator.setContinueOnError(true); + + DataSourceInitializer initializer = new DataSourceInitializer(); + initializer.setDataSource(dataSource); + initializer.setDatabasePopulator(databasePopulator); + + return initializer; + } + + @Bean + public JobRepository jobRepository() throws Exception { + JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); + factory.setDataSource(dataSource()); + factory.setTransactionManager(getTransactionManager()); + factory.afterPropertiesSet(); + return factory.getObject(); + } + + @Bean + public PlatformTransactionManager getTransactionManager() { + return new ResourcelessTransactionManager(); + } + + @Bean(name = "jobLauncher") + public JobLauncher jobLauncher() throws Exception { + SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); + jobLauncher.setJobRepository(jobRepository()); + jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); + jobLauncher.afterPropertiesSet(); + return jobLauncher; + } + + @Bean + public JobOperator jobOperator() throws Exception { + SimpleJobOperator jobOperator = new SimpleJobOperator(); + jobOperator.setJobExplorer(jobExplorer); + jobOperator.setJobLauncher(jobLauncher()); + jobOperator.setJobRegistry(jobRegistry); + jobOperator.setJobRepository(jobRepository()); + return jobOperator; + } + + @Bean + public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() { + JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor(); + jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry); + return jobRegistryBeanPostProcessor; + } + + @Bean(name = "dummyStep") + protected Step dummyStep(ItemReader<DummyObject> reader, + ItemProcessor<DummyObject, String> processor, + ItemWriter<String> writer) { + return steps.get("dummyStep").<DummyObject, String> chunk(2) + .reader(reader).processor(processor).writer(writer).build(); + } + + @Bean(name = "dummyJob") + public Job job(@Qualifier("dummyStep") Step dummyStep) { + return jobs.get("dummyJob").start(dummyStep).build(); + } + + @Bean + public ItemReader<DummyObject> dummyItemReader() { + FlatFileItemReader<DummyObject> csvFileReader = new FlatFileItemReader<>(); + csvFileReader.setResource(new ClassPathResource("dummy/dummy.txt")); + csvFileReader.setLinesToSkip(1); + LineMapper<DummyObject> lineMapper = dummyLineMapper(); + csvFileReader.setLineMapper(lineMapper); + return csvFileReader; + } + + @Bean + public ItemProcessor<DummyObject, String> dummyItemProcessor() { + return new DummyItemProcessor(); + } + + @Bean + public ItemWriter<String> dummyItemWriter() { + return new DummyItemWriter(); + } + + private LineMapper<DummyObject> dummyLineMapper() { + DefaultLineMapper<DummyObject> lineMapper = new DefaultLineMapper<>(); + + LineTokenizer dummyTokenizer = dummyTokenizer(); + lineMapper.setLineTokenizer(dummyTokenizer); + + FieldSetMapper<DummyObject> dummyFieldSetMapper = dummyFieldSetMapper(); + lineMapper.setFieldSetMapper(dummyFieldSetMapper); + + return lineMapper; + } + + private FieldSetMapper<DummyObject> dummyFieldSetMapper() { + BeanWrapperFieldSetMapper<DummyObject> studentInformationMapper = new BeanWrapperFieldSetMapper<>(); + studentInformationMapper.setTargetType(DummyObject.class); + return studentInformationMapper; + } + + private LineTokenizer dummyTokenizer() { + DelimitedLineTokenizer studentLineTokenizer = new DelimitedLineTokenizer(); + studentLineTokenizer.setDelimiter(","); + studentLineTokenizer.setNames(new String[]{"f1", "f2"}); + return studentLineTokenizer; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4e6babdf/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/dummy/DummyItemProcessor.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/dummy/DummyItemProcessor.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/dummy/DummyItemProcessor.java new file mode 100644 index 0000000..a124e4d --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/dummy/DummyItemProcessor.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.dummy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.item.ItemProcessor; + +public class DummyItemProcessor implements ItemProcessor<DummyObject, String> { + + private static final Logger LOG = LoggerFactory.getLogger(DummyItemProcessor.class); + + @Override + public String process(DummyObject input) throws Exception { + LOG.info("Dummy processing, f1: {}, f2: {}. wait 10 seconds", input.getF1(), input.getF2()); + Thread.sleep(10000); + return String.format("%s, %s", input.getF1(), input.getF2()); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4e6babdf/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/dummy/DummyItemWriter.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/dummy/DummyItemWriter.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/dummy/DummyItemWriter.java new file mode 100644 index 0000000..f495795 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/dummy/DummyItemWriter.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.dummy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.item.ItemWriter; + +import java.util.List; + +public class DummyItemWriter implements ItemWriter<String> { + + private static final Logger LOG = LoggerFactory.getLogger(DummyItemWriter.class); + + @Override + public void write(List<? extends String> values) throws Exception { + LOG.info("DummyItem writer called (values: {})... wait 1 seconds", values.toString()); + Thread.sleep(1000); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4e6babdf/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/dummy/DummyObject.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/dummy/DummyObject.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/dummy/DummyObject.java new file mode 100644 index 0000000..ce087dd --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/dummy/DummyObject.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.dummy; + +public class DummyObject { + private String f1; + private String f2; + + public String getF1() { + return f1; + } + + public void setF1(String f1) { + this.f1 = f1; + } + + public String getF2() { + return f2; + } + + public void setF2(String f2) { + this.f2 = f2; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4e6babdf/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/rest/JobResource.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/rest/JobResource.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/rest/JobResource.java index 45b1ca5..27fed40 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/rest/JobResource.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/rest/JobResource.java @@ -20,12 +20,24 @@ package org.apache.ambari.infra.rest; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.JobParametersInvalidException; +import org.springframework.batch.core.explore.JobExplorer; +import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException; +import org.springframework.batch.core.launch.JobOperator; +import org.springframework.batch.core.launch.NoSuchJobException; import org.springframework.context.annotation.Scope; +import javax.inject.Inject; import javax.inject.Named; import javax.ws.rs.GET; +import javax.ws.rs.POST; import javax.ws.rs.Path; +import javax.ws.rs.PathParam; import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import java.util.Date; +import java.util.Set; @Api(value = "jobs", description = "Job operations") @Path("jobs") @@ -33,11 +45,36 @@ import javax.ws.rs.Produces; @Scope("request") public class JobResource { + @Inject + private JobOperator jobOperator; + + @Inject + private JobExplorer jobExplorer; + @GET @Produces({"application/json"}) - @ApiOperation("Get all jobs") - public String getAuditLogs() { - return "jobs..."; // TODO + @ApiOperation("Get all job names") + public Set<String> getAllJobNames() { + return jobOperator.getJobNames(); + } + + @GET + @Path("executions/{jobName}") + @Produces({"application/json"}) + @ApiOperation("Get the id values of all the running job instances by job name") + public Set<Long> getExecutionIdsByJobName( + @PathParam("jobName") String jobName) throws NoSuchJobException { + return jobOperator.getRunningExecutions(jobName); + } + + @POST + @Produces({"application/json"}) + @Path("start/{jobName}") + public Long startJob(@PathParam("jobName") String jobName, @QueryParam("params") String params) + throws JobParametersInvalidException, JobInstanceAlreadyExistsException, NoSuchJobException { + JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); + jobParametersBuilder.addDate("date", new Date()); + return jobOperator.start(jobName, jobParametersBuilder.toJobParameters() + "," + params); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/4e6babdf/ambari-infra/ambari-infra-manager/src/main/resources/dummy/dummy.txt ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/resources/dummy/dummy.txt b/ambari-infra/ambari-infra-manager/src/main/resources/dummy/dummy.txt new file mode 100644 index 0000000..41da725 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/resources/dummy/dummy.txt @@ -0,0 +1,3 @@ +f1,f2 +v1,v2 +v3,v4 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/4e6babdf/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties b/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties index 13878a1..fbeac78 100644 --- a/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties +++ b/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties @@ -11,4 +11,8 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limitations under the License. \ No newline at end of file +# limitations under the License. +infra-manager.batch.db.file=job-repository.db +infra-manager.batch.db.init=true +infra-manager.batch.db.username=admin +infra-manager.batch.db.password=admin \ No newline at end of file
