[GOBBLIN-414] Added lineage event for convertible hive datasets Closes #2290 from aditya1105/metadata
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/faa27f41 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/faa27f41 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/faa27f41 Branch: refs/heads/0.12.0 Commit: faa27f41f00f1d142c128e13a3da0f8c388d83b9 Parents: 5e6bfb0 Author: aditya1105 <[email protected]> Authored: Thu Mar 1 07:34:58 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Thu Mar 1 07:34:58 2018 -0800 ---------------------------------------------------------------------- .../conversion/hive/source/HiveSource.java | 54 +++++++++++++++++- .../dataset/ConvertibleHiveDatasetTest.java | 60 ++++++++++++++++++++ 2 files changed, 111 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/faa27f41/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java index 3ad99fd..4cee48f 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java @@ -27,6 +27,13 @@ import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; +import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; +import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset; +import org.apache.gobblin.dataset.DatasetConstants; +import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.metrics.event.lineage.LineageInfo; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -148,6 +155,7 @@ public class HiveSource implements Source { protected long maxLookBackTime; protected long beginGetWorkunitsTime; protected List<String> ignoreDataPathIdentifierList; + protected SharedResourcesBroker<GobblinScopeTypes> sharedJobBroker; protected final ClassAliasResolver<HiveBaseExtractorFactory> classAliasResolver = new ClassAliasResolver<>(HiveBaseExtractorFactory.class); @@ -214,6 +222,7 @@ public class HiveSource implements Source { this.maxLookBackTime = new DateTime().minusDays(maxLookBackDays).getMillis(); this.ignoreDataPathIdentifierList = COMMA_BASED_SPLITTER.splitToList(state.getProp(HIVE_SOURCE_IGNORE_DATA_PATH_IDENTIFIER_KEY, DEFAULT_HIVE_SOURCE_IGNORE_DATA_PATH_IDENTIFIER)); + this.sharedJobBroker = state.getBroker(); silenceHiveLoggers(); } @@ -252,7 +261,10 @@ public class HiveSource implements Source { EventWorkunitUtils.setTableSlaEventMetadata(hiveWorkUnit, hiveDataset.getTable(), updateTime, lowWatermark.getValue(), this.beginGetWorkunitsTime); - + if (hiveDataset instanceof ConvertibleHiveDataset) { + setLineageInfo((ConvertibleHiveDataset) hiveDataset, hiveWorkUnit, this.sharedJobBroker); + log.info("Added lineage event for dataset " + hiveDataset.getUrn()); + } this.workunits.add(hiveWorkUnit); log.debug(String.format("Workunit added for table: %s", hiveWorkUnit)); @@ -281,7 +293,7 @@ public class HiveSource implements Source { } protected void createWorkunitsForPartitionedTable(HiveDataset hiveDataset, AutoReturnableObject<IMetaStoreClient> client) throws IOException { - + boolean setLineageInfo = false; long tableProcessTime = new DateTime().getMillis(); this.watermarker.onTableProcessBegin(hiveDataset.getTable(), tableProcessTime); @@ -329,7 +341,12 @@ public class HiveSource implements Source { EventWorkunitUtils.setPartitionSlaEventMetadata(hiveWorkUnit, hiveDataset.getTable(), sourcePartition, updateTime, lowWatermark.getValue(), this.beginGetWorkunitsTime); - + if (hiveDataset instanceof ConvertibleHiveDataset && !setLineageInfo) { + setLineageInfo((ConvertibleHiveDataset) hiveDataset, hiveWorkUnit, this.sharedJobBroker); + log.info("Added lineage event for dataset " + hiveDataset.getUrn()); + // Add lineage information only once per hive table + setLineageInfo = true; + } workunits.add(hiveWorkUnit); log.info(String.format("Creating workunit for partition %s as updateTime %s is greater than low watermark %s", sourcePartition.getCompleteName(), updateTime, lowWatermark.getValue())); @@ -474,4 +491,35 @@ public class HiveSource implements Source { private boolean isAvro(Table table) { return AvroSerDe.class.getName().equals(table.getSd().getSerdeInfo().getSerializationLib()); } + + public static void setLineageInfo(ConvertibleHiveDataset convertibleHiveDataset, WorkUnit workUnit, + SharedResourcesBroker<GobblinScopeTypes> sharedJobBroker) + throws IOException { + String sourceTable = + convertibleHiveDataset.getTable().getDbName() + "." + convertibleHiveDataset.getTable().getTableName(); + DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, sourceTable); + source.addMetadata(DatasetConstants.FS_URI, + convertibleHiveDataset.getTable().getDataLocation().getFileSystem(new Configuration()).getUri().toString()); + + int virtualBranch = 0; + for (String format : convertibleHiveDataset.getDestFormats()) { + ++virtualBranch; + Optional<ConvertibleHiveDataset.ConversionConfig> conversionConfigForFormat = + convertibleHiveDataset.getConversionConfigForFormat(format); + Optional<LineageInfo> lineageInfo = LineageInfo.getLineageInfo(sharedJobBroker); + if (!lineageInfo.isPresent()) { + continue; + } else if (!conversionConfigForFormat.isPresent()) { + continue; + } + String destTable = conversionConfigForFormat.get().getDestinationDbName() + "." + conversionConfigForFormat.get() + .getDestinationTableName(); + DatasetDescriptor dest = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, destTable); + Path destPath = new Path(conversionConfigForFormat.get().getDestinationDataPath()); + dest.addMetadata(DatasetConstants.FS_URI, destPath.getFileSystem(new Configuration()).getUri().toString()); + + lineageInfo.get().setSource(source, workUnit); + lineageInfo.get().putDestination(dest, virtualBranch, workUnit); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/faa27f41/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java index 5021d4d..c399264 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java @@ -19,8 +19,20 @@ package org.apache.gobblin.data.management.conversion.hive.dataset; import java.io.InputStream; import java.net.URI; import java.net.URISyntaxException; +import java.util.Collection; +import java.util.Collections; import java.util.Properties; +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; +import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; +import org.apache.gobblin.data.management.conversion.hive.source.HiveSource; +import org.apache.gobblin.dataset.DatasetConstants; +import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder; +import org.apache.gobblin.metrics.event.lineage.LineageInfo; +import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; @@ -31,6 +43,7 @@ import org.testng.annotations.Test; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.gson.Gson; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -45,6 +58,44 @@ import static org.mockito.Mockito.when; public class ConvertibleHiveDatasetTest { @Test + public void testLineageInfo() + throws Exception { + String testConfFilePath = "convertibleHiveDatasetTest/flattenedAndNestedOrc.conf"; + Config config = ConfigFactory.parseResources(testConfFilePath).getConfig("hive.conversion.avro"); + WorkUnit workUnit = WorkUnit.createEmpty(); + Gson GSON = new Gson(); + HiveSource.setLineageInfo(createTestConvertibleDataset(config), workUnit, getSharedJobBroker()); + Properties props = workUnit.getSpecProperties(); + // Asset that lineage name is correct + Assert.assertEquals(props.getProperty("gobblin.event.lineage.name"), "db1.tb1"); + + // Assert that source is correct for lineage event + Assert.assertTrue(props.containsKey("gobblin.event.lineage.source")); + DatasetDescriptor sourceDD = + GSON.fromJson(props.getProperty("gobblin.event.lineage.source"), DatasetDescriptor.class); + Assert.assertEquals(sourceDD.getPlatform(), DatasetConstants.PLATFORM_HIVE); + Assert.assertEquals(sourceDD.getName(), "db1.tb1"); + + // Assert that first dest is correct for lineage event + Assert.assertTrue(props.containsKey("gobblin.event.lineage.branch.1.destination")); + DatasetDescriptor destDD1 = + GSON.fromJson(props.getProperty("gobblin.event.lineage.branch.1.destination"), DatasetDescriptor.class); + Assert.assertEquals(destDD1.getPlatform(), DatasetConstants.PLATFORM_HIVE); + Assert.assertEquals(destDD1.getName(), "db1_nestedOrcDb.tb1_nestedOrc"); + + // Assert that second dest is correct for lineage event + Assert.assertTrue(props.containsKey("gobblin.event.lineage.branch.2.destination")); + DatasetDescriptor destDD2 = + GSON.fromJson(props.getProperty("gobblin.event.lineage.branch.2.destination"), DatasetDescriptor.class); + Assert.assertEquals(destDD2.getPlatform(), DatasetConstants.PLATFORM_HIVE); + Assert.assertEquals(destDD2.getName(), "db1_flattenedOrcDb.tb1_flattenedOrc"); + + // Assert that there are two eventBuilders for nestedOrc and flattenedOrc + Collection<LineageEventBuilder> lineageEventBuilders = LineageInfo.load(Collections.singleton(workUnit)); + Assert.assertEquals(lineageEventBuilders.size(), 2); + } + + @Test public void testFlattenedOrcConfig() throws Exception { String testConfFilePath = "convertibleHiveDatasetTest/flattenedOrc.conf"; Config config = ConfigFactory.parseResources(testConfFilePath).getConfig("hive.conversion.avro"); @@ -181,4 +232,13 @@ public class ConvertibleHiveDatasetTest { table.setSd(sd); return table; } + + public static SharedResourcesBroker<GobblinScopeTypes> getSharedJobBroker() { + SharedResourcesBroker<GobblinScopeTypes> instanceBroker = SharedResourcesBrokerFactory + .createDefaultTopLevelBroker(ConfigFactory.empty(), GobblinScopeTypes.GLOBAL.defaultScopeInstance()); + SharedResourcesBroker<GobblinScopeTypes> jobBroker = instanceBroker + .newSubscopedBuilder(new JobScopeInstance("ConvertibleHiveDatasetLineageEventTest", String.valueOf(System.currentTimeMillis()))) + .build(); + return jobBroker; + } }
