[GOBBLIN-395] Add lineage for copying config based dataset Closes #2269 from zxcware/c2
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/161bef09 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/161bef09 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/161bef09 Branch: refs/heads/0.12.0 Commit: 161bef09dd5cbbbb65f9f6965008c57b632fb075 Parents: c35f76e Author: zhchen <[email protected]> Authored: Tue Jan 30 17:26:03 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Tue Jan 30 17:26:03 2018 -0800 ---------------------------------------------------------------------- .../data/management/copy/CopyableFile.java | 25 +++++++++ .../copy/RecursiveCopyableDataset.java | 21 +------- .../copy/replication/ConfigBasedDataset.java | 11 ++-- .../data/management/copy/CopyableFileTest.java | 53 +++++++++++++++++++- .../gobblin/metrics/reporter/EventReporter.java | 2 +- 5 files changed, 84 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/161bef09/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java index 843a7e3..d2547b4 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java @@ -19,6 +19,7 @@ package org.apache.gobblin.data.management.copy; import org.apache.gobblin.data.management.partition.File; import org.apache.gobblin.data.management.copy.PreserveAttributes.Option; +import org.apache.gobblin.dataset.DatasetConstants; import org.apache.gobblin.dataset.DatasetDescriptor; import org.apache.gobblin.util.PathUtils; import org.apache.gobblin.util.guid.Guid; @@ -116,6 +117,30 @@ public class CopyableFile extends CopyEntity implements File { } /** + * Set file system based source and destination dataset for this {@link CopyableFile} + * + * @param originFs {@link FileSystem} where this {@link CopyableFile} origins + * @param targetFs {@link FileSystem} where this {@link CopyableFile} is copied to + */ + public void setFsDatasets(FileSystem originFs, FileSystem targetFs) { + /* + * By default, the raw Gobblin dataset for CopyableFile lineage is its parent folder + * if itself is not a folder + */ + boolean isDir = origin.isDirectory(); + + Path fullSourcePath = Path.getPathWithoutSchemeAndAuthority(origin.getPath()); + String sourceDatasetName = isDir ? fullSourcePath.toString() : fullSourcePath.getParent().toString(); + sourceDataset = new DatasetDescriptor(originFs.getScheme(), sourceDatasetName); + sourceDataset.addMetadata(DatasetConstants.FS_URI, originFs.getUri().toString()); + + Path fullDestinationPath = Path.getPathWithoutSchemeAndAuthority(destination); + String destinationDatasetName = isDir ? fullDestinationPath.toString() : fullDestinationPath.getParent().toString(); + destinationDataset = new DatasetDescriptor(targetFs.getScheme(), destinationDatasetName); + destinationDataset.addMetadata(DatasetConstants.FS_URI, targetFs.getUri().toString()); + } + + /** * Get a {@link CopyableFile.Builder}. * * @param originFs {@link FileSystem} where original file exists. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/161bef09/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java index 252dafa..2d1f740 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java @@ -20,9 +20,7 @@ package org.apache.gobblin.data.management.copy; import org.apache.gobblin.commit.CommitStep; import org.apache.gobblin.data.management.copy.entities.PrePublishStep; import org.apache.gobblin.data.management.dataset.DatasetUtils; -import org.apache.gobblin.dataset.DatasetConstants; import org.apache.gobblin.dataset.FileSystemDataset; -import org.apache.gobblin.dataset.DatasetDescriptor; import org.apache.gobblin.util.PathUtils; import org.apache.gobblin.util.FileListUtils; import org.apache.gobblin.util.commit.DeleteFileCommitStep; @@ -148,24 +146,7 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData .datasetOutputPath(thisTargetPath.toString()).ancestorsOwnerAndPermission(CopyableFile .resolveReplicatedOwnerAndPermissionsRecursively(this.fs, file.getPath().getParent(), nonGlobSearchPath, configuration)).build(); - - /* - * By default, the raw Gobblin dataset for CopyableFile lineage is its parent folder - * if itself is not a folder - */ - boolean isDir = file.isDirectory(); - - Path fullSourcePath = Path.getPathWithoutSchemeAndAuthority(file.getPath()); - String sourceDataset = isDir ? fullSourcePath.toString() : fullSourcePath.getParent().toString(); - DatasetDescriptor source = new DatasetDescriptor(this.fs.getScheme(), sourceDataset); - source.addMetadata(DatasetConstants.FS_URI, this.fs.getUri().toString()); - copyableFile.setSourceDataset(source); - - String destinationDataset = isDir ? thisTargetPath.toString() : thisTargetPath.getParent().toString(); - DatasetDescriptor destination = new DatasetDescriptor(targetFs.getScheme(), destinationDataset); - destination.addMetadata(DatasetConstants.FS_URI, targetFs.getUri().toString()); - copyableFile.setDestinationDataset(destination); - + copyableFile.setFsDatasets(this.fs, targetFs); copyableFiles.add(copyableFile); } copyEntities.addAll(this.copyableFileFilter.filter(this.fs, targetFs, copyableFiles)); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/161bef09/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java index 293034b..cc893a6 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java @@ -187,12 +187,11 @@ public class ConfigBasedDataset implements CopyableDataset { if (copyToFileMap.containsKey(newPath)) { deletedPaths.add(newPath); } - - copyableFiles.add( - CopyableFile.fromOriginAndDestination(copyFromFs, originFileStatus, copyToFs.makeQualified(newPath), - copyConfiguration) - .fileSet(PathUtils.getPathWithoutSchemeAndAuthority(copyTo.getDatasetPath()).toString()) - .build()); + CopyableFile copyableFile = CopyableFile + .fromOriginAndDestination(copyFromFs, originFileStatus, copyToFs.makeQualified(newPath), copyConfiguration) + .fileSet(PathUtils.getPathWithoutSchemeAndAuthority(copyTo.getDatasetPath()).toString()).build(); + copyableFile.setFsDatasets(copyFromFs, copyToFs); + copyableFiles.add(copyableFile); } // clean up already checked paths http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/161bef09/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java index 30ba0af..986efeb 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java @@ -18,6 +18,7 @@ package org.apache.gobblin.data.management.copy; import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; import java.util.List; import java.util.Properties; @@ -36,8 +37,13 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.dataset.DatasetDescriptor; import org.apache.gobblin.util.PathUtils; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + public class CopyableFileTest { @Test @@ -86,6 +92,51 @@ public class CopyableFileTest { } + @Test + public void testSetFsDatasets() throws URISyntaxException { + FileSystem originFs = mock(FileSystem.class); + String originFsUri = "hdfs://source.company.biz:2000"; + String originPath = "/data/databases/source/profile"; + when(originFs.getUri()).thenReturn(new URI(originFsUri)); + when(originFs.getScheme()).thenReturn("hdfs"); + + FileSystem targetFs = mock(FileSystem.class); + String targetFsUri = "file:///"; + String destinationPath = "/data/databases/destination/profile"; + when(targetFs.getUri()).thenReturn(new URI(targetFsUri)); + when(targetFs.getScheme()).thenReturn("file"); + + // Test when source file is not a directory + FileStatus origin = new FileStatus(0l, false, 0, 0l, 0l, new Path(originPath)); + CopyableFile copyableFile = new CopyableFile(origin, new Path(destinationPath), null, null, null, + PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, String>newHashMap(), ""); + copyableFile.setFsDatasets(originFs, targetFs); + DatasetDescriptor source = copyableFile.getSourceDataset(); + Assert.assertEquals(source.getName(), "/data/databases/source"); + Assert.assertEquals(source.getPlatform(), "hdfs"); + Assert.assertEquals(source.getMetadata().get("fsUri"), originFsUri); + DatasetDescriptor destination = copyableFile.getDestinationDataset(); + Assert.assertEquals(destination.getName(), "/data/databases/destination"); + Assert.assertEquals(destination.getPlatform(), "file"); + Assert.assertEquals(destination.getMetadata().get("fsUri"), targetFsUri); + + // Test when source file is a directory + originPath = originFsUri + originPath; + destinationPath = targetFsUri + destinationPath; + origin = new FileStatus(0l, true, 0, 0l, 0l, new Path(originPath)); + copyableFile = new CopyableFile(origin, new Path(destinationPath), null, null, null, + PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, String>newHashMap(), ""); + copyableFile.setFsDatasets(originFs, targetFs); + source = copyableFile.getSourceDataset(); + Assert.assertEquals(source.getName(), "/data/databases/source/profile"); + Assert.assertEquals(source.getPlatform(), "hdfs"); + Assert.assertEquals(source.getMetadata().get("fsUri"), originFsUri); + destination = copyableFile.getDestinationDataset(); + Assert.assertEquals(destination.getName(), "/data/databases/destination/profile"); + Assert.assertEquals(destination.getPlatform(), "file"); + Assert.assertEquals(destination.getMetadata().get("fsUri"), targetFsUri); + } + @Test public void testCopyableFileBuilderMinimumConfiguration() @@ -200,7 +251,7 @@ public class CopyableFileTest { FileStatus fileStatus = new FileStatus(1, false, 0, 0, 0, 0, FsPermission.getDefault(), "owner", "group", path); - FileSystem fs = Mockito.mock(FileSystem.class); + FileSystem fs = mock(FileSystem.class); Mockito.doReturn(fileStatus).when(fs).getFileStatus(path); Mockito.doReturn(path).when(fs).makeQualified(path); Mockito.doReturn(new URI("hdfs://uri")).when(fs).getUri(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/161bef09/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java index 498ad58..a733d6a 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java @@ -125,7 +125,7 @@ public abstract class EventReporter extends ScheduledReporter implements Closeab } try { if (!this.reportingQueue.offer(sanitizeEvent(event), 10, TimeUnit.SECONDS)) { - log.error("Enqueuing of event %s at reporter with class %s timed out. Sending of events is probably stuck.", + log.error("Enqueuing of event {} at reporter with class {} timed out. Sending of events is probably stuck.", event, this.getClass().getCanonicalName()); } } catch (InterruptedException ie) {
