Repository: incubator-gobblin Updated Branches: refs/heads/master 5a6bfea9f -> fcd57541a
[GOBBLIN-561] Handle data completeness checks for data partitions with no records. Closes #2422 from sv2000/compaction Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/fcd57541 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/fcd57541 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/fcd57541 Branch: refs/heads/master Commit: fcd57541a9c4a5273fe0836925336ad2af6bd6af Parents: 5a6bfea Author: sv2000 <[email protected]> Authored: Tue Aug 21 11:36:36 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Tue Aug 21 11:36:36 2018 -0700 ---------------------------------------------------------------------- .../CompactionAvroJobConfigurator.java | 49 +++++++++++++------- .../verify/CompactionAuditCountVerifier.java | 13 ++++-- .../gobblin/hive/avro/HiveAvroSerDeManager.java | 18 +++---- .../hive/policy/HiveRegistrationPolicy.java | 1 + .../hive/policy/HiveRegistrationPolicyBase.java | 19 ++++++-- 5 files changed, 65 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcd57541/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java index c9b7708..1779e33 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java @@ -64,6 +64,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; import org.apache.gobblin.dataset.Dataset; import org.apache.gobblin.dataset.FileSystemDataset; +import org.apache.gobblin.hive.policy.HiveRegistrationPolicy; import org.apache.gobblin.util.AvroUtils; import org.apache.gobblin.util.FileListUtils; import org.apache.gobblin.util.HadoopUtils; @@ -157,12 +158,14 @@ public class CompactionAvroJobConfigurator { private void configureSchema(Job job) throws IOException { Schema newestSchema = MRCompactorAvroKeyDedupJobRunner.getNewestSchemaFromSource(job, this.fs); - if (this.state.getPropAsBoolean(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_SINGLE_INPUT_SCHEMA, true)) { - AvroJob.setInputKeySchema(job, newestSchema); + if (newestSchema != null) { + if (this.state.getPropAsBoolean(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_SINGLE_INPUT_SCHEMA, true)) { + AvroJob.setInputKeySchema(job, newestSchema); + } + AvroJob.setMapOutputKeySchema(job, this.shouldDeduplicate ? getKeySchema(job, newestSchema) : newestSchema); + AvroJob.setMapOutputValueSchema(job, newestSchema); + AvroJob.setOutputKeySchema(job, newestSchema); } - AvroJob.setMapOutputKeySchema(job, this.shouldDeduplicate ? getKeySchema(job, newestSchema) : newestSchema); - AvroJob.setMapOutputValueSchema(job, newestSchema); - AvroJob.setOutputKeySchema(job, newestSchema); } protected void configureMapper(Job job) { @@ -228,14 +231,13 @@ public class CompactionAvroJobConfigurator { } /** - * Refer to {@link MRCompactorAvroKeyDedupJobRunner#configureInputAndOutputPaths(Job)} + * Refer to {@link MRCompactorAvroKeyDedupJobRunner#configureInputAndOutputPaths(Job)}. + * @return false if no valid input paths present for MR job to process, where a path is valid if it is + * a directory containing one or more files. + * */ - protected void configureInputAndOutputPaths(Job job, FileSystemDataset dataset) throws IOException { - - this.mapReduceInputPaths = getGranularInputPaths(dataset.datasetRoot()); - for (Path path: mapReduceInputPaths) { - FileInputFormat.addInputPath(job, path); - } + protected boolean configureInputAndOutputPaths(Job job, FileSystemDataset dataset) throws IOException { + boolean emptyDirectoryFlag = false; String mrOutputBase = this.state.getProp(MRCompactor.COMPACTION_JOB_DIR); CompactionPathParser parser = new CompactionPathParser(this.state); @@ -244,7 +246,19 @@ public class CompactionAvroJobConfigurator { log.info ("Cleaning temporary MR output directory: " + mrOutputPath); this.fs.delete(mrOutputPath, true); + + this.mapReduceInputPaths = getGranularInputPaths(dataset.datasetRoot()); + if (this.mapReduceInputPaths.isEmpty()) { + this.mapReduceInputPaths.add(dataset.datasetRoot()); + emptyDirectoryFlag = true; + } + + for (Path path: mapReduceInputPaths) { + FileInputFormat.addInputPath(job, path); + } + FileOutputFormat.setOutputPath(job, mrOutputPath); + return emptyDirectoryFlag; } /** @@ -270,14 +284,15 @@ public class CompactionAvroJobConfigurator { addJars(conf); Job job = Job.getInstance(conf); job.setJobName(MRCompactorJobRunner.HADOOP_JOB_NAME); - this.configureInputAndOutputPaths(job, dataset); + boolean emptyDirectoryFlag = this.configureInputAndOutputPaths(job, dataset); + if (emptyDirectoryFlag) { + this.state.setProp(HiveRegistrationPolicy.MAPREDUCE_JOB_INPUT_PATH_EMPTY_KEY, true); + } this.configureMapper(job); this.configureReducer(job); - - if (!this.shouldDeduplicate) { + if (emptyDirectoryFlag || !this.shouldDeduplicate) { job.setNumReduceTasks(0); } - // Configure schema at the last step because FilesInputFormat will be used internally this.configureSchema(job); this.isJobCreated = true; @@ -332,7 +347,7 @@ public class CompactionAvroJobConfigurator { } total.add(fileStatus.getPath().getParent()); } else { - uncompacted.add(fileStatus.getPath().getParent()); + uncompacted.add(fileStatus.getPath().getParent()); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcd57541/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java index 5653281..7c417df 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java @@ -151,20 +151,23 @@ public class CompactionAuditCountVerifier implements CompactionVerifier<FileSyst */ private Result passed (String datasetName, Map<String, Long> countsByTier, String referenceTier) { if (!countsByTier.containsKey(this.gobblinTier)) { - return new Result(false, String.format("Failed to get audit count for topic %s, tier %s", datasetName, this.gobblinTier)); + log.info("Missing entry for dataset: " + datasetName + " in gobblin tier: " + this.gobblinTier + "; setting count to 0."); } if (!countsByTier.containsKey(referenceTier)) { - return new Result(false, String.format("Failed to get audit count for topic %s, tier %s", datasetName, referenceTier)); + log.info("Missing entry for dataset: " + datasetName + " in reference tier: " + referenceTier + "; setting count to 0."); } - long refCount = countsByTier.get(referenceTier); - long gobblinCount = countsByTier.get(this.gobblinTier); + long refCount = countsByTier.getOrDefault(referenceTier, 0L); + long gobblinCount = countsByTier.getOrDefault(this.gobblinTier, 0L); + + if (refCount == 0) { + return new Result(true, ""); + } if ((double) gobblinCount / (double) refCount < this.threshold) { return new Result (false, String.format("%s failed for %s : gobblin count = %d, %s count = %d (%f < threshold %f)", this.getName(), datasetName, gobblinCount, referenceTier, refCount, (double) gobblinCount / (double) refCount, this.threshold)); } - return new Result(true, ""); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcd57541/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java ---------------------------------------------------------------------- diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java index b30a2fa..7277c2e 100644 --- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java @@ -103,11 +103,19 @@ public class HiveAvroSerDeManager extends HiveSerDeManager { */ @Override public void addSerDeProperties(Path path, HiveRegistrationUnit hiveUnit) throws IOException { + Preconditions.checkArgument(this.fs.getFileStatus(path).isDirectory(), path + " is not a directory."); + Schema schema; + try (Timer.Context context = metricContext.timer(HIVE_SPEC_SCHEMA_READING_TIMER).time()) { + schema = getDirectorySchema(path); + } + if (schema == null) { + return; + } hiveUnit.setSerDeType(this.serDeWrapper.getSerDe().getClass().getName()); hiveUnit.setInputFormat(this.serDeWrapper.getInputFormatClassName()); hiveUnit.setOutputFormat(this.serDeWrapper.getOutputFormatClassName()); - addSchemaProperties(path, hiveUnit); + addSchemaProperties(path, hiveUnit, schema); } @Override @@ -129,17 +137,11 @@ public class HiveAvroSerDeManager extends HiveSerDeManager { } } - private void addSchemaProperties(Path path, HiveRegistrationUnit hiveUnit) throws IOException { - Preconditions.checkArgument(this.fs.getFileStatus(path).isDirectory(), path + " is not a directory."); - + private void addSchemaProperties(Path path, HiveRegistrationUnit hiveUnit, Schema schema) throws IOException { Path schemaFile = new Path(path, this.schemaFileName); if (this.useSchemaFile) { hiveUnit.setSerDeProp(SCHEMA_URL, schemaFile.toString()); } else { - Schema schema ; - try (Timer.Context context = metricContext.timer(HIVE_SPEC_SCHEMA_READING_TIMER).time()) { - schema = getDirectorySchema(path); - } try (Timer.Context context = metricContext.timer(HIVE_SPEC_SCHEMA_WRITING_TIMER).time()) { addSchemaFromAvroFile(schema, schemaFile, hiveUnit); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcd57541/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicy.java ---------------------------------------------------------------------- diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicy.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicy.java index 0248c42..174848c 100644 --- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicy.java +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicy.java @@ -33,6 +33,7 @@ import org.apache.gobblin.hive.spec.HiveSpec; */ @Alpha public interface HiveRegistrationPolicy { + public static final String MAPREDUCE_JOB_INPUT_PATH_EMPTY_KEY = "mapreduce.job.input.path.empty"; /** * Get a collection of {@link HiveSpec}s for a {@link Path}, which can be used by {@link org.apache.gobblin.hive.HiveRegister} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcd57541/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java ---------------------------------------------------------------------- diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java index d03e8b9..1a4ab1d 100644 --- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java @@ -84,6 +84,7 @@ public class HiveRegistrationPolicyBase implements HiveRegistrationPolicy { // {@value PRIMARY_TABLE_TOKEN} if present in {@value ADDITIONAL_HIVE_TABLE_NAMES} or dbPrefix.{@value HIVE_TABLE_NAME} // .. will be replaced by the table name determined via {@link #getTableName(Path)} public static final String PRIMARY_TABLE_TOKEN = "$PRIMARY_TABLE"; + protected static final ConfigClient configClient = org.apache.gobblin.config.client.ConfigClient.createConfigClient(VersionStabilityPolicy.WEAK_LOCAL_STABILITY); @@ -108,6 +109,7 @@ public class HiveRegistrationPolicyBase implements HiveRegistrationPolicy { protected final String dbNameSuffix; protected final String tableNamePrefix; protected final String tableNameSuffix; + protected final boolean emptyInputPathFlag; protected final MetricContext metricContext; @@ -128,7 +130,7 @@ public class HiveRegistrationPolicyBase implements HiveRegistrationPolicy { this.dbNameSuffix = props.getProp(HIVE_DATABASE_NAME_SUFFIX, StringUtils.EMPTY); this.tableNamePrefix = props.getProp(HIVE_TABLE_NAME_PREFIX, StringUtils.EMPTY); this.tableNameSuffix = props.getProp(HIVE_TABLE_NAME_SUFFIX, StringUtils.EMPTY); - + this.emptyInputPathFlag = props.getPropAsBoolean(MAPREDUCE_JOB_INPUT_PATH_EMPTY_KEY, false); this.metricContext = Instrumented.getMetricContext(props, HiveRegister.class); } @@ -341,11 +343,18 @@ public class HiveRegistrationPolicyBase implements HiveRegistrationPolicy { * @throws IOException */ protected HiveTable getTable(Path path, String dbName, String tableName) throws IOException { - HiveTable table = new HiveTable.Builder().withDbName(dbName).withTableName(tableName) - .withSerdeManaager(HiveSerDeManager.get(this.props)).build(); + HiveTable.Builder tableBuilder = new HiveTable.Builder().withDbName(dbName).withTableName(tableName); + + if (!this.emptyInputPathFlag) { + tableBuilder = tableBuilder.withSerdeManaager(HiveSerDeManager.get(this.props)); + } + HiveTable table = tableBuilder.build(); table.setLocation(this.fs.makeQualified(getTableLocation(path)).toString()); - table.setSerDeProps(path); + + if (!this.emptyInputPathFlag) { + table.setSerDeProps(path); + } // Setting table-level props. State tableProps = new State(this.props.getTablePartitionProps()); @@ -418,4 +427,4 @@ public class HiveRegistrationPolicyBase implements HiveRegistrationPolicy { "Unable to instantiate " + HiveRegistrationPolicy.class.getSimpleName() + " with type " + policyType, e); } } -} +} \ No newline at end of file
