This is an automated email from the ASF dual-hosted git repository. lesun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new fb58973 [GOBBLIN-1223][GOBBLIN-1217] Change the criteria for re-compaction, limit the time for re-compaction fb58973 is described below commit fb589737bd4f667a04fd02cb17f7b0b809c126e0 Author: Zihan Li <zi...@zihli-mn1.linkedin.biz> AuthorDate: Wed Jul 29 21:06:28 2020 -0700 [GOBBLIN-1223][GOBBLIN-1217] Change the criteria for re-compaction, limit the time for re-compaction force AM to read from token file to update token at start up code style address comments address comments address comments [GOBBLIN-1217] start metrics reporting with a few map-reduce properties Closes #3065 from arjun4084346/mapperNum address comments fix conflicts fix conflict [GOBBLIN-1223] Change the criteria for re- compaction, limit the time for re-compaction address comments Closes #3071 from ZihanLi58/GOBBLIN-1223 --- .../CompactionCompleteFileOperationAction.java | 2 + .../dataset/TimeBasedSubDirDatasetsFinder.java | 6 ++ .../compaction/event/CompactionSlaEventHelper.java | 1 + .../verify/CompactionTimeRangeVerifier.java | 64 +++++++++++++++------ .../compaction/verify/InputRecordCountHelper.java | 65 +++++++++++----------- .../mapreduce/AvroCompactionTaskTest.java | 43 ++++++++++++++ ligradle/findbugs/findbugsExclude.xml | 4 ++ 7 files changed, 135 insertions(+), 50 deletions(-) diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java index 252f118..72465b4 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java @@ -36,6 +36,7 @@ import org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner; import org.apache.gobblin.compaction.mapreduce.RecordKeyDedupReducerBase; import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase; import org.apache.gobblin.compaction.parser.CompactionPathParser; +import org.apache.gobblin.compaction.source.CompactionSource; import org.apache.gobblin.compaction.verify.InputRecordCountHelper; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; @@ -161,6 +162,7 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete this.configurator.getConfiguredJob().getJobID().toString()); compactState.setProp("DuplicateRecordCount", job.getCounters().findCounter( RecordKeyDedupReducerBase.EVENT_COUNTER.DEDUPED).getValue()); + compactState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME, this.state.getProp(CompactionSource.COMPACTION_INIT_TIME)); helper.saveState(new Path(result.getDstAbsoluteDir()), compactState); log.info("duplicated records count for "+ dstPath + " : " + compactState.getProp("DuplicateRecordCount")); diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/TimeBasedSubDirDatasetsFinder.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/TimeBasedSubDirDatasetsFinder.java index 111fe75..60155ff 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/TimeBasedSubDirDatasetsFinder.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/TimeBasedSubDirDatasetsFinder.java @@ -76,6 +76,12 @@ public class TimeBasedSubDirDatasetsFinder extends DatasetsFinder { public static final String COMPACTION_TIMEBASED_MIN_TIME_AGO = COMPACTION_TIMEBASED_PREFIX + "min.time.ago"; public static final String DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO = "1d"; + // The latest compaction run time to be processed. Format = ?m?d?h. + public static final String MIN_RECOMPACTION_DURATION = + COMPACTION_TIMEBASED_PREFIX + "min.recompaction.duration"; + // By default we don't apply this limitation + public static final String DEFAULT_MIN_RECOMPACTION_DURATION = "0h"; + protected final String folderTimePattern; protected final String subDirPattern; protected final DateTimeZone timeZone; diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java index 763f375..f6fd977 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java @@ -49,6 +49,7 @@ public class CompactionSlaEventHelper { public static final String REGULAR_RECORD_COUNT = "regularRecordCount"; public static final String NEED_RECOMPACT = "needRecompact"; public static final String PREV_RECORD_COUNT_TOTAL = "prevRecordCountTotal"; + public static final String LAST_RUN_START_TIME = "lastRunStartTime"; public static final String EXEC_COUNT_TOTAL = "executionCountTotal"; public static final String MR_JOB_ID = "mrJobId"; public static final String RECORD_COUNT_TOTAL = "recordCountTotal"; diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java index 6d9d8c4..0d580d0 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java @@ -24,17 +24,20 @@ import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder; +import org.apache.gobblin.compaction.event.CompactionSlaEventHelper; import org.apache.gobblin.compaction.mapreduce.MRCompactor; import org.apache.gobblin.compaction.parser.CompactionPathParser; import org.apache.gobblin.compaction.source.CompactionSource; import org.apache.gobblin.configuration.State; import org.apache.gobblin.dataset.FileSystemDataset; +import org.apache.hadoop.fs.Path; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.Period; import org.joda.time.format.PeriodFormatter; import org.joda.time.format.PeriodFormatterBuilder; + /** * A simple class which verify current dataset belongs to a specific time range. Will skip to do * compaction if dataset is not in a correct time range. @@ -47,32 +50,63 @@ public class CompactionTimeRangeVerifier implements CompactionVerifier<FileSyste protected State state; - public Result verify (FileSystemDataset dataset) { + public Result verify(FileSystemDataset dataset) { final DateTime earliest; final DateTime latest; try { CompactionPathParser.CompactionParserResult result = new CompactionPathParser(state).parse(dataset); DateTime folderTime = result.getTime(); - DateTimeZone timeZone = DateTimeZone.forID(this.state.getProp(MRCompactor.COMPACTION_TIMEZONE, MRCompactor.DEFAULT_COMPACTION_TIMEZONE)); - DateTime compactionStartTime = new DateTime(this.state.getPropAsLong(CompactionSource.COMPACTION_INIT_TIME), timeZone); - PeriodFormatter formatter = new PeriodFormatterBuilder().appendMonths().appendSuffix("m").appendDays().appendSuffix("d").appendHours() - .appendSuffix("h").toFormatter(); + DateTimeZone timeZone = DateTimeZone.forID( + this.state.getProp(MRCompactor.COMPACTION_TIMEZONE, MRCompactor.DEFAULT_COMPACTION_TIMEZONE)); + DateTime compactionStartTime = + new DateTime(this.state.getPropAsLong(CompactionSource.COMPACTION_INIT_TIME), timeZone); + PeriodFormatter formatter = new PeriodFormatterBuilder().appendMonths() + .appendSuffix("m") + .appendDays() + .appendSuffix("d") + .appendHours() + .appendSuffix("h") + .toFormatter(); // Dataset name is like 'Identity/MemberAccount' or 'PageViewEvent' String datasetName = result.getDatasetName(); // get earliest time - String maxTimeAgoStrList = this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MAX_TIME_AGO, TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO); - String maxTimeAgoStr = getMachedLookbackTime(datasetName, maxTimeAgoStrList, TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO); + String maxTimeAgoStrList = this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MAX_TIME_AGO, + TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO); + String maxTimeAgoStr = getMachedLookbackTime(datasetName, maxTimeAgoStrList, + TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO); Period maxTimeAgo = formatter.parsePeriod(maxTimeAgoStr); earliest = compactionStartTime.minus(maxTimeAgo); // get latest time - String minTimeAgoStrList = this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MIN_TIME_AGO, TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO); - String minTimeAgoStr = getMachedLookbackTime(datasetName, minTimeAgoStrList, TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO); + String minTimeAgoStrList = this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MIN_TIME_AGO, + TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO); + String minTimeAgoStr = getMachedLookbackTime(datasetName, minTimeAgoStrList, + TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO); Period minTimeAgo = formatter.parsePeriod(minTimeAgoStr); latest = compactionStartTime.minus(minTimeAgo); + // get latest last run start time, we want to limit the duration between two compaction for the same dataset + if (state.contains(TimeBasedSubDirDatasetsFinder.MIN_RECOMPACTION_DURATION)) { + String minDurationStrList = this.state.getProp(TimeBasedSubDirDatasetsFinder.MIN_RECOMPACTION_DURATION); + String minDurationStr = getMachedLookbackTime(datasetName, minDurationStrList, + TimeBasedSubDirDatasetsFinder.DEFAULT_MIN_RECOMPACTION_DURATION); + Period minDurationTime = formatter.parsePeriod(minDurationStr); + DateTime latestEligibleCompactTime = compactionStartTime.minus(minDurationTime); + InputRecordCountHelper helper = new InputRecordCountHelper(state); + State compactState = helper.loadState(new Path(result.getDstAbsoluteDir())); + if (compactState.contains(CompactionSlaEventHelper.LAST_RUN_START_TIME) + && compactState.getPropAsLong(CompactionSlaEventHelper.LAST_RUN_START_TIME) + > latestEligibleCompactTime.getMillis()) { + log.warn("Last compaction for {} is {}, not before {}", dataset.datasetRoot(), + new DateTime(compactState.getPropAsLong(CompactionSlaEventHelper.LAST_RUN_START_TIME), timeZone), + latestEligibleCompactTime); + return new Result(false, + "Last compaction for " + dataset.datasetRoot() + " is not before" + latestEligibleCompactTime); + } + } + if (earliest.isBefore(folderTime) && latest.isAfter(folderTime)) { log.debug("{} falls in the user defined time range", dataset.datasetRoot()); return new Result(true, ""); @@ -88,7 +122,7 @@ public class CompactionTimeRangeVerifier implements CompactionVerifier<FileSyste return COMPACTION_VERIFIER_TIME_RANGE; } - public boolean isRetriable () { + public boolean isRetriable() { return false; } @@ -109,15 +143,15 @@ public class CompactionTimeRangeVerifier implements CompactionVerifier<FileSyste * @param datasetName A description of dataset without time partition information. Example 'Identity/MemberAccount' or 'PageViewEvent' * @return The lookback time matched with given dataset. */ - public static String getMachedLookbackTime (String datasetName, String datasetsAndLookBacks, String sysDefaultLookback) { + public static String getMachedLookbackTime(String datasetName, String datasetsAndLookBacks, + String sysDefaultLookback) { String defaultLookback = sysDefaultLookback; - for (String entry : Splitter.on(";").trimResults() - .omitEmptyStrings().splitToList(datasetsAndLookBacks)) { + for (String entry : Splitter.on(";").trimResults().omitEmptyStrings().splitToList(datasetsAndLookBacks)) { List<String> datasetAndLookbackTime = Splitter.on(":").trimResults().omitEmptyStrings().splitToList(entry); if (datasetAndLookbackTime.size() == 1) { defaultLookback = datasetAndLookbackTime.get(0); - } else if (datasetAndLookbackTime.size() == 2) { + } else if (datasetAndLookbackTime.size() == 2) { String regex = datasetAndLookbackTime.get(0); if (Pattern.compile(regex).matcher(datasetName).find()) { return datasetAndLookbackTime.get(1); @@ -128,6 +162,4 @@ public class CompactionTimeRangeVerifier implements CompactionVerifier<FileSyste } return defaultLookback; } - - } diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java index bbc581d..fd131c8 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java @@ -16,24 +16,17 @@ */ package org.apache.gobblin.compaction.verify; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.Collection; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import com.google.common.base.Charsets; -import com.google.common.collect.Lists; - import lombok.Getter; import lombok.extern.slf4j.Slf4j; - import org.apache.gobblin.compaction.dataset.DatasetHelper; import org.apache.gobblin.compaction.event.CompactionSlaEventHelper; import org.apache.gobblin.compaction.mapreduce.MRCompactor; @@ -42,9 +35,12 @@ import org.apache.gobblin.configuration.State; import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.RecordCountProvider; import org.apache.gobblin.util.recordcount.IngestionRecordCountProvider; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; -import static org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.COMPACTION_OUTPUT_EXTENSION; -import static org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.DEFAULT_COMPACTION_OUTPUT_EXTENSION; +import static org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*; /** @@ -73,13 +69,12 @@ public class InputRecordCountHelper { */ public InputRecordCountHelper(State state) { try { - this.fs = getSourceFileSystem (state); + this.fs = getSourceFileSystem(state); this.state = state; this.extensionName = state.getProp(COMPACTION_OUTPUT_EXTENSION, DEFAULT_COMPACTION_OUTPUT_EXTENSION); - this.inputRecordCountProvider = (RecordCountProvider) Class - .forName(state.getProp(MRCompactor.COMPACTION_INPUT_RECORD_COUNT_PROVIDER, - MRCompactor.DEFAULT_COMPACTION_INPUT_RECORD_COUNT_PROVIDER)) - .newInstance(); + this.inputRecordCountProvider = (RecordCountProvider) Class.forName( + state.getProp(MRCompactor.COMPACTION_INPUT_RECORD_COUNT_PROVIDER, + MRCompactor.DEFAULT_COMPACTION_INPUT_RECORD_COUNT_PROVIDER)).newInstance(); } catch (Exception e) { throw new RuntimeException("Failed to instantiate " + InputRecordCountHelper.class.getName(), e); } @@ -90,9 +85,9 @@ public class InputRecordCountHelper { * @param paths all paths where the record count are calculated * @return record count after parsing all files under given paths */ - public long calculateRecordCount (Collection<Path> paths) throws IOException { + public long calculateRecordCount(Collection<Path> paths) throws IOException { long sum = 0; - for (Path path: paths) { + for (Path path : paths) { sum += inputRecordCountProvider.getRecordCount( DatasetHelper.getApplicableFilePaths(this.fs, path, Lists.newArrayList(extensionName))); } @@ -102,11 +97,12 @@ public class InputRecordCountHelper { /** * Load compaction state file */ - public State loadState (Path dir) throws IOException { + public State loadState(Path dir) throws IOException { return loadState(this.fs, dir); } - private static State loadState (FileSystem fs, Path dir) throws IOException { + @VisibleForTesting + public static State loadState(FileSystem fs, Path dir) throws IOException { State state = new State(); if (fs.exists(new Path(dir, STATE_FILE))) { try (FSDataInputStream inputStream = fs.open(new Path(dir, STATE_FILE))) { @@ -119,11 +115,12 @@ public class InputRecordCountHelper { /** * Save compaction state file */ - public void saveState (Path dir, State state) throws IOException { + public void saveState(Path dir, State state) throws IOException { saveState(this.fs, dir, state); } - private static void saveState (FileSystem fs, Path dir, State state) throws IOException { + @VisibleForTesting + public static void saveState(FileSystem fs, Path dir, State state) throws IOException { Path tmpFile = new Path(dir, STATE_FILE + ".tmp"); Path newFile = new Path(dir, STATE_FILE); fs.delete(tmpFile, false); @@ -142,7 +139,7 @@ public class InputRecordCountHelper { * @param dir directory where a state file is located * @return record count */ - public long readRecordCount (Path dir) throws IOException { + public long readRecordCount(Path dir) throws IOException { return readRecordCount(this.fs, dir); } @@ -154,12 +151,13 @@ public class InputRecordCountHelper { * @return record count */ @Deprecated - public static long readRecordCount (FileSystem fs, Path dir) throws IOException { + public static long readRecordCount(FileSystem fs, Path dir) throws IOException { State state = loadState(fs, dir); if (!state.contains(CompactionSlaEventHelper.RECORD_COUNT_TOTAL)) { - if (fs.exists(new Path (dir, RECORD_COUNT_FILE))){ - try (BufferedReader br = new BufferedReader(new InputStreamReader(fs.open (new Path (dir, RECORD_COUNT_FILE)), Charsets.UTF_8))) { + if (fs.exists(new Path(dir, RECORD_COUNT_FILE))) { + try (BufferedReader br = new BufferedReader( + new InputStreamReader(fs.open(new Path(dir, RECORD_COUNT_FILE)), Charsets.UTF_8))) { long count = Long.parseLong(br.readLine()); return count; } @@ -177,7 +175,7 @@ public class InputRecordCountHelper { * @param dir directory where a state file is located * @return record count */ - public long readExecutionCount (Path dir) throws IOException { + public long readExecutionCount(Path dir) throws IOException { State state = loadState(fs, dir); return Long.parseLong(state.getProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, "0")); } @@ -189,14 +187,13 @@ public class InputRecordCountHelper { * @param dir directory where a record file is located */ @Deprecated - public static void writeRecordCount (FileSystem fs, Path dir, long count) throws IOException { - State state = loadState(fs, dir); - state.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, count); - saveState(fs, dir, state); + public static void writeRecordCount(FileSystem fs, Path dir, long count) throws IOException { + State state = loadState(fs, dir); + state.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, count); + saveState(fs, dir, state); } - protected FileSystem getSourceFileSystem (State state) - throws IOException { + protected FileSystem getSourceFileSystem(State state) throws IOException { Configuration conf = HadoopUtils.getConfFromState(state); String uri = state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, ConfigurationKeys.LOCAL_FS_URI); return HadoopUtils.getOptionallyThrottledFileSystem(FileSystem.get(URI.create(uri), conf), state); diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java index fb53a5c..7f9ef51 100644 --- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java +++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java @@ -28,6 +28,8 @@ import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.gobblin.compaction.event.CompactionSlaEventHelper; +import org.apache.gobblin.configuration.State; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -171,6 +173,47 @@ public class AvroCompactionTaskTest { Assert.assertTrue(fs.exists(new Path (basePath, "Identity/MemberAccount/hourly/2017/04/03/10"))); } + public void testAvroRecompactionWithLimitation() throws Exception { + FileSystem fs = getFileSystem(); + String basePath = "/tmp/testRecompaction"; + fs.delete(new Path(basePath), true); + + File jobDir = new File(basePath, "Identity/MemberAccount/minutely/2017/04/03/10/20_30/run_2017-04-03-10-20"); + Assert.assertTrue(jobDir.mkdirs()); + + GenericRecord r1 = createRandomRecord(); + writeFileWithContent(jobDir, "file1", r1, 20); + + EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin ("Recompaction-First", basePath); + JobExecutionResult result = embeddedGobblin.run(); + long recordCount = InputRecordCountHelper.readRecordCount(fs, (new Path (basePath, new Path("Identity/MemberAccount/hourly/2017/04/03/10")))); + Assert.assertTrue(result.isSuccessful()); + Assert.assertEquals(recordCount, 20); + + // Now write more avro files to input dir + writeFileWithContent(jobDir, "file2", r1, 22); + EmbeddedGobblin embeddedGobblin_2 = createEmbeddedGobblin ("Recompaction-Second", basePath); + embeddedGobblin_2.setConfiguration(TimeBasedSubDirDatasetsFinder.MIN_RECOMPACTION_DURATION, "8h"); + embeddedGobblin_2.run(); + Assert.assertTrue(result.isSuccessful()); + + // Because it's not meet the criteria, we should not run the re-compaction + recordCount = InputRecordCountHelper.readRecordCount(fs, (new Path (basePath, new Path("Identity/MemberAccount/hourly/2017/04/03/10")))); + Assert.assertEquals(recordCount, 20); + + State state = InputRecordCountHelper.loadState(fs, (new Path (basePath, new Path("Identity/MemberAccount/hourly/2017/04/03/10")))); + state.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME, + Long.toString(state.getPropAsLong(CompactionSlaEventHelper.LAST_RUN_START_TIME) - 8 * 60 * 60 * 1000)); + InputRecordCountHelper.saveState(fs, (new Path (basePath, new Path("Identity/MemberAccount/hourly/2017/04/03/10"))), state); + embeddedGobblin_2.run(); + Assert.assertTrue(result.isSuccessful()); + + // After two minutes, re-compaction can be trigger, a new record count should be written. + recordCount = InputRecordCountHelper.readRecordCount(fs, (new Path (basePath, new Path("Identity/MemberAccount/hourly/2017/04/03/10")))); + Assert.assertEquals(recordCount, 42); + Assert.assertTrue(fs.exists(new Path (basePath, "Identity/MemberAccount/hourly/2017/04/03/10"))); + } + // Returning file handler for setting modfication time. private File writeFileWithContent(File dir, String fileName, GenericRecord r, int count) throws IOException { File file = new File(dir, fileName + "." + count + ".avro"); diff --git a/ligradle/findbugs/findbugsExclude.xml b/ligradle/findbugs/findbugsExclude.xml index 30afc6a..d064047 100644 --- a/ligradle/findbugs/findbugsExclude.xml +++ b/ligradle/findbugs/findbugsExclude.xml @@ -49,4 +49,8 @@ <Class name="org.apache.gobblin.source.jdbc.JdbcExtractor" /> <Bug pattern="OBL_UNSATISFIED_OBLIGATION,ODR_OPEN_DATABASE_RESOURCE" /> </Match> + <Match> + <Class name="org.apache.gobblin.compaction.verify.CompactionTimeRangeVerifier" /> + <Bug pattern="REC_CATCH_EXCEPTION" /> + </Match> </FindBugsFilter>