Repository: beam Updated Branches: refs/heads/master ae6860db5 -> 83419241a
[BEAM-1466] JSON utils extension Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/40bc64c9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/40bc64c9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/40bc64c9 Branch: refs/heads/master Commit: 40bc64c9b2e4c43ec2cd643123d0954fda3f7cf0 Parents: ae6860d Author: Aviem Zur <[email protected]> Authored: Sat Feb 11 17:06:45 2017 +0200 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Mon Feb 27 21:10:41 2017 +0100 ---------------------------------------------------------------------- sdks/java/extensions/jackson/pom.xml | 124 ++++++++++ .../beam/sdk/extensions/jackson/AsJsons.java | 76 ++++++ .../beam/sdk/extensions/jackson/ParseJsons.java | 75 ++++++ .../sdk/extensions/jackson/package-info.java | 22 ++ .../jackson/JacksonTransformsTest.java | 242 +++++++++++++++++++ sdks/java/extensions/pom.xml | 1 + 6 files changed, 540 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/40bc64c9/sdks/java/extensions/jackson/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/jackson/pom.xml b/sdks/java/extensions/jackson/pom.xml new file mode 100644 index 0000000..be5c953 --- /dev/null +++ b/sdks/java/extensions/jackson/pom.xml @@ -0,0 +1,124 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-extensions-parent</artifactId> + <version>0.6.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-sdks-java-extensions-json-jackson</artifactId> + <name>Apache Beam :: SDKs :: Java :: Extensions :: Jackson</name> + <description> + Jackson extension provides PTransforms for deserializing and generating JSON strings. + </description> + + <build> + <pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <artifactSet> + <includes> + <include>com.google.guava:guava</include> + </includes> + </artifactSet> + <relocations> + <relocation> + <pattern>com.google.common</pattern> + <shadedPattern>org.apache.beam.sdk.extensions.jackson.repackaged.com.google.common</shadedPattern> + </relocation> + <relocation> + <pattern>com.google.thirdparty</pattern> + <shadedPattern>org.apache.beam.sdk.extensions.jackson.repackaged.com.google.thirdparty</shadedPattern> + </relocation> + </relocations> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <!-- Dependencies for tests --> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/40bc64c9/sdks/java/extensions/jackson/src/main/java/org/apache/beam/sdk/extensions/jackson/AsJsons.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/jackson/src/main/java/org/apache/beam/sdk/extensions/jackson/AsJsons.java b/sdks/java/extensions/jackson/src/main/java/org/apache/beam/sdk/extensions/jackson/AsJsons.java new file mode 100644 index 0000000..a9c7a9f --- /dev/null +++ b/sdks/java/extensions/jackson/src/main/java/org/apache/beam/sdk/extensions/jackson/AsJsons.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.jackson; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; +import java.io.IOException; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.PCollection; + +/** + * {@link PTransform} for serializing objects to JSON {@link String Strings}. + * Transforms a {@code PCollection<InputT>} into a {@link PCollection} of JSON + * {@link String Strings} representing objects in the original {@link PCollection} using Jackson. + */ +public class AsJsons<InputT> extends PTransform<PCollection<InputT>, PCollection<String>> { + private static final ObjectMapper DEFAULT_MAPPER = new ObjectMapper(); + + private final Class<? extends InputT> inputClass; + private ObjectMapper customMapper; + + /** + * Creates a {@link AsJsons} {@link PTransform} that will transform a {@code PCollection<InputT>} + * into a {@link PCollection} of JSON {@link String Strings} representing those objects using a + * Jackson {@link ObjectMapper}. + */ + public static <OutputT> AsJsons<OutputT> of(Class<? extends OutputT> outputClass) { + return new AsJsons<>(outputClass); + } + + private AsJsons(Class<? extends InputT> outputClass) { + this.inputClass = outputClass; + } + + /** + * Use custom Jackson {@link ObjectMapper} instead of the default one. + */ + public AsJsons<InputT> withMapper(ObjectMapper mapper) { + AsJsons<InputT> newTransform = new AsJsons<>(inputClass); + newTransform.customMapper = mapper; + return newTransform; + } + + @Override + public PCollection<String> expand(PCollection<InputT> input) { + return input.apply(MapElements.via(new SimpleFunction<InputT, String>() { + @Override + public String apply(InputT input) { + try { + ObjectMapper mapper = Optional.fromNullable(customMapper).or(DEFAULT_MAPPER); + return mapper.writeValueAsString(input); + } catch (IOException e) { + throw new RuntimeException( + "Failed to serialize " + inputClass.getName() + " value: " + input, e); + } + } + })); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/40bc64c9/sdks/java/extensions/jackson/src/main/java/org/apache/beam/sdk/extensions/jackson/ParseJsons.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/jackson/src/main/java/org/apache/beam/sdk/extensions/jackson/ParseJsons.java b/sdks/java/extensions/jackson/src/main/java/org/apache/beam/sdk/extensions/jackson/ParseJsons.java new file mode 100644 index 0000000..f3bff65 --- /dev/null +++ b/sdks/java/extensions/jackson/src/main/java/org/apache/beam/sdk/extensions/jackson/ParseJsons.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.jackson; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; +import java.io.IOException; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.PCollection; + +/** + * {@link PTransform} for parsing JSON {@link String Strings}. + * Parse {@link PCollection} of {@link String Strings} in JSON format into a {@link PCollection} of + * objects represented by those JSON {@link String Strings} using Jackson. + */ +public class ParseJsons<OutputT> extends PTransform<PCollection<String>, PCollection<OutputT>> { + private static final ObjectMapper DEFAULT_MAPPER = new ObjectMapper(); + + private final Class<? extends OutputT> outputClass; + private ObjectMapper customMapper; + + /** + * Creates a {@link ParseJsons} {@link PTransform} that will parse JSON {@link String Strings} + * into a {@code PCollection<OutputT>} using a Jackson {@link ObjectMapper}. + */ + public static <OutputT> ParseJsons<OutputT> of(Class<? extends OutputT> outputClass) { + return new ParseJsons<>(outputClass); + } + + private ParseJsons(Class<? extends OutputT> outputClass) { + this.outputClass = outputClass; + } + + /** + * Use custom Jackson {@link ObjectMapper} instead of the default one. + */ + public ParseJsons<OutputT> withMapper(ObjectMapper mapper) { + ParseJsons<OutputT> newTransform = new ParseJsons<>(outputClass); + newTransform.customMapper = mapper; + return newTransform; + } + + @Override + public PCollection<OutputT> expand(PCollection<String> input) { + return input.apply(MapElements.via(new SimpleFunction<String, OutputT>() { + @Override + public OutputT apply(String input) { + try { + ObjectMapper mapper = Optional.fromNullable(customMapper).or(DEFAULT_MAPPER); + return mapper.readValue(input, outputClass); + } catch (IOException e) { + throw new RuntimeException( + "Failed to parse a " + outputClass.getName() + " from JSON value: " + input, e); + } + } + })); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/40bc64c9/sdks/java/extensions/jackson/src/main/java/org/apache/beam/sdk/extensions/jackson/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/jackson/src/main/java/org/apache/beam/sdk/extensions/jackson/package-info.java b/sdks/java/extensions/jackson/src/main/java/org/apache/beam/sdk/extensions/jackson/package-info.java new file mode 100644 index 0000000..6460f9d --- /dev/null +++ b/sdks/java/extensions/jackson/src/main/java/org/apache/beam/sdk/extensions/jackson/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Utilities for parsing and creating JSON serialized objects. + */ +package org.apache.beam.sdk.extensions.jackson; http://git-wip-us.apache.org/repos/asf/beam/blob/40bc64c9/sdks/java/extensions/jackson/src/test/java/org/apache/beam/sdk/extensions/jackson/JacksonTransformsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/jackson/src/test/java/org/apache/beam/sdk/extensions/jackson/JacksonTransformsTest.java b/sdks/java/extensions/jackson/src/test/java/org/apache/beam/sdk/extensions/jackson/JacksonTransformsTest.java new file mode 100644 index 0000000..293ce84 --- /dev/null +++ b/sdks/java/extensions/jackson/src/test/java/org/apache/beam/sdk/extensions/jackson/JacksonTransformsTest.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.jackson; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.google.common.collect.Iterables; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; + +/** + * Test Jackson transforms {@link ParseJsons} and {@link AsJsons}. + */ +public class JacksonTransformsTest { + private static final List<String> VALID_JSONS = + Arrays.asList( + "{\"myString\":\"abc\",\"myInt\":3}", + "{\"myString\":\"def\",\"myInt\":4}" + ); + + private static final List<String> INVALID_JSONS = + Arrays.asList( + "{myString:\"abc\",\"myInt\":3,\"other\":1}", + "{", + "" + ); + + private static final List<String> EMPTY_JSONS = + Arrays.asList( + "{}", + "{}" + ); + + + private static final List<String> EXTRA_PROPERTIES_JSONS = + Arrays.asList( + "{\"myString\":\"abc\",\"myInt\":3,\"other\":1}", + "{\"myString\":\"def\",\"myInt\":4}" + ); + + private static final List<MyPojo> POJOS = + Arrays.asList( + new MyPojo("abc", 3), + new MyPojo("def", 4) + ); + + private static final List<MyEmptyBean> EMPTY_BEANS = + Arrays.asList( + new MyEmptyBean("abc", 3), + new MyEmptyBean("def", 4) + ); + + @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + + @Test + public void parseValidJsons() { + PCollection<MyPojo> output = + pipeline + .apply(Create.of(VALID_JSONS)) + .apply(ParseJsons.of(MyPojo.class)).setCoder(SerializableCoder.of(MyPojo.class)); + + PAssert.that(output).containsInAnyOrder(POJOS); + + pipeline.run(); + } + + @Test(expected = Pipeline.PipelineExecutionException.class) + public void failParsingInvalidJsons() { + PCollection<MyPojo> output = + pipeline + .apply(Create.of(Iterables.concat(VALID_JSONS, INVALID_JSONS))) + .apply(ParseJsons.of(MyPojo.class)).setCoder(SerializableCoder.of(MyPojo.class)); + + PAssert.that(output).containsInAnyOrder(POJOS); + + pipeline.run(); + } + + @Test(expected = Pipeline.PipelineExecutionException.class) + public void failParsingWithoutCustomMapper() { + PCollection<MyPojo> output = + pipeline + .apply(Create.of(EXTRA_PROPERTIES_JSONS)) + .apply(ParseJsons.of(MyPojo.class)).setCoder(SerializableCoder.of(MyPojo.class)); + + PAssert.that(output).empty(); + + pipeline.run(); + } + + @Test + public void parseUsingCustomMapper() { + ObjectMapper customMapper = + new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + PCollection<MyPojo> output = + pipeline + .apply(Create.of(EXTRA_PROPERTIES_JSONS)) + .apply(ParseJsons.of(MyPojo.class) + .withMapper(customMapper)).setCoder(SerializableCoder.of(MyPojo.class)); + + PAssert.that(output).containsInAnyOrder(POJOS); + + pipeline.run(); + } + + @Test + public void writeValidObjects() { + PCollection<String> output = + pipeline + .apply(Create.of(POJOS)) + .apply(AsJsons.of(MyPojo.class)).setCoder(StringUtf8Coder.of()); + + PAssert.that(output).containsInAnyOrder(VALID_JSONS); + + pipeline.run(); + } + + @Test(expected = Pipeline.PipelineExecutionException.class) + public void failWritingWithoutCustomMapper() { + PCollection<String> output = + pipeline + .apply(Create.of(EMPTY_BEANS)) + .apply(AsJsons.of(MyEmptyBean.class)).setCoder(StringUtf8Coder.of()); + + pipeline.run(); + } + + + @Test + public void writeUsingCustomMapper() { + ObjectMapper customMapper = + new ObjectMapper().configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + + PCollection<String> output = + pipeline + .apply(Create.of(EMPTY_BEANS)) + .apply(AsJsons.of(MyEmptyBean.class) + .withMapper(customMapper)).setCoder(StringUtf8Coder.of()); + + PAssert.that(output).containsInAnyOrder(EMPTY_JSONS); + + pipeline.run(); + } + + /** + * Pojo for tests. + */ + @SuppressWarnings({"WeakerAccess", "unused"}) + public static class MyPojo implements Serializable { + private String myString; + private int myInt; + + public MyPojo() { + } + + public MyPojo(String myString, int myInt) { + this.myString = myString; + this.myInt = myInt; + } + + public String getMyString() { + return myString; + } + + public void setMyString(String myString) { + this.myString = myString; + } + + public int getMyInt() { + return myInt; + } + + public void setMyInt(int myInt) { + this.myInt = myInt; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (!(o instanceof MyPojo)) { + return false; + } + + MyPojo myPojo = (MyPojo) o; + + return myInt == myPojo.myInt && (myString != null ? myString.equals(myPojo.myString) : + myPojo.myString == null); + } + + @Override + public int hashCode() { + int result = myString != null ? myString.hashCode() : 0; + result = 31 * result + myInt; + return result; + } + } + + /** + * Pojo for tests. + */ + @SuppressWarnings({"WeakerAccess", "unused"}) + public static class MyEmptyBean implements Serializable { + private String myString; + private int myInt; + + public MyEmptyBean(String myString, int myInt) { + this.myString = myString; + this.myInt = myInt; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/40bc64c9/sdks/java/extensions/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml index 99e0cb6..26d92de 100644 --- a/sdks/java/extensions/pom.xml +++ b/sdks/java/extensions/pom.xml @@ -32,6 +32,7 @@ <name>Apache Beam :: SDKs :: Java :: Extensions</name> <modules> + <module>jackson</module> <module>join-library</module> <module>sorter</module> </modules>
