This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 32f866b [GOBBLIN-1117] Enable record count verification for ORC format
32f866b is described below
commit 32f866b08e524148b1c13192a35bc19c3d8b3ee2
Author: Lei Sun <[email protected]>
AuthorDate: Thu Apr 16 16:05:57 2020 -0700
[GOBBLIN-1117] Enable record count verification for ORC format
Closes #2957 from autumnust/orc-recompact-fix
---
.../compaction/source/CompactionSource.java | 2 +-
.../verify/CompactionThresholdVerifier.java | 31 ++++++++-----
.../compaction/verify/InputRecordCountHelper.java | 12 +++--
.../mapreduce/AvroCompactionTaskTest.java | 2 +-
.../mapreduce/OrcCompactionTaskTest.java | 54 ++++++++++++++++------
5 files changed, 70 insertions(+), 31 deletions(-)
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
index 6b7f551..35fe53d 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
@@ -156,7 +156,7 @@ public class CompactionSource implements
WorkUnitStreamSource<String, String> {
Stopwatch stopwatch = Stopwatch.createStarted();
int threads =
this.state.getPropAsInt(CompactionVerifier.COMPACTION_VERIFICATION_THREADS, 5);
long timeOutInMinute =
this.state.getPropAsLong(CompactionVerifier.COMPACTION_VERIFICATION_TIMEOUT_MINUTES,
30);
- long iterationCountLimit =
this.state.getPropAsLong(CompactionVerifier.COMPACTION_VERIFICATION_ITERATION_COUNT_LIMIT,
Integer.MAX_VALUE);
+ long iterationCountLimit =
this.state.getPropAsLong(CompactionVerifier.COMPACTION_VERIFICATION_ITERATION_COUNT_LIMIT,
100);
long iteration = 0;
Map<String, String> failedReasonMap = null;
while (datasets.size() > 0 && iteration++ < iterationCountLimit) {
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
index 0eed686..5154afd 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
@@ -32,11 +32,11 @@ import
org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnRati
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset;
import org.apache.gobblin.dataset.FileSystemDataset;
+
/**
- * Compare the source and destination avro records. Determine if a compaction
is needed.
+ * Compare the source and destination file records' count. Determine if a
compaction is needed.
*/
@Slf4j
public class CompactionThresholdVerifier implements
CompactionVerifier<FileSystemDataset> {
@@ -62,34 +62,41 @@ public class CompactionThresholdVerifier implements
CompactionVerifier<FileSyste
*
* @return true iff the difference exceeds the threshold or this is the
first time compaction
*/
- public Result verify (FileSystemDataset dataset) {
+ public Result verify(FileSystemDataset dataset) {
Map<String, Double> thresholdMap = RecompactionConditionBasedOnRatio.
- getDatasetRegexAndRecompactThreshold
(state.getProp(MRCompactor.COMPACTION_LATEDATA_THRESHOLD_FOR_RECOMPACT_PER_DATASET,
- StringUtils.EMPTY));
+ getDatasetRegexAndRecompactThreshold(
+
state.getProp(MRCompactor.COMPACTION_LATEDATA_THRESHOLD_FOR_RECOMPACT_PER_DATASET,
StringUtils.EMPTY));
CompactionPathParser.CompactionParserResult result = new
CompactionPathParser(state).parse(dataset);
- double threshold =
RecompactionConditionBasedOnRatio.getRatioThresholdByDatasetName(result.getDatasetName(),
thresholdMap);
- log.debug ("Threshold is {} for dataset {}", threshold,
result.getDatasetName());
+ double threshold =
+
RecompactionConditionBasedOnRatio.getRatioThresholdByDatasetName(result.getDatasetName(),
thresholdMap);
+ log.debug("Threshold is {} for dataset {}", threshold,
result.getDatasetName());
InputRecordCountHelper helper = new InputRecordCountHelper(state);
try {
double newRecords = 0;
if (!dataset.isVirtual()) {
- newRecords = helper.calculateRecordCount (Lists.newArrayList(new
Path(dataset.datasetURN())));
+ newRecords = helper.calculateRecordCount(Lists.newArrayList(new
Path(dataset.datasetURN())));
}
- double oldRecords = helper.readRecordCount (new
Path(result.getDstAbsoluteDir()));
+ double oldRecords = helper.readRecordCount(new
Path(result.getDstAbsoluteDir()));
if (oldRecords == 0) {
return new Result(true, "");
}
+ if (newRecords < oldRecords) {
+ return new Result(false, "Illegal state: Current records count should
old be smaller.");
+ }
+
if ((newRecords - oldRecords) / oldRecords > threshold) {
- log.debug ("Dataset {} records exceeded the threshold {}",
dataset.datasetURN(), threshold);
+ log.debug("Dataset {} records exceeded the threshold {}",
dataset.datasetURN(), threshold);
return new Result(true, "");
}
- return new Result(false, String.format("%s is failed for dataset %s.
Prev=%f, Cur=%f, not reaching to threshold %f", this.getName(),
result.getDatasetName(), oldRecords, newRecords, threshold));
+ return new Result(false, String
+ .format("%s is failed for dataset %s. Prev=%f, Cur=%f, not reaching
to threshold %f", this.getName(),
+ result.getDatasetName(), oldRecords, newRecords, threshold));
} catch (IOException e) {
return new Result(false, ExceptionUtils.getFullStackTrace(e));
}
@@ -102,7 +109,7 @@ public class CompactionThresholdVerifier implements
CompactionVerifier<FileSyste
return this.getClass().getName();
}
- public boolean isRetriable () {
+ public boolean isRetriable() {
return false;
}
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
index e1bc952..bbc581d 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
@@ -43,8 +43,12 @@ import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.RecordCountProvider;
import org.apache.gobblin.util.recordcount.IngestionRecordCountProvider;
+import static
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.COMPACTION_OUTPUT_EXTENSION;
+import static
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.DEFAULT_COMPACTION_OUTPUT_EXTENSION;
+
+
/**
- * A class helps to calculate, serialize, deserialize record count.
+ * A class helps to calculate, serialize, deserialize record count. This will
work for Avro and ORC formats.
*
* By using {@link IngestionRecordCountProvider}, the default input file name
should be in format
* {file_name}.{record_count}.{extension}. For example, given a file path:
"/a/b/c/file.123.avro",
@@ -57,7 +61,7 @@ public class InputRecordCountHelper {
private final FileSystem fs;
private final State state;
private final RecordCountProvider inputRecordCountProvider;
- private final String AVRO = "avro";
+ private final String extensionName;
@Deprecated
public final static String RECORD_COUNT_FILE = "_record_count";
@@ -71,6 +75,7 @@ public class InputRecordCountHelper {
try {
this.fs = getSourceFileSystem (state);
this.state = state;
+ this.extensionName = state.getProp(COMPACTION_OUTPUT_EXTENSION,
DEFAULT_COMPACTION_OUTPUT_EXTENSION);
this.inputRecordCountProvider = (RecordCountProvider) Class
.forName(state.getProp(MRCompactor.COMPACTION_INPUT_RECORD_COUNT_PROVIDER,
MRCompactor.DEFAULT_COMPACTION_INPUT_RECORD_COUNT_PROVIDER))
@@ -88,7 +93,8 @@ public class InputRecordCountHelper {
public long calculateRecordCount (Collection<Path> paths) throws IOException
{
long sum = 0;
for (Path path: paths) {
- sum +=
inputRecordCountProvider.getRecordCount(DatasetHelper.getApplicableFilePaths(this.fs,
path, Lists.newArrayList(AVRO)));
+ sum += inputRecordCountProvider.getRecordCount(
+ DatasetHelper.getApplicableFilePaths(this.fs, path,
Lists.newArrayList(extensionName)));
}
return sum;
}
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
index 19d01fa..2a04d68 100644
---
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
@@ -141,7 +141,7 @@ public class AvroCompactionTaskTest {
}
@Test
- public void testRecompaction () throws Exception {
+ public void testAvroRecompaction() throws Exception {
FileSystem fs = getFileSystem();
String basePath = "/tmp/testRecompaction";
fs.delete(new Path(basePath), true);
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
index f855790..e6a3fab 100644
---
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
@@ -51,10 +51,13 @@ import org.testng.annotations.Test;
import static org.apache.gobblin.compaction.mapreduce.AvroCompactionTaskTest.*;
import static
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*;
+import static
org.apache.gobblin.compaction.mapreduce.MRCompactor.COMPACTION_LATEDATA_THRESHOLD_FOR_RECOMPACT_PER_DATASET;
import static
org.apache.gobblin.compaction.mapreduce.MRCompactor.COMPACTION_SHOULD_DEDUPLICATE;
public class OrcCompactionTaskTest {
+ final String extensionName = "orc";
+
private void createTestingData(File jobDir) throws Exception {
// Write some ORC file for compaction here.
TypeDescription schema = TypeDescription.fromString("struct<i:int,j:int>");
@@ -74,15 +77,16 @@ public class OrcCompactionTaskTest {
orcStruct_3.setFieldValue("i", new IntWritable(4));
orcStruct_3.setFieldValue("j", new IntWritable(5));
- File file_0 = new File(jobDir, "file_0");
- File file_1 = new File(jobDir, "file_1");
+ // Following pattern: FILENAME.RECORDCOUNT.EXTENSION
+ File file_0 = new File(jobDir, "file_0.2." + extensionName);
+ File file_1 = new File(jobDir, "file_1.2." + extensionName);
writeOrcRecordsInFile(new Path(file_0.getAbsolutePath()), schema,
ImmutableList.of(orcStruct_0, orcStruct_2));
writeOrcRecordsInFile(new Path(file_1.getAbsolutePath()), schema,
ImmutableList.of(orcStruct_1, orcStruct_3));
}
@Test
- public void basicTest() throws Exception {
+ public void basicTestWithRecompaction() throws Exception {
File basePath = Files.createTempDir();
basePath.deleteOnExit();
@@ -101,18 +105,18 @@ public class OrcCompactionTaskTest {
orcStruct_4.setFieldValue("j", new IntWritable(6));
orcStruct_4.setFieldValue("k", new IntWritable(7));
- File file_2 = new File(jobDir, "file_2");
+ File file_2 = new File(jobDir, "file_2.1." + extensionName);
writeOrcRecordsInFile(new Path(file_2.getAbsolutePath()), evolvedSchema,
ImmutableList.of(orcStruct_4));
// Make this is the newest.
file_2.setLastModified(Long.MAX_VALUE);
// Verify execution
// Overwrite the job configurator factory key.
- String extensionFileName = "orcavro";
EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin("basic",
basePath.getAbsolutePath().toString())
.setConfiguration(CompactionJobConfigurator.COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY,
TestCompactionOrcJobConfigurator.Factory.class.getName())
- .setConfiguration(COMPACTION_OUTPUT_EXTENSION, extensionFileName);
+ .setConfiguration(COMPACTION_OUTPUT_EXTENSION, extensionName)
+
.setConfiguration(COMPACTION_LATEDATA_THRESHOLD_FOR_RECOMPACT_PER_DATASET,
"Identity.*:0.1");
JobExecutionResult execution = embeddedGobblin.run();
Assert.assertTrue(execution.isSuccessful());
@@ -120,14 +124,7 @@ public class OrcCompactionTaskTest {
File outputDir = new File(basePath, hourlyPath);
FileSystem fs = FileSystem.getLocal(new Configuration());
List<FileStatus> statuses = new ArrayList<>();
- for (FileStatus status : fs.listStatus(new
Path(outputDir.getAbsolutePath()), new PathFilter() {
- @Override
- public boolean accept(Path path) {
- return FilenameUtils.isExtension(path.getName(), extensionFileName);
- }
- })) {
- statuses.add(status);
- }
+ reloadFolder(statuses, outputDir, fs);
Assert.assertTrue(statuses.size() == 1);
List<OrcStruct> result = readOrcFile(statuses.get(0).getPath());
@@ -144,6 +141,35 @@ public class OrcCompactionTaskTest {
Assert.assertEquals(result.get(3).getFieldValue("i"), new IntWritable(5));
Assert.assertEquals(result.get(3).getFieldValue("j"), new IntWritable(6));
Assert.assertEquals(result.get(3).getFieldValue("k"), new IntWritable(7));
+
+ // Adding new .orc file into the directory and verify if re-compaction is
triggered.
+ File file_late = new File(jobDir, "file_late.1." + extensionName);
+ OrcStruct orcStruct_5 = (OrcStruct) OrcStruct.createValue(evolvedSchema);
+ orcStruct_5.setFieldValue("i", new IntWritable(10));
+ orcStruct_5.setFieldValue("j", new IntWritable(11));
+ orcStruct_5.setFieldValue("k", new IntWritable(12));
+
+ writeOrcRecordsInFile(new Path(file_late.getAbsolutePath()),
evolvedSchema, ImmutableList.of(orcStruct_5));
+ execution = embeddedGobblin.run();
+ Assert.assertTrue(execution.isSuccessful());
+
+ reloadFolder(statuses, outputDir, fs);
+ result = readOrcFile(statuses.get(0).getPath());
+ // Note previous execution's inspection gives 4 result, given
re-compaction, this should gives 1 late-record more.
+ Assert.assertEquals(result.size(), 4 + 1);
+ }
+
+ // A helper method to load all files in the output directory for
compaction-result inspection.
+ private void reloadFolder(List<FileStatus> statuses, File outputDir,
FileSystem fs) throws IOException {
+ statuses.clear();
+ for (FileStatus status : fs.listStatus(new
Path(outputDir.getAbsolutePath()), new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return FilenameUtils.isExtension(path.getName(), extensionName);
+ }
+ })) {
+ statuses.add(status);
+ }
}
@Test