Repository: incubator-gobblin Updated Branches: refs/heads/master 3a28721d8 -> 9fd80690d
[GOBBLIN-319] Add DatasetResolver to transform raw Gobblin dataset to application specific dataset Closes #2171 from zxcware/lineage Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/9fd80690 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/9fd80690 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/9fd80690 Branch: refs/heads/master Commit: 9fd80690db540c373321a2d434b64d8706d80d65 Parents: 3a28721 Author: zhchen <[email protected]> Authored: Thu Nov 30 15:26:16 2017 -0800 Committer: Hung Tran <[email protected]> Committed: Thu Nov 30 15:26:29 2017 -0800 ---------------------------------------------------------------------- .../gobblin/dataset/DatasetConstants.java | 5 + .../apache/gobblin/dataset/DatasetResolver.java | 35 +++++++ .../gobblin/dataset/DatasetResolverFactory.java | 31 ++++++ .../gobblin/dataset/NoopDatasetResolver.java | 35 +++++++ .../gobblin/publisher/BaseDataPublisher.java | 28 ++++-- .../publisher/BaseDataPublisherTest.java | 52 ++++++++++ .../data/management/copy/CopySource.java | 17 +++- .../management/copy/CopyableDatasetBase.java | 7 -- .../data/management/copy/CopyableFile.java | 7 +- .../copy/RecursiveCopyableDataset.java | 27 ++++-- .../copy/hive/HiveCopyEntityHelper.java | 15 ++- .../data/management/copy/hive/HiveDataset.java | 6 -- .../copy/hive/HivePartitionFileSet.java | 2 +- .../copy/hive/UnpartitionedTableFileSet.java | 2 +- .../copy/publisher/CopyDataPublisher.java | 2 +- .../metrics/event/lineage/LineageException.java | 32 ------- .../metrics/event/lineage/LineageInfo.java | 73 +++++++++------ .../metrics/event/lineage/LineageEventTest.java | 99 +++++++++++--------- .../gobblin/runtime/SafeDatasetCommit.java | 17 ++-- 19 files changed, 335 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java index 73999dc..35bb09e 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java @@ -19,10 +19,15 @@ package org.apache.gobblin.dataset; public class DatasetConstants { /** Platforms */ + public static final String PLATFORM_FILE = "file"; + public static final String PLATFORM_HDFS = "hdfs"; public static final String PLATFORM_KAFKA = "kafka"; public static final String PLATFORM_HIVE = "hive"; public static final String PLATFORM_MYSQL = "mysql"; + /** Common metadata */ + public static final String BRANCH = "branch"; + /** File system metadata */ public static final String FS_URI = "fsUri"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolver.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolver.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolver.java new file mode 100644 index 0000000..0e28169 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolver.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.dataset; + +import org.apache.gobblin.configuration.State; + + +/** + * A {@link DatasetResolver} resolves job specific dataset + */ +public interface DatasetResolver { + /** + * Given raw Gobblin dataset, resolve job specific dataset + * + * @param raw a dataset in terms of Gobblin + * @param state configuration that helps resolve job specific dataset + * @return resolved dataset for the job + */ + DatasetDescriptor resolve(DatasetDescriptor raw, State state); +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java new file mode 100644 index 0000000..eb1b887 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.dataset; + +import org.apache.gobblin.configuration.State; + + +/** + * A factory that creates an instance of {@link DatasetResolver} + */ +public interface DatasetResolverFactory { + String NAMESPACE = "DatasetResolverFactory"; + String CLASS = NAMESPACE + "." + "class"; + + DatasetResolver createResolver(State state); +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java new file mode 100644 index 0000000..c54011a --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.dataset; + +import org.apache.gobblin.configuration.State; + + +/** + * The default {@link DatasetResolver} that directly uses Gobblin raw dataset as job dataset + */ +public class NoopDatasetResolver implements DatasetResolver { + public static final NoopDatasetResolver INSTANCE = new NoopDatasetResolver(); + + private NoopDatasetResolver() {} + + @Override + public DatasetDescriptor resolve(DatasetDescriptor raw, State state) { + return raw; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java index 0097c15..89bab2b 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java @@ -61,9 +61,11 @@ import org.apache.gobblin.dataset.DatasetDescriptor; import org.apache.gobblin.metadata.MetadataMerger; import org.apache.gobblin.metadata.types.StaticStringMetadataMerger; import org.apache.gobblin.metrics.event.lineage.LineageInfo; +import org.apache.gobblin.util.FileListUtils; import org.apache.gobblin.util.ForkOperatorUtils; import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.ParallelRunner; +import org.apache.gobblin.util.PathUtils; import org.apache.gobblin.util.WriterUtils; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; import org.apache.gobblin.writer.FsDataWriter; @@ -280,6 +282,20 @@ public class BaseDataPublisher extends SingleTaskDataPublisher { } } + private void addLineageInfo(WorkUnitState state, int branchId) { + DatasetDescriptor destination = createDestinationDescriptor(state, branchId); + LineageInfo.putDestination(destination, branchId, state); + } + + protected DatasetDescriptor createDestinationDescriptor(WorkUnitState state, int branchId) { + Path publisherOutputDir = getPublisherOutputDir(state, branchId); + FileSystem fs = this.publisherFileSystemByBranches.get(branchId); + DatasetDescriptor destination = new DatasetDescriptor(fs.getScheme(), publisherOutputDir.toString()); + destination.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString()); + destination.addMetadata(DatasetConstants.BRANCH, String.valueOf(branchId)); + return destination; + } + @Override public void publishData(WorkUnitState state) throws IOException { @@ -296,6 +312,7 @@ public class BaseDataPublisher extends SingleTaskDataPublisher { private void publishSingleTaskData(WorkUnitState state, int branchId) throws IOException { publishData(state, branchId, true, new HashSet<Path>()); + addLineageInfo(state, branchId); } @Override @@ -329,16 +346,7 @@ public class BaseDataPublisher extends SingleTaskDataPublisher { private void publishMultiTaskData(WorkUnitState state, int branchId, Set<Path> writerOutputPathsMoved) throws IOException { publishData(state, branchId, false, writerOutputPathsMoved); - DatasetDescriptor destination = createDestinationDescriptor(state, branchId); - LineageInfo.putDestination(destination, branchId, state); - } - - protected DatasetDescriptor createDestinationDescriptor(WorkUnitState state, int branchId) { - Path publisherOutputDir = getPublisherOutputDir(state, branchId); - FileSystem fs = this.publisherFileSystemByBranches.get(branchId); - DatasetDescriptor destination = new DatasetDescriptor(fs.getScheme(), publisherOutputDir.toString()); - destination.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString()); - return destination; + addLineageInfo(state, branchId); } protected void publishData(WorkUnitState state, int branchId, boolean publishSingleTaskData, http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java b/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java index 09bc0c8..159786b 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java @@ -28,10 +28,13 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.testng.Assert; import org.testng.annotations.Test; @@ -41,8 +44,10 @@ import com.google.common.io.Files; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.dataset.DatasetDescriptor; import org.apache.gobblin.metadata.MetadataMerger; import org.apache.gobblin.metadata.types.GlobalMetadata; +import org.apache.gobblin.metrics.event.lineage.LineageInfo; import org.apache.gobblin.util.ForkOperatorUtils; import org.apache.gobblin.writer.FsDataWriter; import org.apache.gobblin.writer.FsWriterMetrics; @@ -519,6 +524,34 @@ public class BaseDataPublisherTest { } } + @Test + public void testPublishSingleTask() + throws IOException { + WorkUnitState state = buildTaskState(1); + DatasetDescriptor source = new DatasetDescriptor("kafka", "testTopic"); + LineageInfo.setSource(source, state); + BaseDataPublisher publisher = new BaseDataPublisher(state); + publisher.publishData(state); + Assert.assertTrue(state.contains("gobblin.event.lineage.branch.0.destination")); + Assert.assertFalse(state.contains("gobblin.event.lineage.branch.1.destination")); + } + + @Test + public void testPublishMultiTasks() + throws IOException { + WorkUnitState state1 = buildTaskState(2); + WorkUnitState state2 = buildTaskState(2); + DatasetDescriptor source = new DatasetDescriptor("kafka", "testTopic"); + LineageInfo.setSource(source, state1); + LineageInfo.setSource(source, state2); + BaseDataPublisher publisher = new BaseDataPublisher(state1); + publisher.publishData(ImmutableList.of(state1, state2)); + Assert.assertTrue(state1.contains("gobblin.event.lineage.branch.0.destination")); + Assert.assertTrue(state1.contains("gobblin.event.lineage.branch.1.destination")); + Assert.assertTrue(state2.contains("gobblin.event.lineage.branch.0.destination")); + Assert.assertTrue(state2.contains("gobblin.event.lineage.branch.1.destination")); + } + public static class TestAdditionMerger implements MetadataMerger<String> { private int sum = 0; @@ -588,4 +621,23 @@ public class BaseDataPublisherTest { return state; } + + private WorkUnitState buildTaskState(int numBranches) { + WorkUnitState state = new WorkUnitState(); + + state.setProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, "namespace"); + state.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, "table"); + state.setProp(ConfigurationKeys.WRITER_FILE_PATH_TYPE, "namespace_table"); + state.setProp(ConfigurationKeys.FORK_BRANCHES_KEY, numBranches); + state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/data/output"); + state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, "/data/working"); + if (numBranches > 1) { + for (int i = 0; i < numBranches; i++) { + state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR + "." + i, "/data/output" + "/branch" + i); + state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR + "." + i, "/data/working" + "/branch" + i); + } + } + + return state; + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java index 8ca05e3..a74d425 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java @@ -65,7 +65,6 @@ import org.apache.gobblin.metrics.GobblinMetrics; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.metrics.event.EventSubmitter; -import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder; import org.apache.gobblin.metrics.event.lineage.LineageInfo; import org.apache.gobblin.metrics.event.sla.SlaEventKeys; import org.apache.gobblin.source.extractor.Extractor; @@ -300,7 +299,7 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { setWorkUnitWatermark(workUnit, watermarkGenerator, copyEntity); computeAndSetWorkUnitGuid(workUnit); workUnitsForPartition.add(workUnit); - addLineageInfo(copyEntity, copyableDataset, workUnit); + addLineageInfo(copyEntity, workUnit); } this.workUnitList.putAll(this.fileSet, workUnitsForPartition); @@ -313,9 +312,17 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { } } - private void addLineageInfo(CopyEntity copyEntity, CopyableDatasetBase copyableDataset, WorkUnit workUnit) { - if (copyEntity instanceof CopyableFile && copyableDataset.getDatasetDescriptor() != null) { - LineageInfo.setSource(copyableDataset.getDatasetDescriptor(), workUnit); + private void addLineageInfo(CopyEntity copyEntity, WorkUnit workUnit) { + if (copyEntity instanceof CopyableFile) { + CopyableFile copyableFile = (CopyableFile) copyEntity; + /* + * In Gobblin Distcp, the source and target path info of a CopyableFile are determined by its dataset found by + * a DatasetFinder. Consequently, the source and destination dataset for the CopyableFile lineage are expected + * to be set by the same logic + */ + if (copyableFile.getSourceDataset() != null && copyableFile.getDestinationDataset() != null) { + LineageInfo.setSource(copyableFile.getSourceDataset(), workUnit); + } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java index 6c71edc..c27b839 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java @@ -18,7 +18,6 @@ package org.apache.gobblin.data.management.copy; import org.apache.gobblin.dataset.Dataset; -import org.apache.gobblin.dataset.DatasetDescriptor; /** @@ -26,10 +25,4 @@ import org.apache.gobblin.dataset.DatasetDescriptor; * Concrete classes must implement a subinterface of this interface ({@link CopyableDataset} or {@link IterableCopyableDataset}). */ public interface CopyableDatasetBase extends Dataset { - /** - * Get the descriptor which identifies and provides metadata of the dataset - */ - default DatasetDescriptor getDatasetDescriptor() { - return null; - } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java index 04e5e34..843a7e3 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java @@ -52,12 +52,17 @@ import com.google.common.collect.Lists; @NoArgsConstructor(access = AccessLevel.PROTECTED) @EqualsAndHashCode(callSuper = true) public class CopyableFile extends CopyEntity implements File { + /** + * The source dataset the file belongs to. For now, since it's only used before copying, set it to be + * transient so that it won't be serialized, avoid unnecessary data transfer + */ + private transient DatasetDescriptor sourceDataset; /** {@link FileStatus} of the existing origin file. */ private FileStatus origin; /** The destination dataset the file will be copied to */ - private DatasetDescriptor destDataset; + private DatasetDescriptor destinationDataset; /** Complete destination {@link Path} of the file. */ private Path destination; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java index 35108df..138debe 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java @@ -20,6 +20,7 @@ package org.apache.gobblin.data.management.copy; import org.apache.gobblin.commit.CommitStep; import org.apache.gobblin.data.management.copy.entities.PrePublishStep; import org.apache.gobblin.data.management.dataset.DatasetUtils; +import org.apache.gobblin.dataset.DatasetConstants; import org.apache.gobblin.dataset.FileSystemDataset; import org.apache.gobblin.dataset.DatasetDescriptor; import org.apache.gobblin.util.PathUtils; @@ -43,8 +44,6 @@ import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import lombok.Getter; - /** * Implementation of {@link CopyableDataset} that creates a {@link CopyableFile} for every file that is a descendant if @@ -69,9 +68,6 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData private final boolean update; private final boolean delete; - @Getter - private transient final DatasetDescriptor datasetDescriptor; - // Include empty directories in the source for copy private final boolean includeEmptyDirectories; // Delete empty directories in the destination @@ -83,7 +79,6 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData this.rootPath = PathUtils.getPathWithoutSchemeAndAuthority(rootPath); this.fs = fs; - this.datasetDescriptor = new DatasetDescriptor(fs.getScheme(), rootPath.toString()); this.pathFilter = DatasetUtils.instantiatePathFilter(properties); this.copyableFileFilter = DatasetUtils.instantiateCopyableFileFilter(properties); @@ -136,7 +131,6 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData List<CopyEntity> copyEntities = Lists.newArrayList(); List<CopyableFile> copyableFiles = Lists.newArrayList(); - DatasetDescriptor targetDataset = new DatasetDescriptor(targetFs.getScheme(), targetPath.toString()); for (Path path : toCopy) { FileStatus file = filesInSource.get(path); @@ -147,7 +141,24 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData .ancestorsOwnerAndPermission(CopyableFile.resolveReplicatedOwnerAndPermissionsRecursively(this.fs, file.getPath().getParent(), nonGlobSearchPath, configuration)) .build(); - copyableFile.setDestDataset(targetDataset); + + /* + * By default, the raw Gobblin dataset for CopyableFile lineage is its parent folder + * if itself is not a folder + */ + boolean isDir = file.isDirectory(); + + Path fullSourcePath = Path.getPathWithoutSchemeAndAuthority(file.getPath()); + String sourceDataset = isDir ? fullSourcePath.toString() : fullSourcePath.getParent().toString(); + DatasetDescriptor source = new DatasetDescriptor(this.fs.getScheme(), sourceDataset); + source.addMetadata(DatasetConstants.FS_URI, this.fs.getUri().toString()); + copyableFile.setSourceDataset(source); + + String destinationDataset = isDir ? thisTargetPath.toString() : thisTargetPath.getParent().toString(); + DatasetDescriptor destination = new DatasetDescriptor(targetFs.getScheme(), destinationDataset); + destination.addMetadata(DatasetConstants.FS_URI, targetFs.getUri().toString()); + copyableFile.setDestinationDataset(destination); + copyableFiles.add(copyableFile); } copyEntities.addAll(this.copyableFileFilter.filter(this.fs, targetFs, copyableFiles)); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java index cc7be1e..2580775 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java @@ -763,9 +763,18 @@ public class HiveCopyEntityHelper { return this.targetFs; } - void setCopyableFileDestinationDataset(CopyableFile copyableFile) { - DatasetDescriptor destination = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, this.getTargetDatabase() + "." + this.getTargetTable()); + /** + * Set the source and destination datasets of a copyable file + */ + void setCopyableFileDatasets(CopyableFile copyableFile) { + String sourceTable = dataset.getTable().getDbName() + "." + dataset.getTable().getTableName(); + DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, sourceTable); + source.addMetadata(DatasetConstants.FS_URI, dataset.getFs().getUri().toString()); + copyableFile.setSourceDataset(source); + + String destinationTable = this.getTargetDatabase() + "." + this.getTargetTable(); + DatasetDescriptor destination = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, destinationTable); destination.addMetadata(DatasetConstants.FS_URI, this.getTargetFs().getUri().toString()); - copyableFile.setDestDataset(destination); + copyableFile.setDestinationDataset(destination); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java index 03dba25..26c7d7e 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java @@ -57,12 +57,10 @@ import org.apache.gobblin.data.management.copy.CopyableDataset; import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder.DbAndTable; import org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset; import org.apache.gobblin.data.management.partition.FileSet; -import org.apache.gobblin.dataset.DatasetConstants; import org.apache.gobblin.hive.HiveMetastoreClientPool; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; -import org.apache.gobblin.dataset.DatasetDescriptor; import org.apache.gobblin.util.AutoReturnableObject; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PathUtils; @@ -108,8 +106,6 @@ public class HiveDataset implements PrioritizedCopyableDataset { protected final DbAndTable dbAndTable; protected final DbAndTable logicalDbAndTable; - private transient final DatasetDescriptor datasetDescriptor; - public HiveDataset(FileSystem fs, HiveMetastoreClientPool clientPool, Table table, Properties properties) { this(fs, clientPool, table, properties, ConfigFactory.empty()); } @@ -128,8 +124,6 @@ public class HiveDataset implements PrioritizedCopyableDataset { Optional.fromNullable(this.table.getDataLocation()); this.tableIdentifier = this.table.getDbName() + "." + this.table.getTableName(); - this.datasetDescriptor = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, tableIdentifier); - this.datasetDescriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString()); this.datasetNamePattern = Optional.fromNullable(ConfigUtils.getString(datasetConfig, DATASET_NAME_PATTERN_KEY, null)); this.dbAndTable = new DbAndTable(table.getDbName(), table.getTableName()); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java index 790c0b4..34b6933 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java @@ -152,7 +152,7 @@ public class HivePartitionFileSet extends HiveFileSet { CopyableFile fileEntity = builder.fileSet(fileSet).checksum(new byte[0]).datasetOutputPath(desiredTargetLocation.location.toString()) .build(); - this.hiveCopyEntityHelper.setCopyableFileDestinationDataset(fileEntity); + this.hiveCopyEntityHelper.setCopyableFileDatasets(fileEntity); copyEntities.add(fileEntity); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java index 4d82a62..21813fb 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java @@ -122,7 +122,7 @@ public class UnpartitionedTableFileSet extends HiveFileSet { Optional.<Partition> absent())) { CopyableFile fileEntity = builder.fileSet(fileSet).datasetOutputPath(desiredTargetLocation.location.toString()).build(); - this.helper.setCopyableFileDestinationDataset(fileEntity); + this.helper.setCopyableFileDatasets(fileEntity); copyEntities.add(fileEntity); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java index 71ebd59..e1ccf65 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java @@ -214,7 +214,7 @@ public class CopyDataPublisher extends DataPublisher implements UnpublishedHandl if (!fileSetRoot.isPresent() && copyableFile.getDatasetOutputPath() != null) { fileSetRoot = Optional.of(copyableFile.getDatasetOutputPath()); } - LineageInfo.putDestination(copyableFile.getDestDataset(), 0, wus); + LineageInfo.putDestination(copyableFile.getDestinationDataset(), 0, wus); } if (datasetOriginTimestamp > copyableFile.getOriginTimestamp()) { datasetOriginTimestamp = copyableFile.getOriginTimestamp(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageException.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageException.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageException.java deleted file mode 100644 index e7528b3..0000000 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageException.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.metrics.event.lineage; - -/** - * A set of exceptions used by {@link LineageEventBuilder} when lineage information is serialized or deserialized. - */ -public class LineageException extends Exception { - public LineageException(String message) { - super(message); - } - public static class ConflictException extends LineageException { - public ConflictException(String branchId, LineageEventBuilder actual, LineageEventBuilder expect) { - super("Conflict LineageEvent: branchId=" + branchId + ", expected=" + expect.toString() + " actual=" + actual.toString()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java index 80b9dec..9a3cc11 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java @@ -19,16 +19,21 @@ package org.apache.gobblin.metrics.event.lineage; import java.util.Collection; import java.util.Map; +import java.util.Set; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.State; import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.dataset.DatasetResolver; +import org.apache.gobblin.dataset.DatasetResolverFactory; +import org.apache.gobblin.dataset.NoopDatasetResolver; /** @@ -57,8 +62,7 @@ import org.apache.gobblin.dataset.DatasetDescriptor; */ @Slf4j public final class LineageInfo { - public static final String BRANCH = "branch"; - + private static final String BRANCH = "branch"; private static final Gson GSON = new Gson(); private static final String NAME_KEY = "name"; @@ -77,8 +81,14 @@ public final class LineageInfo { * */ public static void setSource(DatasetDescriptor source, State state) { - state.setProp(getKey(NAME_KEY), source.getName()); - state.setProp(getKey(LineageEventBuilder.SOURCE), GSON.toJson(source)); + DatasetResolver resolver = getResolver(state); + DatasetDescriptor descriptor = resolver.resolve(source, state); + if (descriptor == null) { + return; + } + + state.setProp(getKey(NAME_KEY), descriptor.getName()); + state.setProp(getKey(LineageEventBuilder.SOURCE), GSON.toJson(descriptor)); } /** @@ -95,42 +105,32 @@ public final class LineageInfo { log.warn("State has no lineage info but branch " + branchId + " puts a destination: " + GSON.toJson(destination)); return; } - + log.debug(String.format("Put destination %s for branch %d", GSON.toJson(destination), branchId)); synchronized (state.getProp(getKey(NAME_KEY))) { - state.setProp(getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION), GSON.toJson(destination)); + DatasetResolver resolver = getResolver(state); + DatasetDescriptor descriptor = resolver.resolve(destination, state); + if (descriptor == null) { + return; + } + + state.setProp(getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION), GSON.toJson(descriptor)); } } /** * Load all lineage information from {@link State}s of a dataset * - * <p> - * For a dataset, the same branch across different {@link State}s must be the same, as - * the same branch means the same destination - * </p> - * * @param states All states which belong to the same dataset * @return A collection of {@link LineageEventBuilder}s put in the state - * @throws LineageException.ConflictException if two states have conflict lineage info */ - public static Collection<LineageEventBuilder> load(Collection<? extends State> states) - throws LineageException { + public static Collection<LineageEventBuilder> load(Collection<? extends State> states) { Preconditions.checkArgument(states != null && !states.isEmpty()); - final Map<String, LineageEventBuilder> resultEvents = Maps.newHashMap(); + Set<LineageEventBuilder> allEvents = Sets.newHashSet(); for (State state : states) { Map<String, LineageEventBuilder> branchedEvents = load(state); - for (Map.Entry<String, LineageEventBuilder> entry : branchedEvents.entrySet()) { - String branch = entry.getKey(); - LineageEventBuilder event = entry.getValue(); - LineageEventBuilder resultEvent = resultEvents.get(branch); - if (resultEvent == null) { - resultEvents.put(branch, event); - } else if (!resultEvent.equals(event)) { - throw new LineageException.ConflictException(branch, event, resultEvent); - } - } + allEvents.addAll(branchedEvents.values()); } - return resultEvents.values(); + return allEvents; } /** @@ -162,7 +162,6 @@ public final class LineageInfo { switch (parts[1]) { case LineageEventBuilder.DESTINATION: DatasetDescriptor destination = GSON.fromJson(entry.getValue().toString(), DatasetDescriptor.class); - destination.addMetadata(BRANCH, branchId); event.setDestination(destination); break; default: @@ -195,6 +194,26 @@ public final class LineageInfo { } /** + * Get the configured {@link DatasetResolver} from {@link State} + */ + public static DatasetResolver getResolver(State state) { + String resolverFactory = state.getProp(DatasetResolverFactory.CLASS); + if (resolverFactory == null) { + return NoopDatasetResolver.INSTANCE; + } + + DatasetResolver resolver = NoopDatasetResolver.INSTANCE; + try { + DatasetResolverFactory factory = (DatasetResolverFactory) Class.forName(resolverFactory).newInstance(); + resolver = factory.createResolver(state); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + log.error(String.format("Fail to create a DatasetResolver with factory class %s", resolverFactory)); + } + + return resolver; + } + + /** * Prefix all keys with {@link LineageEventBuilder#LIENAGE_EVENT_NAMESPACE} */ private static String getKey(Object... objects) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java index 4e711b9..7388de6 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import org.apache.gobblin.configuration.State; +import org.apache.gobblin.dataset.DatasetConstants; import org.apache.gobblin.dataset.DatasetDescriptor; import org.apache.gobblin.metrics.event.GobblinEventBuilder; @@ -31,21 +32,31 @@ import org.testng.annotations.Test; import com.google.common.collect.Lists; +/** + * Test for loading linage events from state + */ public class LineageEventTest { @Test public void testEvent() { final String topic = "testTopic"; + final String kafka = "kafka"; + final String hdfs = "hdfs"; + final String mysql = "mysql"; + final String branch = "branch"; + State state0 = new State(); - DatasetDescriptor source = new DatasetDescriptor("kafka", topic); + DatasetDescriptor source = new DatasetDescriptor(kafka, topic); LineageInfo.setSource(source, state0); - DatasetDescriptor destination0 = new DatasetDescriptor("hdfs", "/data/dbchanges"); - LineageInfo.putDestination(destination0, 0, state0); - DatasetDescriptor destination1 = new DatasetDescriptor("mysql", "kafka.testTopic"); - LineageInfo.putDestination(destination1, 1, state0); + DatasetDescriptor destination00 = new DatasetDescriptor(hdfs, "/data/dbchanges"); + destination00.addMetadata(branch, "0"); + LineageInfo.putDestination(destination00, 0, state0); + DatasetDescriptor destination01 = new DatasetDescriptor(mysql, "kafka.testTopic"); + destination01.addMetadata(branch, "1"); + LineageInfo.putDestination(destination01, 1, state0); Map<String, LineageEventBuilder> events = LineageInfo.load(state0); - verify(events.get("0"), topic, source, destination0, 0); - verify(events.get("1"), topic, source, destination1, 1); + verify(events.get("0"), topic, source, destination00); + verify(events.get("1"), topic, source, destination01); State state1 = new State(); LineageInfo.setSource(source, state1); @@ -54,60 +65,56 @@ public class LineageEventTest { states.add(state1); // Test only full fledged lineage events are loaded - try { - Collection<LineageEventBuilder> eventsList = LineageInfo.load(states); - Assert.assertTrue(eventsList.size() == 2); - Assert.assertEquals(getLineageEvent(eventsList, 0), events.get("0")); - Assert.assertEquals(getLineageEvent(eventsList, 1), events.get("1")); - } catch (LineageException e) { - Assert.fail("Unexpected exception"); - } + Collection<LineageEventBuilder> eventsList = LineageInfo.load(states); + Assert.assertTrue(eventsList.size() == 2); + Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), events.get("0")); + Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), events.get("1")); // There are 3 full fledged lineage events - DatasetDescriptor destination2 = new DatasetDescriptor("mysql", "kafka.testTopic2"); - LineageInfo.putDestination(destination2, 2, state1); - try { - Collection<LineageEventBuilder> eventsList = LineageInfo.load(states); - Assert.assertTrue(eventsList.size() == 3); - Assert.assertEquals(getLineageEvent(eventsList, 0), events.get("0")); - Assert.assertEquals(getLineageEvent(eventsList, 1), events.get("1")); - verify(getLineageEvent(eventsList, 2), topic, source, destination2, 2); - } catch (LineageException e) { - Assert.fail("Unexpected exception"); + DatasetDescriptor destination12 = new DatasetDescriptor(mysql, "kafka.testTopic2"); + destination12.addMetadata(branch, "2"); + LineageInfo.putDestination(destination12, 2, state1); + eventsList = LineageInfo.load(states); + Assert.assertTrue(eventsList.size() == 3); + Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), events.get("0")); + Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), events.get("1")); + verify(getLineageEvent(eventsList, 2, mysql), topic, source, destination12); + + + // There 5 lineage events put, but only 4 unique lineage events + DatasetDescriptor destination10 = destination12; + LineageInfo.putDestination(destination10, 0, state1); + DatasetDescriptor destination11 = new DatasetDescriptor("hive", "kafka.testTopic1"); + destination11.addMetadata(branch, "1"); + LineageInfo.putDestination(destination11, 1, state1); + eventsList = LineageInfo.load(states); + Assert.assertTrue(eventsList.size() == 4); + Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), events.get("0")); + Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), events.get("1")); + // Either branch 0 or 2 of state 1 is selected + LineageEventBuilder event12 = getLineageEvent(eventsList, 0, mysql); + if (event12 == null) { + event12 = getLineageEvent(eventsList, 2, mysql); } - - // Throw conflict exception when there is a conflict on a branch between 2 states - LineageInfo.putDestination(destination2, 0, state1); - boolean hasLineageException = false; - try { - Collection<LineageEventBuilder> eventsList = LineageInfo.load(states); - } catch (LineageException e) { - Assert.assertTrue(e instanceof LineageException.ConflictException); - hasLineageException = true; - } - Assert.assertTrue(hasLineageException); + verify(event12, topic, source, destination12); + verify(getLineageEvent(eventsList, 1, "hive"), topic, source, destination11); } - private LineageEventBuilder getLineageEvent(Collection<LineageEventBuilder> events, int branchId) { + private LineageEventBuilder getLineageEvent(Collection<LineageEventBuilder> events, int branchId, String destinationPlatform) { for (LineageEventBuilder event : events) { - if (event.getDestination().getMetadata().get(LineageInfo.BRANCH).equals(String.valueOf(branchId))) { + if (event.getDestination().getPlatform().equals(destinationPlatform) && + event.getDestination().getMetadata().get(DatasetConstants.BRANCH).equals(String.valueOf(branchId))) { return event; } } return null; } - private void verify(LineageEventBuilder event, String name, DatasetDescriptor source, DatasetDescriptor destination, int branchId) { + private void verify(LineageEventBuilder event, String name, DatasetDescriptor source, DatasetDescriptor destination) { Assert.assertEquals(event.getName(), name); Assert.assertEquals(event.getNamespace(), LineageEventBuilder.LIENAGE_EVENT_NAMESPACE); Assert.assertEquals(event.getMetadata().get(GobblinEventBuilder.EVENT_TYPE), LineageEventBuilder.LINEAGE_EVENT_TYPE); Assert.assertTrue(event.getSource().equals(source)); - - DatasetDescriptor updatedDestination = new DatasetDescriptor(destination); - updatedDestination.addMetadata(LineageInfo.BRANCH, String.valueOf(branchId)); - Assert.assertTrue(event.getDestination().equals(updatedDestination)); - - // It only has eventType info - Assert.assertTrue(event.getMetadata().size() == 1); + Assert.assertTrue(event.getDestination().equals(destination)); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9fd80690/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java index 7ff9bb1..43e5c59 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java @@ -38,7 +38,6 @@ import org.apache.gobblin.commit.DeliverySemantics; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.instrumented.Instrumented; -import org.apache.gobblin.metrics.event.lineage.LineageException; import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.event.FailureEventBuilder; @@ -210,20 +209,19 @@ final class SafeDatasetCommit implements Callable<Void> { } } if (states.size() == 0) { + log.info("Will not submit lineage events as no state contains lineage info"); return; } try { if (StringUtils.isEmpty(datasetUrn)) { // This dataset may contain different kinds of LineageEvent - for (Collection<TaskState> collection : aggregateByLineageEvent(states)) { - submitLineageEvent(collection); + for (Map.Entry<String, Collection<TaskState>> entry : aggregateByLineageEvent(states).entrySet()) { + submitLineageEvent(entry.getKey(), entry.getValue()); } } else { - submitLineageEvent(states); + submitLineageEvent(datasetUrn, states); } - } catch (LineageException e) { - log.error("Lineage event submission failed due to :" + e.toString()); } finally { // Purge lineage info from all states for (TaskState taskState : allStates) { @@ -232,10 +230,11 @@ final class SafeDatasetCommit implements Callable<Void> { } } - private void submitLineageEvent(Collection<TaskState> states) throws LineageException { + private void submitLineageEvent(String dataset, Collection<TaskState> states) { Collection<LineageEventBuilder> events = LineageInfo.load(states); // Send events events.forEach(event -> event.submit(metricContext)); + log.info(String.format("Submitted %d lineage events for dataset %s", events.size(), dataset)); } /** @@ -425,7 +424,7 @@ final class SafeDatasetCommit implements Callable<Void> { .withDatasetState(datasetState).build()); } - private static Collection<Collection<TaskState>> aggregateByLineageEvent(Collection<TaskState> states) { + private static Map<String, Collection<TaskState>> aggregateByLineageEvent(Collection<TaskState> states) { Map<String, Collection<TaskState>> statesByEvents = Maps.newHashMap(); for (TaskState state : states) { String eventName = LineageInfo.getFullEventName(state); @@ -433,6 +432,6 @@ final class SafeDatasetCommit implements Callable<Void> { statesForEvent.add(state); } - return statesByEvents.values(); + return statesByEvents; } }
