[GOBBLIN-587] Implement partition level lineage for fs based destination Closes #2453 from zxcware/pd
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ef59a151 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ef59a151 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ef59a151 Branch: refs/heads/master Commit: ef59a1517575f41671b0ec4ffa6ac53b3648e30c Parents: e74c8b7 Author: zhchen <[email protected]> Authored: Fri Sep 14 12:31:02 2018 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Fri Sep 14 12:31:02 2018 -0700 ---------------------------------------------------------------------- .../org/apache/gobblin/dataset/Descriptor.java | 71 +++---- .../gobblin/dataset/PartitionDescriptor.java | 28 +++ .../gobblin/util/io/GsonInterfaceAdapter.java | 195 +++++++++++++++++++ .../org/apache/gobblin/writer/DataWriter.java | 23 ++- .../apache/gobblin/dataset/DescriptorTest.java | 13 +- .../util/io/GsonInterfaceAdapterTest.java | 46 +++++ .../org/apache/gobblin/util/test/BaseClass.java | 37 ++++ .../apache/gobblin/util/test/ExtendedClass.java | 33 ++++ .../org/apache/gobblin/util/test/TestClass.java | 69 +++++++ .../writer/InstrumentedDataWriterDecorator.java | 6 + .../gobblin/publisher/BaseDataPublisher.java | 31 ++- .../publisher/TimePartitionedDataPublisher.java | 21 +- .../writer/CloseOnFlushWriterWrapper.java | 6 + .../org/apache/gobblin/writer/FsDataWriter.java | 17 ++ .../gobblin/writer/PartitionedDataWriter.java | 72 ++++++- .../publisher/BaseDataPublisherTest.java | 86 +++++++- .../gobblin/writer/PartitionedWriterTest.java | 18 ++ .../test/TestPartitionAwareWriterBuilder.java | 9 + .../dataset/ConvertibleHiveDatasetTest.java | 25 ++- .../apache/gobblin/metrics/MetricContext.java | 2 +- .../event/lineage/LineageEventBuilder.java | 18 +- .../metrics/event/lineage/LineageInfo.java | 68 ++++--- .../metrics/event/lineage/LineageEventTest.java | 27 +-- .../gobblin/azkaban/AzkabanJobLauncher.java | 8 +- .../salesforce/SalesforceSourceTest.java | 5 +- .../gobblin/util/io/GsonInterfaceAdapter.java | 195 ------------------- .../util/io/GsonInterfaceAdapterTest.java | 46 ----- .../org/apache/gobblin/util/test/BaseClass.java | 37 ---- .../apache/gobblin/util/test/ExtendedClass.java | 33 ---- .../org/apache/gobblin/util/test/TestClass.java | 69 ------- 30 files changed, 810 insertions(+), 504 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/main/java/org/apache/gobblin/dataset/Descriptor.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/Descriptor.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/Descriptor.java index 2e3daa5..3e30ab8 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/Descriptor.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/Descriptor.java @@ -18,13 +18,18 @@ package org.apache.gobblin.dataset; import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; import com.google.gson.Gson; -import com.google.gson.JsonObject; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; import lombok.Getter; import lombok.RequiredArgsConstructor; +import org.apache.gobblin.util.io.GsonInterfaceAdapter; + /** * A descriptor is a simplified representation of a resource, which could be a dataset, dataset partition, file, etc. @@ -32,11 +37,6 @@ import lombok.RequiredArgsConstructor; * primary keys, version, etc * * <p> - * The class provides {@link #serialize(Descriptor)} and {@link #deserialize(String)} util methods pair to send - * a descriptor object over the wire - * </p> - * - * <p> * When the original object has complicated inner structure and there is a requirement to send it over the wire, * it's a time to define a corresponding {@link Descriptor} becomes. In this case, the {@link Descriptor} can just * have minimal information enough to construct the original object on the other side of the wire @@ -52,7 +52,10 @@ import lombok.RequiredArgsConstructor; public class Descriptor { /** Use gson for ser/de */ - private static final Gson GSON = new Gson(); + public static final Gson GSON = + new GsonBuilder().registerTypeAdapterFactory(new GsonInterfaceAdapter(Descriptor.class)).create(); + /** Type token for ser/de descriptor list */ + private static final Type DESCRIPTOR_LIST_TYPE = new TypeToken<ArrayList<Descriptor>>(){}.getType(); /** Name of the resource */ @Getter @@ -68,41 +71,41 @@ public class Descriptor { } /** - * A helper class for ser/de of a {@link Descriptor} + * Serialize any {@link Descriptor} object as json string + * + * <p> + * Note: it can serialize subclasses + * </p> + */ + public static String toJson(Descriptor descriptor) { + return GSON.toJson(descriptor); + } + + /** + * Deserialize the json string to the a {@link Descriptor} object + */ + public static Descriptor fromJson(String json) { + return fromJson(json, Descriptor.class); + } + + /** + * Deserialize the json string to the specified {@link Descriptor} object */ - @RequiredArgsConstructor - private static class Wrap { - /** The actual class name of the {@link Descriptor} */ - private final String clazz; - /** A json representation of the {@link Descriptor}*/ - private final JsonObject data; + public static <T extends Descriptor> T fromJson(String json, Class<T> clazz) { + return GSON.fromJson(json, clazz); } /** - * Serialize any {@link Descriptor} object to a string + * Serialize a list of descriptors as json string */ - public static String serialize(Descriptor descriptor) { - if (descriptor == null) { - return GSON.toJson(null); - } - JsonObject data = GSON.toJsonTree(descriptor).getAsJsonObject(); - return GSON.toJson(new Wrap(descriptor.getClass().getName(), data)); + public static String toJson(List<Descriptor> descriptors) { + return GSON.toJson(descriptors, DESCRIPTOR_LIST_TYPE); } /** - * Deserialize a string, which results from {@link #serialize(Descriptor)}, into the original - * {@link Descriptor} object + * Deserialize the string, resulted from {@link #toJson(List)}, to a list of descriptors */ - public static Descriptor deserialize(String serialized) { - Wrap wrap = GSON.fromJson(serialized, Wrap.class); - if (wrap == null) { - return null; - } - - try { - return GSON.fromJson(wrap.data, (Type) Class.forName(wrap.clazz)); - } catch (ClassNotFoundException e) { - return null; - } + public static List<Descriptor> fromJsonList(String jsonList) { + return GSON.fromJson(jsonList, DESCRIPTOR_LIST_TYPE); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionDescriptor.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionDescriptor.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionDescriptor.java index f0b4dcf..2a5370e 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionDescriptor.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionDescriptor.java @@ -17,6 +17,12 @@ package org.apache.gobblin.dataset; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; + +import com.google.gson.reflect.TypeToken; + import lombok.Getter; @@ -24,6 +30,10 @@ import lombok.Getter; * A {@link Descriptor} to identifies a partition of a dataset */ public class PartitionDescriptor extends Descriptor { + + /** Type token for ser/de partition descriptor list */ + private static final Type DESCRIPTOR_LIST_TYPE = new TypeToken<ArrayList<PartitionDescriptor>>(){}.getType(); + @Getter private final DatasetDescriptor dataset; @@ -37,6 +47,10 @@ public class PartitionDescriptor extends Descriptor { return new PartitionDescriptor(getName(), dataset); } + public PartitionDescriptor copyWithNewDataset(DatasetDescriptor dataset) { + return new PartitionDescriptor(getName(), dataset); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -56,4 +70,18 @@ public class PartitionDescriptor extends Descriptor { result = 31 * result + getName().hashCode(); return result; } + + /** + * Serialize a list of partition descriptors as json string + */ + public static String toPartitionJsonList(List<PartitionDescriptor> descriptors) { + return Descriptor.GSON.toJson(descriptors, DESCRIPTOR_LIST_TYPE); + } + + /** + * Deserialize the string, resulted from {@link #toPartitionJsonList(List)}, to a list of partition descriptors + */ + public static List<PartitionDescriptor> fromPartitionJsonList(String jsonList) { + return Descriptor.GSON.fromJson(jsonList, DESCRIPTOR_LIST_TYPE); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java b/gobblin-api/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java new file mode 100644 index 0000000..5973aa9 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.util.io; + +import lombok.AllArgsConstructor; +import lombok.RequiredArgsConstructor; + +import java.io.IOException; +import java.lang.reflect.GenericArrayType; +import java.lang.reflect.ParameterizedType; +import java.util.Collection; +import java.util.Map; + +import org.apache.commons.lang3.ClassUtils; + +import com.google.common.base.Optional; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; +import com.google.gson.TypeAdapter; +import com.google.gson.TypeAdapterFactory; +import com.google.gson.internal.Streams; +import com.google.gson.reflect.TypeToken; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; + + +/** + * A {@link Gson} interface adapter that makes it possible to serialize and deserialize polymorphic objects. + * + * <p> + * This adapter will capture all instances of {@link #baseClass} and write them as + * {"object-type":"class.name", "object-data":"data"}, allowing for correct serialization and deserialization of + * polymorphic objects. The following types will not be captured by the adapter (i.e. they will be written by the + * default GSON writer): + * - Primitives and boxed primitives + * - Arrays + * - Collections + * - Maps + * Additionally, generic classes (e.g. class MyClass<T>) cannot be correctly decoded. + * </p> + * + * <p> + * To use: + * <pre> + * {@code + * MyClass object = new MyClass(); + * Gson gson = GsonInterfaceAdapter.getGson(MyBaseClass.class); + * String json = gson.toJson(object); + * Myclass object2 = gson.fromJson(json, MyClass.class); + * } + * </pre> + * </p> + * + * <p> + * Note: a useful case is GsonInterfaceAdapter.getGson(Object.class), which will correctly serialize / deserialize + * all types except for java generics. + * </p> + * + * @param <T> The interface or abstract type to be serialized and deserialized with {@link Gson}. + */ +@RequiredArgsConstructor +public class GsonInterfaceAdapter implements TypeAdapterFactory { + + protected static final String OBJECT_TYPE = "object-type"; + protected static final String OBJECT_DATA = "object-data"; + + private final Class<?> baseClass; + + @Override + public <R> TypeAdapter<R> create(Gson gson, TypeToken<R> type) { + if (ClassUtils.isPrimitiveOrWrapper(type.getRawType()) || type.getType() instanceof GenericArrayType + || CharSequence.class.isAssignableFrom(type.getRawType()) + || (type.getType() instanceof ParameterizedType && (Collection.class.isAssignableFrom(type.getRawType()) + || Map.class.isAssignableFrom(type.getRawType())))) { + // delegate primitives, arrays, collections, and maps + return null; + } + if (!this.baseClass.isAssignableFrom(type.getRawType())) { + // delegate anything not assignable from base class + return null; + } + TypeAdapter<R> adapter = new InterfaceAdapter<>(gson, this, type); + return adapter; + } + + @AllArgsConstructor + private static class InterfaceAdapter<R> extends TypeAdapter<R> { + + private final Gson gson; + private final TypeAdapterFactory factory; + private final TypeToken<R> typeToken; + + @Override + public void write(JsonWriter out, R value) throws IOException { + if (Optional.class.isAssignableFrom(this.typeToken.getRawType())) { + Optional opt = (Optional) value; + if (opt != null && opt.isPresent()) { + Object actualValue = opt.get(); + writeObject(actualValue, out); + } else { + out.beginObject(); + out.endObject(); + } + } else { + writeObject(value, out); + } + } + + @Override + public R read(JsonReader in) throws IOException { + JsonElement element = Streams.parse(in); + if (element.isJsonNull()) { + return readNull(); + } + JsonObject jsonObject = element.getAsJsonObject(); + + if (this.typeToken.getRawType() == Optional.class) { + if (jsonObject.has(OBJECT_TYPE)) { + return (R) Optional.of(readValue(jsonObject, null)); + } else if (jsonObject.entrySet().isEmpty()) { + return (R) Optional.absent(); + } else { + throw new IOException("No class found for Optional value."); + } + } + return this.readValue(jsonObject, this.typeToken); + } + + private <S> S readNull() { + if (this.typeToken.getRawType() == Optional.class) { + return (S) Optional.absent(); + } + return null; + } + + private <S> void writeObject(S value, JsonWriter out) throws IOException { + if (value != null) { + JsonObject jsonObject = new JsonObject(); + jsonObject.add(OBJECT_TYPE, new JsonPrimitive(value.getClass().getName())); + TypeAdapter<S> delegate = + (TypeAdapter<S>) this.gson.getDelegateAdapter(this.factory, TypeToken.get(value.getClass())); + jsonObject.add(OBJECT_DATA, delegate.toJsonTree(value)); + Streams.write(jsonObject, out); + } else { + out.nullValue(); + } + } + + private <S> S readValue(JsonObject jsonObject, TypeToken<S> defaultTypeToken) throws IOException { + try { + TypeToken<S> actualTypeToken; + if (jsonObject.isJsonNull()) { + return null; + } else if (jsonObject.has(OBJECT_TYPE)) { + String className = jsonObject.get(OBJECT_TYPE).getAsString(); + Class<S> klazz = (Class<S>) Class.forName(className); + actualTypeToken = TypeToken.get(klazz); + } else if (defaultTypeToken != null) { + actualTypeToken = defaultTypeToken; + } else { + throw new IOException("Could not determine TypeToken."); + } + TypeAdapter<S> delegate = this.gson.getDelegateAdapter(this.factory, actualTypeToken); + S value = delegate.fromJsonTree(jsonObject.get(OBJECT_DATA)); + return value; + } catch (ClassNotFoundException cnfe) { + throw new IOException(cnfe); + } + } + + } + + public static <T> Gson getGson(Class<T> clazz) { + Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonInterfaceAdapter(clazz)).create(); + return gson; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/main/java/org/apache/gobblin/writer/DataWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/writer/DataWriter.java b/gobblin-api/src/main/java/org/apache/gobblin/writer/DataWriter.java index 18d11d6..400dcdf 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/writer/DataWriter.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/writer/DataWriter.java @@ -21,13 +21,18 @@ import java.io.Closeable; import java.io.Flushable; import java.io.IOException; +import org.apache.gobblin.dataset.Descriptor; import org.apache.gobblin.records.ControlMessageHandler; import org.apache.gobblin.records.FlushControlMessageHandler; import org.apache.gobblin.stream.RecordEnvelope; /** - * An interface for data writers. + * An interface for data writers + * + * <p> + * Generally, one work unit has a dedicated {@link DataWriter} instance, which processes only one dataset + * </p> * * @param <D> data record type * @@ -77,6 +82,22 @@ public interface DataWriter<D> extends Closeable, Flushable { throws IOException; /** + * The method should return a {@link Descriptor} that represents what the writer is writing + * + * <p> + * Note that, this information might be useless and discarded by a + * {@link org.apache.gobblin.publisher.DataPublisher}, which determines the final form of dataset or partition + * </p> + * + * @return a {@link org.apache.gobblin.dataset.DatasetDescriptor} if it writes files of a dataset or + * a {@link org.apache.gobblin.dataset.PartitionDescriptor} if it writes files of a dataset partition or + * {@code null} if it is useless + */ + default Descriptor getDataDescriptor() { + return null; + } + + /** * Write the input {@link RecordEnvelope}. By default, just call {@link #write(Object)}. */ default void writeEnvelope(RecordEnvelope<D> recordEnvelope) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java b/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java index 6ac875b..d7b0409 100644 --- a/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java +++ b/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java @@ -38,20 +38,15 @@ public class DescriptorTest { @Test public void testPartitionDescriptor() { - // Test serialization - String partitionJson = "{\"clazz\":\"org.apache.gobblin.dataset.PartitionDescriptor\",\"data\":{\"dataset\":{\"platform\":\"hdfs\",\"metadata\":{},\"name\":\"/data/tracking/PageViewEvent\"},\"name\":\"hourly/2018/08/14/18\"}}"; - DatasetDescriptor dataset = new DatasetDescriptor("hdfs", "/data/tracking/PageViewEvent"); String partitionName = "hourly/2018/08/14/18"; PartitionDescriptor partition = new PartitionDescriptor(partitionName, dataset); - Assert.assertEquals(Descriptor.serialize(partition), partitionJson); - System.out.println(partitionJson); - Descriptor partition2 = Descriptor.deserialize(partitionJson); + // Test copy with new dataset + DatasetDescriptor dataset2 = new DatasetDescriptor("hive", "/data/tracking/PageViewEvent"); + Descriptor partition2 = partition.copyWithNewDataset(dataset2); Assert.assertEquals(partition2.getName(), partition.getName()); - Assert.assertEquals(((PartitionDescriptor)partition2).getDataset(), partition.getDataset()); - Assert.assertEquals(partition, partition2); - Assert.assertEquals(partition.hashCode(), partition2.hashCode()); + Assert.assertEquals(((PartitionDescriptor)partition2).getDataset(), dataset2); // Test copy PartitionDescriptor partition3 = partition.copy(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/test/java/org/apache/gobblin/util/io/GsonInterfaceAdapterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/test/java/org/apache/gobblin/util/io/GsonInterfaceAdapterTest.java b/gobblin-api/src/test/java/org/apache/gobblin/util/io/GsonInterfaceAdapterTest.java new file mode 100644 index 0000000..17f1ac0 --- /dev/null +++ b/gobblin-api/src/test/java/org/apache/gobblin/util/io/GsonInterfaceAdapterTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.util.io; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.base.Optional; +import com.google.gson.Gson; + +import org.apache.gobblin.util.test.BaseClass; +import org.apache.gobblin.util.test.TestClass; + + +public class GsonInterfaceAdapterTest { + + @Test(groups = {"gobblin.util.io"}) + public void test() { + Gson gson = GsonInterfaceAdapter.getGson(Object.class); + + TestClass test = new TestClass(); + test.absent = Optional.absent(); + Assert.assertNotEquals(test, new TestClass()); + + String ser = gson.toJson(test); + BaseClass deser = gson.fromJson(ser, BaseClass.class); + Assert.assertEquals(test, deser); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/test/java/org/apache/gobblin/util/test/BaseClass.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/test/java/org/apache/gobblin/util/test/BaseClass.java b/gobblin-api/src/test/java/org/apache/gobblin/util/test/BaseClass.java new file mode 100644 index 0000000..4d1d7c7 --- /dev/null +++ b/gobblin-api/src/test/java/org/apache/gobblin/util/test/BaseClass.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.util.test; + +import lombok.EqualsAndHashCode; + +import java.util.Random; + + +/** + * Used for {@link org.apache.gobblin.util.io.GsonInterfaceAdapterTest}. + */ +@EqualsAndHashCode +public class BaseClass { + + public BaseClass() { + this.field = Integer.toString(new Random().nextInt()); + } + + private String field; + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/test/java/org/apache/gobblin/util/test/ExtendedClass.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/test/java/org/apache/gobblin/util/test/ExtendedClass.java b/gobblin-api/src/test/java/org/apache/gobblin/util/test/ExtendedClass.java new file mode 100644 index 0000000..32d43cc --- /dev/null +++ b/gobblin-api/src/test/java/org/apache/gobblin/util/test/ExtendedClass.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.util.test; + +import lombok.EqualsAndHashCode; + +import java.util.Random; + + +/** + * Used for {@link org.apache.gobblin.util.io.GsonInterfaceAdapterTest}. + */ +@EqualsAndHashCode(callSuper = true) +public class ExtendedClass extends BaseClass { + + private final int otherField = new Random().nextInt(); + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/test/java/org/apache/gobblin/util/test/TestClass.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/test/java/org/apache/gobblin/util/test/TestClass.java b/gobblin-api/src/test/java/org/apache/gobblin/util/test/TestClass.java new file mode 100644 index 0000000..fd1947c --- /dev/null +++ b/gobblin-api/src/test/java/org/apache/gobblin/util/test/TestClass.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.util.test; + +import lombok.EqualsAndHashCode; + +import java.util.List; +import java.util.Map; +import java.util.Random; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + + +/** + * Used for {@link org.apache.gobblin.util.io.GsonInterfaceAdapterTest}. + */ +@EqualsAndHashCode(callSuper = true) +public class TestClass extends BaseClass { + + private static final Random random = new Random(); + + private final int intValue = random.nextInt(); + private final long longValue = random.nextLong(); + private final double doubleValue = random.nextLong(); + private final Map<String, Integer> map = createRandomMap(); + private final List<String> list = createRandomList(); + private final Optional<String> present = Optional.of(Integer.toString(random.nextInt())); + // Set manually to absent + public Optional<String> absent = Optional.of("a"); + private final Optional<BaseClass> optionalObject = Optional.of(new BaseClass()); + private final BaseClass polymorphic = new ExtendedClass(); + private final Optional<? extends BaseClass> polymorphicOptional = Optional.of(new ExtendedClass()); + + private static Map<String, Integer> createRandomMap() { + Map<String, Integer> map = Maps.newHashMap(); + int size = random.nextInt(5); + for (int i = 0; i < size; i++) { + map.put("value" + random.nextInt(), random.nextInt()); + } + return map; + } + + private static List<String> createRandomList() { + List<String> list = Lists.newArrayList(); + int size = random.nextInt(5); + for (int i = 0; i < size; i++) { + list.add("value" + random.nextInt()); + } + return list; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java ---------------------------------------------------------------------- diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java index e90f895..efc349f 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java @@ -24,6 +24,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import org.apache.gobblin.configuration.State; +import org.apache.gobblin.dataset.Descriptor; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.records.ControlMessageHandler; @@ -127,6 +128,11 @@ public class InstrumentedDataWriterDecorator<D> extends InstrumentedDataWriterBa } @Override + public Descriptor getDataDescriptor() { + return this.embeddedWriter.getDataDescriptor(); + } + + @Override public Object getDecoratedObject() { return this.embeddedWriter; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java index 2ddcd76..56b326e 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java @@ -20,6 +20,7 @@ package org.apache.gobblin.publisher; import java.io.IOException; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -59,19 +60,20 @@ import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.dataset.DatasetConstants; import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.dataset.Descriptor; +import org.apache.gobblin.dataset.PartitionDescriptor; import org.apache.gobblin.metadata.MetadataMerger; import org.apache.gobblin.metadata.types.StaticStringMetadataMerger; import org.apache.gobblin.metrics.event.lineage.LineageInfo; -import org.apache.gobblin.util.FileListUtils; import org.apache.gobblin.util.ForkOperatorUtils; import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.ParallelRunner; -import org.apache.gobblin.util.PathUtils; import org.apache.gobblin.util.WriterUtils; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; import org.apache.gobblin.writer.FsDataWriter; import org.apache.gobblin.writer.FsWriterMetrics; import org.apache.gobblin.writer.PartitionIdentifier; +import org.apache.gobblin.writer.PartitionedDataWriter; import static org.apache.gobblin.util.retry.RetryerFactory.*; @@ -294,12 +296,31 @@ public class BaseDataPublisher extends SingleTaskDataPublisher { } private void addLineageInfo(WorkUnitState state, int branchId) { - DatasetDescriptor destination = createDestinationDescriptor(state, branchId); - if (this.lineageInfo.isPresent()) { - this.lineageInfo.get().putDestination(destination, branchId, state); + if (!this.lineageInfo.isPresent()) { + LOG.info("Will not add lineage info"); + return; + } + + // Final dataset descriptor + DatasetDescriptor datasetDescriptor = createDestinationDescriptor(state, branchId); + + List<PartitionDescriptor> partitions = PartitionedDataWriter.getPartitionInfoAndClean(state, branchId); + List<Descriptor> descriptors = new ArrayList<>(); + if (partitions.size() == 0) { + // Report as dataset level lineage + descriptors.add(datasetDescriptor); + } else { + // Report as partition level lineage + for (PartitionDescriptor partition : partitions) { + descriptors.add(partition.copyWithNewDataset(datasetDescriptor)); + } } + this.lineageInfo.get().putDestination(descriptors, branchId, state); } + /** + * Create destination dataset descriptor + */ protected DatasetDescriptor createDestinationDescriptor(WorkUnitState state, int branchId) { Path publisherOutputDir = getPublisherOutputDir(state, branchId); FileSystem fs = this.publisherFileSystemByBranches.get(branchId); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java index 157552e..e3ca31c 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java @@ -18,13 +18,17 @@ package org.apache.gobblin.publisher; import java.io.IOException; +import java.util.List; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import com.google.common.collect.Lists; + import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.dataset.Descriptor; import org.apache.gobblin.util.FileListUtils; import org.apache.gobblin.util.ForkOperatorUtils; import org.apache.gobblin.util.ParallelRunner; @@ -68,21 +72,4 @@ public class TimePartitionedDataPublisher extends BaseDataPublisher { movePath(parallelRunner, workUnitState, status.getPath(), outputPath, branchId); } } - - @Override - protected DatasetDescriptor createDestinationDescriptor(WorkUnitState state, int branchId) { - // Get base descriptor - DatasetDescriptor descriptor = super.createDestinationDescriptor(state, branchId); - - // Decorate with partition prefix - String propName = ForkOperatorUtils - .getPropertyNameForBranch(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX, numBranches, branchId); - String timePrefix = state.getProp(propName, ""); - Path pathWithTimePrefix = new Path(descriptor.getName(), timePrefix); - DatasetDescriptor destination = new DatasetDescriptor(descriptor.getPlatform(), pathWithTimePrefix.toString()); - // Add back the metadata - descriptor.getMetadata().forEach(destination::addMetadata); - - return destination; - } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java index 66e5a26..5e912bb 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java @@ -27,6 +27,7 @@ import com.google.common.base.Preconditions; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; +import org.apache.gobblin.dataset.Descriptor; import org.apache.gobblin.records.ControlMessageHandler; import org.apache.gobblin.records.FlushControlMessageHandler; import org.apache.gobblin.stream.ControlMessage; @@ -144,6 +145,11 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements De } @Override + public Descriptor getDataDescriptor() { + return writer.getDataDescriptor(); + } + + @Override public ControlMessageHandler getMessageHandler() { return this.controlMessageHandler; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java index f4e4931..aa228af 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java @@ -39,6 +39,10 @@ import org.apache.gobblin.codec.StreamCodec; import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; +import org.apache.gobblin.dataset.DatasetConstants; +import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.dataset.Descriptor; +import org.apache.gobblin.dataset.PartitionDescriptor; import org.apache.gobblin.metadata.types.GlobalMetadata; import org.apache.gobblin.util.FinalState; import org.apache.gobblin.util.ForkOperatorUtils; @@ -155,6 +159,19 @@ public abstract class FsDataWriter<D> implements DataWriter<D>, FinalState, Meta } } + @Override + public Descriptor getDataDescriptor() { + // Dataset is resulted from WriterUtils.getWriterOutputDir(properties, this.numBranches, this.branchId) + // The writer dataset might not be same as the published dataset + DatasetDescriptor datasetDescriptor = new DatasetDescriptor(fs.getScheme(), outputFile.getParent().toString()); + + if (partitionKey == null) { + return datasetDescriptor; + } + + return new PartitionDescriptor(partitionKey, datasetDescriptor); + } + /** * Create the staging output file and an {@link OutputStream} to write to the file. * http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java index 83dd074..cdfc11b 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java @@ -18,6 +18,8 @@ package org.apache.gobblin.writer; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.function.Supplier; @@ -29,9 +31,11 @@ import org.apache.commons.lang3.reflect.ConstructorUtils; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.collect.Lists; import com.google.common.io.Closer; import lombok.extern.slf4j.Slf4j; @@ -39,6 +43,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; +import org.apache.gobblin.dataset.Descriptor; +import org.apache.gobblin.dataset.PartitionDescriptor; import org.apache.gobblin.instrumented.writer.InstrumentedDataWriterDecorator; import org.apache.gobblin.instrumented.writer.InstrumentedPartitionedDataWriterDecorator; import org.apache.gobblin.records.ControlMessageHandler; @@ -66,6 +72,9 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin private int writerIdSuffix = 0; private final String baseWriterId; + private final State state; + private final int branchId; + private final Optional<WriterPartitioner> partitioner; private final LoadingCache<GenericRecord, DataWriter<D>> partitionWriters; private final Optional<PartitionAwareDataWriterBuilder> builder; @@ -78,6 +87,9 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin public PartitionedDataWriter(DataWriterBuilder<S, D> builder, final State state) throws IOException { + this.state = state; + this.branchId = builder.branch; + this.isSpeculativeAttemptSafe = true; this.isWatermarkCapable = true; this.baseWriterId = builder.getWriterId(); @@ -227,7 +239,11 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin @Override public void close() throws IOException { - this.closer.close(); + try { + serializePartitionInfoToState(); + } finally { + this.closer.close(); + } } private DataWriter<D> createPartitionWriter(GenericRecord partition) @@ -352,4 +368,58 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin cloner.close(); } } + + /** + * Get the serialized key to partitions info in {@link #state} + */ + private static String getPartitionsKey(int branchId) { + return String.format("writer.%d.partitions", branchId); + } + + /** + * Serialize partitions info to {@link #state} if they are any + */ + private void serializePartitionInfoToState() { + List<PartitionDescriptor> descriptors = new ArrayList<>(); + + for (DataWriter writer : partitionWriters.asMap().values()) { + Descriptor descriptor = writer.getDataDescriptor(); + if (null == descriptor) { + log.warn("Drop partition info as writer {} returns a null PartitionDescriptor", writer.toString()); + continue; + } + + if (!(descriptor instanceof PartitionDescriptor)) { + log.warn("Drop partition info as writer {} does not return a PartitionDescriptor", writer.toString()); + continue; + } + + descriptors.add((PartitionDescriptor)descriptor); + } + + if (descriptors.size() > 0) { + state.setProp(getPartitionsKey(branchId), PartitionDescriptor.toPartitionJsonList(descriptors)); + } else { + log.info("Partitions info not available. Will not serialize partitions"); + } + } + + /** + * Get the partition info of a work unit from the {@code state}. Then partition info will be removed from the + * {@code state} to avoid persisting useless information + * + * <p> + * In Gobblin, only the {@link PartitionedDataWriter} knows all partitions written for a work unit. Each partition + * {@link DataWriter} decides the actual form of a dataset partition + * </p> + */ + public static List<PartitionDescriptor> getPartitionInfoAndClean(State state, int branchId) { + String partitionsKey = getPartitionsKey(branchId); + String json = state.getProp(partitionsKey); + if (Strings.isNullOrEmpty(json)) { + return Lists.newArrayList(); + } + state.removeProp(partitionsKey); + return PartitionDescriptor.fromPartitionJsonList(json); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java b/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java index d46d6e3..469b72f 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java @@ -21,25 +21,27 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.Type; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.testng.Assert; import org.testng.annotations.Test; import com.google.common.collect.ImmutableList; import com.google.common.io.Files; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; import com.typesafe.config.ConfigFactory; import org.apache.gobblin.broker.SharedResourcesBrokerFactory; @@ -47,16 +49,19 @@ import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance; import org.apache.gobblin.broker.gobblin_scopes.TaskScopeInstance; import org.apache.gobblin.broker.iface.SharedResourcesBroker; -import org.apache.gobblin.broker.iface.SubscopedBrokerBuilder; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.dataset.Descriptor; +import org.apache.gobblin.dataset.PartitionDescriptor; import org.apache.gobblin.metadata.MetadataMerger; import org.apache.gobblin.metadata.types.GlobalMetadata; +import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder; import org.apache.gobblin.metrics.event.lineage.LineageInfo; import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.util.ForkOperatorUtils; +import org.apache.gobblin.util.io.GsonInterfaceAdapter; import org.apache.gobblin.writer.FsDataWriter; import org.apache.gobblin.writer.FsWriterMetrics; import org.apache.gobblin.writer.PartitionIdentifier; @@ -66,6 +71,10 @@ import org.apache.gobblin.writer.PartitionIdentifier; * Tests for BaseDataPublisher */ public class BaseDataPublisherTest { + private static final Type PARTITION_LIST_TYPE = new TypeToken<ArrayList<PartitionDescriptor>>(){}.getType(); + private static final Gson GSON = + new GsonBuilder().registerTypeAdapterFactory(new GsonInterfaceAdapter(Descriptor.class)).create(); + /** * Test DATA_PUBLISHER_METADATA_STR: a user should be able to put an arbitrary metadata string in job configuration * and have that written out. @@ -532,6 +541,9 @@ public class BaseDataPublisherTest { } } + /** + * Test lineage info is set on publishing single task + */ @Test public void testPublishSingleTask() throws IOException { @@ -545,6 +557,9 @@ public class BaseDataPublisherTest { Assert.assertFalse(state.contains("gobblin.event.lineage.branch.1.destination")); } + /** + * Test lineage info is set on publishing multiple tasks + */ @Test public void testPublishMultiTasks() throws IOException { @@ -562,6 +577,69 @@ public class BaseDataPublisherTest { Assert.assertTrue(state2.contains("gobblin.event.lineage.branch.1.destination")); } + /** + * Test partition level lineages are set + */ + @Test + public void testPublishedPartitionsLineage() + throws IOException { + int numBranches = 2; + int numPartitionsPerBranch = 2; + + WorkUnitState state = buildTaskState(numBranches); + LineageInfo lineageInfo = LineageInfo.getLineageInfo(state.getTaskBroker()).get(); + DatasetDescriptor source = new DatasetDescriptor("kafka", "testTopic"); + lineageInfo.setSource(source, state); + BaseDataPublisher publisher = new BaseDataPublisher(state); + + // Set up writer partition descriptors + DatasetDescriptor datasetAtWriter = new DatasetDescriptor("dummy", "dummy"); + for (int i = 0; i < numBranches; i++) { + List<PartitionDescriptor> partitions = new ArrayList<>(); + for (int j = 0; j < numPartitionsPerBranch; j++) { + // Dummy dataset descriptor will be discarded by publisher + partitions.add(new PartitionDescriptor("partition" + i + j, datasetAtWriter)); + } + String partitionsKey = "writer." + i + ".partitions"; + state.setProp(partitionsKey, GSON.toJson(partitions, PARTITION_LIST_TYPE)); + } + + publisher.publish(ImmutableList.of(state)); + + Assert.assertTrue(state.contains("gobblin.event.lineage.branch.0.destination")); + Assert.assertTrue(state.contains("gobblin.event.lineage.branch.1.destination")); + + Collection<LineageEventBuilder> events = LineageInfo.load(ImmutableList.of(state)); + Assert.assertTrue(events.size() == 4); + + // Find the partition lineage and assert + for (int i = 0; i < numBranches; i++) { + String outputPath = String.format("/data/output/branch%d/namespace/table", i); + DatasetDescriptor destinationDataset = new DatasetDescriptor("file", outputPath); + destinationDataset.addMetadata("fsUri", "file:///"); + destinationDataset.addMetadata("branch", "" + i); + + for (int j = 0; j < numPartitionsPerBranch; j++) { + LineageEventBuilder event = find(events, "partition" + i + j); + Assert.assertTrue(null != event); + Assert.assertEquals(event.getSource(), source); + Assert.assertEquals(event.getDestination(), + // Dataset written by the writer is discarded + new PartitionDescriptor("partition" + i + j, destinationDataset)); + } + } + } + + private static LineageEventBuilder find(Collection<LineageEventBuilder> events, String partitionName) { + for (LineageEventBuilder event : events) { + if (event.getDestination().getName().equals(partitionName)) { + return event; + } + } + + return null; + } + public static class TestAdditionMerger implements MetadataMerger<String> { private int sum = 0; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java index 0dc9846..20984ef 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java @@ -19,14 +19,18 @@ package org.apache.gobblin.writer; import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.gobblin.ack.BasicAckableForTesting; +import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.dataset.PartitionDescriptor; import org.apache.gobblin.stream.FlushControlMessage; import org.testng.Assert; import org.testng.annotations.Test; +import org.testng.util.Strings; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; @@ -107,6 +111,10 @@ public class PartitionedWriterTest { action = builder.actions.poll(); Assert.assertEquals(action.getType(), TestPartitionAwareWriterBuilder.Actions.CLEANUP); + // Before close, partitions info is not serialized + String partitionsKey = "writer.0.partitions"; + Assert.assertTrue(state.getProp(partitionsKey) == null); + writer.close(); Assert.assertEquals(builder.actions.size(), 2); action = builder.actions.poll(); @@ -114,6 +122,16 @@ public class PartitionedWriterTest { action = builder.actions.poll(); Assert.assertEquals(action.getType(), TestPartitionAwareWriterBuilder.Actions.CLOSE); + // After close, partitions info is available + Assert.assertFalse(Strings.isNullOrEmpty(state.getProp(partitionsKey))); + List<PartitionDescriptor> partitions = PartitionedDataWriter.getPartitionInfoAndClean(state, 0); + Assert.assertTrue(state.getProp(partitionsKey) == null); + Assert.assertEquals(partitions.size(), 2); + + DatasetDescriptor dataset = new DatasetDescriptor("testPlatform", "testDataset"); + Assert.assertEquals(partitions.get(0), new PartitionDescriptor("a", dataset)); + Assert.assertEquals(partitions.get(1), new PartitionDescriptor("1", dataset)); + writer.commit(); Assert.assertEquals(builder.actions.size(), 2); action = builder.actions.poll(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/test/java/org/apache/gobblin/writer/test/TestPartitionAwareWriterBuilder.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/test/TestPartitionAwareWriterBuilder.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/test/TestPartitionAwareWriterBuilder.java index 8b1e0ca..8da42d7 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/writer/test/TestPartitionAwareWriterBuilder.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/test/TestPartitionAwareWriterBuilder.java @@ -25,6 +25,9 @@ import org.apache.avro.Schema; import com.google.common.collect.Queues; import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct; +import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.dataset.Descriptor; +import org.apache.gobblin.dataset.PartitionDescriptor; import org.apache.gobblin.writer.DataWriter; import org.apache.gobblin.writer.PartitionAwareDataWriterBuilder; @@ -118,6 +121,12 @@ public class TestPartitionAwareWriterBuilder extends PartitionAwareDataWriterBui public boolean isSpeculativeAttemptSafe() { return true; } + + @Override + public Descriptor getDataDescriptor() { + DatasetDescriptor dataset = new DatasetDescriptor("testPlatform", "testDataset"); + return new PartitionDescriptor(this.partition, dataset); + } } @Data http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java index eddd852..bfa76c5 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java @@ -18,10 +18,13 @@ package org.apache.gobblin.data.management.conversion.hive.dataset; import com.google.common.base.Optional; import java.io.InputStream; +import java.lang.reflect.Type; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Properties; import org.apache.gobblin.broker.SharedResourcesBrokerFactory; @@ -53,18 +56,28 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset.ConversionConfig; import org.apache.gobblin.hive.HiveMetastoreClientPool; import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.io.GsonInterfaceAdapter; import static org.mockito.Mockito.when; @Test(groups = { "gobblin.data.management.conversion" }) public class ConvertibleHiveDatasetTest { + + /** Lineage info ser/de */ + private static final Type DESCRIPTOR_LIST_TYPE = new TypeToken<ArrayList<Descriptor>>(){}.getType(); + private static final Gson GSON = + new GsonBuilder().registerTypeAdapterFactory(new GsonInterfaceAdapter(Descriptor.class)).create(); + + /** * Test if lineage information is properly set in the workunit for convertible hive datasets */ @@ -73,7 +86,6 @@ public class ConvertibleHiveDatasetTest { String testConfFilePath = "convertibleHiveDatasetTest/flattenedAndNestedOrc.conf"; Config config = ConfigFactory.parseResources(testConfFilePath).getConfig("hive.conversion.avro"); // Set datasetResolverFactory to convert Hive Lineage event to Hdfs Lineage event - Gson GSON = new Gson(); ConvertibleHiveDataset testConvertibleDataset = createTestConvertibleDataset(config); HiveWorkUnit workUnit = new HiveWorkUnit(testConvertibleDataset); workUnit.setProp("gobblin.broker.lineageInfo.datasetResolverFactory", @@ -104,7 +116,7 @@ public class ConvertibleHiveDatasetTest { // Assert that source is correct for lineage event Assert.assertTrue(props.containsKey("gobblin.event.lineage.source")); DatasetDescriptor sourceDD = - (DatasetDescriptor) Descriptor.deserialize(props.getProperty("gobblin.event.lineage.source")); + GSON.fromJson(props.getProperty("gobblin.event.lineage.source"), DatasetDescriptor.class); Assert.assertEquals(sourceDD.getPlatform(), "file"); Assert.assertEquals(sourceDD.getName(), "/tmp/test"); Assert.assertEquals(sourceDD.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE), "db1.tb1"); @@ -112,7 +124,7 @@ public class ConvertibleHiveDatasetTest { // Assert that first dest is correct for lineage event Assert.assertTrue(props.containsKey("gobblin.event.lineage.branch.1.destination")); DatasetDescriptor destDD1 = - (DatasetDescriptor) Descriptor.deserialize(props.getProperty("gobblin.event.lineage.branch.1.destination")); + (DatasetDescriptor) firstDescriptor(props, "gobblin.event.lineage.branch.1.destination"); Assert.assertEquals(destDD1.getPlatform(), "file"); Assert.assertEquals(destDD1.getName(), "/tmp/data_nestedOrc/db1/tb1/final"); Assert.assertEquals(destDD1.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE), @@ -121,13 +133,18 @@ public class ConvertibleHiveDatasetTest { // Assert that second dest is correct for lineage event Assert.assertTrue(props.containsKey("gobblin.event.lineage.branch.2.destination")); DatasetDescriptor destDD2 = - (DatasetDescriptor) Descriptor.deserialize(props.getProperty("gobblin.event.lineage.branch.2.destination")); + (DatasetDescriptor) firstDescriptor(props, "gobblin.event.lineage.branch.2.destination"); Assert.assertEquals(destDD2.getPlatform(), "file"); Assert.assertEquals(destDD2.getName(), "/tmp/data_flattenedOrc/db1/tb1/final"); Assert.assertEquals(destDD2.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE), "db1_flattenedOrcDb.tb1_flattenedOrc"); } + private Descriptor firstDescriptor(Properties prop, String destinationKey) { + List<Descriptor> descriptors = GSON.fromJson(prop.getProperty(destinationKey), DESCRIPTOR_LIST_TYPE); + return descriptors.get(0); + } + @Test public void testFlattenedOrcConfig() throws Exception { String testConfFilePath = "convertibleHiveDatasetTest/flattenedOrc.conf"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java index 46f8ab1..c712c4d 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java @@ -733,7 +733,7 @@ public class MetricContext extends MetricRegistry implements ReportableContext, */ public MetricContext buildStrict() throws NameConflictException { if(this.parent == null) { - this.parent = RootMetricContext.get(); + hasParent(RootMetricContext.get()); } return new MetricContext(this.name, this.parent, this.tags, false); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java index c920cc3..63d7237 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java @@ -17,16 +17,10 @@ package org.apache.gobblin.metrics.event.lineage; - import java.util.Map; import org.apache.commons.lang.StringUtils; -import org.apache.gobblin.dataset.DatasetDescriptor; -import org.apache.gobblin.dataset.Descriptor; -import org.apache.gobblin.metrics.GobblinTrackingEvent; -import org.apache.gobblin.metrics.event.GobblinEventBuilder; - import com.google.common.base.Joiner; import com.google.common.collect.Maps; import com.google.gson.Gson; @@ -35,6 +29,10 @@ import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.dataset.Descriptor; +import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.event.GobblinEventBuilder; + /** * The builder builds a specific {@link GobblinTrackingEvent} whose metadata has {@value GobblinEventBuilder#EVENT_TYPE} @@ -65,8 +63,8 @@ public final class LineageEventBuilder extends GobblinEventBuilder { @Override public GobblinTrackingEvent build() { Map<String, String> dataMap = Maps.newHashMap(metadata); - dataMap.put(SOURCE, Descriptor.serialize(source)); - dataMap.put(DESTINATION, Descriptor.serialize(destination)); + dataMap.put(SOURCE, Descriptor.toJson(source)); + dataMap.put(DESTINATION, Descriptor.toJson(destination)); return new GobblinTrackingEvent(0L, namespace, name, dataMap); } @@ -126,10 +124,10 @@ public final class LineageEventBuilder extends GobblinEventBuilder { metadata.forEach((key, value) -> { switch (key) { case SOURCE: - lineageEvent.setSource(Descriptor.deserialize(value)); + lineageEvent.setSource(Descriptor.fromJson(value)); break; case DESTINATION: - lineageEvent.setDestination(Descriptor.deserialize(value)); + lineageEvent.setDestination(Descriptor.fromJson(value)); break; default: lineageEvent.addMetadata(key, value); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java index 0311df7..3e65105 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java @@ -17,7 +17,10 @@ package org.apache.gobblin.metrics.event.lineage; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -25,9 +28,9 @@ import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.gson.Gson; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -80,7 +83,6 @@ public final class LineageInfo { private static final String DATASET_RESOLVER_CONFIG_NAMESPACE = "datasetResolver"; private static final String BRANCH = "branch"; - private static final Gson GSON = new Gson(); private static final String NAME_KEY = "name"; private static final Config FALLBACK = @@ -112,7 +114,7 @@ public final class LineageInfo { } state.setProp(getKey(NAME_KEY), descriptor.getName()); - state.setProp(getKey(LineageEventBuilder.SOURCE), Descriptor.serialize(descriptor)); + state.setProp(getKey(LineageEventBuilder.SOURCE), Descriptor.toJson(descriptor)); } /** @@ -123,20 +125,41 @@ public final class LineageInfo { * is supposed to put the destination dataset information. Since different branches may concurrently put, * the method is implemented to be threadsafe * </p> + * + * @deprecated Use {@link #putDestination(List, int, State)} */ + @Deprecated public void putDestination(Descriptor destination, int branchId, State state) { + putDestination(Lists.newArrayList(destination), branchId, state); + } + + /** + * Put data {@link Descriptor}s of a destination dataset to a state + * + * @param descriptors It can be a single item list which just has the dataset descriptor or a list + * of dataset partition descriptors + */ + public void putDestination(List<Descriptor> descriptors, int branchId, State state) { + if (!hasLineageInfo(state)) { - log.warn("State has no lineage info but branch " + branchId + " puts a destination: " + GSON.toJson(destination)); + log.warn("State has no lineage info but branch " + branchId + " puts {} descriptors", descriptors.size()); return; } - log.debug(String.format("Put destination %s for branch %d", GSON.toJson(destination), branchId)); + + log.info(String.format("Put destination %s for branch %d", Descriptor.toJson(descriptors), branchId)); + synchronized (state.getProp(getKey(NAME_KEY))) { - Descriptor descriptor = resolver.resolve(destination, state); - if (descriptor == null) { - return; + List<Descriptor> resolvedDescriptors = new ArrayList<>(); + for (Descriptor descriptor : descriptors) { + Descriptor resolvedDescriptor = resolver.resolve(descriptor, state); + if (resolvedDescriptor == null) { + continue; + } + resolvedDescriptors.add(resolvedDescriptor); } - state.setProp(getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION), Descriptor.serialize(descriptor)); + state.setProp(getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION), + Descriptor.toJson(resolvedDescriptors)); } } @@ -150,8 +173,8 @@ public final class LineageInfo { Preconditions.checkArgument(states != null && !states.isEmpty()); Set<LineageEventBuilder> allEvents = Sets.newHashSet(); for (State state : states) { - Map<String, LineageEventBuilder> branchedEvents = load(state); - allEvents.addAll(branchedEvents.values()); + Map<String, Set<LineageEventBuilder>> branchedEvents = load(state); + branchedEvents.values().forEach(allEvents::addAll); } return allEvents; } @@ -161,12 +184,12 @@ public final class LineageInfo { * * @return A map from branch to its lineage info. If there is no destination info, return an empty map */ - static Map<String, LineageEventBuilder> load(State state) { + static Map<String, Set<LineageEventBuilder>> load(State state) { String name = state.getProp(getKey(NAME_KEY)); - Descriptor source = Descriptor.deserialize(state.getProp(getKey(LineageEventBuilder.SOURCE))); + Descriptor source = Descriptor.fromJson(state.getProp(getKey(LineageEventBuilder.SOURCE))); String branchedPrefix = getKey(BRANCH, ""); - Map<String, LineageEventBuilder> events = Maps.newHashMap(); + Map<String, Set<LineageEventBuilder>> events = Maps.newHashMap(); if (source == null) { return events; } @@ -180,16 +203,17 @@ public final class LineageInfo { String[] parts = key.substring(branchedPrefix.length()).split("\\."); assert parts.length == 2; String branchId = parts[0]; - LineageEventBuilder event = events.get(branchId); - if (event == null) { - event = new LineageEventBuilder(name); - event.setSource(source.copy()); - events.put(parts[0], event); - } + Set<LineageEventBuilder> branchEvents = events.computeIfAbsent(branchId, k -> new HashSet<>()); + switch (parts[1]) { case LineageEventBuilder.DESTINATION: - Descriptor destination = Descriptor.deserialize(entry.getValue().toString()); - event.setDestination(destination); + List<Descriptor> descriptors = Descriptor.fromJsonList(entry.getValue().toString()); + for (Descriptor descriptor : descriptors) { + LineageEventBuilder event = new LineageEventBuilder(name); + event.setSource(source); + event.setDestination(descriptor); + branchEvents.add(event); + } break; default: throw new RuntimeException("Unsupported lineage key: " + key); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java index 8ca6d5e..5fd7952 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java @@ -20,6 +20,7 @@ package org.apache.gobblin.metrics.event.lineage; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.gobblin.broker.SharedResourcesBrokerFactory; import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; @@ -65,9 +66,9 @@ public class LineageEventTest { destination01.addMetadata(branch, "1"); lineageInfo.putDestination(destination01, 1, state0); - Map<String, LineageEventBuilder> events = LineageInfo.load(state0); - verify(events.get("0"), topic, source, destination00); - verify(events.get("1"), topic, source, destination01); + Map<String, Set<LineageEventBuilder>> events = LineageInfo.load(state0); + verify(first(events.get("0")), topic, source, destination00); + verify(first(events.get("1")), topic, source, destination01); State state1 = new State(); lineageInfo.setSource(source, state1); @@ -78,8 +79,8 @@ public class LineageEventTest { // Test only full fledged lineage events are loaded Collection<LineageEventBuilder> eventsList = LineageInfo.load(states); Assert.assertTrue(eventsList.size() == 2); - Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), events.get("0")); - Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), events.get("1")); + Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), first(events.get("0"))); + Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), first(events.get("1"))); // There are 3 full fledged lineage events DatasetDescriptor destination12 = new DatasetDescriptor(mysql, "kafka.testTopic2"); @@ -87,8 +88,8 @@ public class LineageEventTest { lineageInfo.putDestination(destination12, 2, state1); eventsList = LineageInfo.load(states); Assert.assertTrue(eventsList.size() == 3); - Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), events.get("0")); - Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), events.get("1")); + Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), first(events.get("0"))); + Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), first(events.get("1"))); verify(getLineageEvent(eventsList, 2, mysql), topic, source, destination12); @@ -100,8 +101,8 @@ public class LineageEventTest { lineageInfo.putDestination(destination11, 1, state1); eventsList = LineageInfo.load(states); Assert.assertTrue(eventsList.size() == 4); - Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), events.get("0")); - Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), events.get("1")); + Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), first(events.get("0"))); + Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), first(events.get("1"))); // Either branch 0 or 2 of state 1 is selected LineageEventBuilder event12 = getLineageEvent(eventsList, 0, mysql); if (event12 == null) { @@ -127,8 +128,8 @@ public class LineageEventTest { PartitionDescriptor destination = new PartitionDescriptor(partitionName, destinationDataset); lineageInfo.putDestination(destination, 0, state); - Map<String, LineageEventBuilder> events = LineageInfo.load(state); - LineageEventBuilder event = events.get("0"); + Map<String, Set<LineageEventBuilder>> events = LineageInfo.load(state); + LineageEventBuilder event = first(events.get("0")); verify(event, topic, source, destination); // Verify gobblin tracking event @@ -170,4 +171,8 @@ public class LineageEventTest { Assert.assertTrue(event.getSource().equals(source)); Assert.assertTrue(event.getDestination().equals(destination)); } + + private <T> T first(Collection<T> collection) { + return collection.iterator().next(); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java index c78d098..3a28b18 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java @@ -130,6 +130,11 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch HadoopUtils.addGobblinSite(); + // Configure root metric context + List<Tag<?>> tags = Lists.newArrayList(); + tags.addAll(Tag.fromMap(AzkabanTags.getAzkabanTags())); + RootMetricContext.get(tags); + if (props.containsKey(GOBBLIN_LOG_LEVEL_KEY)) { Level logLevel = Level.toLevel(props.getProperty(GOBBLIN_LOG_LEVEL_KEY), Level.INFO); Logger.getLogger("org.apache.gobblin").setLevel(logLevel); @@ -198,9 +203,6 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch jobProps = ConfigUtils.configToProperties(resolvedJob); } - List<Tag<?>> tags = Lists.newArrayList(); - tags.addAll(Tag.fromMap(AzkabanTags.getAzkabanTags())); - RootMetricContext.get(tags); GobblinMetrics.addCustomTagsToProperties(jobProps, tags); // If the job launcher type is not specified in the job configuration, http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java ---------------------------------------------------------------------- diff --git a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java index 0b0e06d..95931a6 100644 --- a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java +++ b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java @@ -49,8 +49,9 @@ public class SalesforceSourceTest { List<WorkUnit> workUnits = source.generateWorkUnits(sourceEntity, sourceState, 20140213000000L); Assert.assertEquals(workUnits.size(), 1); - DatasetDescriptor sourceDataset = new DatasetDescriptor("salesforce", "contacts"); - Assert.assertEquals(Descriptor.serialize(sourceDataset), workUnits.get(0).getProp("gobblin.event.lineage.source")); + String expected = "{\"object-type\":\"org.apache.gobblin.dataset.DatasetDescriptor\"," + + "\"object-data\":{\"platform\":\"salesforce\",\"metadata\":{},\"name\":\"contacts\"}}"; + Assert.assertEquals(expected, workUnits.get(0).getProp("gobblin.event.lineage.source")); Assert.assertEquals(workUnits.get(0).getProp("gobblin.event.lineage.name"), "contacts"); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-utility/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java deleted file mode 100644 index 5973aa9..0000000 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.util.io; - -import lombok.AllArgsConstructor; -import lombok.RequiredArgsConstructor; - -import java.io.IOException; -import java.lang.reflect.GenericArrayType; -import java.lang.reflect.ParameterizedType; -import java.util.Collection; -import java.util.Map; - -import org.apache.commons.lang3.ClassUtils; - -import com.google.common.base.Optional; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonPrimitive; -import com.google.gson.TypeAdapter; -import com.google.gson.TypeAdapterFactory; -import com.google.gson.internal.Streams; -import com.google.gson.reflect.TypeToken; -import com.google.gson.stream.JsonReader; -import com.google.gson.stream.JsonWriter; - - -/** - * A {@link Gson} interface adapter that makes it possible to serialize and deserialize polymorphic objects. - * - * <p> - * This adapter will capture all instances of {@link #baseClass} and write them as - * {"object-type":"class.name", "object-data":"data"}, allowing for correct serialization and deserialization of - * polymorphic objects. The following types will not be captured by the adapter (i.e. they will be written by the - * default GSON writer): - * - Primitives and boxed primitives - * - Arrays - * - Collections - * - Maps - * Additionally, generic classes (e.g. class MyClass<T>) cannot be correctly decoded. - * </p> - * - * <p> - * To use: - * <pre> - * {@code - * MyClass object = new MyClass(); - * Gson gson = GsonInterfaceAdapter.getGson(MyBaseClass.class); - * String json = gson.toJson(object); - * Myclass object2 = gson.fromJson(json, MyClass.class); - * } - * </pre> - * </p> - * - * <p> - * Note: a useful case is GsonInterfaceAdapter.getGson(Object.class), which will correctly serialize / deserialize - * all types except for java generics. - * </p> - * - * @param <T> The interface or abstract type to be serialized and deserialized with {@link Gson}. - */ -@RequiredArgsConstructor -public class GsonInterfaceAdapter implements TypeAdapterFactory { - - protected static final String OBJECT_TYPE = "object-type"; - protected static final String OBJECT_DATA = "object-data"; - - private final Class<?> baseClass; - - @Override - public <R> TypeAdapter<R> create(Gson gson, TypeToken<R> type) { - if (ClassUtils.isPrimitiveOrWrapper(type.getRawType()) || type.getType() instanceof GenericArrayType - || CharSequence.class.isAssignableFrom(type.getRawType()) - || (type.getType() instanceof ParameterizedType && (Collection.class.isAssignableFrom(type.getRawType()) - || Map.class.isAssignableFrom(type.getRawType())))) { - // delegate primitives, arrays, collections, and maps - return null; - } - if (!this.baseClass.isAssignableFrom(type.getRawType())) { - // delegate anything not assignable from base class - return null; - } - TypeAdapter<R> adapter = new InterfaceAdapter<>(gson, this, type); - return adapter; - } - - @AllArgsConstructor - private static class InterfaceAdapter<R> extends TypeAdapter<R> { - - private final Gson gson; - private final TypeAdapterFactory factory; - private final TypeToken<R> typeToken; - - @Override - public void write(JsonWriter out, R value) throws IOException { - if (Optional.class.isAssignableFrom(this.typeToken.getRawType())) { - Optional opt = (Optional) value; - if (opt != null && opt.isPresent()) { - Object actualValue = opt.get(); - writeObject(actualValue, out); - } else { - out.beginObject(); - out.endObject(); - } - } else { - writeObject(value, out); - } - } - - @Override - public R read(JsonReader in) throws IOException { - JsonElement element = Streams.parse(in); - if (element.isJsonNull()) { - return readNull(); - } - JsonObject jsonObject = element.getAsJsonObject(); - - if (this.typeToken.getRawType() == Optional.class) { - if (jsonObject.has(OBJECT_TYPE)) { - return (R) Optional.of(readValue(jsonObject, null)); - } else if (jsonObject.entrySet().isEmpty()) { - return (R) Optional.absent(); - } else { - throw new IOException("No class found for Optional value."); - } - } - return this.readValue(jsonObject, this.typeToken); - } - - private <S> S readNull() { - if (this.typeToken.getRawType() == Optional.class) { - return (S) Optional.absent(); - } - return null; - } - - private <S> void writeObject(S value, JsonWriter out) throws IOException { - if (value != null) { - JsonObject jsonObject = new JsonObject(); - jsonObject.add(OBJECT_TYPE, new JsonPrimitive(value.getClass().getName())); - TypeAdapter<S> delegate = - (TypeAdapter<S>) this.gson.getDelegateAdapter(this.factory, TypeToken.get(value.getClass())); - jsonObject.add(OBJECT_DATA, delegate.toJsonTree(value)); - Streams.write(jsonObject, out); - } else { - out.nullValue(); - } - } - - private <S> S readValue(JsonObject jsonObject, TypeToken<S> defaultTypeToken) throws IOException { - try { - TypeToken<S> actualTypeToken; - if (jsonObject.isJsonNull()) { - return null; - } else if (jsonObject.has(OBJECT_TYPE)) { - String className = jsonObject.get(OBJECT_TYPE).getAsString(); - Class<S> klazz = (Class<S>) Class.forName(className); - actualTypeToken = TypeToken.get(klazz); - } else if (defaultTypeToken != null) { - actualTypeToken = defaultTypeToken; - } else { - throw new IOException("Could not determine TypeToken."); - } - TypeAdapter<S> delegate = this.gson.getDelegateAdapter(this.factory, actualTypeToken); - S value = delegate.fromJsonTree(jsonObject.get(OBJECT_DATA)); - return value; - } catch (ClassNotFoundException cnfe) { - throw new IOException(cnfe); - } - } - - } - - public static <T> Gson getGson(Class<T> clazz) { - Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonInterfaceAdapter(clazz)).create(); - return gson; - } -}
