AMBARI-22514. Initial implementation of Schedulable document deletion & archiving for Infra Solr (Krisztian Kasa via oleewere)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/393fdb80 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/393fdb80 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/393fdb80 Branch: refs/heads/branch-3.0-perf Commit: 393fdb8048ff579e8a55cd1b477a23d1bf105576 Parents: 2bf3c8e Author: Krisztian Kasa <[email protected]> Authored: Tue Nov 28 15:45:22 2017 +0100 Committer: Oliver Szabo <[email protected]> Committed: Tue Nov 28 15:47:59 2017 +0100 ---------------------------------------------------------------------- ambari-infra/ambari-infra-manager/pom.xml | 11 ++ .../infra/job/archive/CompositeFileAction.java | 46 +++++ .../ambari/infra/job/archive/Document.java | 54 +++++ .../infra/job/archive/DocumentDestination.java | 23 +++ .../archive/DocumentExportConfiguration.java | 118 +++++++++++ .../job/archive/DocumentExportJobListener.java | 35 ++++ .../job/archive/DocumentExportProperties.java | 112 +++++++++++ .../job/archive/DocumentExportStepListener.java | 47 +++++ .../infra/job/archive/DocumentExporter.java | 99 ++++++++++ .../infra/job/archive/DocumentItemReader.java | 135 +++++++++++++ .../infra/job/archive/DocumentItemWriter.java | 25 +++ .../infra/job/archive/DocumentIterator.java | 25 +++ .../infra/job/archive/DocumentSource.java | 24 +++ .../ambari/infra/job/archive/FileAction.java | 25 +++ .../job/archive/LocalDocumentItemWriter.java | 72 +++++++ .../ambari/infra/job/archive/S3Properties.java | 64 ++++++ .../ambari/infra/job/archive/S3Uploader.java | 51 +++++ .../infra/job/archive/SolrDocumentIterator.java | 90 +++++++++ .../infra/job/archive/SolrDocumentSource.java | 68 +++++++ .../infra/job/archive/SolrQueryBuilder.java | 115 +++++++++++ .../infra/job/archive/SolrQueryProperties.java | 69 +++++++ .../infra/job/archive/TarGzCompressor.java | 50 +++++ .../apache/ambari/infra/manager/JobManager.java | 21 +- .../src/main/resources/infra-manager.properties | 12 ++ .../src/main/resources/log4j2.xml | 2 +- .../infra/job/archive/DocumentExporterTest.java | 147 ++++++++++++++ .../job/archive/DocumentItemReaderTest.java | 197 +++++++++++++++++++ .../archive/LocalDocumentItemWriterTest.java | 98 +++++++++ .../infra/job/archive/SolrQueryBuilderTest.java | 113 +++++++++++ .../test-config/logfeeder/logfeeder.properties | 2 +- 30 files changed, 1940 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/pom.xml b/ambari-infra/ambari-infra-manager/pom.xml index aa86da8..67bf7d1 100644 --- a/ambari-infra/ambari-infra-manager/pom.xml +++ b/ambari-infra/ambari-infra-manager/pom.xml @@ -141,6 +141,12 @@ <version>3.4</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <version>1.3</version> + <scope>test</scope> + </dependency> <!-- Spring dependencies --> <dependency> <groupId>org.springframework</groupId> @@ -417,6 +423,11 @@ <groupId>com.google.guava</groupId> <version>20.0</version> </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-s3</artifactId> + <version>1.11.5</version> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java new file mode 100644 index 0000000..84ce160 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +import java.io.File; +import java.util.List; + +import static java.util.Arrays.asList; + +public class CompositeFileAction implements FileAction { + + private final List<FileAction> actions; + + public CompositeFileAction(FileAction... actions) { + this.actions = asList(actions); + } + + public void add(FileAction action) { + actions.add(action); + } + + @Override + public File perform(File inputFile) { + File file = inputFile; + for (FileAction action : actions) { + file = action.perform(file); + } + return file; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java new file mode 100644 index 0000000..84f5ece --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +import com.fasterxml.jackson.annotation.JsonAnyGetter; +import com.fasterxml.jackson.annotation.JsonAnySetter; + +import java.util.HashMap; +import java.util.Map; + +import static java.util.Collections.unmodifiableMap; + +// TODO: create entities for each solr collections +public class Document { + private final Map<String, String> fieldMap; + + private Document() { + fieldMap = new HashMap<>(); + } + + public Document(Map<String, String> fieldMap) { + this.fieldMap = unmodifiableMap(fieldMap); + } + + public String get(String key) { + return fieldMap.get(key); + } + + @JsonAnyGetter + private Map<String, String> getFieldMap() { + return fieldMap; + } + + @JsonAnySetter + private void put(String key, String value) { + fieldMap.put(key, value); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentDestination.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentDestination.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentDestination.java new file mode 100644 index 0000000..f647a36 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentDestination.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +public interface DocumentDestination { + DocumentItemWriter open(Document firstDocument); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java new file mode 100644 index 0000000..69f41d3 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.configuration.annotation.JobScope; +import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import javax.inject.Inject; +import java.io.File; +import java.nio.file.Paths; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; + +import static org.apache.ambari.infra.job.archive.SolrDocumentSource.SOLR_DATETIME_FORMATTER; +import static org.apache.commons.lang.StringUtils.isBlank; + +@Configuration +public class DocumentExportConfiguration { + private static final Logger LOG = LoggerFactory.getLogger(DocumentExportConfiguration.class); + private static final DateTimeFormatter FILENAME_DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH_mm_ss.SSSX"); + + @Inject + private DocumentExportProperties properties; + + @Inject + private StepBuilderFactory steps; + + @Inject + private JobBuilderFactory jobs; + + + + @Bean + public Job logExportJob(@Qualifier("exportStep") Step logExportStep) { + return jobs.get("solr_data_export").listener(new DocumentExportJobListener()).start(logExportStep).build(); + } + + @Bean + @JobScope + public Step exportStep(DocumentExporter documentExporter) { + return steps.get("export") + .tasklet(documentExporter) + .listener(new DocumentExportStepListener(properties)) + .build(); + } + + @Bean + @StepScope + public DocumentExporter getDocumentExporter(DocumentItemReader documentItemReader, + @Value("#{stepExecution.jobExecution.id}") String jobId) { + File path = Paths.get( + properties.getDestinationDirectoryPath(), + String.format("%s_%s", properties.getQuery().getCollection(), jobId)).toFile(); // TODO: add end date + LOG.info("Destination directory path={}", path); + if (!path.exists()) { + if (!path.mkdirs()) { + LOG.warn("Unable to create directory {}", path); + } + } + + CompositeFileAction fileAction = new CompositeFileAction(new TarGzCompressor()); + + return new DocumentExporter( + documentItemReader, + firstDocument -> new LocalDocumentItemWriter( + new File(path, String.format("%s_-_%s.json", + properties.getQuery().getCollection(), + firstDocument.get(properties.getFileNameSuffixColumn()))), + fileAction), + properties.getWriteBlockSize()); + } + + @Bean + @StepScope + public DocumentItemReader reader(DocumentSource documentSource) { + return new DocumentItemReader(documentSource, properties.getReadBlockSize()); + } + + @Bean + @StepScope + public DocumentSource logSource(@Value("#{jobParameters[endDate]}") String endDateText) { + OffsetDateTime endDate = OffsetDateTime.now(ZoneOffset.UTC); + if (!isBlank(endDateText)) + endDate = OffsetDateTime.parse(endDateText); + + return new SolrDocumentSource( + properties.getZooKeeperSocket(), + properties.getQuery(), + SOLR_DATETIME_FORMATTER.format(endDate)); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java new file mode 100644 index 0000000..f1df46c --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobExecutionListener; + +public class DocumentExportJobListener implements JobExecutionListener { + @Override + public void beforeJob(JobExecution jobExecution) { + + } + + @Override + public void afterJob(JobExecution jobExecution) { + jobExecution.setExitStatus(new ExitStatus(ExitStatus.COMPLETED.getExitCode())); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java new file mode 100644 index 0000000..d6301c0 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +import org.hibernate.validator.constraints.NotBlank; +import org.springframework.batch.core.JobParameters; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.PropertySource; + +import javax.validation.constraints.Min; + +import static org.apache.commons.lang.StringUtils.isBlank; + +@Configuration +@PropertySource(value = {"classpath:infra-manager.properties"}) +@ConfigurationProperties(prefix = "infra-manager.jobs.solr_data_export") +public class DocumentExportProperties { + @NotBlank + private String zooKeeperSocket; + @Min(1) + private int readBlockSize; + @Min(1) + private int writeBlockSize; + @NotBlank + private String destinationDirectoryPath; + @NotBlank + private String fileNameSuffixColumn; + private SolrQueryProperties query; + + public String getZooKeeperSocket() { + return zooKeeperSocket; + } + + public void setZooKeeperSocket(String zooKeeperSocket) { + this.zooKeeperSocket = zooKeeperSocket; + } + + public int getReadBlockSize() { + return readBlockSize; + } + + public void setReadBlockSize(int readBlockSize) { + this.readBlockSize = readBlockSize; + } + + public int getWriteBlockSize() { + return writeBlockSize; + } + + public void setWriteBlockSize(int writeBlockSize) { + this.writeBlockSize = writeBlockSize; + } + + public String getDestinationDirectoryPath() { + return destinationDirectoryPath; + } + + public void setDestinationDirectoryPath(String destinationDirectoryPath) { + this.destinationDirectoryPath = destinationDirectoryPath; + } + + public void apply(JobParameters jobParameters) { + // TODO: solr query params + zooKeeperSocket = jobParameters.getString("zooKeeperSocket", zooKeeperSocket); + readBlockSize = getIntJobParameter(jobParameters, "readBlockSize", readBlockSize); + writeBlockSize = getIntJobParameter(jobParameters, "writeBlockSize", writeBlockSize); + destinationDirectoryPath = jobParameters.getString("destinationDirectoryPath", destinationDirectoryPath); + query.setCollection(jobParameters.getString("collection", query.getCollection())); + query.setQueryText(jobParameters.getString("queryText", query.getQueryText())); + query.setFilterQueryText(jobParameters.getString("filterQueryText", query.getFilterQueryText())); + } + + private int getIntJobParameter(JobParameters jobParameters, String parameterName, int defaultValue) { + String writeBlockSizeText = jobParameters.getString(parameterName); + if (isBlank(writeBlockSizeText)) + return defaultValue; + return this.writeBlockSize = Integer.parseInt(writeBlockSizeText); + } + + public String getFileNameSuffixColumn() { + return fileNameSuffixColumn; + } + + public void setFileNameSuffixColumn(String fileNameSuffixColumn) { + this.fileNameSuffixColumn = fileNameSuffixColumn; + } + + public SolrQueryProperties getQuery() { + return query; + } + + public void setQuery(SolrQueryProperties query) { + this.query = query; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportStepListener.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportStepListener.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportStepListener.java new file mode 100644 index 0000000..3bab6d5 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportStepListener.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.StepExecutionListener; + +public class DocumentExportStepListener implements StepExecutionListener { + private static final Logger LOG = LoggerFactory.getLogger(DocumentExportStepListener.class); + + private final DocumentExportProperties properties; + + public DocumentExportStepListener(DocumentExportProperties properties) { + this.properties = properties; + } + + @Override + public void beforeStep(StepExecution stepExecution) { + properties.apply(stepExecution.getJobParameters()); + LOG.info("LogExport step - before step execution"); + } + + @Override + public ExitStatus afterStep(StepExecution stepExecution) { + LOG.info("LogExport step - after step execution"); + return stepExecution.getExitStatus(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java new file mode 100644 index 0000000..6106c20 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.StepExecutionListener; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.batch.item.ItemStreamReader; +import org.springframework.batch.repeat.RepeatStatus; + +public class DocumentExporter implements Tasklet, StepExecutionListener { + + private boolean complete = false; + private final ItemStreamReader<Document> documentReader; + private final DocumentDestination documentDestination; + private final int writeBlockSize; + + public DocumentExporter(ItemStreamReader<Document> documentReader, DocumentDestination documentDestination, int writeBlockSize) { + this.documentReader = documentReader; + this.documentDestination = documentDestination; + this.writeBlockSize = writeBlockSize; + } + + @Override + public void beforeStep(StepExecution stepExecution) { + + } + + @Override + public ExitStatus afterStep(StepExecution stepExecution) { + if (complete) { + return ExitStatus.COMPLETED; + } + else { + return ExitStatus.FAILED; + } + } + + @Override + public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { + ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext(); + documentReader.open(executionContext); + + DocumentItemWriter writer = null; + int writtenCount = 0; + try { + Document document; + while ((document = documentReader.read()) != null) { + if (writer != null && writtenCount >= writeBlockSize) { + writer.close(); + writer = null; + writtenCount = 0; + documentReader.update(executionContext); + } + + if (writer == null) + writer = documentDestination.open(document); + + writer.write(document); + ++writtenCount; + } + } + catch (Exception e) { + if (writer != null) { + writer.revert(); + writer = null; + } + throw e; + } + finally { + if (writer != null) + writer.close(); + documentReader.close(); + } + + complete = true; + return RepeatStatus.FINISHED; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemReader.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemReader.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemReader.java new file mode 100644 index 0000000..a4378a4 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemReader.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +import org.springframework.batch.item.ExecutionContext; +import org.springframework.batch.item.ItemStreamException; +import org.springframework.batch.item.support.AbstractItemStreamItemReader; +import org.springframework.batch.repeat.CompletionPolicy; +import org.springframework.batch.repeat.RepeatContext; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.batch.repeat.context.RepeatContextSupport; +import org.springframework.util.ClassUtils; + +public class DocumentItemReader extends AbstractItemStreamItemReader<Document> implements CompletionPolicy { + + public final static String POSITION = "last-read"; + + private final DocumentSource documentSource; + private final int readBlockSize; + + private DocumentIterator documentIterator = null; + private int count = 0; + private boolean eof = false; + private Document current = null; + private Document previous = null; + + public DocumentItemReader(DocumentSource documentSource, int readBlockSize) { + this.documentSource = documentSource; + this.readBlockSize = readBlockSize; + setName(ClassUtils.getShortName(DocumentItemReader.class)); + } + + @Override + public Document read() throws Exception { + if (documentIterator == null) + openStream(); + Document next = getNext(); + if (next == null && count > readBlockSize) { + openStream(); + next = getNext(); + } + eof = next == null; + if (eof && documentIterator != null) + documentIterator.close(); + + previous = current; + current = next; + return current; + } + + private Document getNext() { + ++count; + return documentIterator.next(); + } + + private void openStream() { + closeStream(); + documentIterator = documentSource.open(current, readBlockSize); + count = 0; + } + + private void closeStream() { + if (documentIterator == null) + return; + try { + documentIterator.close(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + documentIterator = null; + } + + @Override + public void open(ExecutionContext executionContext) { + super.open(executionContext); + current = null; + previous = null; + eof = false; + documentIterator = null; + if (!executionContext.containsKey(POSITION)) + return; + + current = (Document) executionContext.get(POSITION); + } + + @Override + public void update(ExecutionContext executionContext) throws ItemStreamException { + super.update(executionContext); + if (previous != null) + executionContext.put(POSITION, previous); + } + + @Override + public void close() { + closeStream(); + } + + @Override + public boolean isComplete(RepeatContext context, RepeatStatus result) { + return eof; + } + + @Override + public boolean isComplete(RepeatContext context) { + return eof; + } + + @Override + public RepeatContext start(RepeatContext parent) { + return new RepeatContextSupport(parent); + } + + @Override + public void update(RepeatContext context) { + if (eof) + context.setCompleteOnly(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemWriter.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemWriter.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemWriter.java new file mode 100644 index 0000000..e96f6f1 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemWriter.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +public interface DocumentItemWriter { + void write(Document document); + void revert(); + void close(); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentIterator.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentIterator.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentIterator.java new file mode 100644 index 0000000..6232cfc --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentIterator.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +import java.util.Iterator; + +// TODO: generic closeable iterator +public interface DocumentIterator extends Iterator<Document>, AutoCloseable { +} http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentSource.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentSource.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentSource.java new file mode 100644 index 0000000..c9871a3 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentSource.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +// TODO: generic object source +public interface DocumentSource { + DocumentIterator open(Document current, int rows); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java new file mode 100644 index 0000000..26a8c63 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +import java.io.File; + +public interface FileAction { + File perform(File inputFile); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java new file mode 100644 index 0000000..02d898d --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.IOUtils; + +import java.io.*; + +public class LocalDocumentItemWriter implements DocumentItemWriter { + private static final ObjectMapper json = new ObjectMapper(); + private static final String ENCODING = "UTF-8"; + + private final File outFile; + private final BufferedWriter bufferedWriter; + private final FileAction fileAction; + + public LocalDocumentItemWriter(File outFile, FileAction fileAction) { + this.fileAction = fileAction; + this.outFile = outFile; + try { + this.bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(outFile), ENCODING)); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } catch (FileNotFoundException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void write(Document document) { + try { + bufferedWriter.write(json.writeValueAsString(document)); + bufferedWriter.newLine(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void revert() { + IOUtils.closeQuietly(bufferedWriter); + outFile.delete(); + } + + @Override + public void close() { + try { + bufferedWriter.close(); + fileAction.perform(outFile); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java new file mode 100644 index 0000000..495401d --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +import org.hibernate.validator.constraints.NotBlank; + +public class S3Properties { + @NotBlank + private String accessKey; + @NotBlank + private String secretKey; + @NotBlank + private String keyPrefix; + @NotBlank + private String bucketName; + + public String getAccessKey() { + return accessKey; + } + + public String getSecretKey() { + return secretKey; + } + + public String getKeyPrefix() { + return keyPrefix; + } + + public String getBucketName() { + return bucketName; + } + + public void setAccessKey(String accessKey) { + this.accessKey = accessKey; + } + + public void setSecretKey(String secretKey) { + this.secretKey = secretKey; + } + + public void setKeyPrefix(String keyPrefix) { + this.keyPrefix = keyPrefix; + } + + public void setBucketName(String bucketName) { + this.bucketName = bucketName; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java new file mode 100644 index 0000000..3214e50 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java @@ -0,0 +1,51 @@ +package org.apache.ambari.infra.job.archive; + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3Client; + +import java.io.File; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +public class S3Uploader implements FileAction { + + private final AmazonS3Client client; + private final String keyPrefix; + private final String bucketName; + + public S3Uploader(S3Properties s3Properties) { + this.keyPrefix = s3Properties.getKeyPrefix(); + this.bucketName = s3Properties.getBucketName(); + BasicAWSCredentials credentials = new BasicAWSCredentials(s3Properties.getAccessKey(), s3Properties.getSecretKey()); + client = new AmazonS3Client(credentials); + } + + @Override + public File perform(File inputFile) { + String key = keyPrefix + inputFile.getName(); + + if (client.doesObjectExist(bucketName, key)) { + System.out.println("Object '" + key + "' already exists"); + System.exit(0); + } + + client.putObject(bucketName, key, inputFile); + return inputFile; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java new file mode 100644 index 0000000..db4069b --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrDocument; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.TimeZone; + +public class SolrDocumentIterator implements DocumentIterator { + + private static final DateFormat SOLR_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"); + + static { + SOLR_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC")); + } + + private final Iterator<SolrDocument> documentIterator; + private final CloudSolrClient client; + + + public SolrDocumentIterator(QueryResponse response, CloudSolrClient client) { + documentIterator = response.getResults().iterator(); + this.client = client; + } + + @Override + public Document next() { + if (!documentIterator.hasNext()) + return null; + + SolrDocument document = documentIterator.next(); + HashMap<String, String> fieldMap = new HashMap<>(); + for (String key : document.getFieldNames()) { + fieldMap.put(key, toString(document.get(key))); + } + + return new Document(fieldMap); + } + + private String toString(Object value) { + if (value == null) { + return null; + } + else if (value instanceof Date) { + return SOLR_DATE_FORMAT.format(value); + } + else { + return value.toString(); + } + } + + @Override + public void close() { + try { + client.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public boolean hasNext() { + return documentIterator.hasNext(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentSource.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentSource.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentSource.java new file mode 100644 index 0000000..2181ba3 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentSource.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.time.format.DateTimeFormatter; + +public class SolrDocumentSource implements DocumentSource { + public static final DateTimeFormatter SOLR_DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSX"); + private static final Logger LOG = LoggerFactory.getLogger(SolrDocumentSource.class); + + private final String zkHost; + private final SolrQueryProperties properties; + private final String endValue; + + public SolrDocumentSource(String zkHost, SolrQueryProperties properties, String endValue) { + this.zkHost = zkHost; + this.properties = properties; + this.endValue = endValue; + } + + @Override + public DocumentIterator open(Document current, int rows) { + CloudSolrClient client = new CloudSolrClient.Builder().withZkHost(zkHost).build(); + client.setDefaultCollection(properties.getCollection()); + + SolrQuery query = properties.toQueryBuilder() + .setEndValue(endValue) + .setDocument(current) + .build(); + query.setRows(rows); + + LOG.info("Executing solr query {}", query.toLocalParamsString()); + + try { + QueryResponse response = client.query(query); + return new SolrDocumentIterator(response, client); + } catch (SolrServerException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryBuilder.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryBuilder.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryBuilder.java new file mode 100644 index 0000000..d0f6d40 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryBuilder.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +import org.apache.solr.client.solrj.SolrQuery; + +import java.util.HashSet; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.solr.client.solrj.SolrQuery.ORDER.asc; + +public class SolrQueryBuilder { + + public static final Pattern PARAMETER_PATTERN = Pattern.compile("\\$\\{[a-z]+\\}"); + + private String queryText; + private String endValue; + private String filterQueryText; + private Document document; + private String[] sortFields; + + public SolrQueryBuilder() { + this.queryText = "*:*"; + } + + public SolrQueryBuilder setQueryText(String queryText) { + this.queryText = queryText; + return this; + } + + public SolrQueryBuilder setEndValue(String endValue) { + this.endValue = endValue; + return this; + } + + public SolrQueryBuilder setFilterQueryText(String filterQueryText) { + this.filterQueryText = filterQueryText; + return this; + } + + + public SolrQueryBuilder setDocument(Document document) { + this.document = document; + return this; + } + + public SolrQueryBuilder addSort(String... sortBy) { + this.sortFields = sortBy; + return this; + } + + public SolrQuery build() { + SolrQuery solrQuery = new SolrQuery(); + + String query = queryText; + query = setEndValueOn(query); + + solrQuery.setQuery(query); + + if (filterQueryText != null) { + String filterQuery = filterQueryText; + filterQuery = setEndValueOn(filterQuery); + + Set<String> paramNames = collectParamNames(filterQuery); + if (document != null) { + for (String parameter : paramNames) { + if (document.get(parameter) != null) + filterQuery = filterQuery.replace(String.format("${%s}", parameter), document.get(parameter)); + } + } + + if (document == null && paramNames.isEmpty() || document != null && !paramNames.isEmpty()) + solrQuery.setFilterQueries(filterQuery); + } + + if (sortFields != null) { + for (String field : sortFields) + solrQuery.addSort(field, asc); + } + + return solrQuery; + } + + private String setEndValueOn(String query) { + if (endValue != null) + query = query.replace("${end}", endValue); + return query; + } + + private Set<String> collectParamNames(String filterQuery) { + Matcher matcher = PARAMETER_PATTERN.matcher(filterQuery); + Set<String> parameters = new HashSet<>(); + while (matcher.find()) + parameters.add(matcher.group().replace("${", "").replace("}", "")); + return parameters; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryProperties.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryProperties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryProperties.java new file mode 100644 index 0000000..444a15b --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryProperties.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +import org.hibernate.validator.constraints.NotBlank; + +public class SolrQueryProperties { + @NotBlank + private String collection; + @NotBlank + private String queryText; + private String filterQueryText; + private String[] sort; + + public String getCollection() { + return collection; + } + + public void setCollection(String collection) { + this.collection = collection; + } + + public String getQueryText() { + return queryText; + } + + public void setQueryText(String queryText) { + this.queryText = queryText; + } + + public String getFilterQueryText() { + return filterQueryText; + } + + public void setFilterQueryText(String filterQueryText) { + this.filterQueryText = filterQueryText; + } + + public String[] getSort() { + return sort; + } + + public void setSort(String[] sort) { + this.sort = sort; + } + + public SolrQueryBuilder toQueryBuilder() { + return new SolrQueryBuilder(). + setQueryText(queryText) + .setFilterQueryText(filterQueryText) + .addSort(sort); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/TarGzCompressor.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/TarGzCompressor.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/TarGzCompressor.java new file mode 100644 index 0000000..8e34ca9 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/TarGzCompressor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; +import org.apache.commons.io.IOUtils; + +import java.io.*; + +public class TarGzCompressor implements FileAction { + @Override + public File perform(File inputFile) { + File tarGzFile = new File(inputFile.getParent(), inputFile.getName() + ".tar.gz"); + try (TarArchiveOutputStream tarArchiveOutputStream = new TarArchiveOutputStream( + new GzipCompressorOutputStream(new FileOutputStream(tarGzFile)))) { + TarArchiveEntry archiveEntry = new TarArchiveEntry(inputFile.getName()); + archiveEntry.setSize(inputFile.length()); + tarArchiveOutputStream.putArchiveEntry(archiveEntry); + + try (FileInputStream fileInputStream = new FileInputStream(inputFile)) { + IOUtils.copy(fileInputStream, tarArchiveOutputStream); + } + + tarArchiveOutputStream.closeArchiveEntry(); + } + catch (IOException ex) { + throw new UncheckedIOException(ex); + } + + return tarGzFile; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java index fc0a4f7..862119a 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/manager/JobManager.java @@ -18,6 +18,7 @@ */ package org.apache.ambari.infra.manager; +import com.google.common.base.Splitter; import com.google.common.collect.Lists; import org.apache.ambari.infra.model.ExecutionContextResponse; import org.apache.ambari.infra.model.JobDetailsResponse; @@ -28,16 +29,14 @@ import org.apache.ambari.infra.model.JobOperationParams; import org.apache.ambari.infra.model.StepExecutionContextResponse; import org.apache.ambari.infra.model.StepExecutionInfoResponse; import org.apache.ambari.infra.model.StepExecutionProgressResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.batch.admin.history.StepExecutionHistory; import org.springframework.batch.admin.service.JobService; import org.springframework.batch.admin.service.NoSuchStepExecutionException; import org.springframework.batch.admin.web.JobInfo; import org.springframework.batch.admin.web.StepExecutionProgress; -import org.springframework.batch.core.JobExecution; -import org.springframework.batch.core.JobInstance; -import org.springframework.batch.core.JobParametersBuilder; -import org.springframework.batch.core.JobParametersInvalidException; -import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.*; import org.springframework.batch.core.launch.JobExecutionNotRunningException; import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException; import org.springframework.batch.core.launch.JobOperator; @@ -54,7 +53,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -64,6 +62,8 @@ import java.util.TimeZone; @Named public class JobManager { + private static final Logger LOG = LoggerFactory.getLogger(JobManager.class); + @Inject private JobService jobService; @@ -83,9 +83,14 @@ public class JobManager { public JobExecutionInfoResponse launchJob(String jobName, String params) throws JobParametersInvalidException, JobInstanceAlreadyExistsException, NoSuchJobException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException { - // TODO: handle params JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); - jobParametersBuilder.addDate("date", new Date()); + if (params != null) { + LOG.info("Parsing parameters of job {} '{}'", jobName, params); + Splitter.on(',') + .trimResults() + .withKeyValueSeparator(Splitter.on('=').limit(2).trimResults()) + .split(params).entrySet().forEach(entry -> jobParametersBuilder.addString(entry.getKey(), entry.getValue())); + } return new JobExecutionInfoResponse(jobService.launch(jobName, jobParametersBuilder.toJobParameters()), timeZone); } http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties b/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties index 8162376..7ef70aa 100644 --- a/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties +++ b/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties @@ -18,3 +18,15 @@ infra-manager.batch.db.username=admin infra-manager.batch.db.password=admin management.security.enabled=false management.health.solr.enabled=false +infra-manager.server.data.folder=/tmp + +infra-manager.jobs.solr_data_export.zoo_keeper_socket=zookeeper:2181 +infra-manager.jobs.solr_data_export.read_block_size=100 +infra-manager.jobs.solr_data_export.write_block_size=150 +infra-manager.jobs.solr_data_export.file_name_suffix_column=logtime +infra-manager.jobs.solr_data_export.destination_directory_path=/tmp/ambariInfraManager +infra-manager.jobs.solr_data_export.query.collection=hadoop_logs +infra-manager.jobs.solr_data_export.query.query_text=logtime:[* TO "${end}"] +infra-manager.jobs.solr_data_export.query.filter_query_text=(logtime:"${logtime}" AND id:{"${id}" TO *]) OR logtime:{"${logtime}" TO "${end}"] +infra-manager.jobs.solr_data_export.query.sort[0]=logtime +infra-manager.jobs.solr_data_export.query.sort[1]=id http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml b/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml index ad1adcd..9737554 100644 --- a/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml +++ b/ambari-infra/ambari-infra-manager/src/main/resources/log4j2.xml @@ -17,7 +17,7 @@ --> <Configuration monitorinterval="30" status="info" strict="true"> <Properties> - <Property name="logging.file">out/infra-manager.log</Property> + <Property name="logging.file">target/log/infra-manager.log</Property> </Properties> <Appenders> <Appender type="Console" name="Console"> http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java new file mode 100644 index 0000000..88fbff0 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentExporterTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.infra.job.archive; + +import org.easymock.EasyMockRunner; +import org.easymock.EasyMockSupport; +import org.easymock.Mock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.scope.context.StepContext; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.batch.item.ItemStreamReader; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.HashMap; + +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; + +@RunWith(EasyMockRunner.class) +public class DocumentExporterTest extends EasyMockSupport { + + private DocumentExporter documentExporter; + @Mock + private ItemStreamReader<Document> reader; + @Mock + private DocumentDestination documentDestination; + @Mock + private DocumentItemWriter documentItemWriter; + @Mock + private DocumentItemWriter documentItemWriter2; + + private ExecutionContext executionContext; + private ChunkContext chunkContext; + private static final Document DOCUMENT = new Document(new HashMap<String, String>() {{ put("id", "1"); }}); + + @Before + public void setUp() throws Exception { + StepExecution stepExecution = new StepExecution("exportDoc", new JobExecution(1L)); + chunkContext = new ChunkContext(new StepContext(stepExecution)); + executionContext = stepExecution.getExecutionContext(); + documentExporter = new DocumentExporter(reader, documentDestination, 2); + } + + @After + public void tearDown() throws Exception { + verifyAll(); + } + + @Test + public void testNothingToRead() throws Exception { + reader.open(executionContext); expectLastCall(); + expect(reader.read()).andReturn(null); + reader.close(); expectLastCall(); + replayAll(); + + documentExporter.execute(null, chunkContext); + } + + @Test + public void testWriteLessDocumentsThanWriteBlockSize() throws Exception { + reader.open(executionContext); expectLastCall(); + expect(reader.read()).andReturn(DOCUMENT); + expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter); + documentItemWriter.write(DOCUMENT); expectLastCall(); + expect(reader.read()).andReturn(null); + reader.close(); expectLastCall(); + documentItemWriter.close(); expectLastCall(); + replayAll(); + + documentExporter.execute(null, chunkContext); + } + + @Test + public void testWriteMoreDocumentsThanWriteBlockSize() throws Exception { + Document document2 = new Document(new HashMap<String, String>() {{ put("id", "2"); }}); + Document document3 = new Document(new HashMap<String, String>() {{ put("id", "3"); }}); + + reader.open(executionContext); expectLastCall(); + expect(reader.read()).andReturn(DOCUMENT); + expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter); + documentItemWriter.write(DOCUMENT); expectLastCall(); + expect(reader.read()).andReturn(document2); + documentItemWriter.write(document2); expectLastCall(); + expect(reader.read()).andReturn(document3); + documentItemWriter.close(); expectLastCall(); + expect(documentDestination.open(document3)).andReturn(documentItemWriter2); + documentItemWriter2.write(document3); expectLastCall(); + expect(reader.read()).andReturn(null); + reader.update(executionContext); + reader.close(); expectLastCall(); + documentItemWriter2.close(); expectLastCall(); + replayAll(); + + documentExporter.execute(null, chunkContext); + } + + @Test(expected = IOException.class) + public void testReadError() throws Exception { + reader.open(executionContext); expectLastCall(); + expect(reader.read()).andReturn(DOCUMENT); + expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter); + documentItemWriter.write(DOCUMENT); expectLastCall(); + expect(reader.read()).andThrow(new IOException("TEST")); + documentItemWriter.revert(); expectLastCall(); + reader.close(); expectLastCall(); + replayAll(); + + documentExporter.execute(null, chunkContext); + } + + @Test(expected = UncheckedIOException.class) + public void testWriteError() throws Exception { + reader.open(executionContext); expectLastCall(); + expect(reader.read()).andReturn(DOCUMENT); + expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter); + documentItemWriter.write(DOCUMENT); expectLastCall().andThrow(new UncheckedIOException(new IOException("TEST"))); + documentItemWriter.revert(); expectLastCall(); + reader.close(); expectLastCall(); + replayAll(); + + documentExporter.execute(null, chunkContext); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentItemReaderTest.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentItemReaderTest.java b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentItemReaderTest.java new file mode 100644 index 0000000..942713f --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/DocumentItemReaderTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.infra.job.archive; + +import org.easymock.EasyMockRunner; +import org.easymock.EasyMockSupport; +import org.easymock.Mock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.batch.item.ExecutionContext; + +import java.util.HashMap; + +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsNull.nullValue; +import static org.junit.Assert.assertThat; + +@RunWith(EasyMockRunner.class) +public class DocumentItemReaderTest extends EasyMockSupport { + private static final Document DOCUMENT = new Document(new HashMap<String, String>() {{ put("id", "1"); }}); + private static final Document DOCUMENT_2 = new Document(new HashMap<String, String>() {{ put("id", "2"); }}); + private static final Document DOCUMENT_3 = new Document(new HashMap<String, String>() {{ put("id", "3"); }}); + private static final int READ_BLOCK_SIZE = 2; + + private DocumentItemReader documentItemReader; + @Mock + private DocumentSource documentSource; + @Mock + private DocumentIterator documentIterator; + @Mock + private DocumentIterator documentIterator2; + + @Before + public void setUp() throws Exception { + documentItemReader = new DocumentItemReader(documentSource, READ_BLOCK_SIZE); + } + + @After + public void tearDown() throws Exception { + verifyAll(); + } + + @Test + public void testReadWhenCollectionIsEmpty() throws Exception { + expect(documentSource.open(null, 2)).andReturn(documentIterator); + expect(documentIterator.next()).andReturn(null); + documentIterator.close(); expectLastCall(); + replayAll(); + + assertThat(documentItemReader.read(), is(nullValue())); + assertThat(documentItemReader.isComplete(null), is(true)); + assertThat(documentItemReader.isComplete(null, null), is(true)); + } + + @Test + public void testReadWhenCollectionContainsLessElementsThanReadBlockSize() throws Exception { + expect(documentSource.open(null, 2)).andReturn(documentIterator); + expect(documentIterator.next()).andReturn(DOCUMENT); + expect(documentIterator.next()).andReturn(null); + documentIterator.close(); expectLastCall(); + replayAll(); + + assertThat(documentItemReader.read(), is(DOCUMENT)); + assertThat(documentItemReader.isComplete(null), is(false)); + assertThat(documentItemReader.isComplete(null, null), is(false)); + assertThat(documentItemReader.read(), is(nullValue())); + assertThat(documentItemReader.isComplete(null), is(true)); + assertThat(documentItemReader.isComplete(null, null), is(true)); + } + + @Test + public void testReadWhenCollectionContainsExactlySameCountElementsAsReadBlockSize() throws Exception { + expect(documentSource.open(null, 2)).andReturn(documentIterator); + expect(documentSource.open(DOCUMENT_2, 2)).andReturn(documentIterator2); + expect(documentIterator.next()).andReturn(DOCUMENT); + expect(documentIterator.next()).andReturn(DOCUMENT_2); + expect(documentIterator.next()).andReturn(null); + documentIterator.close(); expectLastCall(); + + expect(documentIterator2.next()).andReturn(null); + documentIterator2.close(); expectLastCall(); + replayAll(); + + assertThat(documentItemReader.read(), is(DOCUMENT)); + assertThat(documentItemReader.isComplete(null), is(false)); + assertThat(documentItemReader.isComplete(null, null), is(false)); + assertThat(documentItemReader.read(), is(DOCUMENT_2)); + assertThat(documentItemReader.isComplete(null), is(false)); + assertThat(documentItemReader.isComplete(null, null), is(false)); + assertThat(documentItemReader.read(), is(nullValue())); + assertThat(documentItemReader.isComplete(null), is(true)); + assertThat(documentItemReader.isComplete(null, null), is(true)); + } + + @Test + public void testReadWhenCollectionContainsMoreElementsThanReadBlockSize() throws Exception { + Document document3 = new Document(new HashMap<String, String>() {{ put("id", "2"); }}); + + expect(documentSource.open(null, 2)).andReturn(documentIterator); + expect(documentSource.open(DOCUMENT_2, 2)).andReturn(documentIterator2); + expect(documentIterator.next()).andReturn(DOCUMENT); + expect(documentIterator.next()).andReturn(DOCUMENT_2); + expect(documentIterator.next()).andReturn(null); + documentIterator.close(); expectLastCall(); + expect(documentIterator2.next()).andReturn(document3); + expect(documentIterator2.next()).andReturn(null); + documentIterator2.close(); expectLastCall(); + + replayAll(); + + assertThat(documentItemReader.read(), is(DOCUMENT)); + assertThat(documentItemReader.isComplete(null), is(false)); + assertThat(documentItemReader.isComplete(null, null), is(false)); + + assertThat(documentItemReader.read(), is(DOCUMENT_2)); + assertThat(documentItemReader.isComplete(null), is(false)); + assertThat(documentItemReader.isComplete(null, null), is(false)); + + assertThat(documentItemReader.read(), is(document3)); + assertThat(documentItemReader.isComplete(null), is(false)); + assertThat(documentItemReader.isComplete(null, null), is(false)); + + assertThat(documentItemReader.read(), is(nullValue())); + assertThat(documentItemReader.isComplete(null), is(true)); + assertThat(documentItemReader.isComplete(null, null), is(true)); + } + + @Test + public void testContinueWhenOnlyFirstElementWasRead() throws Exception { + expect(documentSource.open(null, 2)).andReturn(documentIterator); + expect(documentIterator.next()).andReturn(DOCUMENT); + documentIterator.close(); expectLastCall(); + expect(documentSource.open(null, 2)).andReturn(documentIterator2); + expect(documentIterator2.next()).andReturn(DOCUMENT); + documentIterator2.close(); expectLastCall(); + replayAll(); + + ExecutionContext executionContext = new ExecutionContext(); + documentItemReader.open(executionContext); + assertThat(documentItemReader.read(), is(DOCUMENT)); + documentItemReader.update(executionContext); + assertThat(executionContext.containsKey(DocumentItemReader.POSITION), is(false)); + documentItemReader.close(); + + documentItemReader.open(executionContext); + assertThat(documentItemReader.read(), is(DOCUMENT)); + documentItemReader.close(); + } + + @Test + public void testContinueWhenMoreThanOneElementWasRead() throws Exception { + expect(documentSource.open(null, 2)).andReturn(documentIterator); + expect(documentIterator.next()).andReturn(DOCUMENT); + expect(documentIterator.next()).andReturn(DOCUMENT_2); + documentIterator.close(); expectLastCall(); + expect(documentSource.open(DOCUMENT, 2)).andReturn(documentIterator2); + expect(documentIterator2.next()).andReturn(DOCUMENT_2); + expect(documentIterator2.next()).andReturn(DOCUMENT_3); + documentIterator2.close(); expectLastCall(); + + replayAll(); + + ExecutionContext executionContext = new ExecutionContext(); + documentItemReader.open(executionContext); + assertThat(documentItemReader.read(), is(DOCUMENT)); + assertThat(documentItemReader.read(), is(DOCUMENT_2)); + documentItemReader.update(executionContext); + assertThat(executionContext.get(DocumentItemReader.POSITION), is(DOCUMENT)); + documentItemReader.close(); + + documentItemReader.open(executionContext); + assertThat(documentItemReader.read(), is(DOCUMENT_2)); + assertThat(documentItemReader.read(), is(DOCUMENT_3)); + documentItemReader.close(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/393fdb80/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriterTest.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriterTest.java b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriterTest.java new file mode 100644 index 0000000..6411ff1 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriterTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.infra.job.archive; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.FileUtils; +import org.easymock.EasyMockRunner; +import org.easymock.EasyMockSupport; +import org.easymock.Mock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static org.easymock.EasyMock.expect; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +@RunWith(EasyMockRunner.class) +public class LocalDocumentItemWriterTest extends EasyMockSupport { + + private static final Document DOCUMENT = new Document(new HashMap<String, String>() {{ put("id", "1"); }}); + private static final Document DOCUMENT2 = new Document(new HashMap<String, String>() {{ put("id", "2"); }}); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private LocalDocumentItemWriter localDocumentItemWriter; + private File outFile; + @Mock + private FileAction fileAction; + + @Before + public void setUp() throws Exception { + outFile = File.createTempFile("LocalDocumentItemWriterTest", "json.tmp"); + localDocumentItemWriter = new LocalDocumentItemWriter(outFile, fileAction); + } + + @After + public void tearDown() throws Exception { + outFile.delete(); + verifyAll(); + } + + @Test + public void testWrite() throws Exception { + expect(fileAction.perform(outFile)).andReturn(outFile); + replayAll(); + + localDocumentItemWriter.write(DOCUMENT); + localDocumentItemWriter.write(DOCUMENT2); + localDocumentItemWriter.close(); + + List<Document> documentList = readBack(outFile); + assertThat(documentList.size(), is(2)); + assertThat(documentList.get(0).get("id"), is(DOCUMENT.get("id"))); + assertThat(documentList.get(1).get("id"), is(DOCUMENT2.get("id"))); + } + + private List<Document> readBack(File file) throws IOException { + List<Document> documentList = new ArrayList<>(); + for (String line : FileUtils.readLines(file)) { + documentList.add(OBJECT_MAPPER.readValue(line, Document.class)); + } + return documentList; + } + + @Test + public void testRevert() throws Exception { + replayAll(); + + localDocumentItemWriter.write(DOCUMENT); + localDocumentItemWriter.revert(); + + assertThat(outFile.exists(), is(false)); + } +} \ No newline at end of file
