Repository: incubator-gobblin Updated Branches: refs/heads/master fc389522a -> 1c03ea22f
[GOBBLIN-463] Change lineage event for Avro2Orc conversion to have underlying FileSystem as platform Closes #2340 from eogren/aditya-branch Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1c03ea22 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1c03ea22 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1c03ea22 Branch: refs/heads/master Commit: 1c03ea22fa21f44000e7aa73b142fc92c5fc2ba2 Parents: fc38952 Author: Aditya Sharma <[email protected]> Authored: Mon Apr 16 20:42:18 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Apr 16 20:42:18 2018 -0700 ---------------------------------------------------------------------- .../gobblin/dataset/DatasetConstants.java | 4 ++ .../dataset/HiveToHdfsDatasetResolver.java | 50 +++++++++++++++++ .../HiveToHdfsDatasetResolverFactory.java | 28 ++++++++++ .../hive/dataset/ConvertibleHiveDataset.java | 50 +++++++++++++++++ .../hive/publisher/HiveConvertPublisher.java | 32 +++++++++++ .../hive/source/HiveAvroToOrcSource.java | 32 ++++++++++- .../conversion/hive/source/HiveSource.java | 54 +----------------- .../conversion/hive/utils/LineageUtils.java | 45 +++++++++++++++ .../dataset/ConvertibleHiveDatasetTest.java | 59 ++++++++++++++------ 9 files changed, 283 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java index 03b7fcb..d704525 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java @@ -37,4 +37,8 @@ public class DatasetConstants { /** JDBC metadata */ public static final String CONNECTION_URL = "connectionUrl"; + + /** FileSystem scheme and location */ + public static final String FS_SCHEME = "fsScheme"; + public static final String FS_LOCATION = "fsLocation"; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolver.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolver.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolver.java new file mode 100644 index 0000000..0130271 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolver.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.dataset; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import org.apache.gobblin.configuration.State; + + +/** + * Singleton {@link DatasetResolver} to convert a Hive {@link DatasetDescriptor} to HDFS {@link DatasetDescriptor} + */ +public class HiveToHdfsDatasetResolver implements DatasetResolver { + public static final String HIVE_TABLE = "hiveTable"; + public static final HiveToHdfsDatasetResolver INSTANCE = new HiveToHdfsDatasetResolver(); + + private HiveToHdfsDatasetResolver() { + // To make it singleton + } + + @Override + public DatasetDescriptor resolve(DatasetDescriptor raw, State state) { + ImmutableMap<String, String> metadata = raw.getMetadata(); + Preconditions.checkArgument(metadata.containsKey(DatasetConstants.FS_SCHEME), + String.format("Hive Dataset Descriptor must contain metadata %s to create Hdfs Dataset Descriptor", + DatasetConstants.FS_SCHEME)); + Preconditions.checkArgument(metadata.containsKey(DatasetConstants.FS_SCHEME), + String.format("Hive Dataset Descriptor must contain metadata %s to create Hdfs Dataset Descriptor", + DatasetConstants.FS_LOCATION)); + DatasetDescriptor datasetDescriptor = + new DatasetDescriptor(metadata.get(DatasetConstants.FS_SCHEME), metadata.get(DatasetConstants.FS_LOCATION)); + datasetDescriptor.addMetadata(HIVE_TABLE, raw.getName()); + return datasetDescriptor; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolverFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolverFactory.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolverFactory.java new file mode 100644 index 0000000..acd2212 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolverFactory.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.dataset; + +import com.typesafe.config.Config; + + +public class HiveToHdfsDatasetResolverFactory implements DatasetResolverFactory { + @Override + public DatasetResolver createResolver(Config config) { + return HiveToHdfsDatasetResolver.INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java index f4c8744..63f1bee 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java @@ -16,6 +16,9 @@ */ package org.apache.gobblin.data.management.conversion.hive.dataset; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -25,7 +28,11 @@ import lombok.ToString; import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.dataset.DatasetConstants; +import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.metadata.Table; import com.google.common.base.Optional; @@ -74,6 +81,14 @@ public class ConvertibleHiveDataset extends HiveDataset { // Mapping for destination format to it's Conversion config private final Map<String, ConversionConfig> destConversionConfigs; + // Source Dataset Descriptor + @Getter + private final DatasetDescriptor sourceDataset; + + // List of destination Dataset Descriptor + @Getter + private final List<DatasetDescriptor> destDatasets; + /** * <ul> * <li> The constructor takes in a dataset {@link Config} which MUST have a comma separated list of destination formats at key, @@ -112,6 +127,41 @@ public class ConvertibleHiveDataset extends HiveDataset { } } + this.sourceDataset = createSourceDataset(); + this.destDatasets = createDestDatasets(); + } + + private List<DatasetDescriptor> createDestDatasets() { + List<DatasetDescriptor> destDatasets = new ArrayList<>(); + for (String format : getDestFormats()) { + Optional<ConversionConfig> conversionConfigForFormat = getConversionConfigForFormat(format); + if (!conversionConfigForFormat.isPresent()) { + continue; + } + String destTable = conversionConfigForFormat.get().getDestinationDbName() + "." + conversionConfigForFormat.get() + .getDestinationTableName(); + DatasetDescriptor dest = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, destTable); + String destLocation = conversionConfigForFormat.get().getDestinationDataPath() + Path.SEPARATOR + "final"; + dest.addMetadata(DatasetConstants.FS_SCHEME, getSourceDataset().getMetadata().get(DatasetConstants.FS_SCHEME)); + dest.addMetadata(DatasetConstants.FS_LOCATION, destLocation); + destDatasets.add(dest); + } + return destDatasets; + } + + private DatasetDescriptor createSourceDataset() { + try { + String sourceTable = getTable().getDbName() + "." + getTable().getTableName(); + DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, sourceTable); + Path sourcePath = getTable().getDataLocation(); + String sourceLocation = Path.getPathWithoutSchemeAndAuthority(sourcePath).toString(); + FileSystem sourceFs = sourcePath.getFileSystem(new Configuration()); + source.addMetadata(DatasetConstants.FS_SCHEME, sourceFs.getScheme()); + source.addMetadata(DatasetConstants.FS_LOCATION, sourceLocation); + return source; + } catch (IOException e) { + throw new RuntimeException(e); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java index ff4d9e9..f87a6a0 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java @@ -31,6 +31,13 @@ import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset; +import org.apache.gobblin.data.management.conversion.hive.source.HiveAvroToOrcSource; +import org.apache.gobblin.data.management.conversion.hive.utils.LineageUtils; +import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.metrics.event.lineage.LineageInfo; +import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -90,6 +97,7 @@ public class HiveConvertPublisher extends DataPublisher { private final FileSystem fs; private final HiveSourceWatermarker watermarker; private final HiveMetastoreClientPool pool; + private final Optional<LineageInfo> lineageInfo; public static final String PARTITION_PARAMETERS_WHITELIST = "hive.conversion.partitionParameters.whitelist"; public static final String PARTITION_PARAMETERS_BLACKLIST = "hive.conversion.partitionParameters.blacklist"; @@ -105,6 +113,15 @@ public class HiveConvertPublisher extends DataPublisher { this.metricContext = Instrumented.getMetricContext(state, HiveConvertPublisher.class); this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, EventConstants.CONVERSION_NAMESPACE).build(); + // Extract LineageInfo from state + if (state instanceof SourceState) { + lineageInfo = LineageInfo.getLineageInfo(((SourceState) state).getBroker()); + } else if (state instanceof WorkUnitState) { + lineageInfo = LineageInfo.getLineageInfo(((WorkUnitState) state).getTaskBrokerNullable()); + } else { + lineageInfo = Optional.absent(); + } + Configuration conf = new Configuration(); Optional<String> uri = Optional.fromNullable(this.state.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI)); if (uri.isPresent()) { @@ -226,6 +243,10 @@ public class HiveConvertPublisher extends DataPublisher { } catch (Exception e) { log.error("Failed while emitting SLA event, but ignoring and moving forward to curate " + "all clean up commands", e); } + if (LineageUtils.shouldSetLineageInfo(wus.getWorkunit())) { + setDestLineageInfo(wus.getWorkunit(), + (ConvertibleHiveDataset) ((HiveWorkUnit) wus.getWorkunit()).getHiveDataset(), this.lineageInfo); + } } } } @@ -254,6 +275,17 @@ public class HiveConvertPublisher extends DataPublisher { } @VisibleForTesting + public void setDestLineageInfo(WorkUnit workUnit, ConvertibleHiveDataset convertibleHiveDataset, + Optional<LineageInfo> lineageInfo) { + List<DatasetDescriptor> destDatasets = convertibleHiveDataset.getDestDatasets(); + for (int i = 0; i < destDatasets.size(); i++) { + if (lineageInfo.isPresent()) { + lineageInfo.get().putDestination(destDatasets.get(i), i + 1, workUnit); + } + } + } + + @VisibleForTesting public void preservePartitionParams(Collection<? extends WorkUnitState> states) { for (WorkUnitState wus : states) { if (wus.getWorkingState() != WorkingState.COMMITTED) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java index 89615e9..4659aad 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java @@ -16,18 +16,26 @@ */ package org.apache.gobblin.data.management.conversion.hive.source; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import java.util.List; import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset; import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDatasetFinder; +import org.apache.gobblin.data.management.conversion.hive.utils.LineageUtils; +import org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker; +import org.apache.gobblin.data.management.copy.hive.HiveDataset; import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder; +import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.metrics.event.lineage.LineageInfo; import org.apache.gobblin.source.workunit.WorkUnit; /** * An extension to {@link HiveSource} that is used for Avro to ORC conversion jobs. */ public class HiveAvroToOrcSource extends HiveSource { - + private Optional<LineageInfo> lineageInfo; @Override public List<WorkUnit> getWorkunits(SourceState state) { if (!state.contains(HIVE_SOURCE_DATASET_FINDER_CLASS_KEY)) { @@ -36,8 +44,26 @@ public class HiveAvroToOrcSource extends HiveSource { if (!state.contains(HiveDatasetFinder.HIVE_DATASET_CONFIG_PREFIX_KEY)) { state.setProp(HiveDatasetFinder.HIVE_DATASET_CONFIG_PREFIX_KEY, "hive.conversion.avro"); } + this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker()); + + List<WorkUnit> workunits = super.getWorkunits(state); + for (WorkUnit workUnit : workunits) { + if (LineageUtils.shouldSetLineageInfo(workUnit)) { + setSourceLineageInfo(workUnit, (ConvertibleHiveDataset) ((HiveWorkUnit) workUnit).getHiveDataset(), + this.lineageInfo); + } + } + return workunits; + } - return super.getWorkunits(state); + @VisibleForTesting + public void setSourceLineageInfo(WorkUnit workUnit, ConvertibleHiveDataset hiveDataset, + Optional<LineageInfo> lineageInfo) { + DatasetDescriptor sourceDataset = hiveDataset.getSourceDataset(); + if (lineageInfo.isPresent()) { + lineageInfo.get().setSource(sourceDataset, workUnit); + } } -} + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java index 4cee48f..3ad99fd 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java @@ -27,13 +27,6 @@ import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; -import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; -import org.apache.gobblin.broker.iface.SharedResourcesBroker; -import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset; -import org.apache.gobblin.dataset.DatasetConstants; -import org.apache.gobblin.dataset.DatasetDescriptor; -import org.apache.gobblin.metrics.event.lineage.LineageInfo; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -155,7 +148,6 @@ public class HiveSource implements Source { protected long maxLookBackTime; protected long beginGetWorkunitsTime; protected List<String> ignoreDataPathIdentifierList; - protected SharedResourcesBroker<GobblinScopeTypes> sharedJobBroker; protected final ClassAliasResolver<HiveBaseExtractorFactory> classAliasResolver = new ClassAliasResolver<>(HiveBaseExtractorFactory.class); @@ -222,7 +214,6 @@ public class HiveSource implements Source { this.maxLookBackTime = new DateTime().minusDays(maxLookBackDays).getMillis(); this.ignoreDataPathIdentifierList = COMMA_BASED_SPLITTER.splitToList(state.getProp(HIVE_SOURCE_IGNORE_DATA_PATH_IDENTIFIER_KEY, DEFAULT_HIVE_SOURCE_IGNORE_DATA_PATH_IDENTIFIER)); - this.sharedJobBroker = state.getBroker(); silenceHiveLoggers(); } @@ -261,10 +252,7 @@ public class HiveSource implements Source { EventWorkunitUtils.setTableSlaEventMetadata(hiveWorkUnit, hiveDataset.getTable(), updateTime, lowWatermark.getValue(), this.beginGetWorkunitsTime); - if (hiveDataset instanceof ConvertibleHiveDataset) { - setLineageInfo((ConvertibleHiveDataset) hiveDataset, hiveWorkUnit, this.sharedJobBroker); - log.info("Added lineage event for dataset " + hiveDataset.getUrn()); - } + this.workunits.add(hiveWorkUnit); log.debug(String.format("Workunit added for table: %s", hiveWorkUnit)); @@ -293,7 +281,7 @@ public class HiveSource implements Source { } protected void createWorkunitsForPartitionedTable(HiveDataset hiveDataset, AutoReturnableObject<IMetaStoreClient> client) throws IOException { - boolean setLineageInfo = false; + long tableProcessTime = new DateTime().getMillis(); this.watermarker.onTableProcessBegin(hiveDataset.getTable(), tableProcessTime); @@ -341,12 +329,7 @@ public class HiveSource implements Source { EventWorkunitUtils.setPartitionSlaEventMetadata(hiveWorkUnit, hiveDataset.getTable(), sourcePartition, updateTime, lowWatermark.getValue(), this.beginGetWorkunitsTime); - if (hiveDataset instanceof ConvertibleHiveDataset && !setLineageInfo) { - setLineageInfo((ConvertibleHiveDataset) hiveDataset, hiveWorkUnit, this.sharedJobBroker); - log.info("Added lineage event for dataset " + hiveDataset.getUrn()); - // Add lineage information only once per hive table - setLineageInfo = true; - } + workunits.add(hiveWorkUnit); log.info(String.format("Creating workunit for partition %s as updateTime %s is greater than low watermark %s", sourcePartition.getCompleteName(), updateTime, lowWatermark.getValue())); @@ -491,35 +474,4 @@ public class HiveSource implements Source { private boolean isAvro(Table table) { return AvroSerDe.class.getName().equals(table.getSd().getSerdeInfo().getSerializationLib()); } - - public static void setLineageInfo(ConvertibleHiveDataset convertibleHiveDataset, WorkUnit workUnit, - SharedResourcesBroker<GobblinScopeTypes> sharedJobBroker) - throws IOException { - String sourceTable = - convertibleHiveDataset.getTable().getDbName() + "." + convertibleHiveDataset.getTable().getTableName(); - DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, sourceTable); - source.addMetadata(DatasetConstants.FS_URI, - convertibleHiveDataset.getTable().getDataLocation().getFileSystem(new Configuration()).getUri().toString()); - - int virtualBranch = 0; - for (String format : convertibleHiveDataset.getDestFormats()) { - ++virtualBranch; - Optional<ConvertibleHiveDataset.ConversionConfig> conversionConfigForFormat = - convertibleHiveDataset.getConversionConfigForFormat(format); - Optional<LineageInfo> lineageInfo = LineageInfo.getLineageInfo(sharedJobBroker); - if (!lineageInfo.isPresent()) { - continue; - } else if (!conversionConfigForFormat.isPresent()) { - continue; - } - String destTable = conversionConfigForFormat.get().getDestinationDbName() + "." + conversionConfigForFormat.get() - .getDestinationTableName(); - DatasetDescriptor dest = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, destTable); - Path destPath = new Path(conversionConfigForFormat.get().getDestinationDataPath()); - dest.addMetadata(DatasetConstants.FS_URI, destPath.getFileSystem(new Configuration()).getUri().toString()); - - lineageInfo.get().setSource(source, workUnit); - lineageInfo.get().putDestination(dest, virtualBranch, workUnit); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java new file mode 100644 index 0000000..249359b --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.data.management.conversion.hive.utils; + +import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset; +import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit; +import org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker; +import org.apache.gobblin.data.management.copy.hive.HiveDataset; +import org.apache.gobblin.source.workunit.WorkUnit; + + +/** + * Utility functions for tracking lineage in hive conversion workflows + */ +public class LineageUtils { + public static boolean shouldSetLineageInfo(WorkUnit workUnit) { + if (!(workUnit instanceof HiveWorkUnit)) { + return false; + } + HiveWorkUnit hiveWorkUnit = (HiveWorkUnit) workUnit; + if (hiveWorkUnit.getPropAsBoolean(PartitionLevelWatermarker.IS_WATERMARK_WORKUNIT_KEY, false)) { + return false; + } + HiveDataset hiveDataset = hiveWorkUnit.getHiveDataset(); + return hiveDataset instanceof ConvertibleHiveDataset; + } + + private LineageUtils() { + // cant instantiate + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java index c399264..b1a38f0 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java @@ -16,6 +16,7 @@ */ package org.apache.gobblin.data.management.conversion.hive.dataset; +import com.google.common.base.Optional; import java.io.InputStream; import java.net.URI; import java.net.URISyntaxException; @@ -27,13 +28,18 @@ import org.apache.gobblin.broker.SharedResourcesBrokerFactory; import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance; import org.apache.gobblin.broker.iface.SharedResourcesBroker; -import org.apache.gobblin.data.management.conversion.hive.source.HiveSource; -import org.apache.gobblin.dataset.DatasetConstants; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.data.management.conversion.hive.publisher.HiveConvertPublisher; +import org.apache.gobblin.data.management.conversion.hive.source.HiveAvroToOrcSource; +import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit; +import org.apache.gobblin.data.management.conversion.hive.utils.LineageUtils; import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.dataset.HiveToHdfsDatasetResolver; +import org.apache.gobblin.dataset.HiveToHdfsDatasetResolverFactory; import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder; import org.apache.gobblin.metrics.event.lineage.LineageInfo; -import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.mockito.Mockito; @@ -56,39 +62,57 @@ import static org.mockito.Mockito.when; @Test(groups = { "gobblin.data.management.conversion" }) public class ConvertibleHiveDatasetTest { - + /** + * Test if lineage information is properly set in the workunit for convertible hive datasets + */ @Test - public void testLineageInfo() - throws Exception { + public void testLineageInfo() throws Exception { String testConfFilePath = "convertibleHiveDatasetTest/flattenedAndNestedOrc.conf"; Config config = ConfigFactory.parseResources(testConfFilePath).getConfig("hive.conversion.avro"); - WorkUnit workUnit = WorkUnit.createEmpty(); + // Set datasetResolverFactory to convert Hive Lineage event to Hdfs Lineage event Gson GSON = new Gson(); - HiveSource.setLineageInfo(createTestConvertibleDataset(config), workUnit, getSharedJobBroker()); + ConvertibleHiveDataset testConvertibleDataset = createTestConvertibleDataset(config); + HiveWorkUnit workUnit = new HiveWorkUnit(testConvertibleDataset); + workUnit.setProp("gobblin.broker.lineageInfo.datasetResolverFactory", + HiveToHdfsDatasetResolverFactory.class.getName()); + workUnit.setProp(ConfigurationKeys.JOB_ID_KEY, "123456"); + Optional<LineageInfo> lineageInfo = LineageInfo.getLineageInfo(getSharedJobBroker(workUnit.getProperties())); + HiveAvroToOrcSource src = new HiveAvroToOrcSource(); + Assert.assertTrue(LineageUtils.shouldSetLineageInfo(workUnit)); + src.setSourceLineageInfo(workUnit, + (ConvertibleHiveDataset) ((HiveWorkUnit) workUnit).getHiveDataset(), lineageInfo); + new HiveConvertPublisher(workUnit).setDestLineageInfo(workUnit, + (ConvertibleHiveDataset) ((HiveWorkUnit) workUnit).getHiveDataset(), lineageInfo); + Properties props = workUnit.getSpecProperties(); // Asset that lineage name is correct - Assert.assertEquals(props.getProperty("gobblin.event.lineage.name"), "db1.tb1"); + Assert.assertEquals(props.getProperty("gobblin.event.lineage.name"), "/tmp/test"); // Assert that source is correct for lineage event Assert.assertTrue(props.containsKey("gobblin.event.lineage.source")); DatasetDescriptor sourceDD = GSON.fromJson(props.getProperty("gobblin.event.lineage.source"), DatasetDescriptor.class); - Assert.assertEquals(sourceDD.getPlatform(), DatasetConstants.PLATFORM_HIVE); - Assert.assertEquals(sourceDD.getName(), "db1.tb1"); + Assert.assertEquals(sourceDD.getPlatform(), "file"); + Assert.assertEquals(sourceDD.getName(), "/tmp/test"); + Assert.assertEquals(sourceDD.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE), "db1.tb1"); // Assert that first dest is correct for lineage event Assert.assertTrue(props.containsKey("gobblin.event.lineage.branch.1.destination")); DatasetDescriptor destDD1 = GSON.fromJson(props.getProperty("gobblin.event.lineage.branch.1.destination"), DatasetDescriptor.class); - Assert.assertEquals(destDD1.getPlatform(), DatasetConstants.PLATFORM_HIVE); - Assert.assertEquals(destDD1.getName(), "db1_nestedOrcDb.tb1_nestedOrc"); + Assert.assertEquals(destDD1.getPlatform(), "file"); + Assert.assertEquals(destDD1.getName(), "/tmp/data_nestedOrc/db1/tb1/final"); + Assert.assertEquals(destDD1.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE), + "db1_nestedOrcDb.tb1_nestedOrc"); // Assert that second dest is correct for lineage event Assert.assertTrue(props.containsKey("gobblin.event.lineage.branch.2.destination")); DatasetDescriptor destDD2 = GSON.fromJson(props.getProperty("gobblin.event.lineage.branch.2.destination"), DatasetDescriptor.class); - Assert.assertEquals(destDD2.getPlatform(), DatasetConstants.PLATFORM_HIVE); - Assert.assertEquals(destDD2.getName(), "db1_flattenedOrcDb.tb1_flattenedOrc"); + Assert.assertEquals(destDD2.getPlatform(), "file"); + Assert.assertEquals(destDD2.getName(), "/tmp/data_flattenedOrc/db1/tb1/final"); + Assert.assertEquals(destDD2.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE), + "db1_flattenedOrcDb.tb1_flattenedOrc"); // Assert that there are two eventBuilders for nestedOrc and flattenedOrc Collection<LineageEventBuilder> lineageEventBuilders = LineageInfo.load(Collections.singleton(workUnit)); @@ -227,15 +251,16 @@ public class ConvertibleHiveDatasetTest { Table table = new Table(); table.setDbName(dbName); table.setTableName(tableName); + table.setTableType(TableType.EXTERNAL_TABLE.name()); StorageDescriptor sd = new StorageDescriptor(); sd.setLocation("/tmp/test"); table.setSd(sd); return table; } - public static SharedResourcesBroker<GobblinScopeTypes> getSharedJobBroker() { + public static SharedResourcesBroker<GobblinScopeTypes> getSharedJobBroker(Properties props) { SharedResourcesBroker<GobblinScopeTypes> instanceBroker = SharedResourcesBrokerFactory - .createDefaultTopLevelBroker(ConfigFactory.empty(), GobblinScopeTypes.GLOBAL.defaultScopeInstance()); + .createDefaultTopLevelBroker(ConfigFactory.parseProperties(props), GobblinScopeTypes.GLOBAL.defaultScopeInstance()); SharedResourcesBroker<GobblinScopeTypes> jobBroker = instanceBroker .newSubscopedBuilder(new JobScopeInstance("ConvertibleHiveDatasetLineageEventTest", String.valueOf(System.currentTimeMillis()))) .build();
