This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new fb58973  [GOBBLIN-1223][GOBBLIN-1217] Change the criteria for 
re-compaction, limit the time for re-compaction
fb58973 is described below

commit fb589737bd4f667a04fd02cb17f7b0b809c126e0
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Wed Jul 29 21:06:28 2020 -0700

    [GOBBLIN-1223][GOBBLIN-1217] Change the criteria for re-compaction, limit 
the time for re-compaction
    
    force AM to read from token file to update token
    at start up
    
    code style
    
    address comments
    
    address comments
    
    address comments
    
    [GOBBLIN-1217] start metrics reporting with a few
    map-reduce properties
    
    Closes #3065 from arjun4084346/mapperNum
    
    address comments
    
    fix conflicts
    
    fix conflict
    
    [GOBBLIN-1223] Change the criteria for re-
    compaction, limit the time for re-compaction
    
    address comments
    
    Closes #3071 from ZihanLi58/GOBBLIN-1223
---
 .../CompactionCompleteFileOperationAction.java     |  2 +
 .../dataset/TimeBasedSubDirDatasetsFinder.java     |  6 ++
 .../compaction/event/CompactionSlaEventHelper.java |  1 +
 .../verify/CompactionTimeRangeVerifier.java        | 64 +++++++++++++++------
 .../compaction/verify/InputRecordCountHelper.java  | 65 +++++++++++-----------
 .../mapreduce/AvroCompactionTaskTest.java          | 43 ++++++++++++++
 ligradle/findbugs/findbugsExclude.xml              |  4 ++
 7 files changed, 135 insertions(+), 50 deletions(-)

diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index 252f118..72465b4 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -36,6 +36,7 @@ import 
org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner;
 import org.apache.gobblin.compaction.mapreduce.RecordKeyDedupReducerBase;
 import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
+import org.apache.gobblin.compaction.source.CompactionSource;
 import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
@@ -161,6 +162,7 @@ public class CompactionCompleteFileOperationAction 
implements CompactionComplete
           this.configurator.getConfiguredJob().getJobID().toString());
       compactState.setProp("DuplicateRecordCount", 
job.getCounters().findCounter(
           RecordKeyDedupReducerBase.EVENT_COUNTER.DEDUPED).getValue());
+      compactState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME, 
this.state.getProp(CompactionSource.COMPACTION_INIT_TIME));
       helper.saveState(new Path(result.getDstAbsoluteDir()), compactState);
       log.info("duplicated records count for "+ dstPath + " : " + 
compactState.getProp("DuplicateRecordCount"));
 
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/TimeBasedSubDirDatasetsFinder.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/TimeBasedSubDirDatasetsFinder.java
index 111fe75..60155ff 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/TimeBasedSubDirDatasetsFinder.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/TimeBasedSubDirDatasetsFinder.java
@@ -76,6 +76,12 @@ public class TimeBasedSubDirDatasetsFinder extends 
DatasetsFinder {
   public static final String COMPACTION_TIMEBASED_MIN_TIME_AGO = 
COMPACTION_TIMEBASED_PREFIX + "min.time.ago";
   public static final String DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO = "1d";
 
+  // The latest compaction run time to be processed. Format = ?m?d?h.
+  public static final String MIN_RECOMPACTION_DURATION =
+      COMPACTION_TIMEBASED_PREFIX + "min.recompaction.duration";
+  // By default we don't apply this limitation
+  public static final String DEFAULT_MIN_RECOMPACTION_DURATION = "0h";
+
   protected final String folderTimePattern;
   protected final String subDirPattern;
   protected final DateTimeZone timeZone;
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
index 763f375..f6fd977 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
@@ -49,6 +49,7 @@ public class CompactionSlaEventHelper {
   public static final String REGULAR_RECORD_COUNT = "regularRecordCount";
   public static final String NEED_RECOMPACT = "needRecompact";
   public static final String PREV_RECORD_COUNT_TOTAL = "prevRecordCountTotal";
+  public static final String LAST_RUN_START_TIME = "lastRunStartTime";
   public static final String EXEC_COUNT_TOTAL = "executionCountTotal";
   public static final String MR_JOB_ID = "mrJobId";
   public static final String RECORD_COUNT_TOTAL = "recordCountTotal";
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
index 6d9d8c4..0d580d0 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
@@ -24,17 +24,20 @@ import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
+import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
 import org.apache.gobblin.compaction.source.CompactionSource;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.hadoop.fs.Path;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Period;
 import org.joda.time.format.PeriodFormatter;
 import org.joda.time.format.PeriodFormatterBuilder;
 
+
 /**
  * A simple class which verify current dataset belongs to a specific time 
range. Will skip to do
  * compaction if dataset is not in a correct time range.
@@ -47,32 +50,63 @@ public class CompactionTimeRangeVerifier implements 
CompactionVerifier<FileSyste
 
   protected State state;
 
-  public Result verify (FileSystemDataset dataset) {
+  public Result verify(FileSystemDataset dataset) {
     final DateTime earliest;
     final DateTime latest;
     try {
       CompactionPathParser.CompactionParserResult result = new 
CompactionPathParser(state).parse(dataset);
       DateTime folderTime = result.getTime();
-      DateTimeZone timeZone = 
DateTimeZone.forID(this.state.getProp(MRCompactor.COMPACTION_TIMEZONE, 
MRCompactor.DEFAULT_COMPACTION_TIMEZONE));
-      DateTime compactionStartTime = new 
DateTime(this.state.getPropAsLong(CompactionSource.COMPACTION_INIT_TIME), 
timeZone);
-      PeriodFormatter formatter = new 
PeriodFormatterBuilder().appendMonths().appendSuffix("m").appendDays().appendSuffix("d").appendHours()
-              .appendSuffix("h").toFormatter();
+      DateTimeZone timeZone = DateTimeZone.forID(
+          this.state.getProp(MRCompactor.COMPACTION_TIMEZONE, 
MRCompactor.DEFAULT_COMPACTION_TIMEZONE));
+      DateTime compactionStartTime =
+          new 
DateTime(this.state.getPropAsLong(CompactionSource.COMPACTION_INIT_TIME), 
timeZone);
+      PeriodFormatter formatter = new PeriodFormatterBuilder().appendMonths()
+          .appendSuffix("m")
+          .appendDays()
+          .appendSuffix("d")
+          .appendHours()
+          .appendSuffix("h")
+          .toFormatter();
 
       // Dataset name is like 'Identity/MemberAccount' or 'PageViewEvent'
       String datasetName = result.getDatasetName();
 
       // get earliest time
-      String maxTimeAgoStrList = 
this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MAX_TIME_AGO,
 TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO);
-      String maxTimeAgoStr = getMachedLookbackTime(datasetName, 
maxTimeAgoStrList, 
TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO);
+      String maxTimeAgoStrList = 
this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MAX_TIME_AGO,
+          
TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO);
+      String maxTimeAgoStr = getMachedLookbackTime(datasetName, 
maxTimeAgoStrList,
+          
TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO);
       Period maxTimeAgo = formatter.parsePeriod(maxTimeAgoStr);
       earliest = compactionStartTime.minus(maxTimeAgo);
 
       // get latest time
-      String minTimeAgoStrList = 
this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MIN_TIME_AGO,
 TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO);
-      String minTimeAgoStr = getMachedLookbackTime(datasetName, 
minTimeAgoStrList, 
TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO);
+      String minTimeAgoStrList = 
this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MIN_TIME_AGO,
+          
TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO);
+      String minTimeAgoStr = getMachedLookbackTime(datasetName, 
minTimeAgoStrList,
+          
TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO);
       Period minTimeAgo = formatter.parsePeriod(minTimeAgoStr);
       latest = compactionStartTime.minus(minTimeAgo);
 
+      // get latest last run start time, we want to limit the duration between 
two compaction for the same dataset
+      if 
(state.contains(TimeBasedSubDirDatasetsFinder.MIN_RECOMPACTION_DURATION)) {
+        String minDurationStrList = 
this.state.getProp(TimeBasedSubDirDatasetsFinder.MIN_RECOMPACTION_DURATION);
+        String minDurationStr = getMachedLookbackTime(datasetName, 
minDurationStrList,
+            TimeBasedSubDirDatasetsFinder.DEFAULT_MIN_RECOMPACTION_DURATION);
+        Period minDurationTime = formatter.parsePeriod(minDurationStr);
+        DateTime latestEligibleCompactTime = 
compactionStartTime.minus(minDurationTime);
+        InputRecordCountHelper helper = new InputRecordCountHelper(state);
+        State compactState = helper.loadState(new 
Path(result.getDstAbsoluteDir()));
+        if (compactState.contains(CompactionSlaEventHelper.LAST_RUN_START_TIME)
+            && 
compactState.getPropAsLong(CompactionSlaEventHelper.LAST_RUN_START_TIME)
+            > latestEligibleCompactTime.getMillis()) {
+          log.warn("Last compaction for {} is {}, not before {}", 
dataset.datasetRoot(),
+              new 
DateTime(compactState.getPropAsLong(CompactionSlaEventHelper.LAST_RUN_START_TIME),
 timeZone),
+              latestEligibleCompactTime);
+          return new Result(false,
+              "Last compaction for " + dataset.datasetRoot() + " is not 
before" + latestEligibleCompactTime);
+        }
+      }
+
       if (earliest.isBefore(folderTime) && latest.isAfter(folderTime)) {
         log.debug("{} falls in the user defined time range", 
dataset.datasetRoot());
         return new Result(true, "");
@@ -88,7 +122,7 @@ public class CompactionTimeRangeVerifier implements 
CompactionVerifier<FileSyste
     return COMPACTION_VERIFIER_TIME_RANGE;
   }
 
-  public boolean isRetriable () {
+  public boolean isRetriable() {
     return false;
   }
 
@@ -109,15 +143,15 @@ public class CompactionTimeRangeVerifier implements 
CompactionVerifier<FileSyste
    * @param datasetName A description of dataset without time partition 
information. Example 'Identity/MemberAccount' or 'PageViewEvent'
    * @return The lookback time matched with given dataset.
    */
-  public static String getMachedLookbackTime (String datasetName, String 
datasetsAndLookBacks, String sysDefaultLookback) {
+  public static String getMachedLookbackTime(String datasetName, String 
datasetsAndLookBacks,
+      String sysDefaultLookback) {
     String defaultLookback = sysDefaultLookback;
 
-    for (String entry : Splitter.on(";").trimResults()
-        .omitEmptyStrings().splitToList(datasetsAndLookBacks)) {
+    for (String entry : 
Splitter.on(";").trimResults().omitEmptyStrings().splitToList(datasetsAndLookBacks))
 {
       List<String> datasetAndLookbackTime = 
Splitter.on(":").trimResults().omitEmptyStrings().splitToList(entry);
       if (datasetAndLookbackTime.size() == 1) {
         defaultLookback = datasetAndLookbackTime.get(0);
-      } else if (datasetAndLookbackTime.size() == 2)  {
+      } else if (datasetAndLookbackTime.size() == 2) {
         String regex = datasetAndLookbackTime.get(0);
         if (Pattern.compile(regex).matcher(datasetName).find()) {
           return datasetAndLookbackTime.get(1);
@@ -128,6 +162,4 @@ public class CompactionTimeRangeVerifier implements 
CompactionVerifier<FileSyste
     }
     return defaultLookback;
   }
-
-
 }
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
index bbc581d..fd131c8 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
@@ -16,24 +16,17 @@
  */
 package org.apache.gobblin.compaction.verify;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
 import java.io.BufferedReader;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.net.URI;
 import java.util.Collection;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
-
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.gobblin.compaction.dataset.DatasetHelper;
 import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
@@ -42,9 +35,12 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.RecordCountProvider;
 import org.apache.gobblin.util.recordcount.IngestionRecordCountProvider;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
-import static 
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.COMPACTION_OUTPUT_EXTENSION;
-import static 
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.DEFAULT_COMPACTION_OUTPUT_EXTENSION;
+import static 
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*;
 
 
 /**
@@ -73,13 +69,12 @@ public class InputRecordCountHelper {
    */
   public InputRecordCountHelper(State state) {
     try {
-      this.fs = getSourceFileSystem (state);
+      this.fs = getSourceFileSystem(state);
       this.state = state;
       this.extensionName = state.getProp(COMPACTION_OUTPUT_EXTENSION, 
DEFAULT_COMPACTION_OUTPUT_EXTENSION);
-      this.inputRecordCountProvider = (RecordCountProvider) Class
-              
.forName(state.getProp(MRCompactor.COMPACTION_INPUT_RECORD_COUNT_PROVIDER,
-                      
MRCompactor.DEFAULT_COMPACTION_INPUT_RECORD_COUNT_PROVIDER))
-              .newInstance();
+      this.inputRecordCountProvider = (RecordCountProvider) Class.forName(
+          state.getProp(MRCompactor.COMPACTION_INPUT_RECORD_COUNT_PROVIDER,
+              
MRCompactor.DEFAULT_COMPACTION_INPUT_RECORD_COUNT_PROVIDER)).newInstance();
     } catch (Exception e) {
       throw new RuntimeException("Failed to instantiate " + 
InputRecordCountHelper.class.getName(), e);
     }
@@ -90,9 +85,9 @@ public class InputRecordCountHelper {
    * @param  paths all paths where the record count are calculated
    * @return record count after parsing all files under given paths
    */
-  public long calculateRecordCount (Collection<Path> paths) throws IOException 
{
+  public long calculateRecordCount(Collection<Path> paths) throws IOException {
     long sum = 0;
-    for (Path path: paths) {
+    for (Path path : paths) {
       sum += inputRecordCountProvider.getRecordCount(
           DatasetHelper.getApplicableFilePaths(this.fs, path, 
Lists.newArrayList(extensionName)));
     }
@@ -102,11 +97,12 @@ public class InputRecordCountHelper {
   /**
    * Load compaction state file
    */
-  public State loadState (Path dir) throws IOException {
+  public State loadState(Path dir) throws IOException {
     return loadState(this.fs, dir);
   }
 
-  private static State loadState (FileSystem fs, Path dir) throws IOException {
+  @VisibleForTesting
+  public static State loadState(FileSystem fs, Path dir) throws IOException {
     State state = new State();
     if (fs.exists(new Path(dir, STATE_FILE))) {
       try (FSDataInputStream inputStream = fs.open(new Path(dir, STATE_FILE))) 
{
@@ -119,11 +115,12 @@ public class InputRecordCountHelper {
   /**
    * Save compaction state file
    */
-  public void saveState (Path dir, State state) throws IOException {
+  public void saveState(Path dir, State state) throws IOException {
     saveState(this.fs, dir, state);
   }
 
-  private static void saveState (FileSystem fs, Path dir, State state) throws 
IOException {
+  @VisibleForTesting
+  public static void saveState(FileSystem fs, Path dir, State state) throws 
IOException {
     Path tmpFile = new Path(dir, STATE_FILE + ".tmp");
     Path newFile = new Path(dir, STATE_FILE);
     fs.delete(tmpFile, false);
@@ -142,7 +139,7 @@ public class InputRecordCountHelper {
    * @param dir directory where a state file is located
    * @return record count
    */
-  public long readRecordCount (Path dir) throws IOException {
+  public long readRecordCount(Path dir) throws IOException {
     return readRecordCount(this.fs, dir);
   }
 
@@ -154,12 +151,13 @@ public class InputRecordCountHelper {
    * @return record count
    */
   @Deprecated
-  public static long readRecordCount (FileSystem fs, Path dir) throws 
IOException {
+  public static long readRecordCount(FileSystem fs, Path dir) throws 
IOException {
     State state = loadState(fs, dir);
 
     if (!state.contains(CompactionSlaEventHelper.RECORD_COUNT_TOTAL)) {
-      if (fs.exists(new Path (dir, RECORD_COUNT_FILE))){
-        try (BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open (new Path (dir, RECORD_COUNT_FILE)), 
Charsets.UTF_8))) {
+      if (fs.exists(new Path(dir, RECORD_COUNT_FILE))) {
+        try (BufferedReader br = new BufferedReader(
+            new InputStreamReader(fs.open(new Path(dir, RECORD_COUNT_FILE)), 
Charsets.UTF_8))) {
           long count = Long.parseLong(br.readLine());
           return count;
         }
@@ -177,7 +175,7 @@ public class InputRecordCountHelper {
    * @param dir directory where a state file is located
    * @return record count
    */
-  public long readExecutionCount (Path dir) throws IOException {
+  public long readExecutionCount(Path dir) throws IOException {
     State state = loadState(fs, dir);
     return 
Long.parseLong(state.getProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, "0"));
   }
@@ -189,14 +187,13 @@ public class InputRecordCountHelper {
    * @param dir directory where a record file is located
    */
   @Deprecated
-  public static void writeRecordCount (FileSystem fs, Path dir, long count) 
throws IOException {
-     State state = loadState(fs, dir);
-     state.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, count);
-     saveState(fs, dir, state);
+  public static void writeRecordCount(FileSystem fs, Path dir, long count) 
throws IOException {
+    State state = loadState(fs, dir);
+    state.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, count);
+    saveState(fs, dir, state);
   }
 
-  protected FileSystem getSourceFileSystem (State state)
-          throws IOException {
+  protected FileSystem getSourceFileSystem(State state) throws IOException {
     Configuration conf = HadoopUtils.getConfFromState(state);
     String uri = state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, 
ConfigurationKeys.LOCAL_FS_URI);
     return 
HadoopUtils.getOptionallyThrottledFileSystem(FileSystem.get(URI.create(uri), 
conf), state);
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
index fb53a5c..7f9ef51 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
@@ -28,6 +28,8 @@ import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
+import org.apache.gobblin.configuration.State;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -171,6 +173,47 @@ public class AvroCompactionTaskTest {
     Assert.assertTrue(fs.exists(new Path (basePath, 
"Identity/MemberAccount/hourly/2017/04/03/10")));
   }
 
+  public void testAvroRecompactionWithLimitation() throws Exception {
+    FileSystem fs = getFileSystem();
+    String basePath = "/tmp/testRecompaction";
+    fs.delete(new Path(basePath), true);
+
+    File jobDir = new File(basePath, 
"Identity/MemberAccount/minutely/2017/04/03/10/20_30/run_2017-04-03-10-20");
+    Assert.assertTrue(jobDir.mkdirs());
+
+    GenericRecord r1 = createRandomRecord();
+    writeFileWithContent(jobDir, "file1", r1, 20);
+
+    EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin 
("Recompaction-First", basePath);
+    JobExecutionResult result = embeddedGobblin.run();
+    long recordCount = InputRecordCountHelper.readRecordCount(fs, (new Path 
(basePath, new Path("Identity/MemberAccount/hourly/2017/04/03/10"))));
+    Assert.assertTrue(result.isSuccessful());
+    Assert.assertEquals(recordCount, 20);
+
+    // Now write more avro files to input dir
+    writeFileWithContent(jobDir, "file2", r1, 22);
+    EmbeddedGobblin embeddedGobblin_2 = createEmbeddedGobblin 
("Recompaction-Second", basePath);
+    
embeddedGobblin_2.setConfiguration(TimeBasedSubDirDatasetsFinder.MIN_RECOMPACTION_DURATION,
 "8h");
+    embeddedGobblin_2.run();
+    Assert.assertTrue(result.isSuccessful());
+
+    // Because it's not meet the criteria, we should not run the re-compaction
+    recordCount = InputRecordCountHelper.readRecordCount(fs, (new Path 
(basePath, new Path("Identity/MemberAccount/hourly/2017/04/03/10"))));
+    Assert.assertEquals(recordCount, 20);
+
+    State state = InputRecordCountHelper.loadState(fs, (new Path (basePath, 
new Path("Identity/MemberAccount/hourly/2017/04/03/10"))));
+    state.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME,
+        
Long.toString(state.getPropAsLong(CompactionSlaEventHelper.LAST_RUN_START_TIME) 
- 8 * 60 * 60 * 1000));
+    InputRecordCountHelper.saveState(fs, (new Path (basePath, new 
Path("Identity/MemberAccount/hourly/2017/04/03/10"))), state);
+    embeddedGobblin_2.run();
+    Assert.assertTrue(result.isSuccessful());
+
+    // After two minutes, re-compaction can be trigger, a new record count 
should be written.
+    recordCount = InputRecordCountHelper.readRecordCount(fs, (new Path 
(basePath, new Path("Identity/MemberAccount/hourly/2017/04/03/10"))));
+    Assert.assertEquals(recordCount, 42);
+    Assert.assertTrue(fs.exists(new Path (basePath, 
"Identity/MemberAccount/hourly/2017/04/03/10")));
+  }
+
   // Returning file handler for setting modfication time.
   private File writeFileWithContent(File dir, String fileName, GenericRecord 
r, int count) throws IOException {
     File file = new File(dir, fileName + "." + count + ".avro");
diff --git a/ligradle/findbugs/findbugsExclude.xml 
b/ligradle/findbugs/findbugsExclude.xml
index 30afc6a..d064047 100644
--- a/ligradle/findbugs/findbugsExclude.xml
+++ b/ligradle/findbugs/findbugsExclude.xml
@@ -49,4 +49,8 @@
     <Class name="org.apache.gobblin.source.jdbc.JdbcExtractor" />
     <Bug pattern="OBL_UNSATISFIED_OBLIGATION,ODR_OPEN_DATABASE_RESOURCE" />
   </Match>
+  <Match>
+    <Class 
name="org.apache.gobblin.compaction.verify.CompactionTimeRangeVerifier" />
+    <Bug pattern="REC_CATCH_EXCEPTION" />
+  </Match>
 </FindBugsFilter>

Reply via email to