[FLINK-1391] Add support for using Avro-POJOs and Avro types with Kryo Conflicts: flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e39bc67 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e39bc67 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e39bc67 Branch: refs/heads/master Commit: 7e39bc67d22766c72e040bcf35d48da817b7a2f2 Parents: cfce493 Author: Robert Metzger <metzg...@web.de> Authored: Mon Jan 12 21:11:09 2015 +0100 Committer: Robert Metzger <rmetz...@apache.org> Committed: Wed Feb 18 15:52:08 2015 +0100 ---------------------------------------------------------------------- docs/example_connectors.md | 27 + .../org/apache/flink/api/io/avro/.gitignore | 1 + .../apache/flink/api/io/avro/AvroPojoTest.java | 157 ++++ .../common/typeutils/ComparatorTestBase.java | 4 +- flink-java/pom.xml | 9 +- .../flink/api/java/typeutils/AvroTypeInfo.java | 72 ++ .../flink/api/java/typeutils/TypeExtractor.java | 8 +- .../java/typeutils/runtime/PojoComparator.java | 7 +- flink-staging/flink-avro/pom.xml | 30 +- .../flink/api/avro/EncoderDecoderTest.java | 9 +- .../api/io/avro/AvroRecordInputFormatTest.java | 144 +++- .../flink/api/io/avro/generated/Colors.java | 32 - .../flink/api/io/avro/generated/User.java | 755 ------------------- .../src/test/resources/avro/user.avsc | 10 +- pom.xml | 2 +- tools/maven/suppressions.xml | 1 + 16 files changed, 443 insertions(+), 825 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/docs/example_connectors.md ---------------------------------------------------------------------- diff --git a/docs/example_connectors.md b/docs/example_connectors.md index 929338e..dccddd0 100644 --- a/docs/example_connectors.md +++ b/docs/example_connectors.md @@ -69,6 +69,33 @@ users to use all existing Hadoop input formats with Flink. This section shows some examples for connecting Flink to other systems. [Read more about Hadoop compatibility in Flink](hadoop_compatibility.html). +## Avro support in Flink + +Flink has extensive build-in support for [Apache Avro](http://avro.apache.org/). This allows to easily read from Avro files with Flink. +Also, the serialization framework of Flink is able to handle classes generated from Avro schemas. + +In order to read data from an Avro file, you have to specify an `AvroInputFormat`. + +**Example**: + +~~~java +AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); +DataSet<User> usersDS = env.createInput(users); +~~~ + +Note that `User` is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example: + +~~~java +usersDS.groupBy("name") +~~~ + + +Note that using the `GenericData.Record` type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use. + +Flink's POJO field selection also works with POJOs generated from Avro. However, the usage is only possible if the field types are written correctly to the generated class. If a field is of type `Object` you can not use the field as a join or grouping key. +Specifying a field in Avro like this `{"name": "type_double_test", "type": "double"},` works fine, however specifying it as a UNION-type with only one field (`{"name": "type_double_test", "type": ["double"]},`) will generate a field of type `Object`. Note that specifying nullable types (`{"name": "type_double_test", "type": ["null", "double"]},`) is possible! + + ### Access Microsoft Azure Table Storage http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore ---------------------------------------------------------------------- diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore new file mode 100644 index 0000000..dc9b237 --- /dev/null +++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore @@ -0,0 +1 @@ +generated \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java new file mode 100644 index 0000000..6ff4836 --- /dev/null +++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.io.avro; + +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.io.avro.generated.User; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.AvroInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.Path; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.util.Collector; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.util.Arrays; + +@RunWith(Parameterized.class) +public class AvroPojoTest extends MultipleProgramsTestBase { + public AvroPojoTest(ExecutionMode mode) { + super(mode); + } + + private File inFile; + private String resultPath; + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + inFile = tempFolder.newFile(); + AvroRecordInputFormatTest.writeTestFile(inFile); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testSimpleAvroRead() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS = env.createInput(users) + // null map type because the order changes in different JVMs (hard to test) + .map(new MapFunction<User, User>() { + @Override + public User map(User value) throws Exception { + value.setTypeMap(null); + return value; + } + }); + + usersDS.writeAsText(resultPath); + + env.execute("Simple Avro read job"); + + + expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n" + + "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n"; + } + + @Test + public void testKeySelection() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS = env.createInput(users); + + DataSet<Tuple2<String, Integer>> res = usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { + @Override + public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { + for(User u : values) { + out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); + } + } + }); + + res.writeAsText(resultPath); + env.execute("Avro Key selection"); + + + expected = "(Alyssa,1)\n(Charlie,1)\n"; + } + + /** + * Test some know fields for grouping on + */ + @Test + public void testAllFields() throws Exception { + for(String fieldName : Arrays.asList("name", "type_enum", "type_double_test")) { + testField(fieldName); + } + } + + private void testField(final String fieldName) throws Exception { + before(); + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS = env.createInput(users); + + DataSet<Object> res = usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() { + @Override + public void reduce(Iterable<User> values, Collector<Object> out) throws Exception { + for(User u : values) { + out.collect(u.get(fieldName)); + } + } + }); + res.writeAsText(resultPath); + env.execute("Simple Avro read job"); + if(fieldName.equals("name")) { + expected = "Alyssa\nCharlie"; + } else if(fieldName.equals("type_enum")) { + expected = "GREEN\nRED\n"; + } else if(fieldName.equals("type_double_test")) { + expected = "123.45\n1.337\n"; + } else { + Assert.fail("Unknown field"); + } + + after(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java index e6a8cb6..bc5c6b6 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java @@ -448,7 +448,7 @@ public abstract class ComparatorTestBase<T> { } // -------------------------------------------------------------------------------------------- - protected static final class TestOutputView extends DataOutputStream implements DataOutputView { + public static final class TestOutputView extends DataOutputStream implements DataOutputView { public TestOutputView() { super(new ByteArrayOutputStream(4096)); @@ -474,7 +474,7 @@ public abstract class ComparatorTestBase<T> { } } - protected static final class TestInputView extends DataInputStream implements DataInputView { + public static final class TestInputView extends DataInputStream implements DataInputView { public TestInputView(byte[] data) { super(new ByteArrayInputStream(data)); http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-java/pom.xml ---------------------------------------------------------------------- diff --git a/flink-java/pom.xml b/flink-java/pom.xml index 3f668ce..3881708 100644 --- a/flink-java/pom.xml +++ b/flink-java/pom.xml @@ -63,6 +63,13 @@ under the License. <version>0.5.1</version> </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>chill-avro_2.10</artifactId> + <version>0.5.1</version> + </dependency> + + <!-- guava needs to be in "provided" scope, to make sure it is not included into the jars by the shading --> <dependency> <groupId>com.google.guava</groupId> @@ -87,7 +94,7 @@ under the License. </dependencies> - <!-- Because flink-scala uses it in tests --> + <!-- Because flink-scala and flink-avro uses it in tests --> <build> <plugins> <plugin> http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java new file mode 100644 index 0000000..ccdf7f7 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.typeutils; + +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; + +/** + * Special type information to generate a POJO type info from an avro schema. + * + * Proceeding: It uses a regular pojo type analysis and replaces all GenericType<CharSequence> + * with a GenericType<avro.Utf8> + * @param <T> + */ +public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> { + public AvroTypeInfo(Class<T> typeClass) { + super(typeClass, generateFieldsFromAvroSchema(typeClass)); + } + + private static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(Class<T> typeClass) { + PojoTypeExtractor pte = new PojoTypeExtractor(); + TypeInformation ti = pte.analyzePojo(typeClass, new ArrayList<Type>(), null); + + if(!(ti instanceof PojoTypeInfo)) { + throw new IllegalStateException("Expecting type to be a PojoTypeInfo"); + } + PojoTypeInfo pti = (PojoTypeInfo) ti; + List<PojoField> newFields = new ArrayList<PojoField>(pti.getTotalFields()); + + for(int i = 0; i < pti.getTotalFields(); i++) { + PojoField f = pti.getPojoFieldAt(i); + TypeInformation newType = f.type; + // check if type is a CharSequence + if(newType instanceof GenericTypeInfo) { + if((newType).getTypeClass().equals(CharSequence.class)) { + // replace the type by a org.apache.avro.util.Utf8 + newType = new GenericTypeInfo(org.apache.avro.util.Utf8.class); + } + } + PojoField newField = new PojoField(f.field, newType); + newFields.add(newField); + } + return newFields; + } + + private static class PojoTypeExtractor extends TypeExtractor { + private PojoTypeExtractor() { + super(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 63273f8..73cedd1 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -30,6 +30,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.commons.lang3.Validate; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.CrossFunction; @@ -71,7 +72,7 @@ public class TypeExtractor { // in an endless recursion private Set<Class<?>> alreadySeen; - private TypeExtractor() { + protected TypeExtractor() { alreadySeen = new HashSet<Class<?>>(); } @@ -1156,6 +1157,11 @@ public class TypeExtractor { return (TypeInformation<OUT>) new EnumTypeInfo(clazz); } + // special case for POJOs generated by Avro. + if(SpecificRecordBase.class.isAssignableFrom(clazz)) { + return (TypeInformation<OUT>) new AvroTypeInfo(clazz); + } + if (alreadySeen.contains(clazz)) { return new GenericTypeInfo<OUT>(clazz); } http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java index ae4a806..c0c7797 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java @@ -210,7 +210,12 @@ public final class PojoComparator<T> extends CompositeTypeComparator<T> implemen int code = 0; for (; i < this.keyFields.length; i++) { code *= TupleComparatorBase.HASH_SALT[i & 0x1F]; - code += this.comparators[i].hash(accessField(keyFields[i], value)); + try { + code += this.comparators[i].hash(accessField(keyFields[i], value)); + }catch(NullPointerException npe) { + throw new RuntimeException("A NullPointerException occured while accessing a key field in a POJO. " + + "Most likely, the value grouped/joined on is null. Field name: "+keyFields[i].getName(), npe); + } } return code; http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-staging/flink-avro/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/pom.xml b/flink-staging/flink-avro/pom.xml index 90a7be2..dac3dba 100644 --- a/flink-staging/flink-avro/pom.xml +++ b/flink-staging/flink-avro/pom.xml @@ -41,7 +41,8 @@ under the License. <artifactId>flink-java</artifactId> <version>${project.version}</version> </dependency> - + + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> @@ -53,6 +54,14 @@ under the License. <artifactId>avro</artifactId> <!-- version is derived from base module --> </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.flink</groupId> @@ -116,7 +125,24 @@ under the License. </execution> </executions> </plugin> - + <!-- Generate Test class from avro schema --> + <plugin> + <groupId>org.apache.avro</groupId> + <artifactId>avro-maven-plugin</artifactId> + <version>1.7.7</version> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>schema</goal> + </goals> + <configuration> + <testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory> + <testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory> + </configuration> + </execution> + </executions> + </plugin> </plugins> <pluginManagement> http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java index 0724457..8f14cb3 100644 --- a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java +++ b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java @@ -29,9 +29,11 @@ import java.util.List; import java.util.Map; import java.util.Random; +import org.apache.avro.generic.GenericData; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.flink.api.io.avro.generated.Colors; +import org.apache.flink.api.io.avro.generated.Fixed16; import org.apache.flink.api.io.avro.generated.User; import org.apache.flink.util.StringUtils; import org.junit.Test; @@ -232,8 +234,11 @@ public class EncoderDecoderTest { map.put("1", 1L); map.put("2", 2L); map.put("3", 3L); - - User user = new User("Freudenreich", 1337, "macintosh gray", 1234567890L, 3.1415926, null, true, strings, bools, null, Colors.GREEN, map); + + byte[] b = new byte[16]; + new Random().nextBytes(b); + Fixed16 f = new Fixed16(b); + User user = new User("Freudenreich", 1337, "macintosh gray", 1234567890L, 3.1415926, null, true, strings, bools, null, Colors.GREEN, map, f, new Boolean(true)); testObjectSerialization(user); } http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java index d8d8b46..1ec4a8a 100644 --- a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java +++ b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java @@ -27,13 +27,24 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileWriter; +import org.apache.avro.file.FileReader; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; +import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.util.Utf8; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.io.avro.generated.Colors; import org.apache.flink.api.io.avro.generated.User; import org.apache.flink.api.java.io.AvroInputFormat; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; @@ -48,7 +59,7 @@ import org.junit.Test; */ public class AvroRecordInputFormatTest { - private File testFile; + public File testFile; final static String TEST_NAME = "Alyssa"; @@ -65,24 +76,25 @@ public class AvroRecordInputFormatTest { final static String TEST_MAP_KEY2 = "KEY 2"; final static long TEST_MAP_VALUE2 = 17554L; - @Before - public void createFiles() throws IOException { - testFile = File.createTempFile("AvroInputFormatTest", null); - + private Schema userSchema = new User().getSchema(); + + + public static void writeTestFile(File testFile) throws IOException { ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>(); stringArray.add(TEST_ARRAY_STRING_1); stringArray.add(TEST_ARRAY_STRING_2); - + ArrayList<Boolean> booleanArray = new ArrayList<Boolean>(); booleanArray.add(TEST_ARRAY_BOOLEAN_1); booleanArray.add(TEST_ARRAY_BOOLEAN_2); - + HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>(); longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1); longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2); - - + + User user1 = new User(); + user1.setName(TEST_NAME); user1.setFavoriteNumber(256); user1.setTypeDoubleTest(123.45d); @@ -91,22 +103,24 @@ public class AvroRecordInputFormatTest { user1.setTypeArrayBoolean(booleanArray); user1.setTypeEnum(TEST_ENUM_COLOR); user1.setTypeMap(longMap); - + // Construct via builder User user2 = User.newBuilder() - .setName("Charlie") - .setFavoriteColor("blue") - .setFavoriteNumber(null) - .setTypeBoolTest(false) - .setTypeDoubleTest(1.337d) - .setTypeNullTest(null) - .setTypeLongTest(1337L) - .setTypeArrayString(new ArrayList<CharSequence>()) - .setTypeArrayBoolean(new ArrayList<Boolean>()) - .setTypeNullableArray(null) - .setTypeEnum(Colors.RED) - .setTypeMap(new HashMap<CharSequence, Long>()) - .build(); + .setName("Charlie") + .setFavoriteColor("blue") + .setFavoriteNumber(null) + .setTypeBoolTest(false) + .setTypeDoubleTest(1.337d) + .setTypeNullTest(null) + .setTypeLongTest(1337L) + .setTypeArrayString(new ArrayList<CharSequence>()) + .setTypeArrayBoolean(new ArrayList<Boolean>()) + .setTypeNullableArray(null) + .setTypeEnum(Colors.RED) + .setTypeMap(new HashMap<CharSequence, Long>()) + .setTypeFixed(null) + .setTypeUnion(null) + .build(); DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class); DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter); dataFileWriter.create(user1.getSchema(), testFile); @@ -114,7 +128,17 @@ public class AvroRecordInputFormatTest { dataFileWriter.append(user2); dataFileWriter.close(); } - + @Before + public void createFiles() throws IOException { + testFile = File.createTempFile("AvroInputFormatTest", null); + writeTestFile(testFile); + } + + + /** + * Test if the AvroInputFormat is able to properly read data from an avro file. + * @throws IOException + */ @Test public void testDeserialisation() throws IOException { Configuration parameters = new Configuration(); @@ -159,9 +183,79 @@ public class AvroRecordInputFormatTest { format.close(); } - + + /** + * Test if the Flink serialization is able to properly process GenericData.Record types. + * Usually users of Avro generate classes (POJOs) from Avro schemas. + * However, if generated classes are not available, one can also use GenericData.Record. + * It is an untyped key-value record which is using a schema to validate the correctness of the data. + * + * It is not recommended to use GenericData.Record with Flink. Use generated POJOs instead. + */ + @Test + public void testDeserializeToGenericType() throws IOException { + DatumReader<GenericData.Record> datumReader = new GenericDatumReader<GenericData.Record>(userSchema); + + FileReader<GenericData.Record> dataFileReader = DataFileReader.openReader(testFile, datumReader); + // initialize Record by reading it from disk (thats easier than creating it by hand) + GenericData.Record rec = new GenericData.Record(userSchema); + dataFileReader.next(rec); + // check if record has been read correctly + assertNotNull(rec); + assertEquals("name not equal", TEST_NAME, rec.get("name").toString() ); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString()); + assertEquals(null, rec.get("type_long_test")); // it is null for the first record. + + // now serialize it with our framework: + TypeInformation<GenericData.Record> te = (TypeInformation<GenericData.Record>) TypeExtractor.createTypeInfo(GenericData.Record.class); + TypeSerializer<GenericData.Record> tser = te.createSerializer(); + ComparatorTestBase.TestOutputView target = new ComparatorTestBase.TestOutputView(); + tser.serialize(rec, target); + + GenericData.Record newRec = tser.deserialize(target.getInputView()); + + // check if it is still the same + assertNotNull(newRec); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString()); + assertEquals("name not equal", TEST_NAME, newRec.get("name").toString() ); + assertEquals(null, newRec.get("type_long_test")); + + } + + /** + * This test validates proper serialization with specific (generated POJO) types. + */ + @Test + public void testDeserializeToSpecificType() throws IOException { + + DatumReader<User> datumReader = new SpecificDatumReader<User>(userSchema); + + FileReader<User> dataFileReader = DataFileReader.openReader(testFile, datumReader); + User rec = dataFileReader.next(); + + // check if record has been read correctly + assertNotNull(rec); + assertEquals("name not equal", TEST_NAME, rec.get("name").toString() ); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString()); + + // now serialize it with our framework: + TypeInformation<User> te = (TypeInformation<User>) TypeExtractor.createTypeInfo(User.class); + TypeSerializer<User> tser = te.createSerializer(); + ComparatorTestBase.TestOutputView target = new ComparatorTestBase.TestOutputView(); + tser.serialize(rec, target); + + User newRec = tser.deserialize(target.getInputView()); + + // check if it is still the same + assertNotNull(newRec); + assertEquals("name not equal", TEST_NAME, newRec.getName().toString() ); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.getTypeEnum().toString() ); + } + + @After public void deleteFiles() { testFile.delete(); } + } http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java deleted file mode 100644 index 58e1f5c..0000000 --- a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -/** - * Autogenerated by Avro - * - * DO NOT EDIT DIRECTLY - */ -package org.apache.flink.api.io.avro.generated; -@SuppressWarnings("all") -@org.apache.avro.specific.AvroGenerated -public enum Colors { - RED, GREEN, BLUE ; - public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"enum\",\"name\":\"Colors\",\"namespace\":\"org.apache.flink.api.io.avro.generated\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}"); - public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java deleted file mode 100644 index 505857e..0000000 --- a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java +++ /dev/null @@ -1,755 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -/** - * Autogenerated by Avro - * - * DO NOT EDIT DIRECTLY - */ -package org.apache.flink.api.io.avro.generated; -@SuppressWarnings("all") -@org.apache.avro.specific.AvroGenerated -public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { - public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.io.avro.generated\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]},{\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]},{\"name\":\"type_double_test\",\"type\":[\"double\"]},{\"name\":\"type_null_test\",\"type\":[\"null\"]},{\"name\":\"type_bool_test\",\"type\":[\"boolean\"]},{\"name\":\"type_array_string\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\",\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\",\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"name\":\"type _map\",\"type\":{\"type\":\"map\",\"values\":\"long\"}}]}"); - public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } - @Deprecated public java.lang.CharSequence name; - @Deprecated public java.lang.Integer favorite_number; - @Deprecated public java.lang.CharSequence favorite_color; - @Deprecated public java.lang.Long type_long_test; - @Deprecated public java.lang.Object type_double_test; - @Deprecated public java.lang.Object type_null_test; - @Deprecated public java.lang.Object type_bool_test; - @Deprecated public java.util.List<java.lang.CharSequence> type_array_string; - @Deprecated public java.util.List<java.lang.Boolean> type_array_boolean; - @Deprecated public java.util.List<java.lang.CharSequence> type_nullable_array; - @Deprecated public org.apache.flink.api.io.avro.generated.Colors type_enum; - @Deprecated public java.util.Map<java.lang.CharSequence,java.lang.Long> type_map; - - /** - * Default constructor. Note that this does not initialize fields - * to their default values from the schema. If that is desired then - * one should use {@link \#newBuilder()}. - */ - public User() {} - - /** - * All-args constructor. - */ - public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color, java.lang.Long type_long_test, java.lang.Object type_double_test, java.lang.Object type_null_test, java.lang.Object type_bool_test, java.util.List<java.lang.CharSequence> type_array_string, java.util.List<java.lang.Boolean> type_array_boolean, java.util.List<java.lang.CharSequence> type_nullable_array, org.apache.flink.api.io.avro.generated.Colors type_enum, java.util.Map<java.lang.CharSequence,java.lang.Long> type_map) { - this.name = name; - this.favorite_number = favorite_number; - this.favorite_color = favorite_color; - this.type_long_test = type_long_test; - this.type_double_test = type_double_test; - this.type_null_test = type_null_test; - this.type_bool_test = type_bool_test; - this.type_array_string = type_array_string; - this.type_array_boolean = type_array_boolean; - this.type_nullable_array = type_nullable_array; - this.type_enum = type_enum; - this.type_map = type_map; - } - - public org.apache.avro.Schema getSchema() { return SCHEMA$; } - // Used by DatumWriter. Applications should not call. - public java.lang.Object get(int field$) { - switch (field$) { - case 0: return name; - case 1: return favorite_number; - case 2: return favorite_color; - case 3: return type_long_test; - case 4: return type_double_test; - case 5: return type_null_test; - case 6: return type_bool_test; - case 7: return type_array_string; - case 8: return type_array_boolean; - case 9: return type_nullable_array; - case 10: return type_enum; - case 11: return type_map; - default: throw new org.apache.avro.AvroRuntimeException("Bad index"); - } - } - // Used by DatumReader. Applications should not call. - @SuppressWarnings(value="unchecked") - public void put(int field$, java.lang.Object value$) { - switch (field$) { - case 0: name = (java.lang.CharSequence)value$; break; - case 1: favorite_number = (java.lang.Integer)value$; break; - case 2: favorite_color = (java.lang.CharSequence)value$; break; - case 3: type_long_test = (java.lang.Long)value$; break; - case 4: type_double_test = (java.lang.Object)value$; break; - case 5: type_null_test = (java.lang.Object)value$; break; - case 6: type_bool_test = (java.lang.Object)value$; break; - case 7: type_array_string = (java.util.List<java.lang.CharSequence>)value$; break; - case 8: type_array_boolean = (java.util.List<java.lang.Boolean>)value$; break; - case 9: type_nullable_array = (java.util.List<java.lang.CharSequence>)value$; break; - case 10: type_enum = (org.apache.flink.api.io.avro.generated.Colors)value$; break; - case 11: type_map = (java.util.Map<java.lang.CharSequence,java.lang.Long>)value$; break; - default: throw new org.apache.avro.AvroRuntimeException("Bad index"); - } - } - - /** - * Gets the value of the 'name' field. - */ - public java.lang.CharSequence getName() { - return name; - } - - /** - * Sets the value of the 'name' field. - * @param value the value to set. - */ - public void setName(java.lang.CharSequence value) { - this.name = value; - } - - /** - * Gets the value of the 'favorite_number' field. - */ - public java.lang.Integer getFavoriteNumber() { - return favorite_number; - } - - /** - * Sets the value of the 'favorite_number' field. - * @param value the value to set. - */ - public void setFavoriteNumber(java.lang.Integer value) { - this.favorite_number = value; - } - - /** - * Gets the value of the 'favorite_color' field. - */ - public java.lang.CharSequence getFavoriteColor() { - return favorite_color; - } - - /** - * Sets the value of the 'favorite_color' field. - * @param value the value to set. - */ - public void setFavoriteColor(java.lang.CharSequence value) { - this.favorite_color = value; - } - - /** - * Gets the value of the 'type_long_test' field. - */ - public java.lang.Long getTypeLongTest() { - return type_long_test; - } - - /** - * Sets the value of the 'type_long_test' field. - * @param value the value to set. - */ - public void setTypeLongTest(java.lang.Long value) { - this.type_long_test = value; - } - - /** - * Gets the value of the 'type_double_test' field. - */ - public java.lang.Object getTypeDoubleTest() { - return type_double_test; - } - - /** - * Sets the value of the 'type_double_test' field. - * @param value the value to set. - */ - public void setTypeDoubleTest(java.lang.Object value) { - this.type_double_test = value; - } - - /** - * Gets the value of the 'type_null_test' field. - */ - public java.lang.Object getTypeNullTest() { - return type_null_test; - } - - /** - * Sets the value of the 'type_null_test' field. - * @param value the value to set. - */ - public void setTypeNullTest(java.lang.Object value) { - this.type_null_test = value; - } - - /** - * Gets the value of the 'type_bool_test' field. - */ - public java.lang.Object getTypeBoolTest() { - return type_bool_test; - } - - /** - * Sets the value of the 'type_bool_test' field. - * @param value the value to set. - */ - public void setTypeBoolTest(java.lang.Object value) { - this.type_bool_test = value; - } - - /** - * Gets the value of the 'type_array_string' field. - */ - public java.util.List<java.lang.CharSequence> getTypeArrayString() { - return type_array_string; - } - - /** - * Sets the value of the 'type_array_string' field. - * @param value the value to set. - */ - public void setTypeArrayString(java.util.List<java.lang.CharSequence> value) { - this.type_array_string = value; - } - - /** - * Gets the value of the 'type_array_boolean' field. - */ - public java.util.List<java.lang.Boolean> getTypeArrayBoolean() { - return type_array_boolean; - } - - /** - * Sets the value of the 'type_array_boolean' field. - * @param value the value to set. - */ - public void setTypeArrayBoolean(java.util.List<java.lang.Boolean> value) { - this.type_array_boolean = value; - } - - /** - * Gets the value of the 'type_nullable_array' field. - */ - public java.util.List<java.lang.CharSequence> getTypeNullableArray() { - return type_nullable_array; - } - - /** - * Sets the value of the 'type_nullable_array' field. - * @param value the value to set. - */ - public void setTypeNullableArray(java.util.List<java.lang.CharSequence> value) { - this.type_nullable_array = value; - } - - /** - * Gets the value of the 'type_enum' field. - */ - public org.apache.flink.api.io.avro.generated.Colors getTypeEnum() { - return type_enum; - } - - /** - * Sets the value of the 'type_enum' field. - * @param value the value to set. - */ - public void setTypeEnum(org.apache.flink.api.io.avro.generated.Colors value) { - this.type_enum = value; - } - - /** - * Gets the value of the 'type_map' field. - */ - public java.util.Map<java.lang.CharSequence,java.lang.Long> getTypeMap() { - return type_map; - } - - /** - * Sets the value of the 'type_map' field. - * @param value the value to set. - */ - public void setTypeMap(java.util.Map<java.lang.CharSequence,java.lang.Long> value) { - this.type_map = value; - } - - /** Creates a new User RecordBuilder */ - public static org.apache.flink.api.io.avro.generated.User.Builder newBuilder() { - return new org.apache.flink.api.io.avro.generated.User.Builder(); - } - - /** Creates a new User RecordBuilder by copying an existing Builder */ - public static org.apache.flink.api.io.avro.generated.User.Builder newBuilder(org.apache.flink.api.io.avro.generated.User.Builder other) { - return new org.apache.flink.api.io.avro.generated.User.Builder(other); - } - - /** Creates a new User RecordBuilder by copying an existing User instance */ - public static org.apache.flink.api.io.avro.generated.User.Builder newBuilder(org.apache.flink.api.io.avro.generated.User other) { - return new org.apache.flink.api.io.avro.generated.User.Builder(other); - } - - /** - * RecordBuilder for User instances. - */ - public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User> - implements org.apache.avro.data.RecordBuilder<User> { - - private java.lang.CharSequence name; - private java.lang.Integer favorite_number; - private java.lang.CharSequence favorite_color; - private java.lang.Long type_long_test; - private java.lang.Object type_double_test; - private java.lang.Object type_null_test; - private java.lang.Object type_bool_test; - private java.util.List<java.lang.CharSequence> type_array_string; - private java.util.List<java.lang.Boolean> type_array_boolean; - private java.util.List<java.lang.CharSequence> type_nullable_array; - private org.apache.flink.api.io.avro.generated.Colors type_enum; - private java.util.Map<java.lang.CharSequence,java.lang.Long> type_map; - - /** Creates a new Builder */ - private Builder() { - super(org.apache.flink.api.io.avro.generated.User.SCHEMA$); - } - - /** Creates a Builder by copying an existing Builder */ - private Builder(org.apache.flink.api.io.avro.generated.User.Builder other) { - super(other); - if (isValidValue(fields()[0], other.name)) { - this.name = data().deepCopy(fields()[0].schema(), other.name); - fieldSetFlags()[0] = true; - } - if (isValidValue(fields()[1], other.favorite_number)) { - this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number); - fieldSetFlags()[1] = true; - } - if (isValidValue(fields()[2], other.favorite_color)) { - this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color); - fieldSetFlags()[2] = true; - } - if (isValidValue(fields()[3], other.type_long_test)) { - this.type_long_test = data().deepCopy(fields()[3].schema(), other.type_long_test); - fieldSetFlags()[3] = true; - } - if (isValidValue(fields()[4], other.type_double_test)) { - this.type_double_test = data().deepCopy(fields()[4].schema(), other.type_double_test); - fieldSetFlags()[4] = true; - } - if (isValidValue(fields()[5], other.type_null_test)) { - this.type_null_test = data().deepCopy(fields()[5].schema(), other.type_null_test); - fieldSetFlags()[5] = true; - } - if (isValidValue(fields()[6], other.type_bool_test)) { - this.type_bool_test = data().deepCopy(fields()[6].schema(), other.type_bool_test); - fieldSetFlags()[6] = true; - } - if (isValidValue(fields()[7], other.type_array_string)) { - this.type_array_string = data().deepCopy(fields()[7].schema(), other.type_array_string); - fieldSetFlags()[7] = true; - } - if (isValidValue(fields()[8], other.type_array_boolean)) { - this.type_array_boolean = data().deepCopy(fields()[8].schema(), other.type_array_boolean); - fieldSetFlags()[8] = true; - } - if (isValidValue(fields()[9], other.type_nullable_array)) { - this.type_nullable_array = data().deepCopy(fields()[9].schema(), other.type_nullable_array); - fieldSetFlags()[9] = true; - } - if (isValidValue(fields()[10], other.type_enum)) { - this.type_enum = data().deepCopy(fields()[10].schema(), other.type_enum); - fieldSetFlags()[10] = true; - } - if (isValidValue(fields()[11], other.type_map)) { - this.type_map = data().deepCopy(fields()[11].schema(), other.type_map); - fieldSetFlags()[11] = true; - } - } - - /** Creates a Builder by copying an existing User instance */ - private Builder(org.apache.flink.api.io.avro.generated.User other) { - super(org.apache.flink.api.io.avro.generated.User.SCHEMA$); - if (isValidValue(fields()[0], other.name)) { - this.name = data().deepCopy(fields()[0].schema(), other.name); - fieldSetFlags()[0] = true; - } - if (isValidValue(fields()[1], other.favorite_number)) { - this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number); - fieldSetFlags()[1] = true; - } - if (isValidValue(fields()[2], other.favorite_color)) { - this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color); - fieldSetFlags()[2] = true; - } - if (isValidValue(fields()[3], other.type_long_test)) { - this.type_long_test = data().deepCopy(fields()[3].schema(), other.type_long_test); - fieldSetFlags()[3] = true; - } - if (isValidValue(fields()[4], other.type_double_test)) { - this.type_double_test = data().deepCopy(fields()[4].schema(), other.type_double_test); - fieldSetFlags()[4] = true; - } - if (isValidValue(fields()[5], other.type_null_test)) { - this.type_null_test = data().deepCopy(fields()[5].schema(), other.type_null_test); - fieldSetFlags()[5] = true; - } - if (isValidValue(fields()[6], other.type_bool_test)) { - this.type_bool_test = data().deepCopy(fields()[6].schema(), other.type_bool_test); - fieldSetFlags()[6] = true; - } - if (isValidValue(fields()[7], other.type_array_string)) { - this.type_array_string = data().deepCopy(fields()[7].schema(), other.type_array_string); - fieldSetFlags()[7] = true; - } - if (isValidValue(fields()[8], other.type_array_boolean)) { - this.type_array_boolean = data().deepCopy(fields()[8].schema(), other.type_array_boolean); - fieldSetFlags()[8] = true; - } - if (isValidValue(fields()[9], other.type_nullable_array)) { - this.type_nullable_array = data().deepCopy(fields()[9].schema(), other.type_nullable_array); - fieldSetFlags()[9] = true; - } - if (isValidValue(fields()[10], other.type_enum)) { - this.type_enum = data().deepCopy(fields()[10].schema(), other.type_enum); - fieldSetFlags()[10] = true; - } - if (isValidValue(fields()[11], other.type_map)) { - this.type_map = data().deepCopy(fields()[11].schema(), other.type_map); - fieldSetFlags()[11] = true; - } - } - - /** Gets the value of the 'name' field */ - public java.lang.CharSequence getName() { - return name; - } - - /** Sets the value of the 'name' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setName(java.lang.CharSequence value) { - validate(fields()[0], value); - this.name = value; - fieldSetFlags()[0] = true; - return this; - } - - /** Checks whether the 'name' field has been set */ - public boolean hasName() { - return fieldSetFlags()[0]; - } - - /** Clears the value of the 'name' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearName() { - name = null; - fieldSetFlags()[0] = false; - return this; - } - - /** Gets the value of the 'favorite_number' field */ - public java.lang.Integer getFavoriteNumber() { - return favorite_number; - } - - /** Sets the value of the 'favorite_number' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setFavoriteNumber(java.lang.Integer value) { - validate(fields()[1], value); - this.favorite_number = value; - fieldSetFlags()[1] = true; - return this; - } - - /** Checks whether the 'favorite_number' field has been set */ - public boolean hasFavoriteNumber() { - return fieldSetFlags()[1]; - } - - /** Clears the value of the 'favorite_number' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearFavoriteNumber() { - favorite_number = null; - fieldSetFlags()[1] = false; - return this; - } - - /** Gets the value of the 'favorite_color' field */ - public java.lang.CharSequence getFavoriteColor() { - return favorite_color; - } - - /** Sets the value of the 'favorite_color' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setFavoriteColor(java.lang.CharSequence value) { - validate(fields()[2], value); - this.favorite_color = value; - fieldSetFlags()[2] = true; - return this; - } - - /** Checks whether the 'favorite_color' field has been set */ - public boolean hasFavoriteColor() { - return fieldSetFlags()[2]; - } - - /** Clears the value of the 'favorite_color' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearFavoriteColor() { - favorite_color = null; - fieldSetFlags()[2] = false; - return this; - } - - /** Gets the value of the 'type_long_test' field */ - public java.lang.Long getTypeLongTest() { - return type_long_test; - } - - /** Sets the value of the 'type_long_test' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setTypeLongTest(java.lang.Long value) { - validate(fields()[3], value); - this.type_long_test = value; - fieldSetFlags()[3] = true; - return this; - } - - /** Checks whether the 'type_long_test' field has been set */ - public boolean hasTypeLongTest() { - return fieldSetFlags()[3]; - } - - /** Clears the value of the 'type_long_test' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearTypeLongTest() { - type_long_test = null; - fieldSetFlags()[3] = false; - return this; - } - - /** Gets the value of the 'type_double_test' field */ - public java.lang.Object getTypeDoubleTest() { - return type_double_test; - } - - /** Sets the value of the 'type_double_test' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setTypeDoubleTest(java.lang.Object value) { - validate(fields()[4], value); - this.type_double_test = value; - fieldSetFlags()[4] = true; - return this; - } - - /** Checks whether the 'type_double_test' field has been set */ - public boolean hasTypeDoubleTest() { - return fieldSetFlags()[4]; - } - - /** Clears the value of the 'type_double_test' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearTypeDoubleTest() { - type_double_test = null; - fieldSetFlags()[4] = false; - return this; - } - - /** Gets the value of the 'type_null_test' field */ - public java.lang.Object getTypeNullTest() { - return type_null_test; - } - - /** Sets the value of the 'type_null_test' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setTypeNullTest(java.lang.Object value) { - validate(fields()[5], value); - this.type_null_test = value; - fieldSetFlags()[5] = true; - return this; - } - - /** Checks whether the 'type_null_test' field has been set */ - public boolean hasTypeNullTest() { - return fieldSetFlags()[5]; - } - - /** Clears the value of the 'type_null_test' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearTypeNullTest() { - type_null_test = null; - fieldSetFlags()[5] = false; - return this; - } - - /** Gets the value of the 'type_bool_test' field */ - public java.lang.Object getTypeBoolTest() { - return type_bool_test; - } - - /** Sets the value of the 'type_bool_test' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setTypeBoolTest(java.lang.Object value) { - validate(fields()[6], value); - this.type_bool_test = value; - fieldSetFlags()[6] = true; - return this; - } - - /** Checks whether the 'type_bool_test' field has been set */ - public boolean hasTypeBoolTest() { - return fieldSetFlags()[6]; - } - - /** Clears the value of the 'type_bool_test' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearTypeBoolTest() { - type_bool_test = null; - fieldSetFlags()[6] = false; - return this; - } - - /** Gets the value of the 'type_array_string' field */ - public java.util.List<java.lang.CharSequence> getTypeArrayString() { - return type_array_string; - } - - /** Sets the value of the 'type_array_string' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setTypeArrayString(java.util.List<java.lang.CharSequence> value) { - validate(fields()[7], value); - this.type_array_string = value; - fieldSetFlags()[7] = true; - return this; - } - - /** Checks whether the 'type_array_string' field has been set */ - public boolean hasTypeArrayString() { - return fieldSetFlags()[7]; - } - - /** Clears the value of the 'type_array_string' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearTypeArrayString() { - type_array_string = null; - fieldSetFlags()[7] = false; - return this; - } - - /** Gets the value of the 'type_array_boolean' field */ - public java.util.List<java.lang.Boolean> getTypeArrayBoolean() { - return type_array_boolean; - } - - /** Sets the value of the 'type_array_boolean' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setTypeArrayBoolean(java.util.List<java.lang.Boolean> value) { - validate(fields()[8], value); - this.type_array_boolean = value; - fieldSetFlags()[8] = true; - return this; - } - - /** Checks whether the 'type_array_boolean' field has been set */ - public boolean hasTypeArrayBoolean() { - return fieldSetFlags()[8]; - } - - /** Clears the value of the 'type_array_boolean' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearTypeArrayBoolean() { - type_array_boolean = null; - fieldSetFlags()[8] = false; - return this; - } - - /** Gets the value of the 'type_nullable_array' field */ - public java.util.List<java.lang.CharSequence> getTypeNullableArray() { - return type_nullable_array; - } - - /** Sets the value of the 'type_nullable_array' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setTypeNullableArray(java.util.List<java.lang.CharSequence> value) { - validate(fields()[9], value); - this.type_nullable_array = value; - fieldSetFlags()[9] = true; - return this; - } - - /** Checks whether the 'type_nullable_array' field has been set */ - public boolean hasTypeNullableArray() { - return fieldSetFlags()[9]; - } - - /** Clears the value of the 'type_nullable_array' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearTypeNullableArray() { - type_nullable_array = null; - fieldSetFlags()[9] = false; - return this; - } - - /** Gets the value of the 'type_enum' field */ - public org.apache.flink.api.io.avro.generated.Colors getTypeEnum() { - return type_enum; - } - - /** Sets the value of the 'type_enum' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setTypeEnum(org.apache.flink.api.io.avro.generated.Colors value) { - validate(fields()[10], value); - this.type_enum = value; - fieldSetFlags()[10] = true; - return this; - } - - /** Checks whether the 'type_enum' field has been set */ - public boolean hasTypeEnum() { - return fieldSetFlags()[10]; - } - - /** Clears the value of the 'type_enum' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearTypeEnum() { - type_enum = null; - fieldSetFlags()[10] = false; - return this; - } - - /** Gets the value of the 'type_map' field */ - public java.util.Map<java.lang.CharSequence,java.lang.Long> getTypeMap() { - return type_map; - } - - /** Sets the value of the 'type_map' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setTypeMap(java.util.Map<java.lang.CharSequence,java.lang.Long> value) { - validate(fields()[11], value); - this.type_map = value; - fieldSetFlags()[11] = true; - return this; - } - - /** Checks whether the 'type_map' field has been set */ - public boolean hasTypeMap() { - return fieldSetFlags()[11]; - } - - /** Clears the value of the 'type_map' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearTypeMap() { - type_map = null; - fieldSetFlags()[11] = false; - return this; - } - - @Override - public User build() { - try { - User record = new User(); - record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]); - record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]); - record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]); - record.type_long_test = fieldSetFlags()[3] ? this.type_long_test : (java.lang.Long) defaultValue(fields()[3]); - record.type_double_test = fieldSetFlags()[4] ? this.type_double_test : (java.lang.Object) defaultValue(fields()[4]); - record.type_null_test = fieldSetFlags()[5] ? this.type_null_test : (java.lang.Object) defaultValue(fields()[5]); - record.type_bool_test = fieldSetFlags()[6] ? this.type_bool_test : (java.lang.Object) defaultValue(fields()[6]); - record.type_array_string = fieldSetFlags()[7] ? this.type_array_string : (java.util.List<java.lang.CharSequence>) defaultValue(fields()[7]); - record.type_array_boolean = fieldSetFlags()[8] ? this.type_array_boolean : (java.util.List<java.lang.Boolean>) defaultValue(fields()[8]); - record.type_nullable_array = fieldSetFlags()[9] ? this.type_nullable_array : (java.util.List<java.lang.CharSequence>) defaultValue(fields()[9]); - record.type_enum = fieldSetFlags()[10] ? this.type_enum : (org.apache.flink.api.io.avro.generated.Colors) defaultValue(fields()[10]); - record.type_map = fieldSetFlags()[11] ? this.type_map : (java.util.Map<java.lang.CharSequence,java.lang.Long>) defaultValue(fields()[11]); - return record; - } catch (Exception e) { - throw new org.apache.avro.AvroRuntimeException(e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-staging/flink-avro/src/test/resources/avro/user.avsc ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/test/resources/avro/user.avsc b/flink-staging/flink-avro/src/test/resources/avro/user.avsc index af3cb75..6801b10 100644 --- a/flink-staging/flink-avro/src/test/resources/avro/user.avsc +++ b/flink-staging/flink-avro/src/test/resources/avro/user.avsc @@ -1,5 +1,5 @@ -{"namespace": "org.apache.flink.api.java.record.io.avro.generated", +{"namespace": "org.apache.flink.api.io.avro.generated", "type": "record", "name": "User", "fields": [ @@ -7,13 +7,17 @@ {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]}, {"name": "type_long_test", "type": ["long", "null"]}, - {"name": "type_double_test", "type": ["double"]}, + {"name": "type_double_test", "type": "double"}, {"name": "type_null_test", "type": ["null"]}, {"name": "type_bool_test", "type": ["boolean"]}, {"name": "type_array_string", "type" : {"type" : "array", "items" : "string"}}, {"name": "type_array_boolean", "type" : {"type" : "array", "items" : "boolean"}}, {"name": "type_nullable_array", "type": ["null", {"type":"array", "items":"string"}], "default":null}, {"name": "type_enum", "type": {"type": "enum", "name": "Colors", "symbols" : ["RED", "GREEN", "BLUE"]}}, - {"name": "type_map", "type": {"type": "map", "values": "long"}} + {"name": "type_map", "type": {"type": "map", "values": "long"}}, + {"name": "type_fixed", + "size": 16, + "type": ["null", {"name": "Fixed16", "size": 16, "type": "fixed"}] }, + {"name": "type_union", "type": ["null", "boolean", "long", "double"]} ] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d1880d8..0aedfb0 100644 --- a/pom.xml +++ b/pom.xml @@ -1027,7 +1027,7 @@ under the License. <exclude>flink-runtime/src/main/resources/web-docs-infoserver/js/timeline.js</exclude> <!-- Test Data. --> <exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude> - <exclude>flink-staging/flink-avro/src/test/resources/avro/user.avsc</exclude> + <exclude>flink-staging/flink-avro/src/test/resources/avro/*.avsc</exclude> <exclude>flink-staging/flink-avro/src/test/resources/testdata.avro</exclude> <!-- Configuration Files. --> <exclude>**/flink-bin/conf/slaves</exclude> http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/tools/maven/suppressions.xml ---------------------------------------------------------------------- diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml index 377cbfd..b17dbce 100644 --- a/tools/maven/suppressions.xml +++ b/tools/maven/suppressions.xml @@ -24,4 +24,5 @@ under the License. <suppressions> <suppress files="org[\\/]apache[\\/]flink[\\/]api[\\/]io[\\/]avro[\\/]example[\\/]User.java" checks="[a-zA-Z0-9]*"/> + <suppress files="org[\\/]apache[\\/]flink[\\/]api[\\/]io[\\/]avro[\\/]generated[\\/].*.java" checks="[a-zA-Z0-9]*"/> </suppressions> \ No newline at end of file