Repository: incubator-gobblin Updated Branches: refs/heads/master 5eee52d93 -> 29d403aec
[GOBBLIN-576] Send partition level lineage in hive distcp Closes #2442 from zxcware/pd Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/29d403ae Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/29d403ae Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/29d403ae Branch: refs/heads/master Commit: 29d403aec044756ad711b4bce5a7395d01168439 Parents: 5eee52d Author: zhchen <[email protected]> Authored: Tue Sep 11 09:14:46 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Tue Sep 11 09:14:46 2018 -0700 ---------------------------------------------------------------------- .../gobblin/data/management/copy/CopySource.java | 6 +++--- .../data/management/copy/CopyableFile.java | 15 +++++++++------ .../copy/hive/HiveCopyEntityHelper.java | 19 +++++++++---------- .../copy/hive/HivePartitionFileSet.java | 15 ++++++++++++++- .../copy/hive/UnpartitionedTableFileSet.java | 3 ++- .../copy/publisher/CopyDataPublisher.java | 2 +- .../data/management/copy/CopyableFileTest.java | 13 +++++++++---- 7 files changed, 47 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29d403ae/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java index 4cf0b64..9ae9b45 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java @@ -398,9 +398,9 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { * a DatasetFinder. Consequently, the source and destination dataset for the CopyableFile lineage are expected * to be set by the same logic */ - if (lineageInfo.isPresent() && copyableFile.getSourceDataset() != null - && copyableFile.getDestinationDataset() != null) { - lineageInfo.get().setSource(copyableFile.getSourceDataset(), workUnit); + if (lineageInfo.isPresent() && copyableFile.getSourceData() != null + && copyableFile.getDestinationData() != null) { + lineageInfo.get().setSource(copyableFile.getSourceData(), workUnit); } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29d403ae/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java index d2547b4..9ad918c 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java @@ -21,6 +21,7 @@ import org.apache.gobblin.data.management.partition.File; import org.apache.gobblin.data.management.copy.PreserveAttributes.Option; import org.apache.gobblin.dataset.DatasetConstants; import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.dataset.Descriptor; import org.apache.gobblin.util.PathUtils; import org.apache.gobblin.util.guid.Guid; @@ -54,16 +55,16 @@ import com.google.common.collect.Lists; @EqualsAndHashCode(callSuper = true) public class CopyableFile extends CopyEntity implements File { /** - * The source dataset the file belongs to. For now, since it's only used before copying, set it to be + * The source data the file belongs to. For now, since it's only used before copying, set it to be * transient so that it won't be serialized, avoid unnecessary data transfer */ - private transient DatasetDescriptor sourceDataset; + private transient Descriptor sourceData; /** {@link FileStatus} of the existing origin file. */ private FileStatus origin; - /** The destination dataset the file will be copied to */ - private DatasetDescriptor destinationDataset; + /** The destination data the file will be copied to */ + private Descriptor destinationData; /** Complete destination {@link Path} of the file. */ private Path destination; @@ -131,13 +132,15 @@ public class CopyableFile extends CopyEntity implements File { Path fullSourcePath = Path.getPathWithoutSchemeAndAuthority(origin.getPath()); String sourceDatasetName = isDir ? fullSourcePath.toString() : fullSourcePath.getParent().toString(); - sourceDataset = new DatasetDescriptor(originFs.getScheme(), sourceDatasetName); + DatasetDescriptor sourceDataset = new DatasetDescriptor(originFs.getScheme(), sourceDatasetName); sourceDataset.addMetadata(DatasetConstants.FS_URI, originFs.getUri().toString()); + sourceData = sourceDataset; Path fullDestinationPath = Path.getPathWithoutSchemeAndAuthority(destination); String destinationDatasetName = isDir ? fullDestinationPath.toString() : fullDestinationPath.getParent().toString(); - destinationDataset = new DatasetDescriptor(targetFs.getScheme(), destinationDatasetName); + DatasetDescriptor destinationDataset = new DatasetDescriptor(targetFs.getScheme(), destinationDatasetName); destinationDataset.addMetadata(DatasetConstants.FS_URI, targetFs.getUri().toString()); + destinationData = destinationDataset; } /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29d403ae/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java index 2b1a142..9553959 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java @@ -765,18 +765,17 @@ public class HiveCopyEntityHelper { return this.targetFs; } - /** - * Set the source and destination datasets of a copyable file - */ - void setCopyableFileDatasets(CopyableFile copyableFile) { + DatasetDescriptor getSourceDataset() { String sourceTable = dataset.getTable().getDbName() + "." + dataset.getTable().getTableName(); - DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, sourceTable); - source.addMetadata(DatasetConstants.FS_URI, dataset.getFs().getUri().toString()); - copyableFile.setSourceDataset(source); + DatasetDescriptor sourceDataset = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, sourceTable); + sourceDataset.addMetadata(DatasetConstants.FS_URI, dataset.getFs().getUri().toString()); + return sourceDataset; + } + DatasetDescriptor getDestinationDataset() { String destinationTable = this.getTargetDatabase() + "." + this.getTargetTable(); - DatasetDescriptor destination = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, destinationTable); - destination.addMetadata(DatasetConstants.FS_URI, this.getTargetFs().getUri().toString()); - copyableFile.setDestinationDataset(destination); + DatasetDescriptor destinationDataset = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, destinationTable); + destinationDataset.addMetadata(DatasetConstants.FS_URI, this.getTargetFs().getUri().toString()); + return destinationDataset; } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29d403ae/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java index 2c3817e..93ae40f 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java @@ -35,6 +35,8 @@ import org.apache.gobblin.data.management.copy.CopyEntity; import org.apache.gobblin.data.management.copy.CopyableFile; import org.apache.gobblin.data.management.copy.entities.PostPublishStep; import org.apache.gobblin.data.management.copy.entities.PrePublishStep; +import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.dataset.PartitionDescriptor; import org.apache.gobblin.hive.HiveRegisterStep; import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils; import org.apache.gobblin.hive.spec.HiveSpec; @@ -153,7 +155,18 @@ public class HivePartitionFileSet extends HiveFileSet { CopyableFile fileEntity = builder.fileSet(fileSet).checksum(new byte[0]).datasetOutputPath(desiredTargetLocation.location.toString()) .build(); - this.hiveCopyEntityHelper.setCopyableFileDatasets(fileEntity); + + DatasetDescriptor sourceDataset = this.hiveCopyEntityHelper.getSourceDataset(); + PartitionDescriptor source = new PartitionDescriptor(partition.getName(), sourceDataset); + fileEntity.setSourceData(source); + + DatasetDescriptor destinationDataset = this.hiveCopyEntityHelper.getDestinationDataset(); + Partition destinationPartition = + this.existingTargetPartition.isPresent() ? this.existingTargetPartition.get() : partition; + PartitionDescriptor destination = + new PartitionDescriptor(destinationPartition.getName(), destinationDataset); + fileEntity.setDestinationData(destination); + copyEntities.add(fileEntity); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29d403ae/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java index 756b4dd..89ebe7e 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java @@ -123,7 +123,8 @@ public class UnpartitionedTableFileSet extends HiveFileSet { Optional.<Partition> absent())) { CopyableFile fileEntity = builder.fileSet(fileSet).datasetOutputPath(desiredTargetLocation.location.toString()).build(); - this.helper.setCopyableFileDatasets(fileEntity); + fileEntity.setSourceData(this.helper.getSourceDataset()); + fileEntity.setDestinationData(this.helper.getDestinationDataset()); copyEntities.add(fileEntity); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29d403ae/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java index 79b2b6a..ec7b1a0 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java @@ -226,7 +226,7 @@ public class CopyDataPublisher extends DataPublisher implements UnpublishedHandl fileSetRoot = Optional.of(copyableFile.getDatasetOutputPath()); } if (lineageInfo.isPresent()) { - lineageInfo.get().putDestination(copyableFile.getDestinationDataset(), 0, wus); + lineageInfo.get().putDestination(copyableFile.getDestinationData(), 0, wus); } } if (datasetOriginTimestamp > copyableFile.getOriginTimestamp()) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29d403ae/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java index 986efeb..65425fa 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java @@ -38,6 +38,7 @@ import com.google.common.collect.Maps; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.dataset.PartitionDescriptor; import org.apache.gobblin.util.PathUtils; import static org.mockito.Mockito.mock; @@ -56,6 +57,10 @@ public class CopyableFileTest { "checksum".getBytes(), PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps .<String, String>newHashMap(), ""); + DatasetDescriptor dataset = new DatasetDescriptor("hive", "db.table"); + PartitionDescriptor descriptor = new PartitionDescriptor("datepartition=2018/09/05", dataset); + copyableFile.setDestinationData(descriptor); + String s = CopyEntity.serialize(copyableFile); CopyEntity de = CopyEntity.deserialize(s); @@ -111,11 +116,11 @@ public class CopyableFileTest { CopyableFile copyableFile = new CopyableFile(origin, new Path(destinationPath), null, null, null, PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, String>newHashMap(), ""); copyableFile.setFsDatasets(originFs, targetFs); - DatasetDescriptor source = copyableFile.getSourceDataset(); + DatasetDescriptor source = (DatasetDescriptor) copyableFile.getSourceData(); Assert.assertEquals(source.getName(), "/data/databases/source"); Assert.assertEquals(source.getPlatform(), "hdfs"); Assert.assertEquals(source.getMetadata().get("fsUri"), originFsUri); - DatasetDescriptor destination = copyableFile.getDestinationDataset(); + DatasetDescriptor destination = (DatasetDescriptor) copyableFile.getDestinationData(); Assert.assertEquals(destination.getName(), "/data/databases/destination"); Assert.assertEquals(destination.getPlatform(), "file"); Assert.assertEquals(destination.getMetadata().get("fsUri"), targetFsUri); @@ -127,11 +132,11 @@ public class CopyableFileTest { copyableFile = new CopyableFile(origin, new Path(destinationPath), null, null, null, PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, String>newHashMap(), ""); copyableFile.setFsDatasets(originFs, targetFs); - source = copyableFile.getSourceDataset(); + source = (DatasetDescriptor) copyableFile.getSourceData(); Assert.assertEquals(source.getName(), "/data/databases/source/profile"); Assert.assertEquals(source.getPlatform(), "hdfs"); Assert.assertEquals(source.getMetadata().get("fsUri"), originFsUri); - destination = copyableFile.getDestinationDataset(); + destination = (DatasetDescriptor) copyableFile.getDestinationData(); Assert.assertEquals(destination.getName(), "/data/databases/destination/profile"); Assert.assertEquals(destination.getPlatform(), "file"); Assert.assertEquals(destination.getMetadata().get("fsUri"), targetFsUri);
