This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 32f866b  [GOBBLIN-1117] Enable record count verification for ORC format
32f866b is described below

commit 32f866b08e524148b1c13192a35bc19c3d8b3ee2
Author: Lei Sun <le...@linkedin.com>
AuthorDate: Thu Apr 16 16:05:57 2020 -0700

    [GOBBLIN-1117] Enable record count verification for ORC format
    
    Closes #2957 from autumnust/orc-recompact-fix
---
 .../compaction/source/CompactionSource.java        |  2 +-
 .../verify/CompactionThresholdVerifier.java        | 31 ++++++++-----
 .../compaction/verify/InputRecordCountHelper.java  | 12 +++--
 .../mapreduce/AvroCompactionTaskTest.java          |  2 +-
 .../mapreduce/OrcCompactionTaskTest.java           | 54 ++++++++++++++++------
 5 files changed, 70 insertions(+), 31 deletions(-)

diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
index 6b7f551..35fe53d 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
@@ -156,7 +156,7 @@ public class CompactionSource implements 
WorkUnitStreamSource<String, String> {
         Stopwatch stopwatch = Stopwatch.createStarted();
         int threads = 
this.state.getPropAsInt(CompactionVerifier.COMPACTION_VERIFICATION_THREADS, 5);
         long timeOutInMinute = 
this.state.getPropAsLong(CompactionVerifier.COMPACTION_VERIFICATION_TIMEOUT_MINUTES,
 30);
-        long iterationCountLimit = 
this.state.getPropAsLong(CompactionVerifier.COMPACTION_VERIFICATION_ITERATION_COUNT_LIMIT,
 Integer.MAX_VALUE);
+        long iterationCountLimit = 
this.state.getPropAsLong(CompactionVerifier.COMPACTION_VERIFICATION_ITERATION_COUNT_LIMIT,
 100);
         long iteration = 0;
         Map<String, String> failedReasonMap = null;
         while (datasets.size() > 0 && iteration++ < iterationCountLimit) {
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
index 0eed686..5154afd 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
@@ -32,11 +32,11 @@ import 
org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnRati
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
 import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset;
 import org.apache.gobblin.dataset.FileSystemDataset;
 
+
 /**
- * Compare the source and destination avro records. Determine if a compaction 
is needed.
+ * Compare the source and destination file records' count. Determine if a 
compaction is needed.
  */
 @Slf4j
 public class CompactionThresholdVerifier implements 
CompactionVerifier<FileSystemDataset> {
@@ -62,34 +62,41 @@ public class CompactionThresholdVerifier implements 
CompactionVerifier<FileSyste
    *
    * @return true iff the difference exceeds the threshold or this is the 
first time compaction
    */
-  public Result verify (FileSystemDataset dataset) {
+  public Result verify(FileSystemDataset dataset) {
 
     Map<String, Double> thresholdMap = RecompactionConditionBasedOnRatio.
-            getDatasetRegexAndRecompactThreshold 
(state.getProp(MRCompactor.COMPACTION_LATEDATA_THRESHOLD_FOR_RECOMPACT_PER_DATASET,
-                    StringUtils.EMPTY));
+        getDatasetRegexAndRecompactThreshold(
+            
state.getProp(MRCompactor.COMPACTION_LATEDATA_THRESHOLD_FOR_RECOMPACT_PER_DATASET,
 StringUtils.EMPTY));
 
     CompactionPathParser.CompactionParserResult result = new 
CompactionPathParser(state).parse(dataset);
 
-    double threshold = 
RecompactionConditionBasedOnRatio.getRatioThresholdByDatasetName(result.getDatasetName(),
 thresholdMap);
-    log.debug ("Threshold is {} for dataset {}", threshold, 
result.getDatasetName());
+    double threshold =
+        
RecompactionConditionBasedOnRatio.getRatioThresholdByDatasetName(result.getDatasetName(),
 thresholdMap);
+    log.debug("Threshold is {} for dataset {}", threshold, 
result.getDatasetName());
 
     InputRecordCountHelper helper = new InputRecordCountHelper(state);
     try {
       double newRecords = 0;
       if (!dataset.isVirtual()) {
-        newRecords = helper.calculateRecordCount (Lists.newArrayList(new 
Path(dataset.datasetURN())));
+        newRecords = helper.calculateRecordCount(Lists.newArrayList(new 
Path(dataset.datasetURN())));
       }
-      double oldRecords = helper.readRecordCount (new 
Path(result.getDstAbsoluteDir()));
+      double oldRecords = helper.readRecordCount(new 
Path(result.getDstAbsoluteDir()));
 
       if (oldRecords == 0) {
         return new Result(true, "");
       }
+      if (newRecords < oldRecords) {
+        return new Result(false, "Illegal state: Current records count should 
old be smaller.");
+      }
+
       if ((newRecords - oldRecords) / oldRecords > threshold) {
-        log.debug ("Dataset {} records exceeded the threshold {}", 
dataset.datasetURN(), threshold);
+        log.debug("Dataset {} records exceeded the threshold {}", 
dataset.datasetURN(), threshold);
         return new Result(true, "");
       }
 
-      return new Result(false, String.format("%s is failed for dataset %s. 
Prev=%f, Cur=%f, not reaching to threshold %f", this.getName(), 
result.getDatasetName(), oldRecords, newRecords, threshold));
+      return new Result(false, String
+          .format("%s is failed for dataset %s. Prev=%f, Cur=%f, not reaching 
to threshold %f", this.getName(),
+              result.getDatasetName(), oldRecords, newRecords, threshold));
     } catch (IOException e) {
       return new Result(false, ExceptionUtils.getFullStackTrace(e));
     }
@@ -102,7 +109,7 @@ public class CompactionThresholdVerifier implements 
CompactionVerifier<FileSyste
     return this.getClass().getName();
   }
 
-  public boolean isRetriable () {
+  public boolean isRetriable() {
     return false;
   }
 }
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
index e1bc952..bbc581d 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
@@ -43,8 +43,12 @@ import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.RecordCountProvider;
 import org.apache.gobblin.util.recordcount.IngestionRecordCountProvider;
 
+import static 
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.COMPACTION_OUTPUT_EXTENSION;
+import static 
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.DEFAULT_COMPACTION_OUTPUT_EXTENSION;
+
+
 /**
- * A class helps to calculate, serialize, deserialize record count.
+ * A class helps to calculate, serialize, deserialize record count. This will 
work for Avro and ORC formats.
  *
  * By using {@link IngestionRecordCountProvider}, the default input file name 
should be in format
  * {file_name}.{record_count}.{extension}. For example, given a file path: 
"/a/b/c/file.123.avro",
@@ -57,7 +61,7 @@ public class InputRecordCountHelper {
   private final FileSystem fs;
   private final State state;
   private final RecordCountProvider inputRecordCountProvider;
-  private final String AVRO = "avro";
+  private final String extensionName;
 
   @Deprecated
   public final static String RECORD_COUNT_FILE = "_record_count";
@@ -71,6 +75,7 @@ public class InputRecordCountHelper {
     try {
       this.fs = getSourceFileSystem (state);
       this.state = state;
+      this.extensionName = state.getProp(COMPACTION_OUTPUT_EXTENSION, 
DEFAULT_COMPACTION_OUTPUT_EXTENSION);
       this.inputRecordCountProvider = (RecordCountProvider) Class
               
.forName(state.getProp(MRCompactor.COMPACTION_INPUT_RECORD_COUNT_PROVIDER,
                       
MRCompactor.DEFAULT_COMPACTION_INPUT_RECORD_COUNT_PROVIDER))
@@ -88,7 +93,8 @@ public class InputRecordCountHelper {
   public long calculateRecordCount (Collection<Path> paths) throws IOException 
{
     long sum = 0;
     for (Path path: paths) {
-      sum += 
inputRecordCountProvider.getRecordCount(DatasetHelper.getApplicableFilePaths(this.fs,
 path, Lists.newArrayList(AVRO)));
+      sum += inputRecordCountProvider.getRecordCount(
+          DatasetHelper.getApplicableFilePaths(this.fs, path, 
Lists.newArrayList(extensionName)));
     }
     return sum;
   }
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
index 19d01fa..2a04d68 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
@@ -141,7 +141,7 @@ public class AvroCompactionTaskTest {
   }
 
   @Test
-  public void testRecompaction () throws Exception {
+  public void testAvroRecompaction() throws Exception {
     FileSystem fs = getFileSystem();
     String basePath = "/tmp/testRecompaction";
     fs.delete(new Path(basePath), true);
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
index f855790..e6a3fab 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
@@ -51,10 +51,13 @@ import org.testng.annotations.Test;
 
 import static org.apache.gobblin.compaction.mapreduce.AvroCompactionTaskTest.*;
 import static 
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*;
+import static 
org.apache.gobblin.compaction.mapreduce.MRCompactor.COMPACTION_LATEDATA_THRESHOLD_FOR_RECOMPACT_PER_DATASET;
 import static 
org.apache.gobblin.compaction.mapreduce.MRCompactor.COMPACTION_SHOULD_DEDUPLICATE;
 
 
 public class OrcCompactionTaskTest {
+  final String extensionName = "orc";
+
   private void createTestingData(File jobDir) throws Exception {
     // Write some ORC file for compaction here.
     TypeDescription schema = TypeDescription.fromString("struct<i:int,j:int>");
@@ -74,15 +77,16 @@ public class OrcCompactionTaskTest {
     orcStruct_3.setFieldValue("i", new IntWritable(4));
     orcStruct_3.setFieldValue("j", new IntWritable(5));
 
-    File file_0 = new File(jobDir, "file_0");
-    File file_1 = new File(jobDir, "file_1");
+    // Following pattern: FILENAME.RECORDCOUNT.EXTENSION
+    File file_0 = new File(jobDir, "file_0.2." + extensionName);
+    File file_1 = new File(jobDir, "file_1.2." + extensionName);
 
     writeOrcRecordsInFile(new Path(file_0.getAbsolutePath()), schema, 
ImmutableList.of(orcStruct_0, orcStruct_2));
     writeOrcRecordsInFile(new Path(file_1.getAbsolutePath()), schema, 
ImmutableList.of(orcStruct_1, orcStruct_3));
   }
 
   @Test
-  public void basicTest() throws Exception {
+  public void basicTestWithRecompaction() throws Exception {
     File basePath = Files.createTempDir();
     basePath.deleteOnExit();
 
@@ -101,18 +105,18 @@ public class OrcCompactionTaskTest {
     orcStruct_4.setFieldValue("j", new IntWritable(6));
     orcStruct_4.setFieldValue("k", new IntWritable(7));
 
-    File file_2 = new File(jobDir, "file_2");
+    File file_2 = new File(jobDir, "file_2.1." + extensionName);
     writeOrcRecordsInFile(new Path(file_2.getAbsolutePath()), evolvedSchema, 
ImmutableList.of(orcStruct_4));
     // Make this is the newest.
     file_2.setLastModified(Long.MAX_VALUE);
 
     // Verify execution
     // Overwrite the job configurator factory key.
-    String extensionFileName = "orcavro";
     EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin("basic", 
basePath.getAbsolutePath().toString())
         
.setConfiguration(CompactionJobConfigurator.COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY,
         TestCompactionOrcJobConfigurator.Factory.class.getName())
-        .setConfiguration(COMPACTION_OUTPUT_EXTENSION, extensionFileName);
+        .setConfiguration(COMPACTION_OUTPUT_EXTENSION, extensionName)
+        
.setConfiguration(COMPACTION_LATEDATA_THRESHOLD_FOR_RECOMPACT_PER_DATASET, 
"Identity.*:0.1");
     JobExecutionResult execution = embeddedGobblin.run();
     Assert.assertTrue(execution.isSuccessful());
 
@@ -120,14 +124,7 @@ public class OrcCompactionTaskTest {
     File outputDir = new File(basePath, hourlyPath);
     FileSystem fs = FileSystem.getLocal(new Configuration());
     List<FileStatus> statuses = new ArrayList<>();
-    for (FileStatus status : fs.listStatus(new 
Path(outputDir.getAbsolutePath()), new PathFilter() {
-      @Override
-      public boolean accept(Path path) {
-        return FilenameUtils.isExtension(path.getName(), extensionFileName);
-      }
-    })) {
-      statuses.add(status);
-    }
+    reloadFolder(statuses, outputDir, fs);
 
     Assert.assertTrue(statuses.size() == 1);
     List<OrcStruct> result = readOrcFile(statuses.get(0).getPath());
@@ -144,6 +141,35 @@ public class OrcCompactionTaskTest {
     Assert.assertEquals(result.get(3).getFieldValue("i"), new IntWritable(5));
     Assert.assertEquals(result.get(3).getFieldValue("j"), new IntWritable(6));
     Assert.assertEquals(result.get(3).getFieldValue("k"), new IntWritable(7));
+
+    // Adding new .orc file into the directory and verify if re-compaction is 
triggered.
+    File file_late = new File(jobDir, "file_late.1." + extensionName);
+    OrcStruct orcStruct_5 = (OrcStruct) OrcStruct.createValue(evolvedSchema);
+    orcStruct_5.setFieldValue("i", new IntWritable(10));
+    orcStruct_5.setFieldValue("j", new IntWritable(11));
+    orcStruct_5.setFieldValue("k", new IntWritable(12));
+
+    writeOrcRecordsInFile(new Path(file_late.getAbsolutePath()), 
evolvedSchema, ImmutableList.of(orcStruct_5));
+    execution = embeddedGobblin.run();
+    Assert.assertTrue(execution.isSuccessful());
+
+    reloadFolder(statuses, outputDir, fs);
+    result = readOrcFile(statuses.get(0).getPath());
+    // Note previous execution's inspection gives 4 result, given 
re-compaction, this should gives 1 late-record more.
+    Assert.assertEquals(result.size(), 4 + 1);
+  }
+
+  // A helper method to load all files in the output directory for 
compaction-result inspection.
+  private void reloadFolder(List<FileStatus> statuses, File outputDir, 
FileSystem fs) throws IOException {
+    statuses.clear();
+    for (FileStatus status : fs.listStatus(new 
Path(outputDir.getAbsolutePath()), new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return FilenameUtils.isExtension(path.getName(), extensionName);
+      }
+    })) {
+      statuses.add(status);
+    }
   }
 
   @Test

Reply via email to