[GOBBLIN-307] Implement lineage event as LineageEventBuilder in gobblin Closes #2161 from zxcware/lineage
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/3e229db9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/3e229db9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/3e229db9 Branch: refs/heads/master Commit: 3e229db9810de8410e0b8fcaf680fcb9f80b5db2 Parents: a34a81a Author: zhchen <[email protected]> Authored: Mon Nov 13 14:15:51 2017 -0800 Committer: Hung Tran <[email protected]> Committed: Mon Nov 13 14:15:51 2017 -0800 ---------------------------------------------------------------------- .../gobblin/dataset/DatasetConstants.java | 34 +++ .../gobblin/dataset/DatasetDescriptor.java | 114 +++++++++ .../gobblin/lineage/LineageException.java | 39 --- .../org/apache/gobblin/lineage/LineageInfo.java | 246 ------------------- .../gobblin/publisher/BaseDataPublisher.java | 23 +- .../publisher/TimePartitionedDataPublisher.java | 28 ++- .../extractor/extract/QueryBasedSource.java | 9 +- .../apache/gobblin/lineage/LineageInfoTest.java | 160 ------------ .../data/management/copy/CopySource.java | 10 +- .../management/copy/CopyableDatasetBase.java | 7 + .../data/management/copy/CopyableFile.java | 4 + .../copy/RecursiveCopyableDataset.java | 15 +- .../copy/hive/HiveCopyEntityHelper.java | 8 + .../data/management/copy/hive/HiveDataset.java | 8 +- .../copy/hive/HivePartitionFileSet.java | 7 +- .../copy/hive/UnpartitionedTableFileSet.java | 5 +- .../copy/publisher/CopyDataPublisher.java | 3 +- .../dataset/ConvertibleHiveDatasetTest.java | 18 +- .../gobblin/metrics/event/EventSubmitter.java | 3 + .../metrics/event/FailureEventBuilder.java | 58 +---- .../metrics/event/GobblinEventBuilder.java | 86 +++++++ .../event/lineage/LineageEventBuilder.java | 147 +++++++++++ .../metrics/event/lineage/LineageException.java | 32 +++ .../metrics/event/lineage/LineageInfo.java | 207 ++++++++++++++++ .../metrics/event/lineage/LineageEventTest.java | 113 +++++++++ .../extractor/extract/kafka/KafkaSource.java | 10 +- .../extractor/extract/jdbc/MysqlSource.java | 14 +- .../gobblin/runtime/SafeDatasetCommit.java | 70 ++++-- .../templates/textFileBasedSourceTest.template | 2 + .../runtime_test/skip_workunits_test.properties | 2 +- 30 files changed, 918 insertions(+), 564 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java new file mode 100644 index 0000000..73999dc --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.dataset; + +public class DatasetConstants { + /** Platforms */ + public static final String PLATFORM_KAFKA = "kafka"; + public static final String PLATFORM_HIVE = "hive"; + public static final String PLATFORM_MYSQL = "mysql"; + + /** File system metadata */ + public static final String FS_URI = "fsUri"; + + /** Kafka metadata */ + public static final String BROKERS = "brokers"; + + /** JDBC metadata */ + public static final String CONNECTION_URL = "connectionUrl"; +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java new file mode 100644 index 0000000..5b41862 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.dataset; + + +import java.util.Map; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + + +/** + * A {@link DatasetDescriptor} identifies and provides metadata to describe a dataset + */ +@RequiredArgsConstructor +public final class DatasetDescriptor { + private static final String PLATFORM_KEY = "platform"; + private static final String NAME_KEY = "name"; + + /** + * which platform the dataset is stored, for example: local, hdfs, oracle, mysql, kafka + */ + @Getter + private final String platform; + /** + * name of the dataset + */ + @Getter + private final String name; + + /** + * metadata about the dataset + */ + private final Map<String, String> metadata = Maps.newHashMap(); + + public DatasetDescriptor(DatasetDescriptor copy) { + platform = copy.getPlatform(); + name = copy.getName(); + metadata.putAll(copy.getMetadata()); + } + + public ImmutableMap<String, String> getMetadata() { + return ImmutableMap.<String, String>builder() + .putAll(metadata) + .build(); + } + + public void addMetadata(String key, String value) { + metadata.put(key, value); + } + + /** + * Serialize to a string map + */ + public Map<String, String> toDataMap() { + Map<String, String> map = Maps.newHashMap(); + map.put(PLATFORM_KEY, platform); + map.put(NAME_KEY, name); + map.putAll(metadata); + return map; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + DatasetDescriptor that = (DatasetDescriptor) o; + return platform.equals(that.platform) && name.equals(that.name) && metadata.equals(that.metadata); + } + + @Override + public int hashCode() { + int result = platform.hashCode(); + result = 31 * result + name.hashCode(); + result = 31 * result + metadata.hashCode(); + return result; + } + + /** + * Deserialize a {@link DatasetDescriptor} from a string map + */ + public static DatasetDescriptor fromDataMap(Map<String, String> dataMap) { + DatasetDescriptor descriptor = new DatasetDescriptor(dataMap.get(PLATFORM_KEY), dataMap.get(NAME_KEY)); + dataMap.forEach((key, value) -> { + if (!key.equals(PLATFORM_KEY) && !key.equals(NAME_KEY)) { + descriptor.addMetadata(key, value); + } + }); + return descriptor; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageException.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageException.java b/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageException.java deleted file mode 100644 index 8dcf592..0000000 --- a/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageException.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.gobblin.lineage; - -/** - * A set of exceptions used by {@link LineageInfo} when lineage information is serialized or deserialized. - */ -public class LineageException extends Exception { - public LineageException(String message) { - super(message); - } - public static class LineageConflictAttributeException extends LineageException { - public LineageConflictAttributeException (String key, String oldValue, String newValue) { - super ("Lineage has conflict value: key=" + key + " value=[1]" + oldValue + " [2]" + newValue); - } - } - - public static class LineageUnsupportedLevelException extends LineageException { - public LineageUnsupportedLevelException (LineageInfo.Level level) { - super (level.toString() + " is not supported"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java b/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java deleted file mode 100644 index 7af71df..0000000 --- a/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.lineage; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - - -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.configuration.State; - -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Sets; - -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - - -/** - * A class to restore all lineage information from a {@link State} - * All lineage attributes are under LINEAGE_NAME_SPACE namespace. - * - * For example, a typical lineage attributes looks like: - * gobblin.lineage.K1 ---> V1 - * gobblin.lineage.branch.3.K2 ---> V2 - * - * K1 is dataset level attribute, K2 is branch level attribute, and branch id is 3. - */ - -@Slf4j -public class LineageInfo { - public static final String LINEAGE_NAME_SPACE = "gobblin.lineage"; - public static final String BRANCH_ID_METADATA_KEY = "branchId"; - private static final String DATASET_PREFIX = LINEAGE_NAME_SPACE + "."; - public static final String LINEAGE_DATASET_URN = DATASET_PREFIX + "dataset.urn"; - private static final String BRANCH_PREFIX = DATASET_PREFIX + "branch."; - - @Getter - private String datasetUrn; - @Getter - private String jobId; - - private Map<String, String> lineageMetaData; - - public enum Level { - DATASET, - BRANCH, - All - } - - private LineageInfo() { - } - - private LineageInfo(String datasetUrn, String jobId, ImmutableMap<String, String> lineageMetaData) { - Preconditions.checkArgument(datasetUrn != null); - Preconditions.checkArgument(jobId != null); - this.datasetUrn = datasetUrn; - this.jobId = jobId; - this.lineageMetaData = lineageMetaData; - } - - /** - * Retrieve lineage information from a {@link State} by {@link Level} - * @param state A single state - * @param level {@link Level#DATASET} only load dataset level lineage attributes - * {@link Level#BRANCH} only load branch level lineage attributes - * {@link Level#All} load all lineage attributes - * @return A collection of {@link LineageInfo}s per branch. When level is {@link Level#DATASET}, this list has only single element. - */ - public static Collection<LineageInfo> load (State state, Level level) throws LineageException { - return load(Collections.singleton(state), level); - } - - /** - * Get all lineage meta data. - */ - public ImmutableMap<String, String> getLineageMetaData() { - return ImmutableMap.copyOf(lineageMetaData); - } - - /** - * Retrieve all lineage information from different {@link State}s. - * This requires the job id and dataset urn to be present in the state, under job.id and dataset.urn. - * A global union operation is applied to combine all <K, V> pairs from the input {@link State}s. If multiple {@link State}s - * share the same K, but have conflicting V, a {@link LineageException} is thrown. - * - * {@link Level} can control if a dataset level or branch level information should be used. When {@link Level#All} is - * specified, all levels of information will be returned; otherwise only specified level of information will be returned. - * - * For instance, assume we have below input states: - * State[0]: gobblin.lineage.K1 ---> V1 - * gobblin.lineage.K2 ---> V2 - * gobblin.lineage.branch.1.K4 ---> V4 - * State[1]: gobblin.lineage.K2 ---> V2 - * gobblin.lineage.K3 ---> V3 - * gobblin.lineage.branch.1.K4 ---> V4 - * gobblin.lineage.branch.1.K5 ---> V5 - * gobblin.lineage.branch.2.K6 ---> V6 - * - * (1) With {@link Level#DATASET} level, the output would be: - * LinieageInfo[0]: K1 ---> V1 - * K2 ---> V2 - * K3 ---> V3 - * (2) With {@link Level#All} level, the output would be: (because there are two branches, so there are two LineageInfo) - * LineageInfo[0]: K1 ---> V1 - * K2 ---> V2 - * K3 ---> V3 - * K4 ---> V4 - * K5 ---> V5 - * - * LineageInfo[1]: K1 ---> V1 - * K2 ---> V2 - * K3 ---> V3 - * K6 ---> V6 - * - * (3) With {@link Level#BRANCH} level, the output would be: (only branch level information was returned) - * LineageInfo[0]: K4 ---> V4 - * K5 ---> V5 - * LineageInfo[1]: K6 ---> V6 - * - * @param states All states which belong to the same dataset and share the same jobId. - * @param level {@link Level#DATASET} only load dataset level lineage attributes - * {@link Level#BRANCH} only load branch level lineage attributes - * {@link Level#All} load all lineage attributes - * @return A collection of {@link LineageInfo}s per branch. When level is {@link Level#DATASET}, this list has only single element. - * - * @throws LineageException.LineageConflictAttributeException if two states have same key but not the same value. - */ - public static Collection<LineageInfo> load (Collection<? extends State> states, Level level) throws LineageException { - Preconditions.checkArgument(states != null && !states.isEmpty()); - Map<String, String> datasetMetaData = new HashMap<>(); - Map<String, Map<String, String>> branchAggregate = new HashMap<>(); - - State anyOne = states.iterator().next(); - String jobId = anyOne.getProp(ConfigurationKeys.JOB_ID_KEY, ""); - String urn = anyOne.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN); - - for (State state: states) { - for (Map.Entry<Object, Object> entry : state.getProperties().entrySet()) { - if (entry.getKey() instanceof String && ((String) entry.getKey()).startsWith(LINEAGE_NAME_SPACE)) { - - String lineageKey = ((String) entry.getKey()); - String lineageValue = (String) entry.getValue(); - - if (lineageKey.startsWith(BRANCH_PREFIX)) { - String branchPrefixStrip = lineageKey.substring(BRANCH_PREFIX.length()); - String branchId = branchPrefixStrip.substring(0, branchPrefixStrip.indexOf(".")); - String key = branchPrefixStrip.substring(branchPrefixStrip.indexOf(".") + 1); - - if (level == Level.BRANCH || level == Level.All) { - if (!branchAggregate.containsKey(branchId)) { - branchAggregate.put(branchId, new HashMap<>()); - } - Map<String, String> branchMetaData = branchAggregate.get(branchId); - String prev = branchMetaData.put(key, lineageValue); - if (prev != null && !prev.equals(lineageValue)) { - throw new LineageException.LineageConflictAttributeException(lineageKey, prev, lineageValue); - } - } - } else if (lineageKey.startsWith(DATASET_PREFIX)) { - if (level == Level.DATASET || level == Level.All) { - String prev = datasetMetaData.put(lineageKey.substring(DATASET_PREFIX.length()), lineageValue); - if (prev != null && !prev.equals(lineageValue)) { - throw new LineageException.LineageConflictAttributeException(lineageKey, prev, lineageValue); - } - } - } - } - } - } - - Collection<LineageInfo> collection = Sets.newHashSet(); - - if (level == Level.DATASET) { - ImmutableMap<String, String> metaData = ImmutableMap.<String, String>builder() - .putAll(datasetMetaData) - .build(); - collection.add(new LineageInfo(urn, jobId, metaData)); - return collection; - } else if (level == Level.BRANCH || level == Level.All){ - if (branchAggregate.isEmpty()) { - if (level == Level.All) { - collection.add(new LineageInfo(urn, jobId, ImmutableMap.<String, String>builder().putAll(datasetMetaData).build())); - } - return collection; - } - for (Map.Entry<String, Map<String, String>> branchMetaDataEntry: branchAggregate.entrySet()) { - String branchId = branchMetaDataEntry.getKey(); - Map<String, String> branchMetaData = branchMetaDataEntry.getValue(); - ImmutableMap.Builder<String, String> metaDataBuilder = ImmutableMap.builder(); - if (level == Level.All) { - metaDataBuilder.putAll(datasetMetaData); - } - metaDataBuilder.putAll(branchMetaData).put(BRANCH_ID_METADATA_KEY, branchId); - collection.add(new LineageInfo(urn, jobId, metaDataBuilder.build())); - } - - return collection; - } else { - throw new LineageException.LineageUnsupportedLevelException(level); - } - } - - public static void setDatasetLineageAttribute (State state, String key, String value) { - state.setProp(DATASET_PREFIX + key, value); - } - - public static void setBranchLineageAttribute (State state, int branchId, String key, String value) { - state.setProp(BRANCH_PREFIX + Joiner.on(".").join(branchId, key), value); - } - - public static Map<String, Collection<State>> aggregateByDatasetUrn (Collection<? extends State> states) { - Map<String, Collection<State>> datasetStates = new HashMap<>(); - for (State state: states) { - String urn = state.getProp(LINEAGE_DATASET_URN, state.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN)); - datasetStates.putIfAbsent(urn, new ArrayList<>()); - Collection<State> datasetState = datasetStates.get(urn); - datasetState.add(state); - } - return datasetStates; - } - - public final String getId() { - return Joiner.on(":::").join(this.datasetUrn, this.jobId); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java index 9bdcbdd..0097c15 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java @@ -56,9 +56,11 @@ import org.apache.gobblin.config.ConfigBuilder; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; -import org.apache.gobblin.lineage.LineageInfo; +import org.apache.gobblin.dataset.DatasetConstants; +import org.apache.gobblin.dataset.DatasetDescriptor; import org.apache.gobblin.metadata.MetadataMerger; import org.apache.gobblin.metadata.types.StaticStringMetadataMerger; +import org.apache.gobblin.metrics.event.lineage.LineageInfo; import org.apache.gobblin.util.ForkOperatorUtils; import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.ParallelRunner; @@ -106,8 +108,6 @@ public class BaseDataPublisher extends SingleTaskDataPublisher { protected final Map<String, ParallelRunner> parallelRunners = Maps.newHashMap(); protected final Set<Path> publisherOutputDirs = Sets.newHashSet(); - public static final String PUBLISH_OUTOUT = "publish.output"; - /* Each partition in each branch may have separate metadata. The metadata mergers are responsible * for aggregating this information from all workunits so it can be published. */ @@ -131,7 +131,6 @@ public class BaseDataPublisher extends SingleTaskDataPublisher { PUBLISH_RETRY_DEFAULTS = ConfigFactory.parseMap(configMap); }; - public BaseDataPublisher(State state) throws IOException { super(state); @@ -330,6 +329,16 @@ public class BaseDataPublisher extends SingleTaskDataPublisher { private void publishMultiTaskData(WorkUnitState state, int branchId, Set<Path> writerOutputPathsMoved) throws IOException { publishData(state, branchId, false, writerOutputPathsMoved); + DatasetDescriptor destination = createDestinationDescriptor(state, branchId); + LineageInfo.putDestination(destination, branchId, state); + } + + protected DatasetDescriptor createDestinationDescriptor(WorkUnitState state, int branchId) { + Path publisherOutputDir = getPublisherOutputDir(state, branchId); + FileSystem fs = this.publisherFileSystemByBranches.get(branchId); + DatasetDescriptor destination = new DatasetDescriptor(fs.getScheme(), publisherOutputDir.toString()); + destination.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString()); + return destination; } protected void publishData(WorkUnitState state, int branchId, boolean publishSingleTaskData, @@ -372,7 +381,6 @@ public class BaseDataPublisher extends SingleTaskDataPublisher { if (!replaceFinalOutputDir) { addWriterOutputToExistingDir(writerOutputDir, publisherOutputDir, state, branchId, parallelRunner); writerOutputPathsMoved.add(writerOutputDir); - addPublisherLineageInfo(state, branchId, publisherOutputDir.toString()); return; } @@ -387,14 +395,9 @@ public class BaseDataPublisher extends SingleTaskDataPublisher { movePath(parallelRunner, state, writerOutputDir, publisherOutputDir, branchId); writerOutputPathsMoved.add(writerOutputDir); - addPublisherLineageInfo(state, branchId, publisherOutputDir.toString()); } } - protected void addPublisherLineageInfo(WorkUnitState state, int branchId, String output) { - LineageInfo.setBranchLineageAttribute(state, branchId, PUBLISH_OUTOUT, output); - } - /** * Get the output directory path this {@link BaseDataPublisher} will write to. * http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java index 90e241a..157552e 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java @@ -18,18 +18,18 @@ package org.apache.gobblin.publisher; import java.io.IOException; -import java.util.Set; -import org.apache.gobblin.lineage.LineageInfo; -import org.apache.gobblin.writer.partitioner.TimeBasedWriterPartitioner; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.dataset.DatasetDescriptor; import org.apache.gobblin.util.FileListUtils; +import org.apache.gobblin.util.ForkOperatorUtils; import org.apache.gobblin.util.ParallelRunner; import org.apache.gobblin.util.WriterUtils; +import org.apache.gobblin.writer.partitioner.TimeBasedWriterPartitioner; /** @@ -70,13 +70,19 @@ public class TimePartitionedDataPublisher extends BaseDataPublisher { } @Override - protected void publishData(WorkUnitState state, int branchId, boolean publishSingleTaskData, Set<Path> writerOutputPathsMoved) throws IOException { - super.publishData(state, branchId, publishSingleTaskData, writerOutputPathsMoved); - if (publishSingleTaskData) { - // Add lineage event for destination. Make sure all workunits belongs to the same dataset has exactly the same value - Path publisherOutputDir = getPublisherOutputDir(state, branchId); - String timePrefix = state.getProp(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX, ""); - LineageInfo.setBranchLineageAttribute(state, branchId, PUBLISH_OUTOUT, new Path(publisherOutputDir, timePrefix).toString()); - } + protected DatasetDescriptor createDestinationDescriptor(WorkUnitState state, int branchId) { + // Get base descriptor + DatasetDescriptor descriptor = super.createDestinationDescriptor(state, branchId); + + // Decorate with partition prefix + String propName = ForkOperatorUtils + .getPropertyNameForBranch(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX, numBranches, branchId); + String timePrefix = state.getProp(propName, ""); + Path pathWithTimePrefix = new Path(descriptor.getName(), timePrefix); + DatasetDescriptor destination = new DatasetDescriptor(descriptor.getPlatform(), pathWithTimePrefix.toString()); + // Add back the metadata + descriptor.getMetadata().forEach(destination::addMetadata); + + return destination; } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java index c77051d..d074f3a 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java @@ -27,7 +27,6 @@ import java.util.Set; import java.util.regex.Pattern; import org.apache.commons.lang3.StringUtils; -import org.apache.gobblin.lineage.LineageInfo; import org.slf4j.MDC; import com.google.common.base.Optional; @@ -48,6 +47,8 @@ import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.configuration.WorkUnitState.WorkingState; +import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder; +import org.apache.gobblin.metrics.event.lineage.LineageInfo; import org.apache.gobblin.source.extractor.JobCommitPolicy; import org.apache.gobblin.source.extractor.partition.Partition; import org.apache.gobblin.source.extractor.partition.Partitioner; @@ -235,7 +236,7 @@ public abstract class QueryBasedSource<S, D> extends AbstractSource<S, D> { workunit.setProp(ConfigurationKeys.SOURCE_ENTITY, sourceEntity.getSourceEntityName()); workunit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, sourceEntity.getDestTableName()); workunit.setProp(WORK_UNIT_STATE_VERSION_KEY, CURRENT_WORK_UNIT_STATE_VERSION); - addLineageSourceInfo (state, sourceEntity, workunit); + addLineageSourceInfo(state, sourceEntity, workunit); partition.serialize(workunit); workUnits.add(workunit); } @@ -243,8 +244,8 @@ public abstract class QueryBasedSource<S, D> extends AbstractSource<S, D> { return workUnits; } - protected void addLineageSourceInfo (SourceState sourceState, SourceEntity entity, WorkUnit workUnit) { - workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, entity.destTableName); + protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity, WorkUnit workUnit) { + // Does nothing by default } protected Set<SourceEntity> getFilteredSourceEntities(SourceState state) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java b/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java deleted file mode 100644 index 2a7ea15..0000000 --- a/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.lineage; - -import java.util.Collection; -import java.util.Map; - -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.junit.Assert; -import org.testng.annotations.Test; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; - -import gobblin.configuration.State; - - -public class LineageInfoTest { - - @Test - public void testDatasetLevel () { - Collection<LineageInfo> collection = null; - try { - collection = LineageInfo.load(createTestStates(), LineageInfo.Level.DATASET); - } catch (LineageException e) { - Assert.fail(e.toString()); - } - - Assert.assertEquals(1, collection.size()); - LineageInfo info = collection.iterator().next(); - ImmutableMap<String, String> map = info.getLineageMetaData(); - Assert.assertEquals(3, map.size()); - Assert.assertEquals("V1", map.get("K1")); - Assert.assertEquals("V2", map.get("K2")); - Assert.assertEquals("V3", map.get("K3")); - } - - @Test - public void testBranchLevel () { - Collection<LineageInfo> collection = null; - try { - collection = LineageInfo.load(createTestStates(), LineageInfo.Level.BRANCH); - } catch (LineageException e) { - Assert.fail(e.toString()); - } - - Assert.assertEquals(2, collection.size()); - - for (LineageInfo info: collection) { - Map<String, String> map = info.getLineageMetaData(); - String branchId = map.get(LineageInfo.BRANCH_ID_METADATA_KEY); - if (branchId.equals("1")) { - Assert.assertEquals(3, map.size()); // include BRANCH_ID_METADATA_KEY - Assert.assertEquals("V4", map.get("K4")); - Assert.assertEquals("V5", map.get("K5")); - } - - if (branchId.equals("2")) { - Assert.assertEquals(2, map.size()); // include BRANCH_ID_METADATA_KEY - Assert.assertEquals("V6", map.get("K6")); - } - } - } - - @Test - public void testAllLevel () { - Collection<LineageInfo> collection = null; - try { - collection = LineageInfo.load(createTestStates(), LineageInfo.Level.All); - } catch (LineageException e) { - Assert.fail(e.toString()); - } - - Assert.assertEquals(2, collection.size()); - for (LineageInfo info: collection) { - Map<String, String> map = info.getLineageMetaData(); - String branchId = map.get(LineageInfo.BRANCH_ID_METADATA_KEY); - if (branchId.equals("1")) { - Assert.assertEquals(6, map.size()); // include BRANCH_ID_METADATA_KEY - Assert.assertEquals("V1", map.get("K1")); - Assert.assertEquals("V2", map.get("K2")); - Assert.assertEquals("V3", map.get("K3")); - Assert.assertEquals("V4", map.get("K4")); - Assert.assertEquals("V5", map.get("K5")); - } - - if (branchId.equals("2")) { - Assert.assertEquals(5, map.size()); // include BRANCH_ID_METADATA_KEY - Assert.assertEquals("V1", map.get("K1")); - Assert.assertEquals("V2", map.get("K2")); - Assert.assertEquals("V3", map.get("K3")); - Assert.assertEquals("V6", map.get("K6")); - } - } - } - - @Test - public void testNoBranchInfo () { - State state = new State(); - state.setProp(ConfigurationKeys.JOB_ID_KEY, "test_job_id_123456"); - state.setProp(ConfigurationKeys.DATASET_URN_KEY, "PageViewEvent"); - LineageInfo.setDatasetLineageAttribute(state,"K1", "V1"); - LineageInfo.setDatasetLineageAttribute(state,"K2", "V2"); - Collection<LineageInfo> collection = null; - try { - collection = LineageInfo.load(Lists.newArrayList(state), LineageInfo.Level.BRANCH); - } catch (LineageException e) { - Assert.fail(e.toString()); - } - - Assert.assertEquals(true, collection.isEmpty()); - } - - private Collection<State> createTestStates() { - /* - * State[0]: gobblin.lineage.K1 ---> V1 - * gobblin.lineage.K2 ---> V2 - * gobblin.lineage.branch.1.K4 ---> V4 - * State[1]: gobblin.lineage.K2 ---> V2 - * gobblin.lineage.K3 ---> V3 - * gobblin.lineage.branch.1.K4 ---> V4 - * gobblin.lineage.branch.1.K5 ---> V5 - * gobblin.lineage.branch.2.K6 ---> V6 - */ - State state_1 = new State(); - state_1.setProp(ConfigurationKeys.JOB_ID_KEY, "test_job_id_123456"); - state_1.setProp(ConfigurationKeys.DATASET_URN_KEY, "PageViewEvent"); - LineageInfo.setDatasetLineageAttribute(state_1,"K1", "V1"); - LineageInfo.setDatasetLineageAttribute(state_1,"K2", "V2"); - LineageInfo.setBranchLineageAttribute(state_1, 1, "K4", "V4"); - - - State state_2 = new State(); - state_2.setProp(ConfigurationKeys.JOB_ID_KEY, "test_job_id_123456"); - state_2.setProp(ConfigurationKeys.DATASET_URN_KEY, "PageViewEvent"); - - LineageInfo.setDatasetLineageAttribute(state_2,"K2", "V2"); - LineageInfo.setDatasetLineageAttribute(state_2,"K3", "V3"); - LineageInfo.setBranchLineageAttribute(state_2, 1, "K4", "V4"); - LineageInfo.setBranchLineageAttribute(state_2, 1, "K5", "V5"); - LineageInfo.setBranchLineageAttribute(state_2, 2, "K6", "V6"); - - return Lists.newArrayList(state_1, state_2); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java index f60e5f0..8ca05e3 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java @@ -19,7 +19,6 @@ package org.apache.gobblin.data.management.copy; import java.io.IOException; import java.net.URI; -import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -66,6 +65,8 @@ import org.apache.gobblin.metrics.GobblinMetrics; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder; +import org.apache.gobblin.metrics.event.lineage.LineageInfo; import org.apache.gobblin.metrics.event.sla.SlaEventKeys; import org.apache.gobblin.source.extractor.Extractor; import org.apache.gobblin.source.extractor.WatermarkInterval; @@ -299,6 +300,7 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { setWorkUnitWatermark(workUnit, watermarkGenerator, copyEntity); computeAndSetWorkUnitGuid(workUnit); workUnitsForPartition.add(workUnit); + addLineageInfo(copyEntity, copyableDataset, workUnit); } this.workUnitList.putAll(this.fileSet, workUnitsForPartition); @@ -311,6 +313,12 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { } } + private void addLineageInfo(CopyEntity copyEntity, CopyableDatasetBase copyableDataset, WorkUnit workUnit) { + if (copyEntity instanceof CopyableFile && copyableDataset.getDatasetDescriptor() != null) { + LineageInfo.setSource(copyableDataset.getDatasetDescriptor(), workUnit); + } + } + /** * @param state a {@link org.apache.gobblin.configuration.WorkUnitState} carrying properties needed by the returned * {@link Extractor} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java index c27b839..6c71edc 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java @@ -18,6 +18,7 @@ package org.apache.gobblin.data.management.copy; import org.apache.gobblin.dataset.Dataset; +import org.apache.gobblin.dataset.DatasetDescriptor; /** @@ -25,4 +26,10 @@ import org.apache.gobblin.dataset.Dataset; * Concrete classes must implement a subinterface of this interface ({@link CopyableDataset} or {@link IterableCopyableDataset}). */ public interface CopyableDatasetBase extends Dataset { + /** + * Get the descriptor which identifies and provides metadata of the dataset + */ + default DatasetDescriptor getDatasetDescriptor() { + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java index cec06f2..04e5e34 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java @@ -19,6 +19,7 @@ package org.apache.gobblin.data.management.copy; import org.apache.gobblin.data.management.partition.File; import org.apache.gobblin.data.management.copy.PreserveAttributes.Option; +import org.apache.gobblin.dataset.DatasetDescriptor; import org.apache.gobblin.util.PathUtils; import org.apache.gobblin.util.guid.Guid; @@ -55,6 +56,9 @@ public class CopyableFile extends CopyEntity implements File { /** {@link FileStatus} of the existing origin file. */ private FileStatus origin; + /** The destination dataset the file will be copied to */ + private DatasetDescriptor destDataset; + /** Complete destination {@link Path} of the file. */ private Path destination; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java index 0f18f68..35108df 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java @@ -21,6 +21,7 @@ import org.apache.gobblin.commit.CommitStep; import org.apache.gobblin.data.management.copy.entities.PrePublishStep; import org.apache.gobblin.data.management.dataset.DatasetUtils; import org.apache.gobblin.dataset.FileSystemDataset; +import org.apache.gobblin.dataset.DatasetDescriptor; import org.apache.gobblin.util.PathUtils; import org.apache.gobblin.util.FileListUtils; import org.apache.gobblin.util.commit.DeleteFileCommitStep; @@ -42,6 +43,8 @@ import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import lombok.Getter; + /** * Implementation of {@link CopyableDataset} that creates a {@link CopyableFile} for every file that is a descendant if @@ -66,6 +69,9 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData private final boolean update; private final boolean delete; + @Getter + private transient final DatasetDescriptor datasetDescriptor; + // Include empty directories in the source for copy private final boolean includeEmptyDirectories; // Delete empty directories in the destination @@ -77,6 +83,7 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData this.rootPath = PathUtils.getPathWithoutSchemeAndAuthority(rootPath); this.fs = fs; + this.datasetDescriptor = new DatasetDescriptor(fs.getScheme(), rootPath.toString()); this.pathFilter = DatasetUtils.instantiatePathFilter(properties); this.copyableFileFilter = DatasetUtils.instantiateCopyableFileFilter(properties); @@ -129,17 +136,19 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData List<CopyEntity> copyEntities = Lists.newArrayList(); List<CopyableFile> copyableFiles = Lists.newArrayList(); + DatasetDescriptor targetDataset = new DatasetDescriptor(targetFs.getScheme(), targetPath.toString()); for (Path path : toCopy) { FileStatus file = filesInSource.get(path); Path filePathRelativeToSearchPath = PathUtils.relativizePath(file.getPath(), nonGlobSearchPath); Path thisTargetPath = new Path(configuration.getPublishDir(), filePathRelativeToSearchPath); - - copyableFiles.add(CopyableFile.fromOriginAndDestination(this.fs, file, thisTargetPath, configuration) + CopyableFile copyableFile = CopyableFile.fromOriginAndDestination(this.fs, file, thisTargetPath, configuration) .fileSet(datasetURN()).datasetOutputPath(thisTargetPath.toString()) .ancestorsOwnerAndPermission(CopyableFile.resolveReplicatedOwnerAndPermissionsRecursively(this.fs, file.getPath().getParent(), nonGlobSearchPath, configuration)) - .build()); + .build(); + copyableFile.setDestDataset(targetDataset); + copyableFiles.add(copyableFile); } copyEntities.addAll(this.copyableFileFilter.filter(this.fs, targetFs, copyableFiles)); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java index 3c7643b..cc7be1e 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java @@ -56,6 +56,8 @@ import com.typesafe.config.Config; import org.apache.gobblin.commit.CommitStep; import org.apache.gobblin.configuration.State; +import org.apache.gobblin.dataset.DatasetConstants; +import org.apache.gobblin.dataset.DatasetDescriptor; import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.data.management.copy.CopyConfiguration; import org.apache.gobblin.data.management.copy.CopyEntity; @@ -760,4 +762,10 @@ public class HiveCopyEntityHelper { public FileSystem getTargetFileSystem() { return this.targetFs; } + + void setCopyableFileDestinationDataset(CopyableFile copyableFile) { + DatasetDescriptor destination = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, this.getTargetDatabase() + "." + this.getTargetTable()); + destination.addMetadata(DatasetConstants.FS_URI, this.getTargetFs().getUri().toString()); + copyableFile.setDestDataset(destination); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java index 2af2f80..03dba25 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java @@ -57,10 +57,12 @@ import org.apache.gobblin.data.management.copy.CopyableDataset; import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder.DbAndTable; import org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset; import org.apache.gobblin.data.management.partition.FileSet; +import org.apache.gobblin.dataset.DatasetConstants; import org.apache.gobblin.hive.HiveMetastoreClientPool; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; +import org.apache.gobblin.dataset.DatasetDescriptor; import org.apache.gobblin.util.AutoReturnableObject; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PathUtils; @@ -106,6 +108,8 @@ public class HiveDataset implements PrioritizedCopyableDataset { protected final DbAndTable dbAndTable; protected final DbAndTable logicalDbAndTable; + private transient final DatasetDescriptor datasetDescriptor; + public HiveDataset(FileSystem fs, HiveMetastoreClientPool clientPool, Table table, Properties properties) { this(fs, clientPool, table, properties, ConfigFactory.empty()); } @@ -124,6 +128,9 @@ public class HiveDataset implements PrioritizedCopyableDataset { Optional.fromNullable(this.table.getDataLocation()); this.tableIdentifier = this.table.getDbName() + "." + this.table.getTableName(); + this.datasetDescriptor = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, tableIdentifier); + this.datasetDescriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString()); + this.datasetNamePattern = Optional.fromNullable(ConfigUtils.getString(datasetConfig, DATASET_NAME_PATTERN_KEY, null)); this.dbAndTable = new DbAndTable(table.getDbName(), table.getTableName()); if (this.datasetNamePattern.isPresent()) { @@ -132,7 +139,6 @@ public class HiveDataset implements PrioritizedCopyableDataset { this.logicalDbAndTable = this.dbAndTable; } this.datasetConfig = resolveConfig(datasetConfig, dbAndTable, logicalDbAndTable); - this.metricContext = Instrumented.getMetricContext(new State(properties), HiveDataset.class, Lists.<Tag<?>> newArrayList(new Tag<>(DATABASE, table.getDbName()), new Tag<>(TABLE, table.getTableName()))); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java index a9982bf..790c0b4 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java @@ -149,8 +149,11 @@ public class HivePartitionFileSet extends HiveFileSet { multiTimer.nextStage(HiveCopyEntityHelper.Stages.CREATE_COPY_UNITS); for (CopyableFile.Builder builder : hiveCopyEntityHelper.getCopyableFilesFromPaths(diffPathSet.filesToCopy, hiveCopyEntityHelper.getConfiguration(), Optional.of(this.partition))) { - copyEntities.add(builder.fileSet(fileSet).checksum(new byte[0]) - .datasetOutputPath(desiredTargetLocation.location.toString()).build()); + CopyableFile fileEntity = + builder.fileSet(fileSet).checksum(new byte[0]).datasetOutputPath(desiredTargetLocation.location.toString()) + .build(); + this.hiveCopyEntityHelper.setCopyableFileDestinationDataset(fileEntity); + copyEntities.add(fileEntity); } log.info("Created {} copy entities for partition {}", copyEntities.size(), this.partition.getCompleteName()); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java index a796a2b..4d82a62 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java @@ -120,7 +120,10 @@ public class UnpartitionedTableFileSet extends HiveFileSet { for (CopyableFile.Builder builder : this.helper.getCopyableFilesFromPaths(diffPathSet.filesToCopy, this.helper.getConfiguration(), Optional.<Partition> absent())) { - copyEntities.add(builder.fileSet(fileSet).datasetOutputPath(desiredTargetLocation.location.toString()).build()); + CopyableFile fileEntity = + builder.fileSet(fileSet).datasetOutputPath(desiredTargetLocation.location.toString()).build(); + this.helper.setCopyableFileDestinationDataset(fileEntity); + copyEntities.add(fileEntity); } multiTimer.close(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java index e443271..71ebd59 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java @@ -18,6 +18,7 @@ package org.apache.gobblin.data.management.copy.publisher; +import org.apache.gobblin.metrics.event.lineage.LineageInfo; import org.apache.gobblin.metrics.event.sla.SlaEventKeys; import java.io.IOException; import java.net.URI; @@ -27,7 +28,6 @@ import java.util.Comparator; import java.util.List; import java.util.Map; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -214,6 +214,7 @@ public class CopyDataPublisher extends DataPublisher implements UnpublishedHandl if (!fileSetRoot.isPresent() && copyableFile.getDatasetOutputPath() != null) { fileSetRoot = Optional.of(copyableFile.getDatasetOutputPath()); } + LineageInfo.putDestination(copyableFile.getDestDataset(), 0, wus); } if (datasetOriginTimestamp > copyableFile.getOriginTimestamp()) { datasetOriginTimestamp = copyableFile.getOriginTimestamp(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java index 51a390d..5021d4d 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java @@ -17,6 +17,8 @@ package org.apache.gobblin.data.management.conversion.hive.dataset; import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; import java.util.Properties; import org.apache.hadoop.fs.FileSystem; @@ -36,6 +38,9 @@ import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiv import org.apache.gobblin.hive.HiveMetastoreClientPool; import org.apache.gobblin.util.ConfigUtils; +import static org.mockito.Mockito.when; + + @Test(groups = { "gobblin.data.management.conversion" }) public class ConvertibleHiveDatasetTest { @@ -94,7 +99,8 @@ public class ConvertibleHiveDatasetTest { } @Test - public void testInvalidFormat() { + public void testInvalidFormat() + throws Exception { Config config = ConfigFactory.parseMap(ImmutableMap.<String, String>of("destinationFormats", "flattenedOrc,nestedOrc")); ConvertibleHiveDataset cd = createTestConvertibleDataset(config); @@ -103,7 +109,8 @@ public class ConvertibleHiveDatasetTest { } @Test - public void testDisableFormat() { + public void testDisableFormat() + throws Exception { Config config = ConfigFactory.parseMap(ImmutableMap.<String, String> builder() .put("destinationFormats", "flattenedOrc") @@ -154,10 +161,13 @@ public class ConvertibleHiveDatasetTest { Assert.assertEquals(conversionConfig.getHiveRuntimeProperties(), hiveProps); } - public static ConvertibleHiveDataset createTestConvertibleDataset(Config config) { + public static ConvertibleHiveDataset createTestConvertibleDataset(Config config) + throws URISyntaxException { Table table = getTestTable("db1", "tb1"); + FileSystem mockFs = Mockito.mock(FileSystem.class); + when(mockFs.getUri()).thenReturn(new URI("test")); ConvertibleHiveDataset cd = - new ConvertibleHiveDataset(Mockito.mock(FileSystem.class), Mockito.mock(HiveMetastoreClientPool.class), new org.apache.hadoop.hive.ql.metadata.Table( + new ConvertibleHiveDataset(mockFs, Mockito.mock(HiveMetastoreClientPool.class), new org.apache.hadoop.hive.ql.metadata.Table( table), new Properties(), config); return cd; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java index 34258ba..16ee2de 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java @@ -35,7 +35,10 @@ import lombok.Getter; * <p> * Instances of this class are immutable. Calling set* methods returns a copy of the calling instance. * </p> + * + * @deprecated Use {@link GobblinEventBuilder} */ +@Deprecated public class EventSubmitter { public static final String EVENT_TYPE = "eventType"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java index 89f83f5..d1ce681 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java @@ -17,47 +17,32 @@ package org.apache.gobblin.metrics.event; -import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.gobblin.metrics.GobblinTrackingEvent; -import org.apache.gobblin.metrics.MetricContext; - -import com.google.common.collect.Maps; - -import lombok.Getter; /** - * A failure event builds a specific {@link GobblinTrackingEvent} whose metadata has - * {@value EventSubmitter#EVENT_TYPE} to be {@value #EVENT_TYPE} + * The builder builds builds a specific {@link GobblinTrackingEvent} whose metadata has + * {@value GobblinEventBuilder#EVENT_TYPE} to be {@value #FAILURE_EVENT_TYPE} * * <p> * Note: A {@link FailureEventBuilder} instance is not reusable */ -public class FailureEventBuilder { - private static final String EVENT_TYPE = "FailureEvent"; - private static final String EVENT_NAMESPACE = "gobblin.event"; +public class FailureEventBuilder extends GobblinEventBuilder { + private static final String FAILURE_EVENT_TYPE = "FailureEvent"; private static final String ROOT_CAUSE = "rootException"; - @Getter - private final String name; - @Getter - private final String namespace; - private final Map<String, String> metadata; - private Throwable rootCause; public FailureEventBuilder(String name) { - this(name, EVENT_NAMESPACE); + this(name, NAMESPACE); } public FailureEventBuilder(String name, String namespace) { - this.name = name; - this.namespace = namespace; - metadata = Maps.newHashMap(); - metadata.put(EventSubmitter.EVENT_TYPE, EVENT_TYPE); + super(name, namespace); + metadata.put(EVENT_TYPE, FAILURE_EVENT_TYPE); } /** @@ -68,42 +53,21 @@ public class FailureEventBuilder { } /** - * Add a metadata pair - */ - public void addMetadata(String key, String value) { - metadata.put(key, value); - } - - /** - * Add additional metadata - */ - public void addAdditionalMetadata(Map<String, String> additionalMetadata) { - metadata.putAll(additionalMetadata); - } - - /** * Build as {@link GobblinTrackingEvent} */ public GobblinTrackingEvent build() { if (rootCause != null) { metadata.put(ROOT_CAUSE, ExceptionUtils.getStackTrace(rootCause)); } - return new GobblinTrackingEvent(0L, EVENT_NAMESPACE, name, metadata); - } - - /** - * Submit the event - */ - public void submit(MetricContext context) { - context.submitEvent(build()); + return new GobblinTrackingEvent(0L, namespace, name, metadata); } /** - * Check if the given {@link GobblinTrackingEvent} is a failiure event + * Check if the given {@link GobblinTrackingEvent} is a failure event */ public static boolean isFailureEvent(GobblinTrackingEvent event) { - String eventType = event.getMetadata().get(EventSubmitter.EVENT_TYPE); - return StringUtils.isNotEmpty(eventType) && eventType.equals(EVENT_TYPE); + String eventType = event.getMetadata().get(EVENT_TYPE); + return StringUtils.isNotEmpty(eventType) && eventType.equals(FAILURE_EVENT_TYPE); } private static Throwable getRootCause(Throwable t) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java new file mode 100644 index 0000000..6b82342 --- /dev/null +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.event; + +import java.util.Map; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; + +import lombok.Getter; + +import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.MetricContext; + + +/** + * A general gobblin event builder which builds a {@link GobblinTrackingEvent} + * + * Note: a {@link GobblinEventBuilder} instance is not reusable + */ +public class GobblinEventBuilder { + public static final String NAMESPACE = "gobblin.event"; + public static final String EVENT_TYPE = "eventType"; + + @Getter + protected final String name; + @Getter + protected final String namespace; + protected final Map<String, String> metadata; + + public GobblinEventBuilder(String name) { + this(name, NAMESPACE); + } + + public GobblinEventBuilder(String name, String namespace) { + this.name = name; + this.namespace = namespace; + metadata = Maps.newHashMap(); + } + + public ImmutableMap<String, String> getMetadata() { + return new ImmutableMap.Builder<String, String>().putAll(metadata).build(); + } + + /** + * Add a metadata pair + */ + public void addMetadata(String key, String value) { + metadata.put(key, value); + } + + /** + * Add additional metadata + */ + public void addAdditionalMetadata(Map<String, String> additionalMetadata) { + metadata.putAll(additionalMetadata); + } + + /** + * Build as {@link GobblinTrackingEvent} + */ + public GobblinTrackingEvent build() { + return new GobblinTrackingEvent(0L, namespace, name, metadata); + } + /** + * Submit the event + */ + public void submit(MetricContext context) { + context.submitEvent(build()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java new file mode 100644 index 0000000..f9030eb --- /dev/null +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.event.lineage; + + +import java.util.Map; + +import org.apache.commons.lang.StringUtils; + +import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.event.GobblinEventBuilder; + +import com.google.common.base.Joiner; +import com.google.gson.Gson; + +import avro.shaded.com.google.common.collect.Maps; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + + +/** + * The builder builds a specific {@link GobblinTrackingEvent} whose metadata has {@value GobblinEventBuilder#EVENT_TYPE} + * to be {@value LineageEventBuilder#LINEAGE_EVENT_TYPE} + * + * Note: A {@link LineageEventBuilder} instance is not reusable + */ + +@Slf4j +public final class LineageEventBuilder extends GobblinEventBuilder { + static final String LIENAGE_EVENT_NAMESPACE = getKey(NAMESPACE, "lineage"); + static final String SOURCE = "source"; + static final String DESTINATION = "destination"; + static final String LINEAGE_EVENT_TYPE = "LineageEvent"; + + private static final Gson GSON = new Gson(); + + @Getter @Setter + private DatasetDescriptor source; + @Getter @Setter + private DatasetDescriptor destination; + + public LineageEventBuilder(String name) { + super(name, LIENAGE_EVENT_NAMESPACE); + addMetadata(EVENT_TYPE, LINEAGE_EVENT_TYPE); + } + + @Override + public GobblinTrackingEvent build() { + source.toDataMap().forEach((key, value) -> metadata.put(getKey(SOURCE, key), value)); + destination.toDataMap().forEach((key, value) -> metadata.put(getKey(DESTINATION, key), value)); + return new GobblinTrackingEvent(0L, namespace, name, metadata); + } + + @Override + public String toString() { + return GSON.toJson(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + LineageEventBuilder event = (LineageEventBuilder) o; + + if (!namespace.equals(event.namespace) || !name.equals(event.name) || !metadata.equals(event.metadata)) { + return false; + } + + if (source != null ? !source.equals(event.source) : event.source != null) { + return false; + } + + return destination != null ? destination.equals(event.destination) : event.destination == null; + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + namespace.hashCode(); + result = 31 * result + metadata.hashCode(); + result = 31 * result + (source != null ? source.hashCode() : 0); + result = 31 * result + (destination != null ? destination.hashCode() : 0); + return result; + } + + /** + * Check if the given {@link GobblinTrackingEvent} is a lineage event + */ + public static boolean isLineageEvent(GobblinTrackingEvent event) { + String eventType = event.getMetadata().get(EVENT_TYPE); + return StringUtils.isNotEmpty(eventType) && eventType.equals(LINEAGE_EVENT_TYPE); + } + + /** + * Create a {@link LineageEventBuilder} from a {@link GobblinEventBuilder}. An inverse function + * to {@link LineageEventBuilder#build()} + */ + public static LineageEventBuilder fromEvent(GobblinTrackingEvent event) { + Map<String, String> metadata = event.getMetadata(); + LineageEventBuilder lineageEvent = new LineageEventBuilder(event.getName()); + + String sourcePrefix = getKey(SOURCE, ""); + Map<String, String> sourceDataMap = Maps.newHashMap(); + String destinationPrefix = getKey(DESTINATION, ""); + Map<String, String> destinationDataMap = Maps.newHashMap(); + + metadata.forEach((key, value) -> { + if (key.startsWith(sourcePrefix)) { + sourceDataMap.put(key.substring(sourcePrefix.length()), value); + } else if (key.startsWith(destinationPrefix)) { + destinationDataMap.put(key.substring(destinationPrefix.length()), value); + } else { + lineageEvent.addMetadata(key, value); + } + }); + + lineageEvent.setSource(DatasetDescriptor.fromDataMap(sourceDataMap)); + lineageEvent.setDestination(DatasetDescriptor.fromDataMap(destinationDataMap)); + return lineageEvent; + } + + static String getKey(Object ... parts) { + return Joiner.on(".").join(parts); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageException.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageException.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageException.java new file mode 100644 index 0000000..e7528b3 --- /dev/null +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.event.lineage; + +/** + * A set of exceptions used by {@link LineageEventBuilder} when lineage information is serialized or deserialized. + */ +public class LineageException extends Exception { + public LineageException(String message) { + super(message); + } + public static class ConflictException extends LineageException { + public ConflictException(String branchId, LineageEventBuilder actual, LineageEventBuilder expect) { + super("Conflict LineageEvent: branchId=" + branchId + ", expected=" + expect.toString() + " actual=" + actual.toString()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java new file mode 100644 index 0000000..dd6c8f2 --- /dev/null +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.event.lineage; + +import java.util.Collection; +import java.util.Map; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.gson.Gson; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.metrics.event.GobblinEventBuilder; + + +/** + * The lineage coordinator in a Gobblin job with single source and multiple destinations + * + * <p> + * In Gobblin, a work unit processes records from only one dataset. It writes output to one or more destinations, + * depending on the number of branches configured in the job. One destination means an output as another dataset. + * </p> + * + * <p> + * Lineage info is jointly collected from the source, represented by {@link org.apache.gobblin.source.Source} or + * {@link org.apache.gobblin.source.extractor.Extractor}, and destination, + * represented by {@link org.apache.gobblin.writer.DataWriter} or {@link org.apache.gobblin.publisher.DataPublisher} + * </p> + * + * <p> + * The general flow is: + * <ol> + * <li> source sets its {@link DatasetDescriptor} to each work unit </li> + * <li> destination puts its {@link DatasetDescriptor} to the work unit </li> + * <li> load and send all lineage events from all states </li> + * <li> purge lineage info from all states </li> + * </ol> + * </p> + */ +@Slf4j +public final class LineageInfo { + public static final String BRANCH = "branch"; + + private static final Gson GSON = new Gson(); + private static final String NAME_KEY = "name"; + + private LineageInfo() { + } + + /** + * Set source {@link DatasetDescriptor} of a lineage event + * + * <p> + * Only the {@link org.apache.gobblin.source.Source} or its {@link org.apache.gobblin.source.extractor.Extractor} + * is supposed to set the source for a work unit of a dataset + * </p> + * + * @param state state about a {@link org.apache.gobblin.source.workunit.WorkUnit} + * + */ + public static void setSource(DatasetDescriptor source, State state) { + state.setProp(getKey(NAME_KEY), source.getName()); + state.setProp(getKey(LineageEventBuilder.SOURCE), GSON.toJson(source)); + } + + /** + * Put a {@link DatasetDescriptor} of a destination dataset to a state + * + * <p> + * Only the {@link org.apache.gobblin.writer.DataWriter} or {@link org.apache.gobblin.publisher.DataPublisher} + * is supposed to put the destination dataset information. Since different branches may concurrently put, + * the method is implemented to be threadsafe + * </p> + */ + public static void putDestination(DatasetDescriptor destination, int branchId, State state) { + if (!hasLineageInfo(state)) { + log.warn("State has no lineage info but branch " + branchId + " puts a destination: " + GSON.toJson(destination)); + return; + } + + synchronized (state.getProp(getKey(NAME_KEY))) { + state.setProp(getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION), GSON.toJson(destination)); + } + } + + /** + * Load all lineage information from {@link State}s of a dataset + * + * <p> + * For a dataset, the same branch across different {@link State}s must be the same, as + * the same branch means the same destination + * </p> + * + * @param states All states which belong to the same dataset + * @return A collection of {@link LineageEventBuilder}s put in the state + * @throws LineageException.ConflictException if two states have conflict lineage info + */ + public static Collection<LineageEventBuilder> load(Collection<? extends State> states) + throws LineageException { + Preconditions.checkArgument(states != null && !states.isEmpty()); + final Map<String, LineageEventBuilder> resultEvents = Maps.newHashMap(); + for (State state : states) { + Map<String, LineageEventBuilder> branchedEvents = load(state); + for (Map.Entry<String, LineageEventBuilder> entry : branchedEvents.entrySet()) { + String branch = entry.getKey(); + LineageEventBuilder event = entry.getValue(); + LineageEventBuilder resultEvent = resultEvents.get(branch); + if (resultEvent == null) { + resultEvents.put(branch, event); + } else if (!resultEvent.equals(event)) { + throw new LineageException.ConflictException(branch, event, resultEvent); + } + } + } + return resultEvents.values(); + } + + /** + * Load all lineage info from a {@link State} + * + * @return A map from branch to its lineage info. If there is no destination info, return an empty map + */ + static Map<String, LineageEventBuilder> load(State state) { + String name = state.getProp(getKey(NAME_KEY)); + DatasetDescriptor source = GSON.fromJson(state.getProp(getKey(LineageEventBuilder.SOURCE)), DatasetDescriptor.class); + + String branchedPrefix = getKey(BRANCH, ""); + Map<String, LineageEventBuilder> events = Maps.newHashMap(); + for (Map.Entry<Object, Object> entry : state.getProperties().entrySet()) { + String key = entry.getKey().toString(); + if (!key.startsWith(branchedPrefix)) { + continue; + } + + String[] parts = key.substring(branchedPrefix.length()).split("\\."); + assert parts.length == 2; + String branchId = parts[0]; + LineageEventBuilder event = events.get(branchId); + if (event == null) { + event = new LineageEventBuilder(name); + event.setSource(new DatasetDescriptor(source)); + events.put(parts[0], event); + } + switch (parts[1]) { + case LineageEventBuilder.DESTINATION: + DatasetDescriptor destination = GSON.fromJson(entry.getValue().toString(), DatasetDescriptor.class); + destination.addMetadata(BRANCH, branchId); + event.setDestination(destination); + break; + default: + throw new RuntimeException("Unsupported lineage key: " + key); + } + } + + return events; + } + + /** + * Remove all lineage related properties from a state + */ + public static void purgeLineageInfo(State state) { + state.removePropsWithPrefix(LineageEventBuilder.LIENAGE_EVENT_NAMESPACE); + } + + /** + * Check if the given state has lineage info + */ + public static boolean hasLineageInfo(State state) { + return state.contains(getKey(NAME_KEY)); + } + + /** + * Get the full lineage event name from a state + */ + public static String getFullEventName(State state) { + return Joiner.on('.').join(LineageEventBuilder.LIENAGE_EVENT_NAMESPACE, state.getProp(getKey(NAME_KEY))); + } + + /** + * Prefix all keys with {@link LineageEventBuilder#LIENAGE_EVENT_NAMESPACE} + */ + private static String getKey(Object... objects) { + Object[] args = new Object[objects.length + 1]; + args[0] = LineageEventBuilder.LIENAGE_EVENT_NAMESPACE; + System.arraycopy(objects, 0, args, 1, objects.length); + return LineageEventBuilder.getKey(args); + } +}
