Repository: incubator-gobblin Updated Branches: refs/heads/master 78da23b11 -> 749b5bd6a
[GOBBLIN-564] Implement partition descriptor Closes #2427 from zxcware/dd2 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/749b5bd6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/749b5bd6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/749b5bd6 Branch: refs/heads/master Commit: 749b5bd6a1a8a2091bdffab809c1b13d0701b3f3 Parents: 78da23b Author: zhchen <[email protected]> Authored: Thu Aug 30 11:44:00 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Thu Aug 30 11:44:00 2018 -0700 ---------------------------------------------------------------------- .../gobblin/dataset/DatasetDescriptor.java | 40 ++++--- .../apache/gobblin/dataset/DatasetResolver.java | 21 +++- .../gobblin/dataset/DatasetResolverFactory.java | 7 +- .../org/apache/gobblin/dataset/Descriptor.java | 108 +++++++++++++++++++ .../gobblin/dataset/DescriptorResolver.java | 35 ++++++ .../dataset/DescriptorResolverFactory.java | 31 ++++++ .../gobblin/dataset/NoopDatasetResolver.java | 5 + .../gobblin/dataset/PartitionDescriptor.java | 59 ++++++++++ .../gobblin/dataset/DatasetResolverTest.java | 66 ++++++++++++ .../apache/gobblin/dataset/DescriptorTest.java | 61 +++++++++++ .../dataset/ConvertibleHiveDatasetTest.java | 7 +- .../event/lineage/LineageEventBuilder.java | 35 +++--- .../metrics/event/lineage/LineageInfo.java | 30 ++++-- .../metrics/event/lineage/LineageEventTest.java | 39 ++++++- .../salesforce/SalesforceSourceTest.java | 4 +- 15 files changed, 494 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java index 5b41862..d2c260b 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetDescriptor.java @@ -17,21 +17,18 @@ package org.apache.gobblin.dataset; - import java.util.Map; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import lombok.Getter; -import lombok.RequiredArgsConstructor; /** - * A {@link DatasetDescriptor} identifies and provides metadata to describe a dataset + * A {@link Descriptor} identifies and provides metadata to describe a dataset */ -@RequiredArgsConstructor -public final class DatasetDescriptor { +public class DatasetDescriptor extends Descriptor { private static final String PLATFORM_KEY = "platform"; private static final String NAME_KEY = "name"; @@ -40,20 +37,24 @@ public final class DatasetDescriptor { */ @Getter private final String platform; - /** - * name of the dataset - */ - @Getter - private final String name; /** * metadata about the dataset */ private final Map<String, String> metadata = Maps.newHashMap(); + public DatasetDescriptor(String platform, String name) { + super(name); + this.platform = platform; + } + + /** + * @deprecated use {@link #copy()} + */ + @Deprecated public DatasetDescriptor(DatasetDescriptor copy) { + super(copy.getName()); platform = copy.getPlatform(); - name = copy.getName(); metadata.putAll(copy.getMetadata()); } @@ -63,17 +64,25 @@ public final class DatasetDescriptor { .build(); } + @Override + public DatasetDescriptor copy() { + return new DatasetDescriptor(this); + } + public void addMetadata(String key, String value) { metadata.put(key, value); } /** * Serialize to a string map + * + * @deprecated use {@link Descriptor#serialize(Descriptor)} */ + @Deprecated public Map<String, String> toDataMap() { Map<String, String> map = Maps.newHashMap(); map.put(PLATFORM_KEY, platform); - map.put(NAME_KEY, name); + map.put(NAME_KEY, getName()); map.putAll(metadata); return map; } @@ -88,20 +97,23 @@ public final class DatasetDescriptor { } DatasetDescriptor that = (DatasetDescriptor) o; - return platform.equals(that.platform) && name.equals(that.name) && metadata.equals(that.metadata); + return platform.equals(that.platform) && getName().equals(that.getName()) && metadata.equals(that.metadata); } @Override public int hashCode() { int result = platform.hashCode(); - result = 31 * result + name.hashCode(); + result = 31 * result + getName().hashCode(); result = 31 * result + metadata.hashCode(); return result; } /** * Deserialize a {@link DatasetDescriptor} from a string map + * + * @deprecated use {@link Descriptor#deserialize(String)} */ + @Deprecated public static DatasetDescriptor fromDataMap(Map<String, String> dataMap) { DatasetDescriptor descriptor = new DatasetDescriptor(dataMap.get(PLATFORM_KEY), dataMap.get(NAME_KEY)); dataMap.forEach((key, value) -> { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolver.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolver.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolver.java index 0e28169..7f38cb4 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolver.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolver.java @@ -22,8 +22,11 @@ import org.apache.gobblin.configuration.State; /** * A {@link DatasetResolver} resolves job specific dataset + * + * @deprecated use the more general {@link DescriptorResolver} */ -public interface DatasetResolver { +@Deprecated +public interface DatasetResolver extends DescriptorResolver { /** * Given raw Gobblin dataset, resolve job specific dataset * @@ -32,4 +35,20 @@ public interface DatasetResolver { * @return resolved dataset for the job */ DatasetDescriptor resolve(DatasetDescriptor raw, State state); + + @Override + default Descriptor resolve(Descriptor raw, State state) { + DatasetDescriptor rawDataset; + + if (raw instanceof DatasetDescriptor) { + rawDataset = (DatasetDescriptor) raw; + } else if (raw instanceof PartitionDescriptor) { + rawDataset = ((PartitionDescriptor) raw).getDataset(); + } else { + // type not supported + return null; + } + + return resolve(rawDataset, state); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java index ac1cfe9..95d7793 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java @@ -19,10 +19,15 @@ package org.apache.gobblin.dataset; import com.typesafe.config.Config; + /** * A factory that creates an instance of {@link DatasetResolver} + * + * @deprecated use {@link DescriptorResolverFactory} as {@link DatasetResolver} is deprecated + * with {@link DescriptorResolver} */ -public interface DatasetResolverFactory { +@Deprecated +public interface DatasetResolverFactory extends DescriptorResolverFactory { /** * Create a {@link DatasetResolver} instance */ http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-api/src/main/java/org/apache/gobblin/dataset/Descriptor.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/Descriptor.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/Descriptor.java new file mode 100644 index 0000000..2e3daa5 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/Descriptor.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.dataset; + +import java.lang.reflect.Type; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + + +/** + * A descriptor is a simplified representation of a resource, which could be a dataset, dataset partition, file, etc. + * It is a digest or abstract, which contains identification properties of the actual resource object, such as ID, name + * primary keys, version, etc + * + * <p> + * The class provides {@link #serialize(Descriptor)} and {@link #deserialize(String)} util methods pair to send + * a descriptor object over the wire + * </p> + * + * <p> + * When the original object has complicated inner structure and there is a requirement to send it over the wire, + * it's a time to define a corresponding {@link Descriptor} becomes. In this case, the {@link Descriptor} can just + * have minimal information enough to construct the original object on the other side of the wire + * </p> + * + * <p> + * When the cost to instantiate the complete original object is high, for example, network calls are required, but + * the use cases are limited to the identification properties, define a corresponding {@link Descriptor} becomes + * handy + * </p> + */ +@RequiredArgsConstructor +public class Descriptor { + + /** Use gson for ser/de */ + private static final Gson GSON = new Gson(); + + /** Name of the resource */ + @Getter + private final String name; + + @Override + public String toString() { + return GSON.toJson(this); + } + + public Descriptor copy() { + return new Descriptor(name); + } + + /** + * A helper class for ser/de of a {@link Descriptor} + */ + @RequiredArgsConstructor + private static class Wrap { + /** The actual class name of the {@link Descriptor} */ + private final String clazz; + /** A json representation of the {@link Descriptor}*/ + private final JsonObject data; + } + + /** + * Serialize any {@link Descriptor} object to a string + */ + public static String serialize(Descriptor descriptor) { + if (descriptor == null) { + return GSON.toJson(null); + } + JsonObject data = GSON.toJsonTree(descriptor).getAsJsonObject(); + return GSON.toJson(new Wrap(descriptor.getClass().getName(), data)); + } + + /** + * Deserialize a string, which results from {@link #serialize(Descriptor)}, into the original + * {@link Descriptor} object + */ + public static Descriptor deserialize(String serialized) { + Wrap wrap = GSON.fromJson(serialized, Wrap.class); + if (wrap == null) { + return null; + } + + try { + return GSON.fromJson(wrap.data, (Type) Class.forName(wrap.clazz)); + } catch (ClassNotFoundException e) { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-api/src/main/java/org/apache/gobblin/dataset/DescriptorResolver.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DescriptorResolver.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DescriptorResolver.java new file mode 100644 index 0000000..3951d65 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DescriptorResolver.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.dataset; + +import org.apache.gobblin.configuration.State; + + +/** + * A resolver transforms an existing {@link Descriptor} to a new one + */ +public interface DescriptorResolver { + /** + * Given raw Gobblin descriptor, resolve a job specific descriptor + * + * @param raw the original descriptor + * @param state configuration that helps resolve job specific descriptor + * @return resolved descriptor for the job or {@code null} if failed to resolve + */ + Descriptor resolve(Descriptor raw, State state); +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-api/src/main/java/org/apache/gobblin/dataset/DescriptorResolverFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DescriptorResolverFactory.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DescriptorResolverFactory.java new file mode 100644 index 0000000..ade308d --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DescriptorResolverFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.dataset; + +import com.typesafe.config.Config; + + +/** + * Factory to create a {@link DescriptorResolver} instance + */ +public interface DescriptorResolverFactory { + /** + * @param config configurations only about {@link DescriptorResolver} + */ + DescriptorResolver createResolver(Config config); +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java index e678d84..a163b5f 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java @@ -33,4 +33,9 @@ public class NoopDatasetResolver implements DatasetResolver { public DatasetDescriptor resolve(DatasetDescriptor raw, State state) { return raw; } + + @Override + public Descriptor resolve(Descriptor raw, State state) { + return raw; + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionDescriptor.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionDescriptor.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionDescriptor.java new file mode 100644 index 0000000..f0b4dcf --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionDescriptor.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.dataset; + +import lombok.Getter; + + +/** + * A {@link Descriptor} to identifies a partition of a dataset + */ +public class PartitionDescriptor extends Descriptor { + @Getter + private final DatasetDescriptor dataset; + + public PartitionDescriptor(String name, DatasetDescriptor dataset) { + super(name); + this.dataset = dataset; + } + + @Override + public PartitionDescriptor copy() { + return new PartitionDescriptor(getName(), dataset); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + PartitionDescriptor that = (PartitionDescriptor) o; + return dataset.equals(that.dataset) && getName().equals(that.getName()); + } + + @Override + public int hashCode() { + int result = dataset.hashCode(); + result = 31 * result + getName().hashCode(); + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-api/src/test/java/org/apache/gobblin/dataset/DatasetResolverTest.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/test/java/org/apache/gobblin/dataset/DatasetResolverTest.java b/gobblin-api/src/test/java/org/apache/gobblin/dataset/DatasetResolverTest.java new file mode 100644 index 0000000..29c1405 --- /dev/null +++ b/gobblin-api/src/test/java/org/apache/gobblin/dataset/DatasetResolverTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.dataset; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import org.apache.gobblin.configuration.State; + + +public class DatasetResolverTest { + + @Test + public void testAsDescriptorResolver() { + DescriptorResolver resolver = new TestDatasetResolver(); + State state = new State(); + + // Test dataset descriptor + DatasetDescriptor dataset = new DatasetDescriptor("hdfs", "/data/tracking/PageViewEvent"); + Descriptor descriptor = resolver.resolve(dataset, state); + Assert.assertTrue(descriptor.getClass().isAssignableFrom(DatasetDescriptor.class)); + Assert.assertEquals(descriptor.getName(), TestDatasetResolver.DATASET_NAME); + + // Test partition descriptor + String partitionName = "hourly/2018/08/14/18"; + PartitionDescriptor partition = new PartitionDescriptor(partitionName, dataset); + descriptor = resolver.resolve(partition, state); + Assert.assertTrue(descriptor.getClass().isAssignableFrom(DatasetDescriptor.class)); + Assert.assertEquals(descriptor.getName(), TestDatasetResolver.DATASET_NAME); + + // Test unsupported descriptor + Assert.assertEquals(resolver.resolve(new MockDescriptor("test"), state), null); + } + + private static class TestDatasetResolver implements DatasetResolver { + static final String DATASET_NAME = "TEST"; + + @Override + public DatasetDescriptor resolve(DatasetDescriptor raw, State state) { + DatasetDescriptor descriptor = new DatasetDescriptor(raw.getPlatform(), DATASET_NAME); + raw.getMetadata().forEach(descriptor::addMetadata); + return descriptor; + } + } + + private static class MockDescriptor extends Descriptor { + public MockDescriptor(String name) { + super(name); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java b/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java new file mode 100644 index 0000000..6ac875b --- /dev/null +++ b/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.dataset; + +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class DescriptorTest { + + @Test + public void testDatasetDescriptor() { + DatasetDescriptor dataset = new DatasetDescriptor("hdfs", "/data/tracking/PageViewEvent"); + dataset.addMetadata("fsUri", "hdfs://test.com:2018"); + + DatasetDescriptor copy = dataset.copy(); + Assert.assertEquals(copy.getName(), dataset.getName()); + Assert.assertEquals(copy.getPlatform(), dataset.getPlatform()); + Assert.assertEquals(copy.getMetadata(), dataset.getMetadata()); + Assert.assertEquals(dataset, copy); + Assert.assertEquals(dataset.hashCode(), copy.hashCode()); + } + + @Test + public void testPartitionDescriptor() { + // Test serialization + String partitionJson = "{\"clazz\":\"org.apache.gobblin.dataset.PartitionDescriptor\",\"data\":{\"dataset\":{\"platform\":\"hdfs\",\"metadata\":{},\"name\":\"/data/tracking/PageViewEvent\"},\"name\":\"hourly/2018/08/14/18\"}}"; + + DatasetDescriptor dataset = new DatasetDescriptor("hdfs", "/data/tracking/PageViewEvent"); + String partitionName = "hourly/2018/08/14/18"; + PartitionDescriptor partition = new PartitionDescriptor(partitionName, dataset); + Assert.assertEquals(Descriptor.serialize(partition), partitionJson); + System.out.println(partitionJson); + + Descriptor partition2 = Descriptor.deserialize(partitionJson); + Assert.assertEquals(partition2.getName(), partition.getName()); + Assert.assertEquals(((PartitionDescriptor)partition2).getDataset(), partition.getDataset()); + Assert.assertEquals(partition, partition2); + Assert.assertEquals(partition.hashCode(), partition2.hashCode()); + + // Test copy + PartitionDescriptor partition3 = partition.copy(); + Assert.assertEquals(partition3.getDataset(), dataset); + Assert.assertEquals(partition3.getName(), partitionName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java index 0048416..eddd852 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java @@ -35,6 +35,7 @@ import org.apache.gobblin.data.management.conversion.hive.source.HiveAvroToOrcSo import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit; import org.apache.gobblin.data.management.conversion.hive.utils.LineageUtils; import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.dataset.Descriptor; import org.apache.gobblin.dataset.HiveToHdfsDatasetResolver; import org.apache.gobblin.dataset.HiveToHdfsDatasetResolverFactory; import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder; @@ -103,7 +104,7 @@ public class ConvertibleHiveDatasetTest { // Assert that source is correct for lineage event Assert.assertTrue(props.containsKey("gobblin.event.lineage.source")); DatasetDescriptor sourceDD = - GSON.fromJson(props.getProperty("gobblin.event.lineage.source"), DatasetDescriptor.class); + (DatasetDescriptor) Descriptor.deserialize(props.getProperty("gobblin.event.lineage.source")); Assert.assertEquals(sourceDD.getPlatform(), "file"); Assert.assertEquals(sourceDD.getName(), "/tmp/test"); Assert.assertEquals(sourceDD.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE), "db1.tb1"); @@ -111,7 +112,7 @@ public class ConvertibleHiveDatasetTest { // Assert that first dest is correct for lineage event Assert.assertTrue(props.containsKey("gobblin.event.lineage.branch.1.destination")); DatasetDescriptor destDD1 = - GSON.fromJson(props.getProperty("gobblin.event.lineage.branch.1.destination"), DatasetDescriptor.class); + (DatasetDescriptor) Descriptor.deserialize(props.getProperty("gobblin.event.lineage.branch.1.destination")); Assert.assertEquals(destDD1.getPlatform(), "file"); Assert.assertEquals(destDD1.getName(), "/tmp/data_nestedOrc/db1/tb1/final"); Assert.assertEquals(destDD1.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE), @@ -120,7 +121,7 @@ public class ConvertibleHiveDatasetTest { // Assert that second dest is correct for lineage event Assert.assertTrue(props.containsKey("gobblin.event.lineage.branch.2.destination")); DatasetDescriptor destDD2 = - GSON.fromJson(props.getProperty("gobblin.event.lineage.branch.2.destination"), DatasetDescriptor.class); + (DatasetDescriptor) Descriptor.deserialize(props.getProperty("gobblin.event.lineage.branch.2.destination")); Assert.assertEquals(destDD2.getPlatform(), "file"); Assert.assertEquals(destDD2.getName(), "/tmp/data_flattenedOrc/db1/tb1/final"); Assert.assertEquals(destDD2.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE), http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java index febf0ed..c920cc3 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.dataset.Descriptor; import org.apache.gobblin.metrics.GobblinTrackingEvent; import org.apache.gobblin.metrics.event.GobblinEventBuilder; @@ -52,9 +53,9 @@ public final class LineageEventBuilder extends GobblinEventBuilder { private static final Gson GSON = new Gson(); @Getter @Setter - private DatasetDescriptor source; + private Descriptor source; @Getter @Setter - private DatasetDescriptor destination; + private Descriptor destination; public LineageEventBuilder(String name) { super(name, LIENAGE_EVENT_NAMESPACE); @@ -63,9 +64,10 @@ public final class LineageEventBuilder extends GobblinEventBuilder { @Override public GobblinTrackingEvent build() { - source.toDataMap().forEach((key, value) -> metadata.put(getKey(SOURCE, key), value)); - destination.toDataMap().forEach((key, value) -> metadata.put(getKey(DESTINATION, key), value)); - return new GobblinTrackingEvent(0L, namespace, name, metadata); + Map<String, String> dataMap = Maps.newHashMap(metadata); + dataMap.put(SOURCE, Descriptor.serialize(source)); + dataMap.put(DESTINATION, Descriptor.serialize(destination)); + return new GobblinTrackingEvent(0L, namespace, name, dataMap); } @Override @@ -121,23 +123,20 @@ public final class LineageEventBuilder extends GobblinEventBuilder { Map<String, String> metadata = event.getMetadata(); LineageEventBuilder lineageEvent = new LineageEventBuilder(event.getName()); - String sourcePrefix = getKey(SOURCE, ""); - Map<String, String> sourceDataMap = Maps.newHashMap(); - String destinationPrefix = getKey(DESTINATION, ""); - Map<String, String> destinationDataMap = Maps.newHashMap(); - metadata.forEach((key, value) -> { - if (key.startsWith(sourcePrefix)) { - sourceDataMap.put(key.substring(sourcePrefix.length()), value); - } else if (key.startsWith(destinationPrefix)) { - destinationDataMap.put(key.substring(destinationPrefix.length()), value); - } else { - lineageEvent.addMetadata(key, value); + switch (key) { + case SOURCE: + lineageEvent.setSource(Descriptor.deserialize(value)); + break; + case DESTINATION: + lineageEvent.setDestination(Descriptor.deserialize(value)); + break; + default: + lineageEvent.addMetadata(key, value); + break; } }); - lineageEvent.setSource(DatasetDescriptor.fromDataMap(sourceDataMap)); - lineageEvent.setDestination(DatasetDescriptor.fromDataMap(destinationDataMap)); return lineageEvent; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java index 2cf8250..0311df7 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java @@ -42,6 +42,8 @@ import org.apache.gobblin.configuration.State; import org.apache.gobblin.dataset.DatasetDescriptor; import org.apache.gobblin.dataset.DatasetResolver; import org.apache.gobblin.dataset.DatasetResolverFactory; +import org.apache.gobblin.dataset.Descriptor; +import org.apache.gobblin.dataset.DescriptorResolver; import org.apache.gobblin.dataset.NoopDatasetResolver; import org.apache.gobblin.metrics.broker.LineageInfoFactory; import org.apache.gobblin.util.ConfigUtils; @@ -86,7 +88,7 @@ public final class LineageInfo { .put(DATASET_RESOLVER_FACTORY, NoopDatasetResolver.FACTORY) .build()); - private final DatasetResolver resolver; + private final DescriptorResolver resolver; public LineageInfo(Config config) { resolver = getResolver(config.withFallback(FALLBACK)); @@ -103,14 +105,14 @@ public final class LineageInfo { * @param state state about a {@link org.apache.gobblin.source.workunit.WorkUnit} * */ - public void setSource(DatasetDescriptor source, State state) { - DatasetDescriptor descriptor = resolver.resolve(source, state); + public void setSource(Descriptor source, State state) { + Descriptor descriptor = resolver.resolve(source, state); if (descriptor == null) { return; } state.setProp(getKey(NAME_KEY), descriptor.getName()); - state.setProp(getKey(LineageEventBuilder.SOURCE), GSON.toJson(descriptor)); + state.setProp(getKey(LineageEventBuilder.SOURCE), Descriptor.serialize(descriptor)); } /** @@ -122,19 +124,19 @@ public final class LineageInfo { * the method is implemented to be threadsafe * </p> */ - public void putDestination(DatasetDescriptor destination, int branchId, State state) { + public void putDestination(Descriptor destination, int branchId, State state) { if (!hasLineageInfo(state)) { log.warn("State has no lineage info but branch " + branchId + " puts a destination: " + GSON.toJson(destination)); return; } log.debug(String.format("Put destination %s for branch %d", GSON.toJson(destination), branchId)); synchronized (state.getProp(getKey(NAME_KEY))) { - DatasetDescriptor descriptor = resolver.resolve(destination, state); + Descriptor descriptor = resolver.resolve(destination, state); if (descriptor == null) { return; } - state.setProp(getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION), GSON.toJson(descriptor)); + state.setProp(getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION), Descriptor.serialize(descriptor)); } } @@ -161,10 +163,14 @@ public final class LineageInfo { */ static Map<String, LineageEventBuilder> load(State state) { String name = state.getProp(getKey(NAME_KEY)); - DatasetDescriptor source = GSON.fromJson(state.getProp(getKey(LineageEventBuilder.SOURCE)), DatasetDescriptor.class); + Descriptor source = Descriptor.deserialize(state.getProp(getKey(LineageEventBuilder.SOURCE))); String branchedPrefix = getKey(BRANCH, ""); Map<String, LineageEventBuilder> events = Maps.newHashMap(); + if (source == null) { + return events; + } + for (Map.Entry<Object, Object> entry : state.getProperties().entrySet()) { String key = entry.getKey().toString(); if (!key.startsWith(branchedPrefix)) { @@ -177,12 +183,12 @@ public final class LineageInfo { LineageEventBuilder event = events.get(branchId); if (event == null) { event = new LineageEventBuilder(name); - event.setSource(new DatasetDescriptor(source)); + event.setSource(source.copy()); events.put(parts[0], event); } switch (parts[1]) { case LineageEventBuilder.DESTINATION: - DatasetDescriptor destination = GSON.fromJson(entry.getValue().toString(), DatasetDescriptor.class); + Descriptor destination = Descriptor.deserialize(entry.getValue().toString()); event.setDestination(destination); break; default: @@ -237,8 +243,10 @@ public final class LineageInfo { * Get the configured {@link DatasetResolver} from {@link State} */ public static DatasetResolver getResolver(Config config) { + // ConfigException.Missing will throw if DATASET_RESOLVER_FACTORY is absent String resolverFactory = config.getString(DATASET_RESOLVER_FACTORY); - if (resolverFactory.equals(NoopDatasetResolver.FACTORY)) { + + if (resolverFactory.toUpperCase().equals(NoopDatasetResolver.FACTORY)) { return NoopDatasetResolver.INSTANCE; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java index 4352e7f..8ca6d5e 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java @@ -29,6 +29,9 @@ import org.apache.gobblin.broker.iface.SharedResourcesBroker; import org.apache.gobblin.configuration.State; import org.apache.gobblin.dataset.DatasetConstants; import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.dataset.Descriptor; +import org.apache.gobblin.dataset.PartitionDescriptor; +import org.apache.gobblin.metrics.GobblinTrackingEvent; import org.apache.gobblin.metrics.event.GobblinEventBuilder; import org.testng.Assert; @@ -42,6 +45,7 @@ import com.typesafe.config.ConfigFactory; * Test for loading linage events from state */ public class LineageEventTest { + @Test public void testEvent() { final String topic = "testTopic"; @@ -54,7 +58,7 @@ public class LineageEventTest { LineageInfo lineageInfo = getLineageInfo(); DatasetDescriptor source = new DatasetDescriptor(kafka, topic); lineageInfo.setSource(source, state0); - DatasetDescriptor destination00 = new DatasetDescriptor(hdfs, "/data/dbchanges"); + DatasetDescriptor destination00 = new DatasetDescriptor(hdfs, "/data/tracking"); destination00.addMetadata(branch, "0"); lineageInfo.putDestination(destination00, 0, state0); DatasetDescriptor destination01 = new DatasetDescriptor(mysql, "kafka.testTopic"); @@ -107,10 +111,37 @@ public class LineageEventTest { verify(getLineageEvent(eventsList, 1, "hive"), topic, source, destination11); } + @Test + public void testEventForPartitionedDataset() { + final String topic = "testTopic"; + final String kafka = "kafka"; + final String hdfs = "hdfs"; + final String path = "/data/tracking/PageViewEvent"; + final String partitionName = "hourly/2018/08/15/15"; + + State state = new State(); + LineageInfo lineageInfo = getLineageInfo(); + DatasetDescriptor source = new DatasetDescriptor(kafka, topic); + lineageInfo.setSource(source, state); + DatasetDescriptor destinationDataset = new DatasetDescriptor(hdfs, path); + PartitionDescriptor destination = new PartitionDescriptor(partitionName, destinationDataset); + lineageInfo.putDestination(destination, 0, state); + + Map<String, LineageEventBuilder> events = LineageInfo.load(state); + LineageEventBuilder event = events.get("0"); + verify(event, topic, source, destination); + + // Verify gobblin tracking event + GobblinTrackingEvent trackingEvent = event.build(); + Assert.assertEquals(LineageEventBuilder.isLineageEvent(trackingEvent), true); + Assert.assertEquals(LineageEventBuilder.fromEvent(trackingEvent), event); + } + private LineageEventBuilder getLineageEvent(Collection<LineageEventBuilder> events, int branchId, String destinationPlatform) { for (LineageEventBuilder event : events) { - if (event.getDestination().getPlatform().equals(destinationPlatform) && - event.getDestination().getMetadata().get(DatasetConstants.BRANCH).equals(String.valueOf(branchId))) { + DatasetDescriptor descriptor = (DatasetDescriptor) event.getDestination(); + if (descriptor.getPlatform().equals(destinationPlatform) && + descriptor.getMetadata().get(DatasetConstants.BRANCH).equals(String.valueOf(branchId))) { return event; } } @@ -132,7 +163,7 @@ public class LineageEventTest { return obj2; } - private void verify(LineageEventBuilder event, String name, DatasetDescriptor source, DatasetDescriptor destination) { + private void verify(LineageEventBuilder event, String name, Descriptor source, Descriptor destination) { Assert.assertEquals(event.getName(), name); Assert.assertEquals(event.getNamespace(), LineageEventBuilder.LIENAGE_EVENT_NAMESPACE); Assert.assertEquals(event.getMetadata().get(GobblinEventBuilder.EVENT_TYPE), LineageEventBuilder.LINEAGE_EVENT_TYPE); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/749b5bd6/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java ---------------------------------------------------------------------- diff --git a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java index 5e9fbba..0b0e06d 100644 --- a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java +++ b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.dataset.Descriptor; import org.apache.gobblin.metrics.event.lineage.LineageInfo; import org.apache.gobblin.source.extractor.extract.QueryBasedSource; import org.apache.gobblin.source.extractor.partition.Partitioner; @@ -49,8 +50,7 @@ public class SalesforceSourceTest { Assert.assertEquals(workUnits.size(), 1); DatasetDescriptor sourceDataset = new DatasetDescriptor("salesforce", "contacts"); - Gson gson = new Gson(); - Assert.assertEquals(gson.toJson(sourceDataset), workUnits.get(0).getProp("gobblin.event.lineage.source")); + Assert.assertEquals(Descriptor.serialize(sourceDataset), workUnits.get(0).getProp("gobblin.event.lineage.source")); Assert.assertEquals(workUnits.get(0).getProp("gobblin.event.lineage.name"), "contacts"); }
