This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f2283c0 [GOBBLIN-1339] Add cluster name to dataset descriptor
f2283c0 is described below
commit f2283c0f8fbff0a5305aee54884359b2cba5ac65
Author: aprokofiev <[email protected]>
AuthorDate: Tue Jan 12 12:20:19 2021 -0800
[GOBBLIN-1339] Add cluster name to dataset descriptor
[GOBBLIN-1339] Add cluster name to dataset
descriptor
We use dataset descriptors to track lineage.
Previously, it
only included the platform name (hive,hdfs) and
path of the
dataset. As a result, we could not differentiate
the data copy
between multiple production clusters, as the
dataset descriptors
were the same for them. We add an optional cluster
name to
address that.
This change will be used for data copy audit
system.
Hive and file-based copy code is updated to
include cluster names.
Use full storage system url instead of just well-
known name
Closes #3178 from aplex/dataset-cluster
---
.../apache/gobblin/dataset/DatasetDescriptor.java | 87 ++++++++++------
.../org/apache/gobblin/dataset/DescriptorTest.java | 25 +++++
.../gobblin/publisher/BaseDataPublisher.java | 43 ++++----
.../gobblin/source/PartitionedFileSourceBase.java | 25 +++--
.../extractor/filebased/FileBasedSource.java | 26 +++--
.../org/apache/gobblin/writer/FsDataWriter.java | 29 +++---
.../gobblin/publisher/BaseDataPublisherTest.java | 22 ++---
.../extractor/filebased/FileBasedSourceTest.java | 18 ++--
.../gobblin/data/management/copy/CopyableFile.java | 29 +++---
.../management/copy/hive/HiveCopyEntityHelper.java | 110 ++++++++++++---------
.../management/copy/hive/HivePartitionFileSet.java | 24 ++---
.../copy/hive/HiveCopyEntityHelperTest.java | 26 +++--
.../source/extractor/extract/jdbc/MysqlSource.java | 23 +++--
13 files changed, 267 insertions(+), 220 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java
index d2c260b..2470226 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java
@@ -17,11 +17,12 @@
package org.apache.gobblin.dataset;
-import java.util.Map;
-
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
-
+import java.net.URI;
+import java.util.Map;
+import java.util.Objects;
+import javax.annotation.Nullable;
import lombok.Getter;
@@ -31,6 +32,7 @@ import lombok.Getter;
public class DatasetDescriptor extends Descriptor {
private static final String PLATFORM_KEY = "platform";
private static final String NAME_KEY = "name";
+ private static final String STORAGE_URL = "storageUrl";
/**
* which platform the dataset is stored, for example: local, hdfs, oracle,
mysql, kafka
@@ -39,12 +41,35 @@ public class DatasetDescriptor extends Descriptor {
private final String platform;
/**
+ * URL of system that stores the dataset. It does not include the dataset
name.
+ *
+ * Examples: hdfs://storage.corp.com, https://api.service.test,
mysql://mysql-db.test:3306, thrift://hive-server:4567
+ *
+ * Using URI instead of URL class to allow for schemas that are not known by
URL class
+ * (https://stackoverflow.com/q/2406518/258737)
+ */
+ @Getter
+ @Nullable
+ private final URI storageUrl;
+
+ /**
* metadata about the dataset
*/
private final Map<String, String> metadata = Maps.newHashMap();
+ /**
+ * @deprecated use {@link #DatasetDescriptor(String, URI, String)} to
provide storage system url.
+ */
+ @Deprecated
public DatasetDescriptor(String platform, String name) {
super(name);
+ this.storageUrl = null;
+ this.platform = platform;
+ }
+
+ public DatasetDescriptor(String platform, URI storageUrl, String name) {
+ super(name);
+ this.storageUrl = storageUrl;
this.platform = platform;
}
@@ -55,13 +80,35 @@ public class DatasetDescriptor extends Descriptor {
public DatasetDescriptor(DatasetDescriptor copy) {
super(copy.getName());
platform = copy.getPlatform();
+ storageUrl = copy.getStorageUrl();
metadata.putAll(copy.getMetadata());
}
+ /**
+ * Deserialize a {@link DatasetDescriptor} from a string map
+ *
+ * @deprecated use {@link Descriptor#deserialize(String)}
+ */
+ @Deprecated
+ public static DatasetDescriptor fromDataMap(Map<String, String> dataMap) {
+ String storageUrlString = dataMap.getOrDefault(STORAGE_URL, null);
+ URI storageUrl = null;
+ if (storageUrlString != null) {
+ storageUrl = URI.create(storageUrlString);
+ }
+
+ DatasetDescriptor descriptor =
+ new DatasetDescriptor(dataMap.get(PLATFORM_KEY), storageUrl,
dataMap.get(NAME_KEY));
+ dataMap.forEach((key, value) -> {
+ if (!key.equals(PLATFORM_KEY) && !key.equals(NAME_KEY) &&
!key.equals(STORAGE_URL)) {
+ descriptor.addMetadata(key, value);
+ }
+ });
+ return descriptor;
+ }
+
public ImmutableMap<String, String> getMetadata() {
- return ImmutableMap.<String, String>builder()
- .putAll(metadata)
- .build();
+ return ImmutableMap.<String, String>builder().putAll(metadata).build();
}
@Override
@@ -83,6 +130,9 @@ public class DatasetDescriptor extends Descriptor {
Map<String, String> map = Maps.newHashMap();
map.put(PLATFORM_KEY, platform);
map.put(NAME_KEY, getName());
+ if (getStorageUrl() != null) {
+ map.put(STORAGE_URL, getStorageUrl().toString());
+ }
map.putAll(metadata);
return map;
}
@@ -95,32 +145,13 @@ public class DatasetDescriptor extends Descriptor {
if (o == null || getClass() != o.getClass()) {
return false;
}
-
DatasetDescriptor that = (DatasetDescriptor) o;
- return platform.equals(that.platform) && getName().equals(that.getName())
&& metadata.equals(that.metadata);
+ return platform.equals(that.platform) && Objects.equals(storageUrl,
that.storageUrl)
+ && metadata.equals(that.metadata);
}
@Override
public int hashCode() {
- int result = platform.hashCode();
- result = 31 * result + getName().hashCode();
- result = 31 * result + metadata.hashCode();
- return result;
- }
-
- /**
- * Deserialize a {@link DatasetDescriptor} from a string map
- *
- * @deprecated use {@link Descriptor#deserialize(String)}
- */
- @Deprecated
- public static DatasetDescriptor fromDataMap(Map<String, String> dataMap) {
- DatasetDescriptor descriptor = new
DatasetDescriptor(dataMap.get(PLATFORM_KEY), dataMap.get(NAME_KEY));
- dataMap.forEach((key, value) -> {
- if (!key.equals(PLATFORM_KEY) && !key.equals(NAME_KEY)) {
- descriptor.addMetadata(key, value);
- }
- });
- return descriptor;
+ return Objects.hash(platform, storageUrl, metadata);
}
}
diff --git
a/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java
b/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java
index d7b0409..d06a838 100644
--- a/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java
+++ b/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.dataset;
+import java.net.URI;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -27,6 +28,7 @@ public class DescriptorTest {
public void testDatasetDescriptor() {
DatasetDescriptor dataset = new DatasetDescriptor("hdfs",
"/data/tracking/PageViewEvent");
dataset.addMetadata("fsUri", "hdfs://test.com:2018");
+ Assert.assertNull(dataset.getStorageUrl());
DatasetDescriptor copy = dataset.copy();
Assert.assertEquals(copy.getName(), dataset.getName());
@@ -34,6 +36,29 @@ public class DescriptorTest {
Assert.assertEquals(copy.getMetadata(), dataset.getMetadata());
Assert.assertEquals(dataset, copy);
Assert.assertEquals(dataset.hashCode(), copy.hashCode());
+
+ //noinspection deprecation
+ Assert.assertEquals(dataset,
DatasetDescriptor.fromDataMap(copy.toDataMap()));
+ }
+
+ @Test
+ public void testDatasetDescriptorWithCluster() {
+ DatasetDescriptor dataset =
+ new DatasetDescriptor("hdfs", URI.create("hdfs://hadoop.test"),
"/data/tracking/PageViewEvent");
+ dataset.addMetadata("fsUri", "hdfs://test.com:2018");
+
+
Assert.assertEquals(dataset.getStorageUrl().toString(),"hdfs://hadoop.test");
+
+ DatasetDescriptor copy = dataset.copy();
+ Assert.assertEquals(copy.getName(), dataset.getName());
+ Assert.assertEquals(copy.getPlatform(), dataset.getPlatform());
+ Assert.assertEquals(copy.getMetadata(), dataset.getMetadata());
+ Assert.assertEquals(copy.getStorageUrl(), dataset.getStorageUrl());
+ Assert.assertEquals(dataset, copy);
+ Assert.assertEquals(dataset.hashCode(), copy.hashCode());
+
+ //noinspection deprecation
+ Assert.assertEquals(dataset,
DatasetDescriptor.fromDataMap(copy.toDataMap()));
}
@Test
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
index f105031..000ae19 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
@@ -17,6 +17,16 @@
package org.apache.gobblin.publisher;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigRenderOptions;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
@@ -30,29 +40,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-
import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.io.Closer;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigRenderOptions;
-
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
@@ -74,6 +62,15 @@ import org.apache.gobblin.writer.FsDataWriter;
import org.apache.gobblin.writer.FsWriterMetrics;
import org.apache.gobblin.writer.PartitionIdentifier;
import org.apache.gobblin.writer.PartitionedDataWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.gobblin.util.retry.RetryerFactory.*;
@@ -325,7 +322,7 @@ public class BaseDataPublisher extends
SingleTaskDataPublisher {
protected DatasetDescriptor createDestinationDescriptor(WorkUnitState state,
int branchId) {
Path publisherOutputDir = getPublisherOutputDir(state, branchId);
FileSystem fs = this.publisherFileSystemByBranches.get(branchId);
- DatasetDescriptor destination = new DatasetDescriptor(fs.getScheme(),
publisherOutputDir.toString());
+ DatasetDescriptor destination = new DatasetDescriptor(fs.getScheme(),
fs.getUri(), publisherOutputDir.toString());
destination.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
destination.addMetadata(DatasetConstants.BRANCH, String.valueOf(branchId));
return destination;
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
index 83da81d..ece7db0 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
@@ -17,24 +17,14 @@
package org.apache.gobblin.source;
+import com.google.common.base.Throwables;
import java.io.IOException;
+import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.joda.time.Duration;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Throwables;
-
import lombok.extern.slf4j.Slf4j;
-
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
@@ -53,6 +43,13 @@ import
org.apache.gobblin.source.workunit.MultiWorkUnitWeightedQueue;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.DatePartitionType;
import org.apache.gobblin.writer.partitioner.TimeBasedAvroWriterPartitioner;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.Duration;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -246,7 +243,9 @@ public abstract class PartitionedFileSourceBase<SCHEMA,
DATA> extends FileBasedS
String platform =
state.getProp(ConfigurationKeys.SOURCE_FILEBASED_PLATFORM,
DatasetConstants.PLATFORM_HDFS);
Path dataDir = new
Path(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY));
String dataset = Path.getPathWithoutSchemeAndAuthority(dataDir).toString();
- DatasetDescriptor datasetDescriptor = new DatasetDescriptor(platform,
dataset);
+ URI fileSystemUrl =
+ URI.create(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI,
ConfigurationKeys.LOCAL_FS_URI));
+ DatasetDescriptor datasetDescriptor = new DatasetDescriptor(platform,
fileSystemUrl, dataset);
String partitionName =
workUnit.getProp(ConfigurationKeys.WORK_UNIT_DATE_PARTITION_NAME);
PartitionDescriptor descriptor = new PartitionDescriptor(partitionName,
datasetDescriptor);
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
index 973a7b0..3bc6bfc 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
@@ -17,28 +17,20 @@
package org.apache.gobblin.source.extractor.filebased;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
-
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Strings;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
@@ -51,6 +43,10 @@ import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.Extract.TableType;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
/**
@@ -251,7 +247,9 @@ public abstract class FileBasedSource<S, D> extends
AbstractSource<S, D> {
String platform =
state.getProp(ConfigurationKeys.SOURCE_FILEBASED_PLATFORM,
DatasetConstants.PLATFORM_HDFS);
Path dataDir = new
Path(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY));
String dataset = Path.getPathWithoutSchemeAndAuthority(dataDir).toString();
- DatasetDescriptor source = new DatasetDescriptor(platform, dataset);
+ URI fileSystemUrl =
+ URI.create(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI,
ConfigurationKeys.LOCAL_FS_URI));
+ DatasetDescriptor source = new DatasetDescriptor(platform, fileSystemUrl,
dataset);
lineageInfo.get().setSource(source, workUnit);
}
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
index 96d6a69..ad67041 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
@@ -17,30 +17,18 @@
package org.apache.gobblin.writer;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
-
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
import org.apache.gobblin.codec.StreamCodec;
import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.dataset.Descriptor;
import org.apache.gobblin.dataset.PartitionDescriptor;
@@ -51,6 +39,14 @@ import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.JobConfigurationUtils;
import org.apache.gobblin.util.WriterUtils;
import org.apache.gobblin.util.recordcount.IngestionRecordCountProvider;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -168,7 +164,8 @@ public abstract class FsDataWriter<D> implements
DataWriter<D>, FinalState, Meta
public Descriptor getDataDescriptor() {
// Dataset is resulted from WriterUtils.getWriterOutputDir(properties,
this.numBranches, this.branchId)
// The writer dataset might not be same as the published dataset
- DatasetDescriptor datasetDescriptor = new
DatasetDescriptor(fs.getScheme(), outputFile.getParent().toString());
+ DatasetDescriptor datasetDescriptor =
+ new DatasetDescriptor(fs.getScheme(), fs.getUri(),
outputFile.getParent().toString());
if (partitionKey == null) {
return datasetDescriptor;
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
index 469b72f..c8d52c0 100644
---
a/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
+++
b/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
@@ -16,12 +16,19 @@
*/
package org.apache.gobblin.publisher;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+import com.typesafe.config.ConfigFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Type;
+import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@@ -31,19 +38,8 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
-
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.io.Files;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.reflect.TypeToken;
-import com.typesafe.config.ConfigFactory;
-
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
@@ -65,6 +61,8 @@ import org.apache.gobblin.util.io.GsonInterfaceAdapter;
import org.apache.gobblin.writer.FsDataWriter;
import org.apache.gobblin.writer.FsWriterMetrics;
import org.apache.gobblin.writer.PartitionIdentifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
/**
@@ -615,7 +613,7 @@ public class BaseDataPublisherTest {
// Find the partition lineage and assert
for (int i = 0; i < numBranches; i++) {
String outputPath =
String.format("/data/output/branch%d/namespace/table", i);
- DatasetDescriptor destinationDataset = new DatasetDescriptor("file",
outputPath);
+ DatasetDescriptor destinationDataset = new DatasetDescriptor("file",
URI.create("file:///"), outputPath);
destinationDataset.addMetadata("fsUri", "file:///");
destinationDataset.addMetadata("branch", "" + i);
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
index af752b8..3c931b4 100644
---
a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
+++
b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
@@ -17,6 +17,12 @@
package org.apache.gobblin.source.extractor.filebased;
+import com.google.common.collect.Sets;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Set;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
@@ -37,7 +43,6 @@ import
org.apache.gobblin.source.extractor.hadoop.AvroFileSource;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
-
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.testng.annotations.AfterClass;
@@ -45,13 +50,6 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.testng.collections.Lists;
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-
-import com.google.common.collect.Sets;
-import com.typesafe.config.ConfigFactory;
-
@Test
public class FileBasedSourceTest {
@@ -126,7 +124,7 @@ public class FileBasedSourceTest {
// Avro file based source
AvroFileSource fileSource = new AvroFileSource();
List<WorkUnit> workUnits = fileSource.getWorkunits(sourceState);
- DatasetDescriptor datasetDescriptor = new DatasetDescriptor("hdfs",
dataset);
+ DatasetDescriptor datasetDescriptor = new DatasetDescriptor("hdfs",
URI.create("file:///"), dataset);
for (WorkUnit workUnit : workUnits) {
Assert.assertEquals(workUnit.getProp(SOURCE_LINEAGE_KEY),
Descriptor.toJson(datasetDescriptor));
}
@@ -136,7 +134,7 @@ public class FileBasedSourceTest {
sourceState.setProp(ConfigurationKeys.SOURCE_FILEBASED_PLATFORM,
DatasetConstants.PLATFORM_FILE);
DatePartitionedJsonFileSource partitionedFileSource = new
DatePartitionedJsonFileSource();
workUnits = partitionedFileSource.getWorkunits(sourceState);
- datasetDescriptor = new DatasetDescriptor("file", dataset);
+ datasetDescriptor = new DatasetDescriptor("file", URI.create("file:///"),
dataset);
Set<String> partitions = Sets.newHashSet("2017-12", "2018-01");
for (WorkUnit workUnit : workUnits) {
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index 24c9e3a..dbe9920 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -17,34 +17,30 @@
package org.apache.gobblin.data.management.copy;
-import org.apache.gobblin.data.management.partition.File;
-import org.apache.gobblin.data.management.copy.PreserveAttributes.Option;
-import org.apache.gobblin.dataset.DatasetConstants;
-import org.apache.gobblin.dataset.DatasetDescriptor;
-import org.apache.gobblin.dataset.Descriptor;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.PathUtils;
-import org.apache.gobblin.util.guid.Guid;
-
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import java.util.Map;
-
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
-
+import org.apache.gobblin.data.management.copy.PreserveAttributes.Option;
+import org.apache.gobblin.data.management.partition.File;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.Descriptor;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.guid.Guid;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-
/**
* Abstraction for a file to copy from {@link #origin} to {@link
#destination}. {@link CopyableFile}s should be
@@ -139,13 +135,14 @@ public class CopyableFile extends CopyEntity implements
File {
Path fullSourcePath =
Path.getPathWithoutSchemeAndAuthority(origin.getPath());
String sourceDatasetName = isDir ? fullSourcePath.toString() :
fullSourcePath.getParent().toString();
- DatasetDescriptor sourceDataset = new
DatasetDescriptor(originFs.getScheme(), sourceDatasetName);
+ DatasetDescriptor sourceDataset = new
DatasetDescriptor(originFs.getScheme(), originFs.getUri(), sourceDatasetName);
sourceDataset.addMetadata(DatasetConstants.FS_URI,
originFs.getUri().toString());
sourceData = sourceDataset;
Path fullDestinationPath =
Path.getPathWithoutSchemeAndAuthority(destination);
String destinationDatasetName = isDir ? fullDestinationPath.toString() :
fullDestinationPath.getParent().toString();
- DatasetDescriptor destinationDataset = new
DatasetDescriptor(targetFs.getScheme(), destinationDatasetName);
+ DatasetDescriptor destinationDataset = new
DatasetDescriptor(targetFs.getScheme(), targetFs.getUri(),
+ destinationDatasetName);
destinationDataset.addMetadata(DatasetConstants.FS_URI,
targetFs.getUri().toString());
destinationData = destinationDataset;
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
index 444d193..ad8d167 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
@@ -17,35 +17,7 @@
package org.apache.gobblin.data.management.copy.hive;
-import java.io.IOException;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-
-import org.apache.gobblin.hive.HiveConstants;
-import org.apache.gobblin.util.filesystem.ModTimeDataFileVersionStrategy;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InvalidInputException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.thrift.TException;
-
import com.google.common.annotations.VisibleForTesting;
-import com.typesafe.config.ConfigFactory;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
@@ -56,12 +28,25 @@ import com.google.common.collect.Maps;
import com.google.common.io.Closer;
import com.google.gson.Gson;
import com.typesafe.config.Config;
-
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import lombok.Builder;
+import lombok.Data;
+import lombok.Getter;
+import lombok.Singular;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.dataset.DatasetConstants;
-import org.apache.gobblin.dataset.DatasetDescriptor;
-import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopyableFile;
@@ -69,6 +54,9 @@ import
org.apache.gobblin.data.management.copy.OwnerAndPermission;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import
org.apache.gobblin.data.management.copy.hive.avro.HiveAvroCopyEntityHelper;
import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.hive.HiveConstants;
import org.apache.gobblin.hive.HiveMetastoreClientPool;
import org.apache.gobblin.hive.HiveRegProps;
import org.apache.gobblin.hive.HiveRegisterStep;
@@ -79,17 +67,24 @@ import org.apache.gobblin.hive.spec.HiveSpec;
import org.apache.gobblin.hive.spec.SimpleHiveSpec;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.MultiTimingEvent;
+import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.commit.DeleteFileCommitStep;
+import org.apache.gobblin.util.filesystem.ModTimeDataFileVersionStrategy;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.util.request_allocation.PushDownRequestor;
-
-import lombok.Builder;
-import lombok.Data;
-import lombok.Getter;
-import lombok.Singular;
-import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InvalidInputException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TException;
/**
@@ -107,6 +102,9 @@ public class HiveCopyEntityHelper {
HiveDatasetFinder.HIVE_DATASET_PREFIX +
".unmanaged.data.conflict.policy";
public static final String DEFAULT_UNMANAGED_DATA_POLICY =
UnmanagedDataPolicy.ABORT.name();
+ public static final String SOURCE_METASTORE_URI_KEY =
+ HiveDatasetFinder.HIVE_DATASET_PREFIX + ".copy.target.metastore.uri";
+
/** Target metastore URI */
public static final String TARGET_METASTORE_URI_KEY =
HiveDatasetFinder.HIVE_DATASET_PREFIX + ".copy.target.metastore.uri";
@@ -177,7 +175,8 @@ public class HiveCopyEntityHelper {
private final HiveRegProps hiveRegProps;
private Optional<Table> existingTargetTable;
private final Table targetTable;
- private final Optional<String> targetURI;
+ private final Optional<String> sourceMetastoreURI;
+ private final Optional<String> targetMetastoreURI;
private final ExistingEntityPolicy existingEntityPolicy;
private final UnmanagedDataPolicy unmanagedDataPolicy;
private final Optional<String> partitionFilter;
@@ -270,8 +269,11 @@ public class HiveCopyEntityHelper {
this.targetPathHelper = new HiveTargetPathHelper(this.dataset);
this.enforceFileSizeMatch = configuration.isEnforceFileLengthMatch();
this.hiveRegProps = new HiveRegProps(new
State(this.dataset.getProperties()));
- this.targetURI =
Optional.fromNullable(this.dataset.getProperties().getProperty(TARGET_METASTORE_URI_KEY));
- this.targetClientPool =
HiveMetastoreClientPool.get(this.dataset.getProperties(), this.targetURI);
+ this.sourceMetastoreURI =
+
Optional.fromNullable(this.dataset.getProperties().getProperty(HiveDatasetFinder.HIVE_METASTORE_URI_KEY));
+ this.targetMetastoreURI =
+
Optional.fromNullable(this.dataset.getProperties().getProperty(TARGET_METASTORE_URI_KEY));
+ this.targetClientPool =
HiveMetastoreClientPool.get(this.dataset.getProperties(),
this.targetMetastoreURI);
this.targetDatabase =
Optional.fromNullable(this.dataset.getProperties().getProperty(TARGET_DATABASE_KEY))
.or(this.dataset.table.getDbName());
this.existingEntityPolicy =
ExistingEntityPolicy.valueOf(this.dataset.getProperties()
@@ -360,7 +362,7 @@ public class HiveCopyEntityHelper {
.withTable(HiveMetaStoreUtils.getHiveTable(this.targetTable.getTTable())).build();
CommitStep tableRegistrationStep =
- new HiveRegisterStep(this.targetURI, tableHiveSpec,
this.hiveRegProps);
+ new HiveRegisterStep(this.targetMetastoreURI, tableHiveSpec,
this.hiveRegProps);
this.tableRegistrationStep = Optional.of(tableRegistrationStep);
if (this.existingTargetTable.isPresent() &&
this.existingTargetTable.get().isPartitioned()) {
@@ -533,7 +535,7 @@ public class HiveCopyEntityHelper {
}
PartitionDeregisterStep deregister =
- new PartitionDeregisterStep(table.getTTable(),
partition.getTPartition(), this.targetURI, this.hiveRegProps);
+ new PartitionDeregisterStep(table.getTTable(),
partition.getTPartition(), this.targetMetastoreURI, this.hiveRegProps);
copyEntities.add(new PostPublishStep(fileSet, Maps.<String, String>
newHashMap(), deregister, stepPriority++));
return stepPriority;
}
@@ -571,7 +573,7 @@ public class HiveCopyEntityHelper {
}
TableDeregisterStep deregister =
- new TableDeregisterStep(table.getTTable(), this.getTargetURI(),
this.getHiveRegProps());
+ new TableDeregisterStep(table.getTTable(),
this.getTargetMetastoreURI(), this.getHiveRegProps());
copyEntities.add(new PostPublishStep(fileSet, Maps.<String, String>
newHashMap(), deregister, stepPriority++));
return stepPriority;
}
@@ -817,14 +819,28 @@ public class HiveCopyEntityHelper {
DatasetDescriptor getSourceDataset() {
String sourceTable = dataset.getTable().getDbName() + "." +
dataset.getTable().getTableName();
- DatasetDescriptor sourceDataset = new
DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, sourceTable);
+
+ URI hiveMetastoreURI = null;
+ if (sourceMetastoreURI.isPresent()) {
+ hiveMetastoreURI = URI.create(sourceMetastoreURI.get());
+ }
+
+ DatasetDescriptor sourceDataset =
+ new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE,
hiveMetastoreURI, sourceTable);
sourceDataset.addMetadata(DatasetConstants.FS_URI,
dataset.getFs().getUri().toString());
return sourceDataset;
}
DatasetDescriptor getDestinationDataset() {
String destinationTable = this.getTargetDatabase() + "." +
this.getTargetTable();
- DatasetDescriptor destinationDataset = new
DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, destinationTable);
+
+ URI hiveMetastoreURI = null;
+ if (targetMetastoreURI.isPresent()) {
+ hiveMetastoreURI = URI.create(targetMetastoreURI.get());
+ }
+
+ DatasetDescriptor destinationDataset =
+ new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE,
hiveMetastoreURI, destinationTable);
destinationDataset.addMetadata(DatasetConstants.FS_URI,
this.getTargetFs().getUri().toString());
return destinationDataset;
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
index efa4e4c..98f30da 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
@@ -17,20 +17,16 @@
package org.apache.gobblin.data.management.copy.hive;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Closer;
-
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
@@ -45,9 +41,9 @@ import org.apache.gobblin.hive.spec.SimpleHiveSpec;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.MultiTimingEvent;
import org.apache.gobblin.util.commit.DeleteFileCommitStep;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
/**
@@ -124,7 +120,7 @@ public class HivePartitionFileSet extends HiveFileSet {
HiveSpec partitionHiveSpec = new SimpleHiveSpec.Builder<>(targetPath)
.withTable(HiveMetaStoreUtils.getHiveTable(hiveCopyEntityHelper.getTargetTable().getTTable()))
.withPartition(Optional.of(HiveMetaStoreUtils.getHivePartition(targetPartition.getTPartition()))).build();
- HiveRegisterStep register = new
HiveRegisterStep(hiveCopyEntityHelper.getTargetURI(), partitionHiveSpec,
+ HiveRegisterStep register = new
HiveRegisterStep(hiveCopyEntityHelper.getTargetMetastoreURI(),
partitionHiveSpec,
hiveCopyEntityHelper.getHiveRegProps());
copyEntities.add(new PostPublishStep(fileSet, Maps.<String, String>
newHashMap(), register, stepPriority++));
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
index 30b5e17..fc35bc1 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
@@ -17,6 +17,9 @@
package org.apache.gobblin.data.management.copy.hive;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -24,7 +27,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-
+import lombok.AllArgsConstructor;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
+import
org.apache.gobblin.data.management.copy.hive.HiveCopyEntityHelper.DeregisterFileDeleteMethod;
+import org.apache.gobblin.hive.HiveRegProps;
+import org.apache.gobblin.metrics.event.MultiTimingEvent;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
@@ -37,19 +46,6 @@ import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.Test;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import lombok.AllArgsConstructor;
-
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.data.management.copy.CopyEntity;
-import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
-import
org.apache.gobblin.data.management.copy.hive.HiveCopyEntityHelper.DeregisterFileDeleteMethod;
-import org.apache.gobblin.hive.HiveRegProps;
-import org.apache.gobblin.metrics.event.MultiTimingEvent;
-
public class HiveCopyEntityHelperTest {
@@ -313,7 +309,7 @@ public class HiveCopyEntityHelperTest {
HiveCopyEntityHelper helper = Mockito.mock(HiveCopyEntityHelper.class);
Mockito.when(helper.getDeleteMethod()).thenReturn(DeregisterFileDeleteMethod.NO_DELETE);
- Mockito.when(helper.getTargetURI()).thenReturn(Optional.of("/targetURI"));
+
Mockito.when(helper.getTargetMetastoreURI()).thenReturn(Optional.of("/targetURI"));
Mockito.when(helper.getHiveRegProps()).thenReturn(new HiveRegProps(new
State()));
Mockito.when(helper.getDataset()).thenReturn(dataset);
diff --git
a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
index 0ed291e..ced9be8 100644
---
a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
+++
b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
@@ -17,24 +17,22 @@
package org.apache.gobblin.source.extractor.extract.jdbc;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import java.io.IOException;
+import java.net.URI;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.DatasetDescriptor;
-import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.exception.ExtractPrepareException;
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.source.extractor.extract.QueryBasedSource;
import org.apache.gobblin.source.jdbc.MysqlExtractor;
import org.apache.gobblin.source.workunit.WorkUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -61,9 +59,10 @@ public class MysqlSource extends QueryBasedSource<JsonArray,
JsonElement> {
String host = sourceState.getProp(ConfigurationKeys.SOURCE_CONN_HOST_NAME);
String port = sourceState.getProp(ConfigurationKeys.SOURCE_CONN_PORT);
String database =
sourceState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_SCHEMA);
- String connectionUrl = "jdbc:mysql://" + host.trim() + ":" + port + "/" +
database.trim();
- DatasetDescriptor source =
- new DatasetDescriptor(DatasetConstants.PLATFORM_MYSQL, database + "."
+ entity.getSourceEntityName());
+ String serverUrl = "mysql://" + host.trim() + ":" + port;
+ String connectionUrl = "jdbc:" + serverUrl + "/" + database.trim();
+ DatasetDescriptor source = new
DatasetDescriptor(DatasetConstants.PLATFORM_MYSQL, URI.create(serverUrl),
+ database + "." + entity.getSourceEntityName());
source.addMetadata(DatasetConstants.CONNECTION_URL, connectionUrl);
if (lineageInfo.isPresent()) {
lineageInfo.get().setSource(source, workUnit);