oleewere closed pull request #8: AMBARI-24792 - Infra Manager: not all the 
documents archived
URL: https://github.com/apache/ambari-infra/pull/8
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/S3Client.java 
b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/S3Client.java
index f0b592d..9c8fbcf 100644
--- 
a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/S3Client.java
+++ 
b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/S3Client.java
@@ -96,4 +96,13 @@ public void deleteObject(String key) {
       throw new RuntimeException(e);
     }
   }
+
+  public InputStream getObject(String key) {
+    try {
+      return s3client.getObject(bucket, key);
+    }
+    catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
 }
diff --git 
a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java
 
b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java
index 24d603b..7a748bc 100644
--- 
a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java
+++ 
b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java
@@ -114,7 +114,7 @@ private void checkInfraManagerReachable() throws Exception {
     }
   }
 
-  protected void addDocument(OffsetDateTime logtime) {
+  protected SolrInputDocument addDocument(OffsetDateTime logtime) {
     SolrInputDocument solrInputDocument = new SolrInputDocument();
     solrInputDocument.addField("logType", "HDFSAudit");
     solrInputDocument.addField("cluster", "cl1");
@@ -148,6 +148,7 @@ protected void addDocument(OffsetDateTime logtime) {
     solrInputDocument.addField("_ttl_", "+7DAYS");
     solrInputDocument.addField("_expire_at_", "2017-12-15T10:23:19.106Z");
     solr.add(solrInputDocument);
+    return solrInputDocument;
   }
 
   @AfterStories
diff --git 
a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java
 
b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java
index b1d36d1..e2bbe9d 100644
--- 
a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java
+++ 
b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java
@@ -18,6 +18,7 @@
  */
 package org.apache.ambari.infra.steps;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Objects.requireNonNull;
 import static 
org.apache.ambari.infra.OffsetDateTimeConverter.SOLR_DATETIME_FORMATTER;
 import static org.apache.ambari.infra.TestUtil.doWithin;
@@ -26,19 +27,24 @@
 import static org.hamcrest.core.IsCollectionContaining.hasItem;
 import static org.junit.Assert.assertThat;
 
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.io.UncheckedIOException;
 import java.time.Duration;
 import java.time.OffsetDateTime;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.ambari.infra.InfraClient;
 import org.apache.ambari.infra.JobExecutionInfo;
 import org.apache.ambari.infra.S3Client;
+import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
@@ -50,16 +56,21 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 public class ExportJobsSteps extends AbstractInfraSteps {
   private static final Logger LOG = 
LoggerFactory.getLogger(ExportJobsSteps.class);
+  private Set<String> documentIds = new HashSet<>();
 
   private Map<String, JobExecutionInfo> launchedJobs = new HashMap<>();
 
   @Given("$count documents in solr")
   public void addDocuments(int count) {
     OffsetDateTime intervalEnd = OffsetDateTime.now();
+    documentIds.clear();
     for (int i = 0; i < count; ++i) {
-      addDocument(intervalEnd.minusMinutes(i % (count / 10)));
+      documentIds.add(addDocument(intervalEnd.minusMinutes(i % (count / 
10))).get("id").getValue().toString());
     }
     getSolr().commit();
   }
@@ -68,13 +79,15 @@ public void addDocuments(int count) {
   public void addDocuments(long count, OffsetDateTime startLogtime, 
OffsetDateTime endLogtime) {
     Duration duration = Duration.between(startLogtime, endLogtime);
     long increment = duration.toNanos() / count;
-    for (int i = 0; i < count; ++i)
-      addDocument(startLogtime.plusNanos(increment * i));
+    documentIds.clear();
+    for (int i = 0; i < count; ++i) {
+      documentIds.add(addDocument(startLogtime.plusNanos(increment * 
i)).get("id").getValue().toString());
+    }
     getSolr().commit();
   }
 
   @Given("a file on s3 with key $key")
-  public void addFileToS3(String key) throws Exception {
+  public void addFileToS3(String key) {
     getS3client().putObject(key, "anything".getBytes());
   }
 
@@ -204,4 +217,26 @@ public void checkNumberOfFilesOnLocalFilesystem(long 
count, String text, String
             .filter(file -> file.getName().contains(text))
             .count(), is(count));
   }
+
+  private static final ObjectMapper json = new ObjectMapper();
+
+  @Then("Check the files $fileNamePart contains the archived documents")
+  public void checkStoredDocumentIds(String fileNamePart) throws Exception {
+    S3Client s3Client = getS3client();
+    int size = documentIds.size();
+    Set<String> storedDocumentIds = new HashSet<>();
+    for (String objectKey : s3Client.listObjectKeys(fileNamePart)) {
+      try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(new 
BZip2CompressorInputStream(s3Client.getObject(objectKey)), UTF_8))) {
+        String line;
+        while ((line = reader.readLine()) != null) {
+          Map<String, Object> document = json.readValue(line, new 
TypeReference<HashMap<String, Object>>() {});
+          String id = document.get("id").toString();
+          storedDocumentIds.add(id);
+          documentIds.remove(id);
+        }
+      }
+    }
+    assertThat(documentIds.size(), is(0));
+    assertThat(storedDocumentIds.size(), is(size));
+  }
 }
diff --git 
a/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story 
b/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story
index 2330474..806dc84 100644
--- a/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story
+++ b/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story
@@ -11,6 +11,7 @@ Given 10 documents in solr with logtime from 
2010-10-09T05:00:00.000Z to 2010-10
 When start archive_audit_logs job with parameters 
writeBlockSize=3,start=2010-10-09T00:00:00.000Z,end=2010-10-11T00:00:00.000Z 
after 2 seconds
 Then Check 4 files exists on s3 server with filenames containing the text 
solr_archive_audit_logs_-_2010-10-09 after 20 seconds
 And solr does not contain documents between 2010-10-09T05:00:00.000Z and 
2010-10-09T20:00:00.000Z after 5 seconds
+And Check the files solr_archive_audit_logs_-_2010-10-09 contains the archived 
documents
 
 
 Scenario: Running archiving job with a bigger start value than end value 
exports and deletes 0 documents
@@ -32,6 +33,7 @@ When delete file with key 
solr_archive_audit_logs_-_2011-10-09T08-00-00.000Z.jso
 And restart archive_audit_logs job within 2 seconds
 Then Check 10 files exists on s3 server with filenames containing the text 
solr_archive_audit_logs_-_2011-10-09 after 20 seconds
 And solr does not contain documents between 2011-10-09T05:00:00.000Z and 
2011-10-09T20:00:00.000Z after 5 seconds
+And Check the files solr_archive_audit_logs_-_2011-10-09 contains the archived 
documents
 
 
 Scenario: After Deleting job deletes documents from solr no document found in 
the specified interval
@@ -65,3 +67,4 @@ And stop job archive_audit_logs after at least 1 file exists 
in s3 with filename
 Then Less than 20 files exists on s3 server with filenames containing the text 
solr_archive_audit_logs_-_2014-03-09 after 20 seconds
 When restart archive_audit_logs job within 10 seconds
 Then Check 25 files exists on s3 server with filenames containing the text 
solr_archive_audit_logs_-_2014-03-09 after 20 seconds
+And Check the files solr_archive_audit_logs_-_2014-03-09 contains the archived 
documents
\ No newline at end of file
diff --git 
a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrProperties.java
 
b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrProperties.java
index 1cb2d62..43e871f 100644
--- 
a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrProperties.java
+++ 
b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrProperties.java
@@ -94,7 +94,12 @@ public SolrParameters merge(JobParameters jobParameters) {
       sortColumns.add(sortValue);
       ++i;
     }
-    solrParameters.setSortColumn(sortColumns.toArray(new String[0]));
+    if (!sortColumns.isEmpty()) {
+      solrParameters.setSortColumn(sortColumns.toArray(new String[0]));
+    }
+    else {
+      solrParameters.setSortColumn(sortColumn);
+    }
 
     return solrParameters;
   }
diff --git 
a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrPropertiesTest.java
 
b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrPropertiesTest.java
index b7bda57..71d25b6 100644
--- 
a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrPropertiesTest.java
+++ 
b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrPropertiesTest.java
@@ -1,6 +1,7 @@
 package org.apache.ambari.infra.job.archive;
 
 import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertThat;
 
 import org.junit.Test;
@@ -27,7 +28,7 @@
  */
 public class  SolrPropertiesTest {
   @Test
-  public void testApplySortColumns() {
+  public void testMergeSortColumns() {
     JobParameters jobParameters = new JobParametersBuilder()
             .addString("sortColumn[0]", "logtime")
             .addString("sortColumn[1]", "id")
@@ -42,13 +43,24 @@ public void testApplySortColumns() {
   }
 
   @Test
-  public void testApplyWhenNoSortIsDefined() {
+  public void testMergeWhenNoSortIsDefined() {
+    JobParameters jobParameters = new JobParametersBuilder()
+            .toJobParameters();
+
+    SolrProperties solrProperties = new SolrProperties();
+    SolrParameters solrParameters = solrProperties.merge(jobParameters);
+    assertThat(solrParameters.getSortColumn(), is(nullValue()));
+  }
+
+  @Test
+  public void testMergeWhenPropertiesAreDefinedButJobParamsAreNot() {
     JobParameters jobParameters = new JobParametersBuilder()
             .toJobParameters();
 
     SolrProperties solrProperties = new SolrProperties();
     solrProperties.setSortColumn(new String[] {"testColumn"});
     SolrParameters solrParameters = solrProperties.merge(jobParameters);
-    assertThat(solrParameters.getSortColumn().length, is(0));
+    assertThat(solrParameters.getSortColumn().length, is(1));
+    assertThat(solrParameters.getSortColumn()[0], is("testColumn"));
   }
 }
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to