Repository: incubator-gobblin Updated Branches: refs/heads/master c0c77ba54 -> be92ad18b
[GOBBLIN-478] Fixed bug to emit lineage events during Avro2ORC conversion Closes #2348 from aditya1105/master Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/be92ad18 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/be92ad18 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/be92ad18 Branch: refs/heads/master Commit: be92ad18b68a2ee5c60bc60a621f99eee3d17c54 Parents: c0c77ba Author: Aditya Sharma <[email protected]> Authored: Thu Apr 26 10:19:00 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Thu Apr 26 10:19:00 2018 -0700 ---------------------------------------------------------------------- .../hive/publisher/HiveConvertPublisher.java | 12 ++++----- .../hive/source/HiveAvroToOrcSource.java | 17 ++++++------- .../conversion/hive/utils/LineageUtils.java | 11 ++++++--- .../dataset/ConvertibleHiveDatasetTest.java | 26 +++++++++++++------- 4 files changed, 37 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/be92ad18/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java index f87a6a0..80da551 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java @@ -243,9 +243,8 @@ public class HiveConvertPublisher extends DataPublisher { } catch (Exception e) { log.error("Failed while emitting SLA event, but ignoring and moving forward to curate " + "all clean up commands", e); } - if (LineageUtils.shouldSetLineageInfo(wus.getWorkunit())) { - setDestLineageInfo(wus.getWorkunit(), - (ConvertibleHiveDataset) ((HiveWorkUnit) wus.getWorkunit()).getHiveDataset(), this.lineageInfo); + if (LineageUtils.shouldSetLineageInfo(wus)) { + setDestLineageInfo(wus, this.lineageInfo); } } } @@ -275,12 +274,13 @@ public class HiveConvertPublisher extends DataPublisher { } @VisibleForTesting - public void setDestLineageInfo(WorkUnit workUnit, ConvertibleHiveDataset convertibleHiveDataset, - Optional<LineageInfo> lineageInfo) { + public static void setDestLineageInfo(WorkUnitState wus, Optional<LineageInfo> lineageInfo) { + HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(wus.getWorkunit()); + ConvertibleHiveDataset convertibleHiveDataset = (ConvertibleHiveDataset) hiveWorkUnit.getHiveDataset(); List<DatasetDescriptor> destDatasets = convertibleHiveDataset.getDestDatasets(); for (int i = 0; i < destDatasets.size(); i++) { if (lineageInfo.isPresent()) { - lineageInfo.get().putDestination(destDatasets.get(i), i + 1, workUnit); + lineageInfo.get().putDestination(destDatasets.get(i), i + 1, wus); } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/be92ad18/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java index 4659aad..5098002 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java @@ -19,23 +19,22 @@ package org.apache.gobblin.data.management.conversion.hive.source; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import java.util.List; - import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset; import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDatasetFinder; import org.apache.gobblin.data.management.conversion.hive.utils.LineageUtils; -import org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker; -import org.apache.gobblin.data.management.copy.hive.HiveDataset; import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder; import org.apache.gobblin.dataset.DatasetDescriptor; import org.apache.gobblin.metrics.event.lineage.LineageInfo; import org.apache.gobblin.source.workunit.WorkUnit; + /** * An extension to {@link HiveSource} that is used for Avro to ORC conversion jobs. */ public class HiveAvroToOrcSource extends HiveSource { private Optional<LineageInfo> lineageInfo; + @Override public List<WorkUnit> getWorkunits(SourceState state) { if (!state.contains(HIVE_SOURCE_DATASET_FINDER_CLASS_KEY)) { @@ -49,21 +48,19 @@ public class HiveAvroToOrcSource extends HiveSource { List<WorkUnit> workunits = super.getWorkunits(state); for (WorkUnit workUnit : workunits) { if (LineageUtils.shouldSetLineageInfo(workUnit)) { - setSourceLineageInfo(workUnit, (ConvertibleHiveDataset) ((HiveWorkUnit) workUnit).getHiveDataset(), - this.lineageInfo); + setSourceLineageInfo(workUnit, this.lineageInfo); } } return workunits; } @VisibleForTesting - public void setSourceLineageInfo(WorkUnit workUnit, ConvertibleHiveDataset hiveDataset, - Optional<LineageInfo> lineageInfo) { - DatasetDescriptor sourceDataset = hiveDataset.getSourceDataset(); + public void setSourceLineageInfo(WorkUnit workUnit, Optional<LineageInfo> lineageInfo) { + HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(workUnit); + ConvertibleHiveDataset convertibleHiveDataset = (ConvertibleHiveDataset) hiveWorkUnit.getHiveDataset(); + DatasetDescriptor sourceDataset = convertibleHiveDataset.getSourceDataset(); if (lineageInfo.isPresent()) { lineageInfo.get().setSource(sourceDataset, workUnit); } } - - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/be92ad18/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java index 249359b..9f46dd2 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java @@ -16,6 +16,7 @@ */ package org.apache.gobblin.data.management.conversion.hive.utils; +import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset; import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit; import org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker; @@ -28,10 +29,8 @@ import org.apache.gobblin.source.workunit.WorkUnit; */ public class LineageUtils { public static boolean shouldSetLineageInfo(WorkUnit workUnit) { - if (!(workUnit instanceof HiveWorkUnit)) { - return false; - } - HiveWorkUnit hiveWorkUnit = (HiveWorkUnit) workUnit; + // Create a HiveWorkUnit from the workunit + HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(workUnit); if (hiveWorkUnit.getPropAsBoolean(PartitionLevelWatermarker.IS_WATERMARK_WORKUNIT_KEY, false)) { return false; } @@ -39,6 +38,10 @@ public class LineageUtils { return hiveDataset instanceof ConvertibleHiveDataset; } + public static boolean shouldSetLineageInfo(WorkUnitState workUnitState) { + return shouldSetLineageInfo(workUnitState.getWorkunit()); + } + private LineageUtils() { // cant instantiate } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/be92ad18/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java index b1a38f0..0048416 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java @@ -29,6 +29,7 @@ import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance; import org.apache.gobblin.broker.iface.SharedResourcesBroker; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.data.management.conversion.hive.publisher.HiveConvertPublisher; import org.apache.gobblin.data.management.conversion.hive.source.HiveAvroToOrcSource; import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit; @@ -38,6 +39,7 @@ import org.apache.gobblin.dataset.HiveToHdfsDatasetResolver; import org.apache.gobblin.dataset.HiveToHdfsDatasetResolverFactory; import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder; import org.apache.gobblin.metrics.event.lineage.LineageInfo; +import org.apache.gobblin.runtime.TaskState; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; @@ -76,15 +78,25 @@ public class ConvertibleHiveDatasetTest { workUnit.setProp("gobblin.broker.lineageInfo.datasetResolverFactory", HiveToHdfsDatasetResolverFactory.class.getName()); workUnit.setProp(ConfigurationKeys.JOB_ID_KEY, "123456"); + Optional<LineageInfo> lineageInfo = LineageInfo.getLineageInfo(getSharedJobBroker(workUnit.getProperties())); HiveAvroToOrcSource src = new HiveAvroToOrcSource(); Assert.assertTrue(LineageUtils.shouldSetLineageInfo(workUnit)); - src.setSourceLineageInfo(workUnit, - (ConvertibleHiveDataset) ((HiveWorkUnit) workUnit).getHiveDataset(), lineageInfo); - new HiveConvertPublisher(workUnit).setDestLineageInfo(workUnit, - (ConvertibleHiveDataset) ((HiveWorkUnit) workUnit).getHiveDataset(), lineageInfo); + if (LineageUtils.shouldSetLineageInfo(workUnit)) { + src.setSourceLineageInfo(workUnit, + lineageInfo); + } + // TaskState is passed to the publisher, hence test should mimic the same behavior + TaskState taskState = new TaskState(new WorkUnitState(workUnit)); + if (LineageUtils.shouldSetLineageInfo(taskState)) { + HiveConvertPublisher.setDestLineageInfo(taskState, lineageInfo); + } + Properties props = taskState.getProperties(); + + // Assert that there are two eventBuilders for nestedOrc and flattenedOrc + Collection<LineageEventBuilder> lineageEventBuilders = LineageInfo.load(Collections.singleton(taskState)); + Assert.assertEquals(lineageEventBuilders.size(), 2); - Properties props = workUnit.getSpecProperties(); // Asset that lineage name is correct Assert.assertEquals(props.getProperty("gobblin.event.lineage.name"), "/tmp/test"); @@ -113,10 +125,6 @@ public class ConvertibleHiveDatasetTest { Assert.assertEquals(destDD2.getName(), "/tmp/data_flattenedOrc/db1/tb1/final"); Assert.assertEquals(destDD2.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE), "db1_flattenedOrcDb.tb1_flattenedOrc"); - - // Assert that there are two eventBuilders for nestedOrc and flattenedOrc - Collection<LineageEventBuilder> lineageEventBuilders = LineageInfo.load(Collections.singleton(workUnit)); - Assert.assertEquals(lineageEventBuilders.size(), 2); } @Test
