Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 78da23b11 -> 749b5bd6a


[GOBBLIN-564] Implement partition descriptor

Closes #2427 from zxcware/dd2


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/749b5bd6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/749b5bd6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/749b5bd6

Branch: refs/heads/master
Commit: 749b5bd6a1a8a2091bdffab809c1b13d0701b3f3
Parents: 78da23b
Author: zhchen <[email protected]>
Authored: Thu Aug 30 11:44:00 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Thu Aug 30 11:44:00 2018 -0700

----------------------------------------------------------------------
 .../gobblin/dataset/DatasetDescriptor.java      |  40 ++++---
 .../apache/gobblin/dataset/DatasetResolver.java |  21 +++-
 .../gobblin/dataset/DatasetResolverFactory.java |   7 +-
 .../org/apache/gobblin/dataset/Descriptor.java  | 108 +++++++++++++++++++
 .../gobblin/dataset/DescriptorResolver.java     |  35 ++++++
 .../dataset/DescriptorResolverFactory.java      |  31 ++++++
 .../gobblin/dataset/NoopDatasetResolver.java    |   5 +
 .../gobblin/dataset/PartitionDescriptor.java    |  59 ++++++++++
 .../gobblin/dataset/DatasetResolverTest.java    |  66 ++++++++++++
 .../apache/gobblin/dataset/DescriptorTest.java  |  61 +++++++++++
 .../dataset/ConvertibleHiveDatasetTest.java     |   7 +-
 .../event/lineage/LineageEventBuilder.java      |  35 +++---
 .../metrics/event/lineage/LineageInfo.java      |  30 ++++--
 .../metrics/event/lineage/LineageEventTest.java |  39 ++++++-
 .../salesforce/SalesforceSourceTest.java        |   4 +-
 15 files changed, 494 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java 
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java
index 5b41862..d2c260b 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java
@@ -17,21 +17,18 @@
 
 package org.apache.gobblin.dataset;
 
-
 import java.util.Map;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 
 
 /**
- * A {@link DatasetDescriptor} identifies and provides metadata to describe a 
dataset
+ * A {@link Descriptor} identifies and provides metadata to describe a dataset
  */
-@RequiredArgsConstructor
-public final class DatasetDescriptor {
+public class DatasetDescriptor extends Descriptor {
   private static final String PLATFORM_KEY = "platform";
   private static final String NAME_KEY = "name";
 
@@ -40,20 +37,24 @@ public final class DatasetDescriptor {
    */
   @Getter
   private final String platform;
-  /**
-   * name of the dataset
-   */
-  @Getter
-  private final String name;
 
   /**
    * metadata about the dataset
    */
   private final Map<String, String> metadata = Maps.newHashMap();
 
+  public DatasetDescriptor(String platform, String name) {
+    super(name);
+    this.platform = platform;
+  }
+
+  /**
+   * @deprecated use {@link #copy()}
+   */
+  @Deprecated
   public DatasetDescriptor(DatasetDescriptor copy) {
+    super(copy.getName());
     platform = copy.getPlatform();
-    name = copy.getName();
     metadata.putAll(copy.getMetadata());
   }
 
@@ -63,17 +64,25 @@ public final class DatasetDescriptor {
         .build();
   }
 
+  @Override
+  public DatasetDescriptor copy() {
+    return new DatasetDescriptor(this);
+  }
+
   public void addMetadata(String key, String value) {
     metadata.put(key, value);
   }
 
   /**
    * Serialize to a string map
+   *
+   * @deprecated use {@link Descriptor#serialize(Descriptor)}
    */
+  @Deprecated
   public Map<String, String> toDataMap() {
     Map<String, String> map = Maps.newHashMap();
     map.put(PLATFORM_KEY, platform);
-    map.put(NAME_KEY, name);
+    map.put(NAME_KEY, getName());
     map.putAll(metadata);
     return map;
   }
@@ -88,20 +97,23 @@ public final class DatasetDescriptor {
     }
 
     DatasetDescriptor that = (DatasetDescriptor) o;
-    return platform.equals(that.platform) && name.equals(that.name) && 
metadata.equals(that.metadata);
+    return platform.equals(that.platform) && getName().equals(that.getName()) 
&& metadata.equals(that.metadata);
   }
 
   @Override
   public int hashCode() {
     int result = platform.hashCode();
-    result = 31 * result + name.hashCode();
+    result = 31 * result + getName().hashCode();
     result = 31 * result + metadata.hashCode();
     return result;
   }
 
   /**
    * Deserialize a {@link DatasetDescriptor} from a string map
+   *
+   * @deprecated use {@link Descriptor#deserialize(String)}
    */
+  @Deprecated
   public static DatasetDescriptor fromDataMap(Map<String, String> dataMap) {
     DatasetDescriptor descriptor = new 
DatasetDescriptor(dataMap.get(PLATFORM_KEY), dataMap.get(NAME_KEY));
     dataMap.forEach((key, value) -> {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolver.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolver.java 
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolver.java
index 0e28169..7f38cb4 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolver.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolver.java
@@ -22,8 +22,11 @@ import org.apache.gobblin.configuration.State;
 
 /**
  * A {@link DatasetResolver} resolves job specific dataset
+ *
+ * @deprecated use the more general {@link DescriptorResolver}
  */
-public interface DatasetResolver {
+@Deprecated
+public interface DatasetResolver extends DescriptorResolver {
   /**
    * Given raw Gobblin dataset, resolve job specific dataset
    *
@@ -32,4 +35,20 @@ public interface DatasetResolver {
    * @return resolved dataset for the job
    */
   DatasetDescriptor resolve(DatasetDescriptor raw, State state);
+
+  @Override
+  default Descriptor resolve(Descriptor raw, State state) {
+    DatasetDescriptor rawDataset;
+
+    if (raw instanceof DatasetDescriptor) {
+      rawDataset = (DatasetDescriptor) raw;
+    } else if (raw instanceof PartitionDescriptor) {
+      rawDataset = ((PartitionDescriptor) raw).getDataset();
+    } else {
+      // type not supported
+      return null;
+    }
+
+    return resolve(rawDataset, state);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java
index ac1cfe9..95d7793 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java
@@ -19,10 +19,15 @@ package org.apache.gobblin.dataset;
 
 import com.typesafe.config.Config;
 
+
 /**
  * A factory that creates an instance of {@link DatasetResolver}
+ *
+ * @deprecated use {@link DescriptorResolverFactory} as {@link 
DatasetResolver} is deprecated
+ * with {@link DescriptorResolver}
  */
-public interface DatasetResolverFactory {
+@Deprecated
+public interface DatasetResolverFactory extends DescriptorResolverFactory {
   /**
    * Create a {@link DatasetResolver} instance
    */

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-api/src/main/java/org/apache/gobblin/dataset/Descriptor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/Descriptor.java 
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/Descriptor.java
new file mode 100644
index 0000000..2e3daa5
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/Descriptor.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset;
+
+import java.lang.reflect.Type;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+
+/**
+ * A descriptor is a simplified representation of a resource, which could be a 
dataset, dataset partition, file, etc.
+ * It is a digest or abstract, which contains identification properties of the 
actual resource object, such as ID, name
+ * primary keys, version, etc
+ *
+ * <p>
+ *   The class provides {@link #serialize(Descriptor)} and {@link 
#deserialize(String)} util methods pair to send
+ *   a descriptor object over the wire
+ * </p>
+ *
+ * <p>
+ *   When the original object has complicated inner structure and there is a 
requirement to send it over the wire,
+ *   it's a time to define a corresponding {@link Descriptor} becomes. In this 
case, the {@link Descriptor} can just
+ *   have minimal information enough to construct the original object on the 
other side of the wire
+ * </p>
+ *
+ * <p>
+ *   When the cost to instantiate the complete original object is high, for 
example, network calls are required, but
+ *   the use cases are limited to the identification properties, define a 
corresponding {@link Descriptor} becomes
+ *   handy
+ * </p>
+ */
+@RequiredArgsConstructor
+public class Descriptor {
+
+  /** Use gson for ser/de */
+  private static final Gson GSON = new Gson();
+
+  /** Name of the resource */
+  @Getter
+  private final String name;
+
+  @Override
+  public String toString() {
+    return GSON.toJson(this);
+  }
+
+  public Descriptor copy() {
+    return new Descriptor(name);
+  }
+
+  /**
+   * A helper class for ser/de of a {@link Descriptor}
+   */
+  @RequiredArgsConstructor
+  private static class Wrap {
+    /** The actual class name of the {@link Descriptor} */
+    private final String clazz;
+    /** A json representation of the {@link Descriptor}*/
+    private final JsonObject data;
+  }
+
+  /**
+   * Serialize any {@link Descriptor} object to a string
+   */
+  public static String serialize(Descriptor descriptor) {
+    if (descriptor == null) {
+      return GSON.toJson(null);
+    }
+    JsonObject data = GSON.toJsonTree(descriptor).getAsJsonObject();
+    return GSON.toJson(new Wrap(descriptor.getClass().getName(), data));
+  }
+
+  /**
+   * Deserialize a string, which results from {@link #serialize(Descriptor)}, 
into the original
+   * {@link Descriptor} object
+   */
+  public static Descriptor deserialize(String serialized) {
+    Wrap wrap = GSON.fromJson(serialized, Wrap.class);
+    if (wrap == null) {
+      return null;
+    }
+
+    try {
+      return GSON.fromJson(wrap.data, (Type) Class.forName(wrap.clazz));
+    } catch (ClassNotFoundException e) {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-api/src/main/java/org/apache/gobblin/dataset/DescriptorResolver.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DescriptorResolver.java 
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DescriptorResolver.java
new file mode 100644
index 0000000..3951d65
--- /dev/null
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DescriptorResolver.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset;
+
+import org.apache.gobblin.configuration.State;
+
+
+/**
+ * A resolver transforms an existing {@link Descriptor} to a new one
+ */
+public interface DescriptorResolver {
+  /**
+   * Given raw Gobblin descriptor, resolve a job specific descriptor
+   *
+   * @param raw the original descriptor
+   * @param state configuration that helps resolve job specific descriptor
+   * @return resolved descriptor for the job or {@code null} if failed to 
resolve
+   */
+  Descriptor resolve(Descriptor raw, State state);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-api/src/main/java/org/apache/gobblin/dataset/DescriptorResolverFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DescriptorResolverFactory.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DescriptorResolverFactory.java
new file mode 100644
index 0000000..ade308d
--- /dev/null
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DescriptorResolverFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset;
+
+import com.typesafe.config.Config;
+
+
+/**
+ * Factory to create a {@link DescriptorResolver} instance
+ */
+public interface DescriptorResolverFactory {
+  /**
+   * @param config configurations only about {@link DescriptorResolver}
+   */
+  DescriptorResolver createResolver(Config config);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java 
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java
index e678d84..a163b5f 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java
@@ -33,4 +33,9 @@ public class NoopDatasetResolver implements DatasetResolver {
   public DatasetDescriptor resolve(DatasetDescriptor raw, State state) {
     return raw;
   }
+
+  @Override
+  public Descriptor resolve(Descriptor raw, State state) {
+    return raw;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionDescriptor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionDescriptor.java 
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionDescriptor.java
new file mode 100644
index 0000000..f0b4dcf
--- /dev/null
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionDescriptor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset;
+
+import lombok.Getter;
+
+
+/**
+ * A {@link Descriptor} to identifies a partition of a dataset
+ */
+public class PartitionDescriptor extends Descriptor {
+  @Getter
+  private final DatasetDescriptor dataset;
+
+  public PartitionDescriptor(String name, DatasetDescriptor dataset) {
+    super(name);
+    this.dataset = dataset;
+  }
+
+  @Override
+  public PartitionDescriptor copy() {
+    return new PartitionDescriptor(getName(), dataset);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    PartitionDescriptor that = (PartitionDescriptor) o;
+    return dataset.equals(that.dataset) && getName().equals(that.getName());
+  }
+
+  @Override
+  public int hashCode() {
+    int result = dataset.hashCode();
+    result = 31 * result + getName().hashCode();
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-api/src/test/java/org/apache/gobblin/dataset/DatasetResolverTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/test/java/org/apache/gobblin/dataset/DatasetResolverTest.java 
b/gobblin-api/src/test/java/org/apache/gobblin/dataset/DatasetResolverTest.java
new file mode 100644
index 0000000..29c1405
--- /dev/null
+++ 
b/gobblin-api/src/test/java/org/apache/gobblin/dataset/DatasetResolverTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.State;
+
+
+public class DatasetResolverTest {
+
+  @Test
+  public void testAsDescriptorResolver() {
+    DescriptorResolver resolver = new TestDatasetResolver();
+    State state = new State();
+
+    // Test dataset descriptor
+    DatasetDescriptor dataset = new DatasetDescriptor("hdfs", 
"/data/tracking/PageViewEvent");
+    Descriptor descriptor = resolver.resolve(dataset, state);
+    
Assert.assertTrue(descriptor.getClass().isAssignableFrom(DatasetDescriptor.class));
+    Assert.assertEquals(descriptor.getName(), 
TestDatasetResolver.DATASET_NAME);
+
+    // Test partition descriptor
+    String partitionName = "hourly/2018/08/14/18";
+    PartitionDescriptor partition = new PartitionDescriptor(partitionName, 
dataset);
+    descriptor = resolver.resolve(partition, state);
+    
Assert.assertTrue(descriptor.getClass().isAssignableFrom(DatasetDescriptor.class));
+    Assert.assertEquals(descriptor.getName(), 
TestDatasetResolver.DATASET_NAME);
+
+    // Test unsupported descriptor
+    Assert.assertEquals(resolver.resolve(new MockDescriptor("test"), state), 
null);
+  }
+
+  private static class TestDatasetResolver implements DatasetResolver {
+    static final String DATASET_NAME = "TEST";
+
+    @Override
+    public DatasetDescriptor resolve(DatasetDescriptor raw, State state) {
+      DatasetDescriptor descriptor = new DatasetDescriptor(raw.getPlatform(), 
DATASET_NAME);
+      raw.getMetadata().forEach(descriptor::addMetadata);
+      return descriptor;
+    }
+  }
+
+  private static class MockDescriptor extends Descriptor {
+    public MockDescriptor(String name) {
+      super(name);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java 
b/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java
new file mode 100644
index 0000000..6ac875b
--- /dev/null
+++ b/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class DescriptorTest {
+
+  @Test
+  public void testDatasetDescriptor() {
+    DatasetDescriptor dataset = new DatasetDescriptor("hdfs", 
"/data/tracking/PageViewEvent");
+    dataset.addMetadata("fsUri", "hdfs://test.com:2018");
+
+    DatasetDescriptor copy = dataset.copy();
+    Assert.assertEquals(copy.getName(), dataset.getName());
+    Assert.assertEquals(copy.getPlatform(), dataset.getPlatform());
+    Assert.assertEquals(copy.getMetadata(), dataset.getMetadata());
+    Assert.assertEquals(dataset, copy);
+    Assert.assertEquals(dataset.hashCode(), copy.hashCode());
+  }
+
+  @Test
+  public void testPartitionDescriptor() {
+    // Test serialization
+    String partitionJson = 
"{\"clazz\":\"org.apache.gobblin.dataset.PartitionDescriptor\",\"data\":{\"dataset\":{\"platform\":\"hdfs\",\"metadata\":{},\"name\":\"/data/tracking/PageViewEvent\"},\"name\":\"hourly/2018/08/14/18\"}}";
+
+    DatasetDescriptor dataset = new DatasetDescriptor("hdfs", 
"/data/tracking/PageViewEvent");
+    String partitionName = "hourly/2018/08/14/18";
+    PartitionDescriptor partition = new PartitionDescriptor(partitionName, 
dataset);
+    Assert.assertEquals(Descriptor.serialize(partition), partitionJson);
+    System.out.println(partitionJson);
+
+    Descriptor partition2 = Descriptor.deserialize(partitionJson);
+    Assert.assertEquals(partition2.getName(), partition.getName());
+    Assert.assertEquals(((PartitionDescriptor)partition2).getDataset(), 
partition.getDataset());
+    Assert.assertEquals(partition, partition2);
+    Assert.assertEquals(partition.hashCode(), partition2.hashCode());
+
+    // Test copy
+    PartitionDescriptor partition3 = partition.copy();
+    Assert.assertEquals(partition3.getDataset(), dataset);
+    Assert.assertEquals(partition3.getName(), partitionName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
index 0048416..eddd852 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
@@ -35,6 +35,7 @@ import 
org.apache.gobblin.data.management.conversion.hive.source.HiveAvroToOrcSo
 import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit;
 import org.apache.gobblin.data.management.conversion.hive.utils.LineageUtils;
 import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.Descriptor;
 import org.apache.gobblin.dataset.HiveToHdfsDatasetResolver;
 import org.apache.gobblin.dataset.HiveToHdfsDatasetResolverFactory;
 import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
@@ -103,7 +104,7 @@ public class ConvertibleHiveDatasetTest {
     // Assert that source is correct for lineage event
     Assert.assertTrue(props.containsKey("gobblin.event.lineage.source"));
     DatasetDescriptor sourceDD =
-        GSON.fromJson(props.getProperty("gobblin.event.lineage.source"), 
DatasetDescriptor.class);
+        (DatasetDescriptor) 
Descriptor.deserialize(props.getProperty("gobblin.event.lineage.source"));
     Assert.assertEquals(sourceDD.getPlatform(), "file");
     Assert.assertEquals(sourceDD.getName(), "/tmp/test");
     
Assert.assertEquals(sourceDD.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE),
 "db1.tb1");
@@ -111,7 +112,7 @@ public class ConvertibleHiveDatasetTest {
     // Assert that first dest is correct for lineage event
     
Assert.assertTrue(props.containsKey("gobblin.event.lineage.branch.1.destination"));
     DatasetDescriptor destDD1 =
-        
GSON.fromJson(props.getProperty("gobblin.event.lineage.branch.1.destination"), 
DatasetDescriptor.class);
+        (DatasetDescriptor) 
Descriptor.deserialize(props.getProperty("gobblin.event.lineage.branch.1.destination"));
     Assert.assertEquals(destDD1.getPlatform(), "file");
     Assert.assertEquals(destDD1.getName(), 
"/tmp/data_nestedOrc/db1/tb1/final");
     
Assert.assertEquals(destDD1.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE),
@@ -120,7 +121,7 @@ public class ConvertibleHiveDatasetTest {
     // Assert that second dest is correct for lineage event
     
Assert.assertTrue(props.containsKey("gobblin.event.lineage.branch.2.destination"));
     DatasetDescriptor destDD2 =
-        
GSON.fromJson(props.getProperty("gobblin.event.lineage.branch.2.destination"), 
DatasetDescriptor.class);
+        (DatasetDescriptor) 
Descriptor.deserialize(props.getProperty("gobblin.event.lineage.branch.2.destination"));
     Assert.assertEquals(destDD2.getPlatform(), "file");
     Assert.assertEquals(destDD2.getName(), 
"/tmp/data_flattenedOrc/db1/tb1/final");
     
Assert.assertEquals(destDD2.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE),

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
----------------------------------------------------------------------
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
index febf0ed..c920cc3 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.Descriptor;
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.event.GobblinEventBuilder;
 
@@ -52,9 +53,9 @@ public final class LineageEventBuilder extends 
GobblinEventBuilder {
   private static final Gson GSON = new Gson();
 
   @Getter @Setter
-  private DatasetDescriptor source;
+  private Descriptor source;
   @Getter @Setter
-  private DatasetDescriptor destination;
+  private Descriptor destination;
 
   public LineageEventBuilder(String name) {
     super(name, LIENAGE_EVENT_NAMESPACE);
@@ -63,9 +64,10 @@ public final class LineageEventBuilder extends 
GobblinEventBuilder {
 
   @Override
   public GobblinTrackingEvent build() {
-    source.toDataMap().forEach((key, value) -> metadata.put(getKey(SOURCE, 
key), value));
-    destination.toDataMap().forEach((key, value) -> 
metadata.put(getKey(DESTINATION, key), value));
-    return new GobblinTrackingEvent(0L, namespace, name, metadata);
+    Map<String, String> dataMap = Maps.newHashMap(metadata);
+    dataMap.put(SOURCE, Descriptor.serialize(source));
+    dataMap.put(DESTINATION, Descriptor.serialize(destination));
+    return new GobblinTrackingEvent(0L, namespace, name, dataMap);
   }
 
   @Override
@@ -121,23 +123,20 @@ public final class LineageEventBuilder extends 
GobblinEventBuilder {
     Map<String, String> metadata = event.getMetadata();
     LineageEventBuilder lineageEvent = new 
LineageEventBuilder(event.getName());
 
-    String sourcePrefix = getKey(SOURCE, "");
-    Map<String, String> sourceDataMap = Maps.newHashMap();
-    String destinationPrefix = getKey(DESTINATION, "");
-    Map<String, String> destinationDataMap = Maps.newHashMap();
-
     metadata.forEach((key, value) -> {
-      if (key.startsWith(sourcePrefix)) {
-        sourceDataMap.put(key.substring(sourcePrefix.length()), value);
-      } else if (key.startsWith(destinationPrefix)) {
-        destinationDataMap.put(key.substring(destinationPrefix.length()), 
value);
-      } else {
-        lineageEvent.addMetadata(key, value);
+      switch (key) {
+        case SOURCE:
+          lineageEvent.setSource(Descriptor.deserialize(value));
+          break;
+        case DESTINATION:
+          lineageEvent.setDestination(Descriptor.deserialize(value));
+          break;
+        default:
+          lineageEvent.addMetadata(key, value);
+          break;
       }
     });
 
-    lineageEvent.setSource(DatasetDescriptor.fromDataMap(sourceDataMap));
-    
lineageEvent.setDestination(DatasetDescriptor.fromDataMap(destinationDataMap));
     return lineageEvent;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
----------------------------------------------------------------------
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
index 2cf8250..0311df7 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
@@ -42,6 +42,8 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.DatasetDescriptor;
 import org.apache.gobblin.dataset.DatasetResolver;
 import org.apache.gobblin.dataset.DatasetResolverFactory;
+import org.apache.gobblin.dataset.Descriptor;
+import org.apache.gobblin.dataset.DescriptorResolver;
 import org.apache.gobblin.dataset.NoopDatasetResolver;
 import org.apache.gobblin.metrics.broker.LineageInfoFactory;
 import org.apache.gobblin.util.ConfigUtils;
@@ -86,7 +88,7 @@ public final class LineageInfo {
           .put(DATASET_RESOLVER_FACTORY, NoopDatasetResolver.FACTORY)
           .build());
 
-  private final DatasetResolver resolver;
+  private final DescriptorResolver resolver;
 
   public LineageInfo(Config config) {
     resolver = getResolver(config.withFallback(FALLBACK));
@@ -103,14 +105,14 @@ public final class LineageInfo {
    * @param state state about a {@link 
org.apache.gobblin.source.workunit.WorkUnit}
    *
    */
-  public void setSource(DatasetDescriptor source, State state) {
-    DatasetDescriptor descriptor = resolver.resolve(source, state);
+  public void setSource(Descriptor source, State state) {
+    Descriptor descriptor = resolver.resolve(source, state);
     if (descriptor == null) {
       return;
     }
 
     state.setProp(getKey(NAME_KEY), descriptor.getName());
-    state.setProp(getKey(LineageEventBuilder.SOURCE), GSON.toJson(descriptor));
+    state.setProp(getKey(LineageEventBuilder.SOURCE), 
Descriptor.serialize(descriptor));
   }
 
   /**
@@ -122,19 +124,19 @@ public final class LineageInfo {
    *   the method is implemented to be threadsafe
    * </p>
    */
-  public void putDestination(DatasetDescriptor destination, int branchId, 
State state) {
+  public void putDestination(Descriptor destination, int branchId, State 
state) {
     if (!hasLineageInfo(state)) {
       log.warn("State has no lineage info but branch " + branchId + " puts a 
destination: " + GSON.toJson(destination));
       return;
     }
     log.debug(String.format("Put destination %s for branch %d", 
GSON.toJson(destination), branchId));
     synchronized (state.getProp(getKey(NAME_KEY))) {
-      DatasetDescriptor descriptor = resolver.resolve(destination, state);
+      Descriptor descriptor = resolver.resolve(destination, state);
       if (descriptor == null) {
         return;
       }
 
-      state.setProp(getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION), 
GSON.toJson(descriptor));
+      state.setProp(getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION), 
Descriptor.serialize(descriptor));
     }
   }
 
@@ -161,10 +163,14 @@ public final class LineageInfo {
    */
   static Map<String, LineageEventBuilder> load(State state) {
     String name = state.getProp(getKey(NAME_KEY));
-    DatasetDescriptor source = 
GSON.fromJson(state.getProp(getKey(LineageEventBuilder.SOURCE)), 
DatasetDescriptor.class);
+    Descriptor source = 
Descriptor.deserialize(state.getProp(getKey(LineageEventBuilder.SOURCE)));
 
     String branchedPrefix = getKey(BRANCH, "");
     Map<String, LineageEventBuilder> events = Maps.newHashMap();
+    if (source == null) {
+      return events;
+    }
+
     for (Map.Entry<Object, Object> entry : state.getProperties().entrySet()) {
       String key = entry.getKey().toString();
       if (!key.startsWith(branchedPrefix)) {
@@ -177,12 +183,12 @@ public final class LineageInfo {
       LineageEventBuilder event = events.get(branchId);
       if (event == null) {
         event = new LineageEventBuilder(name);
-        event.setSource(new DatasetDescriptor(source));
+        event.setSource(source.copy());
         events.put(parts[0], event);
       }
       switch (parts[1]) {
         case LineageEventBuilder.DESTINATION:
-          DatasetDescriptor destination = 
GSON.fromJson(entry.getValue().toString(), DatasetDescriptor.class);
+          Descriptor destination = 
Descriptor.deserialize(entry.getValue().toString());
           event.setDestination(destination);
           break;
         default:
@@ -237,8 +243,10 @@ public final class LineageInfo {
    * Get the configured {@link DatasetResolver} from {@link State}
    */
   public static DatasetResolver getResolver(Config config) {
+    // ConfigException.Missing will throw if DATASET_RESOLVER_FACTORY is absent
     String resolverFactory = config.getString(DATASET_RESOLVER_FACTORY);
-    if (resolverFactory.equals(NoopDatasetResolver.FACTORY)) {
+
+    if (resolverFactory.toUpperCase().equals(NoopDatasetResolver.FACTORY)) {
       return NoopDatasetResolver.INSTANCE;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
index 4352e7f..8ca6d5e 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
@@ -29,6 +29,9 @@ import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.DatasetConstants;
 import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.Descriptor;
+import org.apache.gobblin.dataset.PartitionDescriptor;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.event.GobblinEventBuilder;
 
 import org.testng.Assert;
@@ -42,6 +45,7 @@ import com.typesafe.config.ConfigFactory;
  * Test for loading linage events from state
  */
 public class LineageEventTest {
+
   @Test
   public void testEvent() {
     final String topic = "testTopic";
@@ -54,7 +58,7 @@ public class LineageEventTest {
     LineageInfo lineageInfo = getLineageInfo();
     DatasetDescriptor source = new DatasetDescriptor(kafka, topic);
     lineageInfo.setSource(source, state0);
-    DatasetDescriptor destination00 = new DatasetDescriptor(hdfs, 
"/data/dbchanges");
+    DatasetDescriptor destination00 = new DatasetDescriptor(hdfs, 
"/data/tracking");
     destination00.addMetadata(branch, "0");
     lineageInfo.putDestination(destination00, 0, state0);
     DatasetDescriptor destination01 = new DatasetDescriptor(mysql, 
"kafka.testTopic");
@@ -107,10 +111,37 @@ public class LineageEventTest {
     verify(getLineageEvent(eventsList, 1, "hive"), topic, source, 
destination11);
   }
 
+  @Test
+  public void testEventForPartitionedDataset() {
+    final String topic = "testTopic";
+    final String kafka = "kafka";
+    final String hdfs = "hdfs";
+    final String path = "/data/tracking/PageViewEvent";
+    final String partitionName = "hourly/2018/08/15/15";
+
+    State state = new State();
+    LineageInfo lineageInfo = getLineageInfo();
+    DatasetDescriptor source = new DatasetDescriptor(kafka, topic);
+    lineageInfo.setSource(source, state);
+    DatasetDescriptor destinationDataset = new DatasetDescriptor(hdfs, path);
+    PartitionDescriptor destination = new PartitionDescriptor(partitionName, 
destinationDataset);
+    lineageInfo.putDestination(destination, 0, state);
+
+    Map<String, LineageEventBuilder> events = LineageInfo.load(state);
+    LineageEventBuilder event = events.get("0");
+    verify(event, topic, source, destination);
+
+    // Verify gobblin tracking event
+    GobblinTrackingEvent trackingEvent = event.build();
+    Assert.assertEquals(LineageEventBuilder.isLineageEvent(trackingEvent), 
true);
+    Assert.assertEquals(LineageEventBuilder.fromEvent(trackingEvent), event);
+  }
+
   private LineageEventBuilder getLineageEvent(Collection<LineageEventBuilder> 
events, int branchId, String destinationPlatform) {
     for (LineageEventBuilder event : events) {
-      if (event.getDestination().getPlatform().equals(destinationPlatform) &&
-          
event.getDestination().getMetadata().get(DatasetConstants.BRANCH).equals(String.valueOf(branchId)))
 {
+      DatasetDescriptor descriptor = (DatasetDescriptor) 
event.getDestination();
+      if (descriptor.getPlatform().equals(destinationPlatform) &&
+          
descriptor.getMetadata().get(DatasetConstants.BRANCH).equals(String.valueOf(branchId)))
 {
         return event;
       }
     }
@@ -132,7 +163,7 @@ public class LineageEventTest {
     return obj2;
   }
 
-  private void verify(LineageEventBuilder event, String name, 
DatasetDescriptor source, DatasetDescriptor destination) {
+  private void verify(LineageEventBuilder event, String name, Descriptor 
source, Descriptor destination) {
     Assert.assertEquals(event.getName(), name);
     Assert.assertEquals(event.getNamespace(), 
LineageEventBuilder.LIENAGE_EVENT_NAMESPACE);
     
Assert.assertEquals(event.getMetadata().get(GobblinEventBuilder.EVENT_TYPE), 
LineageEventBuilder.LINEAGE_EVENT_TYPE);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java
 
b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java
index 5e9fbba..0b0e06d 100644
--- 
a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java
+++ 
b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java
@@ -21,6 +21,7 @@ import java.util.List;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.Descriptor;
 import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.source.extractor.extract.QueryBasedSource;
 import org.apache.gobblin.source.extractor.partition.Partitioner;
@@ -49,8 +50,7 @@ public class SalesforceSourceTest {
     Assert.assertEquals(workUnits.size(), 1);
 
     DatasetDescriptor sourceDataset = new DatasetDescriptor("salesforce", 
"contacts");
-    Gson gson = new Gson();
-    Assert.assertEquals(gson.toJson(sourceDataset), 
workUnits.get(0).getProp("gobblin.event.lineage.source"));
+    Assert.assertEquals(Descriptor.serialize(sourceDataset), 
workUnits.get(0).getProp("gobblin.event.lineage.source"));
     
Assert.assertEquals(workUnits.get(0).getProp("gobblin.event.lineage.name"), 
"contacts");
   }
 

Reply via email to