This is an automated email from the ASF dual-hosted git repository. lesun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new 32f866b [GOBBLIN-1117] Enable record count verification for ORC format 32f866b is described below commit 32f866b08e524148b1c13192a35bc19c3d8b3ee2 Author: Lei Sun <le...@linkedin.com> AuthorDate: Thu Apr 16 16:05:57 2020 -0700 [GOBBLIN-1117] Enable record count verification for ORC format Closes #2957 from autumnust/orc-recompact-fix --- .../compaction/source/CompactionSource.java | 2 +- .../verify/CompactionThresholdVerifier.java | 31 ++++++++----- .../compaction/verify/InputRecordCountHelper.java | 12 +++-- .../mapreduce/AvroCompactionTaskTest.java | 2 +- .../mapreduce/OrcCompactionTaskTest.java | 54 ++++++++++++++++------ 5 files changed, 70 insertions(+), 31 deletions(-) diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java index 6b7f551..35fe53d 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java @@ -156,7 +156,7 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> { Stopwatch stopwatch = Stopwatch.createStarted(); int threads = this.state.getPropAsInt(CompactionVerifier.COMPACTION_VERIFICATION_THREADS, 5); long timeOutInMinute = this.state.getPropAsLong(CompactionVerifier.COMPACTION_VERIFICATION_TIMEOUT_MINUTES, 30); - long iterationCountLimit = this.state.getPropAsLong(CompactionVerifier.COMPACTION_VERIFICATION_ITERATION_COUNT_LIMIT, Integer.MAX_VALUE); + long iterationCountLimit = this.state.getPropAsLong(CompactionVerifier.COMPACTION_VERIFICATION_ITERATION_COUNT_LIMIT, 100); long iteration = 0; Map<String, String> failedReasonMap = null; while (datasets.size() > 0 && iteration++ < iterationCountLimit) { diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java index 0eed686..5154afd 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java @@ -32,11 +32,11 @@ import org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnRati import org.apache.gobblin.compaction.mapreduce.MRCompactor; import org.apache.gobblin.compaction.parser.CompactionPathParser; import org.apache.gobblin.configuration.State; -import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset; import org.apache.gobblin.dataset.FileSystemDataset; + /** - * Compare the source and destination avro records. Determine if a compaction is needed. + * Compare the source and destination file records' count. Determine if a compaction is needed. */ @Slf4j public class CompactionThresholdVerifier implements CompactionVerifier<FileSystemDataset> { @@ -62,34 +62,41 @@ public class CompactionThresholdVerifier implements CompactionVerifier<FileSyste * * @return true iff the difference exceeds the threshold or this is the first time compaction */ - public Result verify (FileSystemDataset dataset) { + public Result verify(FileSystemDataset dataset) { Map<String, Double> thresholdMap = RecompactionConditionBasedOnRatio. - getDatasetRegexAndRecompactThreshold (state.getProp(MRCompactor.COMPACTION_LATEDATA_THRESHOLD_FOR_RECOMPACT_PER_DATASET, - StringUtils.EMPTY)); + getDatasetRegexAndRecompactThreshold( + state.getProp(MRCompactor.COMPACTION_LATEDATA_THRESHOLD_FOR_RECOMPACT_PER_DATASET, StringUtils.EMPTY)); CompactionPathParser.CompactionParserResult result = new CompactionPathParser(state).parse(dataset); - double threshold = RecompactionConditionBasedOnRatio.getRatioThresholdByDatasetName(result.getDatasetName(), thresholdMap); - log.debug ("Threshold is {} for dataset {}", threshold, result.getDatasetName()); + double threshold = + RecompactionConditionBasedOnRatio.getRatioThresholdByDatasetName(result.getDatasetName(), thresholdMap); + log.debug("Threshold is {} for dataset {}", threshold, result.getDatasetName()); InputRecordCountHelper helper = new InputRecordCountHelper(state); try { double newRecords = 0; if (!dataset.isVirtual()) { - newRecords = helper.calculateRecordCount (Lists.newArrayList(new Path(dataset.datasetURN()))); + newRecords = helper.calculateRecordCount(Lists.newArrayList(new Path(dataset.datasetURN()))); } - double oldRecords = helper.readRecordCount (new Path(result.getDstAbsoluteDir())); + double oldRecords = helper.readRecordCount(new Path(result.getDstAbsoluteDir())); if (oldRecords == 0) { return new Result(true, ""); } + if (newRecords < oldRecords) { + return new Result(false, "Illegal state: Current records count should old be smaller."); + } + if ((newRecords - oldRecords) / oldRecords > threshold) { - log.debug ("Dataset {} records exceeded the threshold {}", dataset.datasetURN(), threshold); + log.debug("Dataset {} records exceeded the threshold {}", dataset.datasetURN(), threshold); return new Result(true, ""); } - return new Result(false, String.format("%s is failed for dataset %s. Prev=%f, Cur=%f, not reaching to threshold %f", this.getName(), result.getDatasetName(), oldRecords, newRecords, threshold)); + return new Result(false, String + .format("%s is failed for dataset %s. Prev=%f, Cur=%f, not reaching to threshold %f", this.getName(), + result.getDatasetName(), oldRecords, newRecords, threshold)); } catch (IOException e) { return new Result(false, ExceptionUtils.getFullStackTrace(e)); } @@ -102,7 +109,7 @@ public class CompactionThresholdVerifier implements CompactionVerifier<FileSyste return this.getClass().getName(); } - public boolean isRetriable () { + public boolean isRetriable() { return false; } } diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java index e1bc952..bbc581d 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java @@ -43,8 +43,12 @@ import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.RecordCountProvider; import org.apache.gobblin.util.recordcount.IngestionRecordCountProvider; +import static org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.COMPACTION_OUTPUT_EXTENSION; +import static org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.DEFAULT_COMPACTION_OUTPUT_EXTENSION; + + /** - * A class helps to calculate, serialize, deserialize record count. + * A class helps to calculate, serialize, deserialize record count. This will work for Avro and ORC formats. * * By using {@link IngestionRecordCountProvider}, the default input file name should be in format * {file_name}.{record_count}.{extension}. For example, given a file path: "/a/b/c/file.123.avro", @@ -57,7 +61,7 @@ public class InputRecordCountHelper { private final FileSystem fs; private final State state; private final RecordCountProvider inputRecordCountProvider; - private final String AVRO = "avro"; + private final String extensionName; @Deprecated public final static String RECORD_COUNT_FILE = "_record_count"; @@ -71,6 +75,7 @@ public class InputRecordCountHelper { try { this.fs = getSourceFileSystem (state); this.state = state; + this.extensionName = state.getProp(COMPACTION_OUTPUT_EXTENSION, DEFAULT_COMPACTION_OUTPUT_EXTENSION); this.inputRecordCountProvider = (RecordCountProvider) Class .forName(state.getProp(MRCompactor.COMPACTION_INPUT_RECORD_COUNT_PROVIDER, MRCompactor.DEFAULT_COMPACTION_INPUT_RECORD_COUNT_PROVIDER)) @@ -88,7 +93,8 @@ public class InputRecordCountHelper { public long calculateRecordCount (Collection<Path> paths) throws IOException { long sum = 0; for (Path path: paths) { - sum += inputRecordCountProvider.getRecordCount(DatasetHelper.getApplicableFilePaths(this.fs, path, Lists.newArrayList(AVRO))); + sum += inputRecordCountProvider.getRecordCount( + DatasetHelper.getApplicableFilePaths(this.fs, path, Lists.newArrayList(extensionName))); } return sum; } diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java index 19d01fa..2a04d68 100644 --- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java +++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java @@ -141,7 +141,7 @@ public class AvroCompactionTaskTest { } @Test - public void testRecompaction () throws Exception { + public void testAvroRecompaction() throws Exception { FileSystem fs = getFileSystem(); String basePath = "/tmp/testRecompaction"; fs.delete(new Path(basePath), true); diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java index f855790..e6a3fab 100644 --- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java +++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java @@ -51,10 +51,13 @@ import org.testng.annotations.Test; import static org.apache.gobblin.compaction.mapreduce.AvroCompactionTaskTest.*; import static org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*; +import static org.apache.gobblin.compaction.mapreduce.MRCompactor.COMPACTION_LATEDATA_THRESHOLD_FOR_RECOMPACT_PER_DATASET; import static org.apache.gobblin.compaction.mapreduce.MRCompactor.COMPACTION_SHOULD_DEDUPLICATE; public class OrcCompactionTaskTest { + final String extensionName = "orc"; + private void createTestingData(File jobDir) throws Exception { // Write some ORC file for compaction here. TypeDescription schema = TypeDescription.fromString("struct<i:int,j:int>"); @@ -74,15 +77,16 @@ public class OrcCompactionTaskTest { orcStruct_3.setFieldValue("i", new IntWritable(4)); orcStruct_3.setFieldValue("j", new IntWritable(5)); - File file_0 = new File(jobDir, "file_0"); - File file_1 = new File(jobDir, "file_1"); + // Following pattern: FILENAME.RECORDCOUNT.EXTENSION + File file_0 = new File(jobDir, "file_0.2." + extensionName); + File file_1 = new File(jobDir, "file_1.2." + extensionName); writeOrcRecordsInFile(new Path(file_0.getAbsolutePath()), schema, ImmutableList.of(orcStruct_0, orcStruct_2)); writeOrcRecordsInFile(new Path(file_1.getAbsolutePath()), schema, ImmutableList.of(orcStruct_1, orcStruct_3)); } @Test - public void basicTest() throws Exception { + public void basicTestWithRecompaction() throws Exception { File basePath = Files.createTempDir(); basePath.deleteOnExit(); @@ -101,18 +105,18 @@ public class OrcCompactionTaskTest { orcStruct_4.setFieldValue("j", new IntWritable(6)); orcStruct_4.setFieldValue("k", new IntWritable(7)); - File file_2 = new File(jobDir, "file_2"); + File file_2 = new File(jobDir, "file_2.1." + extensionName); writeOrcRecordsInFile(new Path(file_2.getAbsolutePath()), evolvedSchema, ImmutableList.of(orcStruct_4)); // Make this is the newest. file_2.setLastModified(Long.MAX_VALUE); // Verify execution // Overwrite the job configurator factory key. - String extensionFileName = "orcavro"; EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin("basic", basePath.getAbsolutePath().toString()) .setConfiguration(CompactionJobConfigurator.COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY, TestCompactionOrcJobConfigurator.Factory.class.getName()) - .setConfiguration(COMPACTION_OUTPUT_EXTENSION, extensionFileName); + .setConfiguration(COMPACTION_OUTPUT_EXTENSION, extensionName) + .setConfiguration(COMPACTION_LATEDATA_THRESHOLD_FOR_RECOMPACT_PER_DATASET, "Identity.*:0.1"); JobExecutionResult execution = embeddedGobblin.run(); Assert.assertTrue(execution.isSuccessful()); @@ -120,14 +124,7 @@ public class OrcCompactionTaskTest { File outputDir = new File(basePath, hourlyPath); FileSystem fs = FileSystem.getLocal(new Configuration()); List<FileStatus> statuses = new ArrayList<>(); - for (FileStatus status : fs.listStatus(new Path(outputDir.getAbsolutePath()), new PathFilter() { - @Override - public boolean accept(Path path) { - return FilenameUtils.isExtension(path.getName(), extensionFileName); - } - })) { - statuses.add(status); - } + reloadFolder(statuses, outputDir, fs); Assert.assertTrue(statuses.size() == 1); List<OrcStruct> result = readOrcFile(statuses.get(0).getPath()); @@ -144,6 +141,35 @@ public class OrcCompactionTaskTest { Assert.assertEquals(result.get(3).getFieldValue("i"), new IntWritable(5)); Assert.assertEquals(result.get(3).getFieldValue("j"), new IntWritable(6)); Assert.assertEquals(result.get(3).getFieldValue("k"), new IntWritable(7)); + + // Adding new .orc file into the directory and verify if re-compaction is triggered. + File file_late = new File(jobDir, "file_late.1." + extensionName); + OrcStruct orcStruct_5 = (OrcStruct) OrcStruct.createValue(evolvedSchema); + orcStruct_5.setFieldValue("i", new IntWritable(10)); + orcStruct_5.setFieldValue("j", new IntWritable(11)); + orcStruct_5.setFieldValue("k", new IntWritable(12)); + + writeOrcRecordsInFile(new Path(file_late.getAbsolutePath()), evolvedSchema, ImmutableList.of(orcStruct_5)); + execution = embeddedGobblin.run(); + Assert.assertTrue(execution.isSuccessful()); + + reloadFolder(statuses, outputDir, fs); + result = readOrcFile(statuses.get(0).getPath()); + // Note previous execution's inspection gives 4 result, given re-compaction, this should gives 1 late-record more. + Assert.assertEquals(result.size(), 4 + 1); + } + + // A helper method to load all files in the output directory for compaction-result inspection. + private void reloadFolder(List<FileStatus> statuses, File outputDir, FileSystem fs) throws IOException { + statuses.clear(); + for (FileStatus status : fs.listStatus(new Path(outputDir.getAbsolutePath()), new PathFilter() { + @Override + public boolean accept(Path path) { + return FilenameUtils.isExtension(path.getName(), extensionName); + } + })) { + statuses.add(status); + } } @Test