This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f2283c0  [GOBBLIN-1339] Add cluster name to dataset descriptor
f2283c0 is described below

commit f2283c0f8fbff0a5305aee54884359b2cba5ac65
Author: aprokofiev <[email protected]>
AuthorDate: Tue Jan 12 12:20:19 2021 -0800

    [GOBBLIN-1339] Add cluster name to dataset descriptor
    
    [GOBBLIN-1339] Add cluster name to dataset
    descriptor
    
    We use dataset descriptors to track lineage.
    Previously, it
    only included the platform name (hive,hdfs) and
    path of the
    dataset. As a result, we could not differentiate
    the data copy
    between multiple production clusters, as the
    dataset descriptors
    were the same for them. We add an optional cluster
    name to
    address that.
    
    This change will be used for data copy audit
    system.
    
    Hive and file-based copy code is updated to
    include cluster names.
    
    Use full storage system url instead of just well-
    known name
    
    Closes #3178 from aplex/dataset-cluster
---
 .../apache/gobblin/dataset/DatasetDescriptor.java  |  87 ++++++++++------
 .../org/apache/gobblin/dataset/DescriptorTest.java |  25 +++++
 .../gobblin/publisher/BaseDataPublisher.java       |  43 ++++----
 .../gobblin/source/PartitionedFileSourceBase.java  |  25 +++--
 .../extractor/filebased/FileBasedSource.java       |  26 +++--
 .../org/apache/gobblin/writer/FsDataWriter.java    |  29 +++---
 .../gobblin/publisher/BaseDataPublisherTest.java   |  22 ++---
 .../extractor/filebased/FileBasedSourceTest.java   |  18 ++--
 .../gobblin/data/management/copy/CopyableFile.java |  29 +++---
 .../management/copy/hive/HiveCopyEntityHelper.java | 110 ++++++++++++---------
 .../management/copy/hive/HivePartitionFileSet.java |  24 ++---
 .../copy/hive/HiveCopyEntityHelperTest.java        |  26 +++--
 .../source/extractor/extract/jdbc/MysqlSource.java |  23 +++--
 13 files changed, 267 insertions(+), 220 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java 
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java
index d2c260b..2470226 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java
@@ -17,11 +17,12 @@
 
 package org.apache.gobblin.dataset;
 
-import java.util.Map;
-
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
-
+import java.net.URI;
+import java.util.Map;
+import java.util.Objects;
+import javax.annotation.Nullable;
 import lombok.Getter;
 
 
@@ -31,6 +32,7 @@ import lombok.Getter;
 public class DatasetDescriptor extends Descriptor {
   private static final String PLATFORM_KEY = "platform";
   private static final String NAME_KEY = "name";
+  private static final String STORAGE_URL = "storageUrl";
 
   /**
    * which platform the dataset is stored, for example: local, hdfs, oracle, 
mysql, kafka
@@ -39,12 +41,35 @@ public class DatasetDescriptor extends Descriptor {
   private final String platform;
 
   /**
+   * URL of system that stores the dataset. It does not include the dataset 
name.
+   *
+   * Examples: hdfs://storage.corp.com, https://api.service.test, 
mysql://mysql-db.test:3306, thrift://hive-server:4567
+   *
+   * Using URI instead of URL class to allow for schemas that are not known by 
URL class
+   * (https://stackoverflow.com/q/2406518/258737)
+   */
+  @Getter
+  @Nullable
+  private final URI storageUrl;
+
+  /**
    * metadata about the dataset
    */
   private final Map<String, String> metadata = Maps.newHashMap();
 
+  /**
+   * @deprecated use {@link #DatasetDescriptor(String, URI, String)} to 
provide storage system url.
+   */
+  @Deprecated
   public DatasetDescriptor(String platform, String name) {
     super(name);
+    this.storageUrl = null;
+    this.platform = platform;
+  }
+
+  public DatasetDescriptor(String platform, URI storageUrl, String name) {
+    super(name);
+    this.storageUrl = storageUrl;
     this.platform = platform;
   }
 
@@ -55,13 +80,35 @@ public class DatasetDescriptor extends Descriptor {
   public DatasetDescriptor(DatasetDescriptor copy) {
     super(copy.getName());
     platform = copy.getPlatform();
+    storageUrl = copy.getStorageUrl();
     metadata.putAll(copy.getMetadata());
   }
 
+  /**
+   * Deserialize a {@link DatasetDescriptor} from a string map
+   *
+   * @deprecated use {@link Descriptor#deserialize(String)}
+   */
+  @Deprecated
+  public static DatasetDescriptor fromDataMap(Map<String, String> dataMap) {
+    String storageUrlString = dataMap.getOrDefault(STORAGE_URL, null);
+    URI storageUrl = null;
+    if (storageUrlString != null) {
+      storageUrl = URI.create(storageUrlString);
+    }
+
+    DatasetDescriptor descriptor =
+        new DatasetDescriptor(dataMap.get(PLATFORM_KEY), storageUrl, 
dataMap.get(NAME_KEY));
+    dataMap.forEach((key, value) -> {
+      if (!key.equals(PLATFORM_KEY) && !key.equals(NAME_KEY) && 
!key.equals(STORAGE_URL)) {
+        descriptor.addMetadata(key, value);
+      }
+    });
+    return descriptor;
+  }
+
   public ImmutableMap<String, String> getMetadata() {
-    return ImmutableMap.<String, String>builder()
-        .putAll(metadata)
-        .build();
+    return ImmutableMap.<String, String>builder().putAll(metadata).build();
   }
 
   @Override
@@ -83,6 +130,9 @@ public class DatasetDescriptor extends Descriptor {
     Map<String, String> map = Maps.newHashMap();
     map.put(PLATFORM_KEY, platform);
     map.put(NAME_KEY, getName());
+    if (getStorageUrl() != null) {
+      map.put(STORAGE_URL, getStorageUrl().toString());
+    }
     map.putAll(metadata);
     return map;
   }
@@ -95,32 +145,13 @@ public class DatasetDescriptor extends Descriptor {
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-
     DatasetDescriptor that = (DatasetDescriptor) o;
-    return platform.equals(that.platform) && getName().equals(that.getName()) 
&& metadata.equals(that.metadata);
+    return platform.equals(that.platform) && Objects.equals(storageUrl, 
that.storageUrl)
+        && metadata.equals(that.metadata);
   }
 
   @Override
   public int hashCode() {
-    int result = platform.hashCode();
-    result = 31 * result + getName().hashCode();
-    result = 31 * result + metadata.hashCode();
-    return result;
-  }
-
-  /**
-   * Deserialize a {@link DatasetDescriptor} from a string map
-   *
-   * @deprecated use {@link Descriptor#deserialize(String)}
-   */
-  @Deprecated
-  public static DatasetDescriptor fromDataMap(Map<String, String> dataMap) {
-    DatasetDescriptor descriptor = new 
DatasetDescriptor(dataMap.get(PLATFORM_KEY), dataMap.get(NAME_KEY));
-    dataMap.forEach((key, value) -> {
-      if (!key.equals(PLATFORM_KEY) && !key.equals(NAME_KEY)) {
-        descriptor.addMetadata(key, value);
-      }
-    });
-    return descriptor;
+    return Objects.hash(platform, storageUrl, metadata);
   }
 }
diff --git 
a/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java 
b/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java
index d7b0409..d06a838 100644
--- a/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java
+++ b/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.dataset;
 
+import java.net.URI;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -27,6 +28,7 @@ public class DescriptorTest {
   public void testDatasetDescriptor() {
     DatasetDescriptor dataset = new DatasetDescriptor("hdfs", 
"/data/tracking/PageViewEvent");
     dataset.addMetadata("fsUri", "hdfs://test.com:2018");
+    Assert.assertNull(dataset.getStorageUrl());
 
     DatasetDescriptor copy = dataset.copy();
     Assert.assertEquals(copy.getName(), dataset.getName());
@@ -34,6 +36,29 @@ public class DescriptorTest {
     Assert.assertEquals(copy.getMetadata(), dataset.getMetadata());
     Assert.assertEquals(dataset, copy);
     Assert.assertEquals(dataset.hashCode(), copy.hashCode());
+
+    //noinspection deprecation
+    Assert.assertEquals(dataset, 
DatasetDescriptor.fromDataMap(copy.toDataMap()));
+  }
+
+  @Test
+  public void testDatasetDescriptorWithCluster() {
+    DatasetDescriptor dataset =
+        new DatasetDescriptor("hdfs", URI.create("hdfs://hadoop.test"), 
"/data/tracking/PageViewEvent");
+    dataset.addMetadata("fsUri", "hdfs://test.com:2018");
+
+    
Assert.assertEquals(dataset.getStorageUrl().toString(),"hdfs://hadoop.test");
+
+    DatasetDescriptor copy = dataset.copy();
+    Assert.assertEquals(copy.getName(), dataset.getName());
+    Assert.assertEquals(copy.getPlatform(), dataset.getPlatform());
+    Assert.assertEquals(copy.getMetadata(), dataset.getMetadata());
+    Assert.assertEquals(copy.getStorageUrl(), dataset.getStorageUrl());
+    Assert.assertEquals(dataset, copy);
+    Assert.assertEquals(dataset.hashCode(), copy.hashCode());
+
+    //noinspection deprecation
+    Assert.assertEquals(dataset, 
DatasetDescriptor.fromDataMap(copy.toDataMap()));
   }
 
   @Test
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
index f105031..000ae19 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
@@ -17,6 +17,16 @@
 
 package org.apache.gobblin.publisher;
 
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigRenderOptions;
 import java.io.IOException;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
@@ -30,29 +40,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.io.Closer;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigRenderOptions;
-
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
@@ -74,6 +62,15 @@ import org.apache.gobblin.writer.FsDataWriter;
 import org.apache.gobblin.writer.FsWriterMetrics;
 import org.apache.gobblin.writer.PartitionIdentifier;
 import org.apache.gobblin.writer.PartitionedDataWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.gobblin.util.retry.RetryerFactory.*;
 
@@ -325,7 +322,7 @@ public class BaseDataPublisher extends 
SingleTaskDataPublisher {
   protected DatasetDescriptor createDestinationDescriptor(WorkUnitState state, 
int branchId) {
     Path publisherOutputDir = getPublisherOutputDir(state, branchId);
     FileSystem fs = this.publisherFileSystemByBranches.get(branchId);
-    DatasetDescriptor destination = new DatasetDescriptor(fs.getScheme(), 
publisherOutputDir.toString());
+    DatasetDescriptor destination = new DatasetDescriptor(fs.getScheme(), 
fs.getUri(), publisherOutputDir.toString());
     destination.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
     destination.addMetadata(DatasetConstants.BRANCH, String.valueOf(branchId));
     return destination;
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
index 83da81d..ece7db0 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
@@ -17,24 +17,14 @@
 
 package org.apache.gobblin.source;
 
+import com.google.common.base.Throwables;
 import java.io.IOException;
+import java.net.URI;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.joda.time.Duration;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Throwables;
-
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
@@ -53,6 +43,13 @@ import 
org.apache.gobblin.source.workunit.MultiWorkUnitWeightedQueue;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.DatePartitionType;
 import org.apache.gobblin.writer.partitioner.TimeBasedAvroWriterPartitioner;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.Duration;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -246,7 +243,9 @@ public abstract class PartitionedFileSourceBase<SCHEMA, 
DATA> extends FileBasedS
     String platform = 
state.getProp(ConfigurationKeys.SOURCE_FILEBASED_PLATFORM, 
DatasetConstants.PLATFORM_HDFS);
     Path dataDir = new 
Path(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY));
     String dataset = Path.getPathWithoutSchemeAndAuthority(dataDir).toString();
-    DatasetDescriptor datasetDescriptor = new DatasetDescriptor(platform, 
dataset);
+    URI fileSystemUrl =
+        URI.create(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, 
ConfigurationKeys.LOCAL_FS_URI));
+    DatasetDescriptor datasetDescriptor = new DatasetDescriptor(platform, 
fileSystemUrl, dataset);
 
     String partitionName = 
workUnit.getProp(ConfigurationKeys.WORK_UNIT_DATE_PARTITION_NAME);
     PartitionDescriptor descriptor = new PartitionDescriptor(partitionName, 
datasetDescriptor);
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
index 973a7b0..3bc6bfc 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
@@ -17,28 +17,20 @@
 
 package org.apache.gobblin.source.extractor.filebased;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Strings;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
@@ -51,6 +43,10 @@ import org.apache.gobblin.source.workunit.Extract;
 import org.apache.gobblin.source.workunit.Extract.TableType;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
 
 
 /**
@@ -251,7 +247,9 @@ public abstract class FileBasedSource<S, D> extends 
AbstractSource<S, D> {
     String platform = 
state.getProp(ConfigurationKeys.SOURCE_FILEBASED_PLATFORM, 
DatasetConstants.PLATFORM_HDFS);
     Path dataDir = new 
Path(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY));
     String dataset = Path.getPathWithoutSchemeAndAuthority(dataDir).toString();
-    DatasetDescriptor source = new DatasetDescriptor(platform, dataset);
+    URI fileSystemUrl =
+        URI.create(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, 
ConfigurationKeys.LOCAL_FS_URI));
+    DatasetDescriptor source = new DatasetDescriptor(platform, fileSystemUrl, 
dataset);
     lineageInfo.get().setSource(source, workUnit);
   }
 
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
index 96d6a69..ad67041 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
@@ -17,30 +17,18 @@
 
 package org.apache.gobblin.writer;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closer;
-
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
 import org.apache.gobblin.codec.StreamCodec;
 import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.dataset.DatasetConstants;
 import org.apache.gobblin.dataset.DatasetDescriptor;
 import org.apache.gobblin.dataset.Descriptor;
 import org.apache.gobblin.dataset.PartitionDescriptor;
@@ -51,6 +39,14 @@ import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.JobConfigurationUtils;
 import org.apache.gobblin.util.WriterUtils;
 import org.apache.gobblin.util.recordcount.IngestionRecordCountProvider;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -168,7 +164,8 @@ public abstract class FsDataWriter<D> implements 
DataWriter<D>, FinalState, Meta
   public Descriptor getDataDescriptor() {
     // Dataset is resulted from WriterUtils.getWriterOutputDir(properties, 
this.numBranches, this.branchId)
     // The writer dataset might not be same as the published dataset
-    DatasetDescriptor datasetDescriptor = new 
DatasetDescriptor(fs.getScheme(), outputFile.getParent().toString());
+    DatasetDescriptor datasetDescriptor =
+        new DatasetDescriptor(fs.getScheme(), fs.getUri(), 
outputFile.getParent().toString());
 
     if (partitionKey == null) {
       return datasetDescriptor;
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
index 469b72f..c8d52c0 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
@@ -16,12 +16,19 @@
  */
 package org.apache.gobblin.publisher;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+import com.typesafe.config.ConfigFactory;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.Type;
+import java.net.URI;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -31,19 +38,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.io.Files;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.reflect.TypeToken;
-import com.typesafe.config.ConfigFactory;
-
 import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
@@ -65,6 +61,8 @@ import org.apache.gobblin.util.io.GsonInterfaceAdapter;
 import org.apache.gobblin.writer.FsDataWriter;
 import org.apache.gobblin.writer.FsWriterMetrics;
 import org.apache.gobblin.writer.PartitionIdentifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
 
 
 /**
@@ -615,7 +613,7 @@ public class BaseDataPublisherTest {
     // Find the partition lineage and assert
     for (int i = 0; i < numBranches; i++) {
       String outputPath = 
String.format("/data/output/branch%d/namespace/table", i);
-      DatasetDescriptor destinationDataset = new DatasetDescriptor("file", 
outputPath);
+      DatasetDescriptor destinationDataset = new DatasetDescriptor("file", 
URI.create("file:///"), outputPath);
       destinationDataset.addMetadata("fsUri", "file:///");
       destinationDataset.addMetadata("branch", "" + i);
 
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
index af752b8..3c931b4 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
@@ -17,6 +17,12 @@
 
 package org.apache.gobblin.source.extractor.filebased;
 
+import com.google.common.collect.Sets;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Set;
 import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
@@ -37,7 +43,6 @@ import 
org.apache.gobblin.source.extractor.hadoop.AvroFileSource;
 import org.apache.gobblin.source.workunit.Extract;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
-
 import org.apache.hadoop.fs.Path;
 import org.junit.Assert;
 import org.testng.annotations.AfterClass;
@@ -45,13 +50,6 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 import org.testng.collections.Lists;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-
-import com.google.common.collect.Sets;
-import com.typesafe.config.ConfigFactory;
-
 
 @Test
 public class FileBasedSourceTest {
@@ -126,7 +124,7 @@ public class FileBasedSourceTest {
     // Avro file based source
     AvroFileSource fileSource = new AvroFileSource();
     List<WorkUnit> workUnits = fileSource.getWorkunits(sourceState);
-    DatasetDescriptor datasetDescriptor = new DatasetDescriptor("hdfs", 
dataset);
+    DatasetDescriptor datasetDescriptor = new DatasetDescriptor("hdfs", 
URI.create("file:///"), dataset);
     for (WorkUnit workUnit : workUnits) {
       Assert.assertEquals(workUnit.getProp(SOURCE_LINEAGE_KEY), 
Descriptor.toJson(datasetDescriptor));
     }
@@ -136,7 +134,7 @@ public class FileBasedSourceTest {
     sourceState.setProp(ConfigurationKeys.SOURCE_FILEBASED_PLATFORM, 
DatasetConstants.PLATFORM_FILE);
     DatePartitionedJsonFileSource partitionedFileSource = new 
DatePartitionedJsonFileSource();
     workUnits = partitionedFileSource.getWorkunits(sourceState);
-    datasetDescriptor = new DatasetDescriptor("file", dataset);
+    datasetDescriptor = new DatasetDescriptor("file", URI.create("file:///"), 
dataset);
 
     Set<String> partitions = Sets.newHashSet("2017-12", "2018-01");
     for (WorkUnit workUnit : workUnits) {
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index 24c9e3a..dbe9920 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -17,34 +17,30 @@
 
 package org.apache.gobblin.data.management.copy;
 
-import org.apache.gobblin.data.management.partition.File;
-import org.apache.gobblin.data.management.copy.PreserveAttributes.Option;
-import org.apache.gobblin.dataset.DatasetConstants;
-import org.apache.gobblin.dataset.DatasetDescriptor;
-import org.apache.gobblin.dataset.Descriptor;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.PathUtils;
-import org.apache.gobblin.util.guid.Guid;
-
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-
 import lombok.AccessLevel;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
-
+import org.apache.gobblin.data.management.copy.PreserveAttributes.Option;
+import org.apache.gobblin.data.management.partition.File;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.Descriptor;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.guid.Guid;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-
 
 /**
  * Abstraction for a file to copy from {@link #origin} to {@link 
#destination}. {@link CopyableFile}s should be
@@ -139,13 +135,14 @@ public class CopyableFile extends CopyEntity implements 
File {
 
     Path fullSourcePath = 
Path.getPathWithoutSchemeAndAuthority(origin.getPath());
     String sourceDatasetName = isDir ? fullSourcePath.toString() : 
fullSourcePath.getParent().toString();
-    DatasetDescriptor sourceDataset = new 
DatasetDescriptor(originFs.getScheme(), sourceDatasetName);
+    DatasetDescriptor sourceDataset = new 
DatasetDescriptor(originFs.getScheme(), originFs.getUri(), sourceDatasetName);
     sourceDataset.addMetadata(DatasetConstants.FS_URI, 
originFs.getUri().toString());
     sourceData = sourceDataset;
 
     Path fullDestinationPath = 
Path.getPathWithoutSchemeAndAuthority(destination);
     String destinationDatasetName = isDir ? fullDestinationPath.toString() : 
fullDestinationPath.getParent().toString();
-    DatasetDescriptor destinationDataset = new 
DatasetDescriptor(targetFs.getScheme(), destinationDatasetName);
+    DatasetDescriptor destinationDataset = new 
DatasetDescriptor(targetFs.getScheme(), targetFs.getUri(),
+            destinationDatasetName);
     destinationDataset.addMetadata(DatasetConstants.FS_URI, 
targetFs.getUri().toString());
     destinationData = destinationDataset;
   }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
index 444d193..ad8d167 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
@@ -17,35 +17,7 @@
 
 package org.apache.gobblin.data.management.copy.hive;
 
-import java.io.IOException;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-
-import org.apache.gobblin.hive.HiveConstants;
-import org.apache.gobblin.util.filesystem.ModTimeDataFileVersionStrategy;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InvalidInputException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.thrift.TException;
-
 import com.google.common.annotations.VisibleForTesting;
-import com.typesafe.config.ConfigFactory;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
@@ -56,12 +28,25 @@ import com.google.common.collect.Maps;
 import com.google.common.io.Closer;
 import com.google.gson.Gson;
 import com.typesafe.config.Config;
-
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import lombok.Builder;
+import lombok.Data;
+import lombok.Getter;
+import lombok.Singular;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gobblin.commit.CommitStep;
 import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.dataset.DatasetConstants;
-import org.apache.gobblin.dataset.DatasetDescriptor;
-import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.data.management.copy.CopyConfiguration;
 import org.apache.gobblin.data.management.copy.CopyEntity;
 import org.apache.gobblin.data.management.copy.CopyableFile;
@@ -69,6 +54,9 @@ import 
org.apache.gobblin.data.management.copy.OwnerAndPermission;
 import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
 import 
org.apache.gobblin.data.management.copy.hive.avro.HiveAvroCopyEntityHelper;
 import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.hive.HiveConstants;
 import org.apache.gobblin.hive.HiveMetastoreClientPool;
 import org.apache.gobblin.hive.HiveRegProps;
 import org.apache.gobblin.hive.HiveRegisterStep;
@@ -79,17 +67,24 @@ import org.apache.gobblin.hive.spec.HiveSpec;
 import org.apache.gobblin.hive.spec.SimpleHiveSpec;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.MultiTimingEvent;
+import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.commit.DeleteFileCommitStep;
+import org.apache.gobblin.util.filesystem.ModTimeDataFileVersionStrategy;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.apache.gobblin.util.request_allocation.PushDownRequestor;
-
-import lombok.Builder;
-import lombok.Data;
-import lombok.Getter;
-import lombok.Singular;
-import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InvalidInputException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TException;
 
 
 /**
@@ -107,6 +102,9 @@ public class HiveCopyEntityHelper {
       HiveDatasetFinder.HIVE_DATASET_PREFIX + 
".unmanaged.data.conflict.policy";
   public static final String DEFAULT_UNMANAGED_DATA_POLICY = 
UnmanagedDataPolicy.ABORT.name();
 
+  public static final String SOURCE_METASTORE_URI_KEY =
+      HiveDatasetFinder.HIVE_DATASET_PREFIX + ".copy.target.metastore.uri";
+
   /** Target metastore URI */
   public static final String TARGET_METASTORE_URI_KEY =
       HiveDatasetFinder.HIVE_DATASET_PREFIX + ".copy.target.metastore.uri";
@@ -177,7 +175,8 @@ public class HiveCopyEntityHelper {
   private final HiveRegProps hiveRegProps;
   private Optional<Table> existingTargetTable;
   private final Table targetTable;
-  private final Optional<String> targetURI;
+  private final Optional<String> sourceMetastoreURI;
+  private final Optional<String> targetMetastoreURI;
   private final ExistingEntityPolicy existingEntityPolicy;
   private final UnmanagedDataPolicy unmanagedDataPolicy;
   private final Optional<String> partitionFilter;
@@ -270,8 +269,11 @@ public class HiveCopyEntityHelper {
       this.targetPathHelper = new HiveTargetPathHelper(this.dataset);
       this.enforceFileSizeMatch = configuration.isEnforceFileLengthMatch();
       this.hiveRegProps = new HiveRegProps(new 
State(this.dataset.getProperties()));
-      this.targetURI = 
Optional.fromNullable(this.dataset.getProperties().getProperty(TARGET_METASTORE_URI_KEY));
-      this.targetClientPool = 
HiveMetastoreClientPool.get(this.dataset.getProperties(), this.targetURI);
+      this.sourceMetastoreURI =
+          
Optional.fromNullable(this.dataset.getProperties().getProperty(HiveDatasetFinder.HIVE_METASTORE_URI_KEY));
+      this.targetMetastoreURI =
+          
Optional.fromNullable(this.dataset.getProperties().getProperty(TARGET_METASTORE_URI_KEY));
+      this.targetClientPool = 
HiveMetastoreClientPool.get(this.dataset.getProperties(), 
this.targetMetastoreURI);
       this.targetDatabase = 
Optional.fromNullable(this.dataset.getProperties().getProperty(TARGET_DATABASE_KEY))
           .or(this.dataset.table.getDbName());
       this.existingEntityPolicy = 
ExistingEntityPolicy.valueOf(this.dataset.getProperties()
@@ -360,7 +362,7 @@ public class HiveCopyEntityHelper {
             
.withTable(HiveMetaStoreUtils.getHiveTable(this.targetTable.getTTable())).build();
 
         CommitStep tableRegistrationStep =
-            new HiveRegisterStep(this.targetURI, tableHiveSpec, 
this.hiveRegProps);
+            new HiveRegisterStep(this.targetMetastoreURI, tableHiveSpec, 
this.hiveRegProps);
         this.tableRegistrationStep = Optional.of(tableRegistrationStep);
 
         if (this.existingTargetTable.isPresent() && 
this.existingTargetTable.get().isPartitioned()) {
@@ -533,7 +535,7 @@ public class HiveCopyEntityHelper {
     }
 
     PartitionDeregisterStep deregister =
-        new PartitionDeregisterStep(table.getTTable(), 
partition.getTPartition(), this.targetURI, this.hiveRegProps);
+        new PartitionDeregisterStep(table.getTTable(), 
partition.getTPartition(), this.targetMetastoreURI, this.hiveRegProps);
     copyEntities.add(new PostPublishStep(fileSet, Maps.<String, String> 
newHashMap(), deregister, stepPriority++));
     return stepPriority;
   }
@@ -571,7 +573,7 @@ public class HiveCopyEntityHelper {
     }
 
     TableDeregisterStep deregister =
-        new TableDeregisterStep(table.getTTable(), this.getTargetURI(), 
this.getHiveRegProps());
+        new TableDeregisterStep(table.getTTable(), 
this.getTargetMetastoreURI(), this.getHiveRegProps());
     copyEntities.add(new PostPublishStep(fileSet, Maps.<String, String> 
newHashMap(), deregister, stepPriority++));
     return stepPriority;
   }
@@ -817,14 +819,28 @@ public class HiveCopyEntityHelper {
 
   DatasetDescriptor getSourceDataset() {
     String sourceTable = dataset.getTable().getDbName() + "." + 
dataset.getTable().getTableName();
-    DatasetDescriptor sourceDataset = new 
DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, sourceTable);
+
+    URI hiveMetastoreURI = null;
+    if (sourceMetastoreURI.isPresent()) {
+      hiveMetastoreURI = URI.create(sourceMetastoreURI.get());
+    }
+
+    DatasetDescriptor sourceDataset =
+        new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, 
hiveMetastoreURI, sourceTable);
     sourceDataset.addMetadata(DatasetConstants.FS_URI, 
dataset.getFs().getUri().toString());
     return sourceDataset;
   }
 
   DatasetDescriptor getDestinationDataset() {
     String destinationTable = this.getTargetDatabase() + "." + 
this.getTargetTable();
-    DatasetDescriptor destinationDataset = new 
DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, destinationTable);
+
+    URI hiveMetastoreURI = null;
+    if (targetMetastoreURI.isPresent()) {
+      hiveMetastoreURI = URI.create(targetMetastoreURI.get());
+    }
+
+    DatasetDescriptor destinationDataset =
+        new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, 
hiveMetastoreURI, destinationTable);
     destinationDataset.addMetadata(DatasetConstants.FS_URI, 
this.getTargetFs().getUri().toString());
     return destinationDataset;
   }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
index efa4e4c..98f30da 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
@@ -17,20 +17,16 @@
 
 package org.apache.gobblin.data.management.copy.hive;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.Closer;
-
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.data.management.copy.CopyEntity;
 import org.apache.gobblin.data.management.copy.CopyableFile;
 import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
@@ -45,9 +41,9 @@ import org.apache.gobblin.hive.spec.SimpleHiveSpec;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.MultiTimingEvent;
 import org.apache.gobblin.util.commit.DeleteFileCommitStep;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
 
 
 /**
@@ -124,7 +120,7 @@ public class HivePartitionFileSet extends HiveFileSet {
       HiveSpec partitionHiveSpec = new SimpleHiveSpec.Builder<>(targetPath)
           
.withTable(HiveMetaStoreUtils.getHiveTable(hiveCopyEntityHelper.getTargetTable().getTTable()))
           
.withPartition(Optional.of(HiveMetaStoreUtils.getHivePartition(targetPartition.getTPartition()))).build();
-      HiveRegisterStep register = new 
HiveRegisterStep(hiveCopyEntityHelper.getTargetURI(), partitionHiveSpec,
+      HiveRegisterStep register = new 
HiveRegisterStep(hiveCopyEntityHelper.getTargetMetastoreURI(), 
partitionHiveSpec,
           hiveCopyEntityHelper.getHiveRegProps());
       copyEntities.add(new PostPublishStep(fileSet, Maps.<String, String> 
newHashMap(), register, stepPriority++));
 
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
index 30b5e17..fc35bc1 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.gobblin.data.management.copy.hive;
 
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -24,7 +27,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-
+import lombok.AllArgsConstructor;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
+import 
org.apache.gobblin.data.management.copy.hive.HiveCopyEntityHelper.DeregisterFileDeleteMethod;
+import org.apache.gobblin.hive.HiveRegProps;
+import org.apache.gobblin.metrics.event.MultiTimingEvent;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
@@ -37,19 +46,6 @@ import org.mockito.stubbing.Answer;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import lombok.AllArgsConstructor;
-
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.data.management.copy.CopyEntity;
-import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
-import 
org.apache.gobblin.data.management.copy.hive.HiveCopyEntityHelper.DeregisterFileDeleteMethod;
-import org.apache.gobblin.hive.HiveRegProps;
-import org.apache.gobblin.metrics.event.MultiTimingEvent;
-
 
 public class HiveCopyEntityHelperTest {
 
@@ -313,7 +309,7 @@ public class HiveCopyEntityHelperTest {
 
     HiveCopyEntityHelper helper = Mockito.mock(HiveCopyEntityHelper.class);
     
Mockito.when(helper.getDeleteMethod()).thenReturn(DeregisterFileDeleteMethod.NO_DELETE);
-    Mockito.when(helper.getTargetURI()).thenReturn(Optional.of("/targetURI"));
+    
Mockito.when(helper.getTargetMetastoreURI()).thenReturn(Optional.of("/targetURI"));
     Mockito.when(helper.getHiveRegProps()).thenReturn(new HiveRegProps(new 
State()));
     Mockito.when(helper.getDataset()).thenReturn(dataset);
 
diff --git 
a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
 
b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
index 0ed291e..ced9be8 100644
--- 
a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
+++ 
b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
@@ -17,24 +17,22 @@
 
 package org.apache.gobblin.source.extractor.extract.jdbc;
 
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import java.io.IOException;
+import java.net.URI;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.dataset.DatasetConstants;
 import org.apache.gobblin.dataset.DatasetDescriptor;
-import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.source.extractor.exception.ExtractPrepareException;
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.source.extractor.extract.QueryBasedSource;
 import org.apache.gobblin.source.jdbc.MysqlExtractor;
 import org.apache.gobblin.source.workunit.WorkUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -61,9 +59,10 @@ public class MysqlSource extends QueryBasedSource<JsonArray, 
JsonElement> {
     String host = sourceState.getProp(ConfigurationKeys.SOURCE_CONN_HOST_NAME);
     String port = sourceState.getProp(ConfigurationKeys.SOURCE_CONN_PORT);
     String database = 
sourceState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_SCHEMA);
-    String connectionUrl = "jdbc:mysql://" + host.trim() + ":" + port + "/" + 
database.trim();
-    DatasetDescriptor source =
-        new DatasetDescriptor(DatasetConstants.PLATFORM_MYSQL, database + "." 
+ entity.getSourceEntityName());
+    String serverUrl = "mysql://" + host.trim() + ":" + port;
+    String connectionUrl = "jdbc:" + serverUrl + "/" + database.trim();
+    DatasetDescriptor source = new 
DatasetDescriptor(DatasetConstants.PLATFORM_MYSQL, URI.create(serverUrl),
+        database + "." + entity.getSourceEntityName());
     source.addMetadata(DatasetConstants.CONNECTION_URL, connectionUrl);
     if (lineageInfo.isPresent()) {
       lineageInfo.get().setSource(source, workUnit);

Reply via email to