http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java new file mode 100644 index 0000000..caa6e0d --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.formats.avro.AvroOutputFormat.Codec; +import org.apache.flink.formats.avro.generated.Colors; +import org.apache.flink.formats.avro.generated.User; +import org.apache.flink.test.util.JavaProgramTestBase; + +import org.apache.avro.file.DataFileReader; +import org.apache.avro.io.DatumReader; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificDatumReader; +import org.junit.Assert; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * IT cases for the {@link AvroOutputFormat}. + */ +@SuppressWarnings("serial") +public class AvroOutputFormatITCase extends JavaProgramTestBase { + + public static String outputPath1; + + public static String outputPath2; + + public static String inputPath; + + public static String userData = "alice|1|blue\n" + + "bob|2|red\n" + + "john|3|yellow\n" + + "walt|4|black\n"; + + @Override + protected void preSubmit() throws Exception { + inputPath = createTempFile("user", userData); + outputPath1 = getTempDirPath("avro_output1"); + outputPath2 = getTempDirPath("avro_output2"); + } + + @Override + protected void testProgram() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<String, Integer, String>> input = env.readCsvFile(inputPath) + .fieldDelimiter("|") + .types(String.class, Integer.class, String.class); + + //output the data with AvroOutputFormat for specific user type + DataSet<User> specificUser = input.map(new ConvertToUser()); + AvroOutputFormat<User> avroOutputFormat = new AvroOutputFormat<User>(User.class); + avroOutputFormat.setCodec(Codec.SNAPPY); // FLINK-4771: use a codec + avroOutputFormat.setSchema(User.SCHEMA$); //FLINK-3304: Ensure the OF is properly serializing the schema + specificUser.write(avroOutputFormat, outputPath1); + + //output the data with AvroOutputFormat for reflect user type + DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective()); + reflectiveUser.write(new AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2); + + env.execute(); + } + + @Override + protected void postSubmit() throws Exception { + //compare result for specific user type + File [] output1; + File file1 = asFile(outputPath1); + if (file1.isDirectory()) { + output1 = file1.listFiles(); + // check for avro ext in dir. + for (File avroOutput : output1) { + Assert.assertTrue("Expect extension '.avro'", avroOutput.toString().endsWith(".avro")); + } + } else { + output1 = new File[] {file1}; + } + List<String> result1 = new ArrayList<String>(); + DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class); + for (File avroOutput : output1) { + + DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput, userDatumReader1); + while (dataFileReader1.hasNext()) { + User user = dataFileReader1.next(); + result1.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor()); + } + } + for (String expectedResult : userData.split("\n")) { + Assert.assertTrue("expected user " + expectedResult + " not found.", result1.contains(expectedResult)); + } + + //compare result for reflect user type + File [] output2; + File file2 = asFile(outputPath2); + if (file2.isDirectory()) { + output2 = file2.listFiles(); + } else { + output2 = new File[] {file2}; + } + List<String> result2 = new ArrayList<String>(); + DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class); + for (File avroOutput : output2) { + DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2); + while (dataFileReader2.hasNext()) { + ReflectiveUser user = dataFileReader2.next(); + result2.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor()); + } + } + for (String expectedResult : userData.split("\n")) { + Assert.assertTrue("expected user " + expectedResult + " not found.", result2.contains(expectedResult)); + } + + } + + private static final class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> { + + @Override + public User map(Tuple3<String, Integer, String> value) throws Exception { + User user = new User(); + user.setName(value.f0); + user.setFavoriteNumber(value.f1); + user.setFavoriteColor(value.f2); + user.setTypeBoolTest(true); + user.setTypeArrayString(Collections.emptyList()); + user.setTypeArrayBoolean(Collections.emptyList()); + user.setTypeEnum(Colors.BLUE); + user.setTypeMap(Collections.emptyMap()); + return user; + } + } + + private static final class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> { + + @Override + public ReflectiveUser map(User value) throws Exception { + return new ReflectiveUser(value.getName().toString(), value.getFavoriteNumber(), value.getFavoriteColor().toString()); + } + } + + private static class ReflectiveUser { + private String name; + private int favoriteNumber; + private String favoriteColor; + + public ReflectiveUser() {} + + public ReflectiveUser(String name, int favoriteNumber, String favoriteColor) { + this.name = name; + this.favoriteNumber = favoriteNumber; + this.favoriteColor = favoriteColor; + } + + public String getName() { + return this.name; + } + + public String getFavoriteColor() { + return this.favoriteColor; + } + + public int getFavoriteNumber() { + return this.favoriteNumber; + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java new file mode 100644 index 0000000..b5ad564 --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.avro.generated.Colors; +import org.apache.flink.formats.avro.generated.User; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for {@link AvroOutputFormat}. + */ +public class AvroOutputFormatTest { + + @Test + public void testSetCodec() throws Exception { + // given + final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class); + + // when + try { + outputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY); + } catch (Exception ex) { + // then + fail("unexpected exception"); + } + } + + @Test + public void testSetCodecError() throws Exception { + // given + boolean error = false; + final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class); + + // when + try { + outputFormat.setCodec(null); + } catch (Exception ex) { + error = true; + } + + // then + assertTrue(error); + } + + @Test + public void testSerialization() throws Exception { + + serializeAndDeserialize(null, null); + serializeAndDeserialize(null, User.SCHEMA$); + for (final AvroOutputFormat.Codec codec : AvroOutputFormat.Codec.values()) { + serializeAndDeserialize(codec, null); + serializeAndDeserialize(codec, User.SCHEMA$); + } + } + + private void serializeAndDeserialize(final AvroOutputFormat.Codec codec, final Schema schema) throws IOException, ClassNotFoundException { + // given + final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class); + if (codec != null) { + outputFormat.setCodec(codec); + } + if (schema != null) { + outputFormat.setSchema(schema); + } + + final ByteArrayOutputStream bos = new ByteArrayOutputStream(); + + // when + try (final ObjectOutputStream oos = new ObjectOutputStream(bos)) { + oos.writeObject(outputFormat); + } + try (final ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) { + // then + Object o = ois.readObject(); + assertTrue(o instanceof AvroOutputFormat); + final AvroOutputFormat<User> restored = (AvroOutputFormat<User>) o; + final AvroOutputFormat.Codec restoredCodec = (AvroOutputFormat.Codec) Whitebox.getInternalState(restored, "codec"); + final Schema restoredSchema = (Schema) Whitebox.getInternalState(restored, "userDefinedSchema"); + + assertTrue(codec != null ? restoredCodec == codec : restoredCodec == null); + assertTrue(schema != null ? restoredSchema.equals(schema) : restoredSchema == null); + } + } + + @Test + public void testCompression() throws Exception { + // given + final Path outputPath = new Path(File.createTempFile("avro-output-file", "avro").getAbsolutePath()); + final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(outputPath, User.class); + outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); + + final Path compressedOutputPath = new Path(File.createTempFile("avro-output-file", "compressed.avro").getAbsolutePath()); + final AvroOutputFormat<User> compressedOutputFormat = new AvroOutputFormat<>(compressedOutputPath, User.class); + compressedOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); + compressedOutputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY); + + // when + output(outputFormat); + output(compressedOutputFormat); + + // then + assertTrue(fileSize(outputPath) > fileSize(compressedOutputPath)); + + // cleanup + FileSystem fs = FileSystem.getLocalFileSystem(); + fs.delete(outputPath, false); + fs.delete(compressedOutputPath, false); + } + + private long fileSize(Path path) throws IOException { + return path.getFileSystem().getFileStatus(path).getLen(); + } + + private void output(final AvroOutputFormat<User> outputFormat) throws IOException { + outputFormat.configure(new Configuration()); + outputFormat.open(1, 1); + for (int i = 0; i < 100; i++) { + User user = new User(); + user.setName("testUser"); + user.setFavoriteNumber(1); + user.setFavoriteColor("blue"); + user.setTypeBoolTest(true); + user.setTypeArrayString(Collections.emptyList()); + user.setTypeArrayBoolean(Collections.emptyList()); + user.setTypeEnum(Colors.BLUE); + user.setTypeMap(Collections.emptyMap()); + outputFormat.writeRecord(user); + } + outputFormat.close(); + } + + @Test + public void testGenericRecord() throws IOException { + final Path outputPath = new Path(File.createTempFile("avro-output-file", "generic.avro").getAbsolutePath()); + final AvroOutputFormat<GenericRecord> outputFormat = new AvroOutputFormat<>(outputPath, GenericRecord.class); + Schema schema = new Schema.Parser().parse("{\"type\":\"record\", \"name\":\"user\", \"fields\": [{\"name\":\"user_name\", \"type\":\"string\"}, {\"name\":\"favorite_number\", \"type\":\"int\"}, {\"name\":\"favorite_color\", \"type\":\"string\"}]}"); + outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); + outputFormat.setSchema(schema); + output(outputFormat, schema); + + GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema); + DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(new File(outputPath.getPath()), reader); + + while (dataFileReader.hasNext()) { + GenericRecord record = dataFileReader.next(); + assertEquals(record.get("user_name").toString(), "testUser"); + assertEquals(record.get("favorite_number"), 1); + assertEquals(record.get("favorite_color").toString(), "blue"); + } + + //cleanup + FileSystem fs = FileSystem.getLocalFileSystem(); + fs.delete(outputPath, false); + + } + + private void output(final AvroOutputFormat<GenericRecord> outputFormat, Schema schema) throws IOException { + outputFormat.configure(new Configuration()); + outputFormat.open(1, 1); + for (int i = 0; i < 100; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put("user_name", "testUser"); + record.put("favorite_number", 1); + record.put("favorite_color", "blue"); + outputFormat.writeRecord(record); + } + outputFormat.close(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java new file mode 100644 index 0000000..92d2c31 --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java @@ -0,0 +1,459 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.formats.avro.generated.Address; +import org.apache.flink.formats.avro.generated.Colors; +import org.apache.flink.formats.avro.generated.User; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.file.FileReader; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.util.Utf8; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Test the avro input format. + * (The testcase is mostly the getting started tutorial of avro) + * http://avro.apache.org/docs/current/gettingstartedjava.html + */ +public class AvroRecordInputFormatTest { + + public File testFile; + + static final String TEST_NAME = "Alyssa"; + + static final String TEST_ARRAY_STRING_1 = "ELEMENT 1"; + static final String TEST_ARRAY_STRING_2 = "ELEMENT 2"; + + static final boolean TEST_ARRAY_BOOLEAN_1 = true; + static final boolean TEST_ARRAY_BOOLEAN_2 = false; + + static final Colors TEST_ENUM_COLOR = Colors.GREEN; + + static final String TEST_MAP_KEY1 = "KEY 1"; + static final long TEST_MAP_VALUE1 = 8546456L; + static final String TEST_MAP_KEY2 = "KEY 2"; + static final long TEST_MAP_VALUE2 = 17554L; + + static final int TEST_NUM = 239; + static final String TEST_STREET = "Baker Street"; + static final String TEST_CITY = "London"; + static final String TEST_STATE = "London"; + static final String TEST_ZIP = "NW1 6XE"; + + private Schema userSchema = new User().getSchema(); + + public static void writeTestFile(File testFile) throws IOException { + ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>(); + stringArray.add(TEST_ARRAY_STRING_1); + stringArray.add(TEST_ARRAY_STRING_2); + + ArrayList<Boolean> booleanArray = new ArrayList<Boolean>(); + booleanArray.add(TEST_ARRAY_BOOLEAN_1); + booleanArray.add(TEST_ARRAY_BOOLEAN_2); + + HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>(); + longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1); + longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2); + + Address addr = new Address(); + addr.setNum(TEST_NUM); + addr.setStreet(TEST_STREET); + addr.setCity(TEST_CITY); + addr.setState(TEST_STATE); + addr.setZip(TEST_ZIP); + + User user1 = new User(); + + user1.setName(TEST_NAME); + user1.setFavoriteNumber(256); + user1.setTypeDoubleTest(123.45d); + user1.setTypeBoolTest(true); + user1.setTypeArrayString(stringArray); + user1.setTypeArrayBoolean(booleanArray); + user1.setTypeEnum(TEST_ENUM_COLOR); + user1.setTypeMap(longMap); + user1.setTypeNested(addr); + + // Construct via builder + User user2 = User.newBuilder() + .setName("Charlie") + .setFavoriteColor("blue") + .setFavoriteNumber(null) + .setTypeBoolTest(false) + .setTypeDoubleTest(1.337d) + .setTypeNullTest(null) + .setTypeLongTest(1337L) + .setTypeArrayString(new ArrayList<CharSequence>()) + .setTypeArrayBoolean(new ArrayList<Boolean>()) + .setTypeNullableArray(null) + .setTypeEnum(Colors.RED) + .setTypeMap(new HashMap<CharSequence, Long>()) + .setTypeFixed(null) + .setTypeUnion(null) + .setTypeNested( + Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET) + .setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP) + .build()) + .build(); + DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class); + DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter); + dataFileWriter.create(user1.getSchema(), testFile); + dataFileWriter.append(user1); + dataFileWriter.append(user2); + dataFileWriter.close(); + } + + @Before + public void createFiles() throws IOException { + testFile = File.createTempFile("AvroInputFormatTest", null); + writeTestFile(testFile); + } + + /** + * Test if the AvroInputFormat is able to properly read data from an avro file. + * @throws IOException + */ + @Test + public void testDeserialisation() throws IOException { + Configuration parameters = new Configuration(); + + AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); + + format.configure(parameters); + FileInputSplit[] splits = format.createInputSplits(1); + assertEquals(splits.length, 1); + format.open(splits[0]); + + User u = format.nextRecord(null); + assertNotNull(u); + + String name = u.getName().toString(); + assertNotNull("empty record", name); + assertEquals("name not equal", TEST_NAME, name); + + // check arrays + List<CharSequence> sl = u.getTypeArrayString(); + assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString()); + assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString()); + + List<Boolean> bl = u.getTypeArrayBoolean(); + assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0)); + assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1)); + + // check enums + Colors enumValue = u.getTypeEnum(); + assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue); + + // check maps + Map<CharSequence, Long> lm = u.getTypeMap(); + assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue()); + assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue()); + + assertFalse("expecting second element", format.reachedEnd()); + assertNotNull("expecting second element", format.nextRecord(u)); + + assertNull(format.nextRecord(u)); + assertTrue(format.reachedEnd()); + + format.close(); + } + + /** + * Test if the AvroInputFormat is able to properly read data from an avro file. + * @throws IOException + */ + @Test + public void testDeserialisationReuseAvroRecordFalse() throws IOException { + Configuration parameters = new Configuration(); + + AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); + format.setReuseAvroValue(false); + + format.configure(parameters); + FileInputSplit[] splits = format.createInputSplits(1); + assertEquals(splits.length, 1); + format.open(splits[0]); + + User u = format.nextRecord(null); + assertNotNull(u); + + String name = u.getName().toString(); + assertNotNull("empty record", name); + assertEquals("name not equal", TEST_NAME, name); + + // check arrays + List<CharSequence> sl = u.getTypeArrayString(); + assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString()); + assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString()); + + List<Boolean> bl = u.getTypeArrayBoolean(); + assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0)); + assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1)); + + // check enums + Colors enumValue = u.getTypeEnum(); + assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue); + + // check maps + Map<CharSequence, Long> lm = u.getTypeMap(); + assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue()); + assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue()); + + assertFalse("expecting second element", format.reachedEnd()); + assertNotNull("expecting second element", format.nextRecord(u)); + + assertNull(format.nextRecord(u)); + assertTrue(format.reachedEnd()); + + format.close(); + } + + /** + * Test if the Flink serialization is able to properly process GenericData.Record types. + * Usually users of Avro generate classes (POJOs) from Avro schemas. + * However, if generated classes are not available, one can also use GenericData.Record. + * It is an untyped key-value record which is using a schema to validate the correctness of the data. + * + * <p>It is not recommended to use GenericData.Record with Flink. Use generated POJOs instead. + */ + @Test + public void testDeserializeToGenericType() throws IOException { + DatumReader<GenericData.Record> datumReader = new GenericDatumReader<>(userSchema); + + try (FileReader<GenericData.Record> dataFileReader = DataFileReader.openReader(testFile, datumReader)) { + // initialize Record by reading it from disk (that's easier than creating it by hand) + GenericData.Record rec = new GenericData.Record(userSchema); + dataFileReader.next(rec); + + // check if record has been read correctly + assertNotNull(rec); + assertEquals("name not equal", TEST_NAME, rec.get("name").toString()); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString()); + assertEquals(null, rec.get("type_long_test")); // it is null for the first record. + + // now serialize it with our framework: + TypeInformation<GenericData.Record> te = TypeExtractor.createTypeInfo(GenericData.Record.class); + + ExecutionConfig ec = new ExecutionConfig(); + assertEquals(GenericTypeInfo.class, te.getClass()); + + Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<Class<?>>()); + + TypeSerializer<GenericData.Record> tser = te.createSerializer(ec); + assertEquals(1, ec.getDefaultKryoSerializerClasses().size()); + assertTrue( + ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) && + ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(AvroKryoSerializerUtils.AvroSchemaSerializer.class)); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out)) { + tser.serialize(rec, outView); + } + + GenericData.Record newRec; + try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper( + new ByteArrayInputStream(out.toByteArray()))) { + newRec = tser.deserialize(inView); + } + + // check if it is still the same + assertNotNull(newRec); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString()); + assertEquals("name not equal", TEST_NAME, newRec.get("name").toString()); + assertEquals(null, newRec.get("type_long_test")); + } + } + + /** + * This test validates proper serialization with specific (generated POJO) types. + */ + @Test + public void testDeserializeToSpecificType() throws IOException { + + DatumReader<User> datumReader = new SpecificDatumReader<User>(userSchema); + + try (FileReader<User> dataFileReader = DataFileReader.openReader(testFile, datumReader)) { + User rec = dataFileReader.next(); + + // check if record has been read correctly + assertNotNull(rec); + assertEquals("name not equal", TEST_NAME, rec.get("name").toString()); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString()); + + // now serialize it with our framework: + ExecutionConfig ec = new ExecutionConfig(); + TypeInformation<User> te = TypeExtractor.createTypeInfo(User.class); + + assertEquals(AvroTypeInfo.class, te.getClass()); + TypeSerializer<User> tser = te.createSerializer(ec); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out)) { + tser.serialize(rec, outView); + } + + User newRec; + try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper( + new ByteArrayInputStream(out.toByteArray()))) { + newRec = tser.deserialize(inView); + } + + // check if it is still the same + assertNotNull(newRec); + assertEquals("name not equal", TEST_NAME, newRec.getName().toString()); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.getTypeEnum().toString()); + } + } + + /** + * Test if the AvroInputFormat is able to properly read data from an Avro + * file as a GenericRecord. + * + * @throws IOException + */ + @Test + public void testDeserialisationGenericRecord() throws IOException { + Configuration parameters = new Configuration(); + + AvroInputFormat<GenericRecord> format = new AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()), + GenericRecord.class); + + doTestDeserializationGenericRecord(format, parameters); + } + + /** + * Helper method to test GenericRecord serialisation. + * + * @param format + * the format to test + * @param parameters + * the configuration to use + * @throws IOException + * thrown id there is a issue + */ + @SuppressWarnings("unchecked") + private void doTestDeserializationGenericRecord(final AvroInputFormat<GenericRecord> format, + final Configuration parameters) throws IOException { + try { + format.configure(parameters); + FileInputSplit[] splits = format.createInputSplits(1); + assertEquals(splits.length, 1); + format.open(splits[0]); + + GenericRecord u = format.nextRecord(null); + assertNotNull(u); + assertEquals("The schemas should be equal", userSchema, u.getSchema()); + + String name = u.get("name").toString(); + assertNotNull("empty record", name); + assertEquals("name not equal", TEST_NAME, name); + + // check arrays + List<CharSequence> sl = (List<CharSequence>) u.get("type_array_string"); + assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString()); + assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString()); + + List<Boolean> bl = (List<Boolean>) u.get("type_array_boolean"); + assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0)); + assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1)); + + // check enums + GenericData.EnumSymbol enumValue = (GenericData.EnumSymbol) u.get("type_enum"); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), enumValue.toString()); + + // check maps + Map<CharSequence, Long> lm = (Map<CharSequence, Long>) u.get("type_map"); + assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue()); + assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue()); + + assertFalse("expecting second element", format.reachedEnd()); + assertNotNull("expecting second element", format.nextRecord(u)); + + assertNull(format.nextRecord(u)); + assertTrue(format.reachedEnd()); + } finally { + format.close(); + } + } + + /** + * Test if the AvroInputFormat is able to properly read data from an avro + * file as a GenericRecord. + * + * @throws IOException if there is an error + */ + @Test + public void testDeserialisationGenericRecordReuseAvroValueFalse() throws IOException { + Configuration parameters = new Configuration(); + + AvroInputFormat<GenericRecord> format = new AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()), + GenericRecord.class); + format.configure(parameters); + format.setReuseAvroValue(false); + + doTestDeserializationGenericRecord(format, parameters); + } + + @After + public void deleteFiles() { + testFile.delete(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java new file mode 100644 index 0000000..5341bcf --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.formats.avro.utils.AvroTestUtils; +import org.apache.flink.types.Row; +import org.apache.flink.util.InstantiationUtil; + +import org.apache.avro.specific.SpecificRecord; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +/** + * Test for the Avro serialization and deserialization schema. + */ +public class AvroRowDeSerializationSchemaTest { + + @Test + public void testSerializeDeserializeSimpleRow() throws IOException { + final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData(); + + final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0); + final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0); + + final byte[] bytes = serializationSchema.serialize(testData.f2); + final Row actual = deserializationSchema.deserialize(bytes); + + assertEquals(testData.f2, actual); + } + + @Test + public void testSerializeSimpleRowSeveralTimes() throws IOException { + final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData(); + + final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0); + final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0); + + serializationSchema.serialize(testData.f2); + serializationSchema.serialize(testData.f2); + final byte[] bytes = serializationSchema.serialize(testData.f2); + final Row actual = deserializationSchema.deserialize(bytes); + + assertEquals(testData.f2, actual); + } + + @Test + public void testDeserializeRowSeveralTimes() throws IOException { + final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData(); + + final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0); + final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0); + + final byte[] bytes = serializationSchema.serialize(testData.f2); + deserializationSchema.deserialize(bytes); + deserializationSchema.deserialize(bytes); + final Row actual = deserializationSchema.deserialize(bytes); + + assertEquals(testData.f2, actual); + } + + @Test + public void testSerializeDeserializeComplexRow() throws IOException { + final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData(); + + final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0); + final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0); + + final byte[] bytes = serializationSchema.serialize(testData.f2); + final Row actual = deserializationSchema.deserialize(bytes); + + assertEquals(testData.f2, actual); + } + + @Test + public void testSerializeComplexRowSeveralTimes() throws IOException { + final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData(); + + final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0); + final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0); + + serializationSchema.serialize(testData.f2); + serializationSchema.serialize(testData.f2); + final byte[] bytes = serializationSchema.serialize(testData.f2); + final Row actual = deserializationSchema.deserialize(bytes); + + assertEquals(testData.f2, actual); + } + + @Test + public void testDeserializeComplexRowSeveralTimes() throws IOException { + final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData(); + + final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0); + final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0); + + final byte[] bytes = serializationSchema.serialize(testData.f2); + deserializationSchema.deserialize(bytes); + deserializationSchema.deserialize(bytes); + final Row actual = deserializationSchema.deserialize(bytes); + + assertEquals(testData.f2, actual); + } + + @Test + public void testSerializability() throws IOException, ClassNotFoundException { + final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData(); + + final AvroRowSerializationSchema serOrig = new AvroRowSerializationSchema(testData.f0); + final AvroRowDeserializationSchema deserOrig = new AvroRowDeserializationSchema(testData.f0); + + byte[] serBytes = InstantiationUtil.serializeObject(serOrig); + byte[] deserBytes = InstantiationUtil.serializeObject(deserOrig); + + AvroRowSerializationSchema serCopy = + InstantiationUtil.deserializeObject(serBytes, Thread.currentThread().getContextClassLoader()); + AvroRowDeserializationSchema deserCopy = + InstantiationUtil.deserializeObject(deserBytes, Thread.currentThread().getContextClassLoader()); + + final byte[] bytes = serCopy.serialize(testData.f2); + deserCopy.deserialize(bytes); + deserCopy.deserialize(bytes); + final Row actual = deserCopy.deserialize(bytes); + + assertEquals(testData.f2, actual); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java new file mode 100644 index 0000000..40a84f9 --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.avro.generated.Address; +import org.apache.flink.formats.avro.generated.Colors; +import org.apache.flink.formats.avro.generated.Fixed16; +import org.apache.flink.formats.avro.generated.User; + +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.specific.SpecificDatumWriter; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Random; + +import static org.junit.Assert.assertEquals; + +/** + * Test the avro input format. + * (The testcase is mostly the getting started tutorial of avro) + * http://avro.apache.org/docs/current/gettingstartedjava.html + */ +public class AvroSplittableInputFormatTest { + + private File testFile; + + static final String TEST_NAME = "Alyssa"; + + static final String TEST_ARRAY_STRING_1 = "ELEMENT 1"; + static final String TEST_ARRAY_STRING_2 = "ELEMENT 2"; + + static final boolean TEST_ARRAY_BOOLEAN_1 = true; + static final boolean TEST_ARRAY_BOOLEAN_2 = false; + + static final Colors TEST_ENUM_COLOR = Colors.GREEN; + + static final String TEST_MAP_KEY1 = "KEY 1"; + static final long TEST_MAP_VALUE1 = 8546456L; + static final String TEST_MAP_KEY2 = "KEY 2"; + static final long TEST_MAP_VALUE2 = 17554L; + + static final Integer TEST_NUM = new Integer(239); + static final String TEST_STREET = "Baker Street"; + static final String TEST_CITY = "London"; + static final String TEST_STATE = "London"; + static final String TEST_ZIP = "NW1 6XE"; + + static final int NUM_RECORDS = 5000; + + @Before + public void createFiles() throws IOException { + testFile = File.createTempFile("AvroSplittableInputFormatTest", null); + + ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>(); + stringArray.add(TEST_ARRAY_STRING_1); + stringArray.add(TEST_ARRAY_STRING_2); + + ArrayList<Boolean> booleanArray = new ArrayList<Boolean>(); + booleanArray.add(TEST_ARRAY_BOOLEAN_1); + booleanArray.add(TEST_ARRAY_BOOLEAN_2); + + HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>(); + longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1); + longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2); + + Address addr = new Address(); + addr.setNum(new Integer(TEST_NUM)); + addr.setStreet(TEST_STREET); + addr.setCity(TEST_CITY); + addr.setState(TEST_STATE); + addr.setZip(TEST_ZIP); + + User user1 = new User(); + user1.setName(TEST_NAME); + user1.setFavoriteNumber(256); + user1.setTypeDoubleTest(123.45d); + user1.setTypeBoolTest(true); + user1.setTypeArrayString(stringArray); + user1.setTypeArrayBoolean(booleanArray); + user1.setTypeEnum(TEST_ENUM_COLOR); + user1.setTypeMap(longMap); + user1.setTypeNested(addr); + + // Construct via builder + User user2 = User.newBuilder() + .setName(TEST_NAME) + .setFavoriteColor("blue") + .setFavoriteNumber(null) + .setTypeBoolTest(false) + .setTypeDoubleTest(1.337d) + .setTypeNullTest(null) + .setTypeLongTest(1337L) + .setTypeArrayString(new ArrayList<CharSequence>()) + .setTypeArrayBoolean(new ArrayList<Boolean>()) + .setTypeNullableArray(null) + .setTypeEnum(Colors.RED) + .setTypeMap(new HashMap<CharSequence, Long>()) + .setTypeFixed(new Fixed16()) + .setTypeUnion(123L) + .setTypeNested( + Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET) + .setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP) + .build()) + .build(); + DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class); + DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter); + dataFileWriter.create(user1.getSchema(), testFile); + dataFileWriter.append(user1); + dataFileWriter.append(user2); + + Random rnd = new Random(1337); + for (int i = 0; i < NUM_RECORDS - 2; i++) { + User user = new User(); + user.setName(TEST_NAME + rnd.nextInt()); + user.setFavoriteNumber(rnd.nextInt()); + user.setTypeDoubleTest(rnd.nextDouble()); + user.setTypeBoolTest(true); + user.setTypeArrayString(stringArray); + user.setTypeArrayBoolean(booleanArray); + user.setTypeEnum(TEST_ENUM_COLOR); + user.setTypeMap(longMap); + Address address = new Address(); + address.setNum(new Integer(TEST_NUM)); + address.setStreet(TEST_STREET); + address.setCity(TEST_CITY); + address.setState(TEST_STATE); + address.setZip(TEST_ZIP); + user.setTypeNested(address); + + dataFileWriter.append(user); + } + dataFileWriter.close(); + } + + @Test + public void testSplittedIF() throws IOException { + Configuration parameters = new Configuration(); + + AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); + + format.configure(parameters); + FileInputSplit[] splits = format.createInputSplits(4); + assertEquals(splits.length, 4); + int elements = 0; + int[] elementsPerSplit = new int[4]; + for (int i = 0; i < splits.length; i++) { + format.open(splits[i]); + while (!format.reachedEnd()) { + User u = format.nextRecord(null); + Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME)); + elements++; + elementsPerSplit[i]++; + } + format.close(); + } + + Assert.assertEquals(1539, elementsPerSplit[0]); + Assert.assertEquals(1026, elementsPerSplit[1]); + Assert.assertEquals(1539, elementsPerSplit[2]); + Assert.assertEquals(896, elementsPerSplit[3]); + Assert.assertEquals(NUM_RECORDS, elements); + format.close(); + } + + @Test + public void testAvroRecoveryWithFailureAtStart() throws Exception { + final int recordsUntilCheckpoint = 132; + + Configuration parameters = new Configuration(); + + AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); + format.configure(parameters); + + FileInputSplit[] splits = format.createInputSplits(4); + assertEquals(splits.length, 4); + + int elements = 0; + int[] elementsPerSplit = new int[4]; + for (int i = 0; i < splits.length; i++) { + format.reopen(splits[i], format.getCurrentState()); + while (!format.reachedEnd()) { + User u = format.nextRecord(null); + Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME)); + elements++; + + if (format.getRecordsReadFromBlock() == recordsUntilCheckpoint) { + + // do the whole checkpoint-restore procedure and see if we pick up from where we left off. + Tuple2<Long, Long> state = format.getCurrentState(); + + // this is to make sure that nothing stays from the previous format + // (as it is going to be in the normal case) + format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class); + + format.reopen(splits[i], state); + assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint); + } + elementsPerSplit[i]++; + } + format.close(); + } + + Assert.assertEquals(1539, elementsPerSplit[0]); + Assert.assertEquals(1026, elementsPerSplit[1]); + Assert.assertEquals(1539, elementsPerSplit[2]); + Assert.assertEquals(896, elementsPerSplit[3]); + Assert.assertEquals(NUM_RECORDS, elements); + format.close(); + } + + @Test + public void testAvroRecovery() throws Exception { + final int recordsUntilCheckpoint = 132; + + Configuration parameters = new Configuration(); + + AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); + format.configure(parameters); + + FileInputSplit[] splits = format.createInputSplits(4); + assertEquals(splits.length, 4); + + int elements = 0; + int[] elementsPerSplit = new int[4]; + for (int i = 0; i < splits.length; i++) { + format.open(splits[i]); + while (!format.reachedEnd()) { + User u = format.nextRecord(null); + Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME)); + elements++; + + if (format.getRecordsReadFromBlock() == recordsUntilCheckpoint) { + + // do the whole checkpoint-restore procedure and see if we pick up from where we left off. + Tuple2<Long, Long> state = format.getCurrentState(); + + // this is to make sure that nothing stays from the previous format + // (as it is going to be in the normal case) + format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class); + + format.reopen(splits[i], state); + assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint); + } + elementsPerSplit[i]++; + } + format.close(); + } + + Assert.assertEquals(1539, elementsPerSplit[0]); + Assert.assertEquals(1026, elementsPerSplit[1]); + Assert.assertEquals(1539, elementsPerSplit[2]); + Assert.assertEquals(896, elementsPerSplit[3]); + Assert.assertEquals(NUM_RECORDS, elements); + format.close(); + } + + /* + This test is gave the reference values for the test of Flink's IF. + + This dependency needs to be added + + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-mapred</artifactId> + <version>1.7.6</version> + </dependency> + + @Test + public void testHadoop() throws Exception { + JobConf jf = new JobConf(); + FileInputFormat.addInputPath(jf, new org.apache.hadoop.fs.Path(testFile.toURI())); + jf.setBoolean(org.apache.avro.mapred.AvroInputFormat.IGNORE_FILES_WITHOUT_EXTENSION_KEY, false); + org.apache.avro.mapred.AvroInputFormat<User> format = new org.apache.avro.mapred.AvroInputFormat<User>(); + InputSplit[] sp = format.getSplits(jf, 4); + int elementsPerSplit[] = new int[4]; + int cnt = 0; + int i = 0; + for (InputSplit s:sp) { + RecordReader<AvroWrapper<User>, NullWritable> r = format.getRecordReader(s, jf, new HadoopDummyReporter()); + AvroWrapper<User> k = r.createKey(); + NullWritable v = r.createValue(); + + while (r.next(k, v)) { + cnt++; + elementsPerSplit[i]++; + } + i++; + } + System.out.println("Status "+Arrays.toString(elementsPerSplit)); + } **/ + + @After + public void deleteFiles() { + testFile.delete(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java new file mode 100644 index 0000000..87e169b --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java @@ -0,0 +1,531 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.formats.avro.generated.Address; +import org.apache.flink.formats.avro.generated.Colors; +import org.apache.flink.formats.avro.generated.Fixed16; +import org.apache.flink.formats.avro.generated.User; +import org.apache.flink.formats.avro.utils.DataInputDecoder; +import org.apache.flink.formats.avro.utils.DataOutputEncoder; +import org.apache.flink.util.StringUtils; + +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Tests the {@link DataOutputEncoder} and {@link DataInputDecoder} classes for Avro serialization. + */ +public class EncoderDecoderTest { + @Test + public void testComplexStringsDirecty() { + try { + Random rnd = new Random(349712539451944123L); + + for (int i = 0; i < 10; i++) { + String testString = StringUtils.getRandomString(rnd, 10, 100); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(512); + { + DataOutputStream dataOut = new DataOutputStream(baos); + DataOutputEncoder encoder = new DataOutputEncoder(); + encoder.setOut(dataOut); + + encoder.writeString(testString); + dataOut.flush(); + dataOut.close(); + } + + byte[] data = baos.toByteArray(); + + // deserialize + { + ByteArrayInputStream bais = new ByteArrayInputStream(data); + DataInputStream dataIn = new DataInputStream(bais); + DataInputDecoder decoder = new DataInputDecoder(); + decoder.setIn(dataIn); + + String deserialized = decoder.readString(); + + assertEquals(testString, deserialized); + } + } + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test failed due to an exception: " + e.getMessage()); + } + } + + @Test + public void testPrimitiveTypes() { + + testObjectSerialization(new Boolean(true)); + testObjectSerialization(new Boolean(false)); + + testObjectSerialization(Byte.valueOf((byte) 0)); + testObjectSerialization(Byte.valueOf((byte) 1)); + testObjectSerialization(Byte.valueOf((byte) -1)); + testObjectSerialization(Byte.valueOf(Byte.MIN_VALUE)); + testObjectSerialization(Byte.valueOf(Byte.MAX_VALUE)); + + testObjectSerialization(Short.valueOf((short) 0)); + testObjectSerialization(Short.valueOf((short) 1)); + testObjectSerialization(Short.valueOf((short) -1)); + testObjectSerialization(Short.valueOf(Short.MIN_VALUE)); + testObjectSerialization(Short.valueOf(Short.MAX_VALUE)); + + testObjectSerialization(Integer.valueOf(0)); + testObjectSerialization(Integer.valueOf(1)); + testObjectSerialization(Integer.valueOf(-1)); + testObjectSerialization(Integer.valueOf(Integer.MIN_VALUE)); + testObjectSerialization(Integer.valueOf(Integer.MAX_VALUE)); + + testObjectSerialization(Long.valueOf(0)); + testObjectSerialization(Long.valueOf(1)); + testObjectSerialization(Long.valueOf(-1)); + testObjectSerialization(Long.valueOf(Long.MIN_VALUE)); + testObjectSerialization(Long.valueOf(Long.MAX_VALUE)); + + testObjectSerialization(Float.valueOf(0)); + testObjectSerialization(Float.valueOf(1)); + testObjectSerialization(Float.valueOf(-1)); + testObjectSerialization(Float.valueOf((float) Math.E)); + testObjectSerialization(Float.valueOf((float) Math.PI)); + testObjectSerialization(Float.valueOf(Float.MIN_VALUE)); + testObjectSerialization(Float.valueOf(Float.MAX_VALUE)); + testObjectSerialization(Float.valueOf(Float.MIN_NORMAL)); + testObjectSerialization(Float.valueOf(Float.NaN)); + testObjectSerialization(Float.valueOf(Float.NEGATIVE_INFINITY)); + testObjectSerialization(Float.valueOf(Float.POSITIVE_INFINITY)); + + testObjectSerialization(Double.valueOf(0)); + testObjectSerialization(Double.valueOf(1)); + testObjectSerialization(Double.valueOf(-1)); + testObjectSerialization(Double.valueOf(Math.E)); + testObjectSerialization(Double.valueOf(Math.PI)); + testObjectSerialization(Double.valueOf(Double.MIN_VALUE)); + testObjectSerialization(Double.valueOf(Double.MAX_VALUE)); + testObjectSerialization(Double.valueOf(Double.MIN_NORMAL)); + testObjectSerialization(Double.valueOf(Double.NaN)); + testObjectSerialization(Double.valueOf(Double.NEGATIVE_INFINITY)); + testObjectSerialization(Double.valueOf(Double.POSITIVE_INFINITY)); + + testObjectSerialization(""); + testObjectSerialization("abcdefg"); + testObjectSerialization("ab\u1535\u0155xyz\u706F"); + + testObjectSerialization(new SimpleTypes(3637, 54876486548L, (byte) 65, "We're out looking for astronauts", (short) 0x2387, 2.65767523)); + testObjectSerialization(new SimpleTypes(705608724, -1L, (byte) -65, "Serve me the sky with a big slice of lemon", (short) Byte.MIN_VALUE, 0.0000001)); + } + + @Test + public void testArrayTypes() { + { + int[] array = new int[] {1, 2, 3, 4, 5}; + testObjectSerialization(array); + } + { + long[] array = new long[] {1, 2, 3, 4, 5}; + testObjectSerialization(array); + } + { + float[] array = new float[] {1, 2, 3, 4, 5}; + testObjectSerialization(array); + } + { + double[] array = new double[] {1, 2, 3, 4, 5}; + testObjectSerialization(array); + } + { + String[] array = new String[] {"Oh", "my", "what", "do", "we", "have", "here", "?"}; + testObjectSerialization(array); + } + } + + @Test + public void testEmptyArray() { + { + int[] array = new int[0]; + testObjectSerialization(array); + } + { + long[] array = new long[0]; + testObjectSerialization(array); + } + { + float[] array = new float[0]; + testObjectSerialization(array); + } + { + double[] array = new double[0]; + testObjectSerialization(array); + } + { + String[] array = new String[0]; + testObjectSerialization(array); + } + } + + @Test + public void testObjects() { + // simple object containing only primitives + { + testObjectSerialization(new Book(976243875L, "The Serialization Odysse", 42)); + } + + // object with collection + { + ArrayList<String> list = new ArrayList<String>(); + list.add("A"); + list.add("B"); + list.add("C"); + list.add("D"); + list.add("E"); + + testObjectSerialization(new BookAuthor(976243875L, list, "Arno Nym")); + } + + // object with empty collection + { + ArrayList<String> list = new ArrayList<String>(); + testObjectSerialization(new BookAuthor(987654321L, list, "The Saurus")); + } + } + + @Test + public void testNestedObjectsWithCollections() { + testObjectSerialization(new ComplexNestedObject2(true)); + } + + @Test + public void testGeneratedObjectWithNullableFields() { + List<CharSequence> strings = Arrays.asList(new CharSequence[] { "These", "strings", "should", "be", "recognizable", "as", "a", "meaningful", "sequence" }); + List<Boolean> bools = Arrays.asList(true, true, false, false, true, false, true, true); + Map<CharSequence, Long> map = new HashMap<CharSequence, Long>(); + map.put("1", 1L); + map.put("2", 2L); + map.put("3", 3L); + + byte[] b = new byte[16]; + new Random().nextBytes(b); + Fixed16 f = new Fixed16(b); + Address addr = new Address(new Integer(239), "6th Main", "Bangalore", + "Karnataka", "560075"); + User user = new User("Freudenreich", 1337, "macintosh gray", + 1234567890L, 3.1415926, null, true, strings, bools, null, + Colors.GREEN, map, f, new Boolean(true), addr); + + testObjectSerialization(user); + } + + @Test + public void testVarLenCountEncoding() { + try { + long[] values = new long[] { 0, 1, 2, 3, 4, 0, 574, 45236, 0, 234623462, 23462462346L, 0, 9734028767869761L, 0x7fffffffffffffffL}; + + // write + ByteArrayOutputStream baos = new ByteArrayOutputStream(512); + { + DataOutputStream dataOut = new DataOutputStream(baos); + + for (long val : values) { + DataOutputEncoder.writeVarLongCount(dataOut, val); + } + + dataOut.flush(); + dataOut.close(); + } + + // read + { + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + DataInputStream dataIn = new DataInputStream(bais); + + for (long val : values) { + long read = DataInputDecoder.readVarLongCount(dataIn); + assertEquals("Wrong var-len encoded value read.", val, read); + } + } + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test failed due to an exception: " + e.getMessage()); + } + } + + private static <X> void testObjectSerialization(X obj) { + + try { + + // serialize + ByteArrayOutputStream baos = new ByteArrayOutputStream(512); + { + DataOutputStream dataOut = new DataOutputStream(baos); + DataOutputEncoder encoder = new DataOutputEncoder(); + encoder.setOut(dataOut); + + @SuppressWarnings("unchecked") + Class<X> clazz = (Class<X>) obj.getClass(); + ReflectDatumWriter<X> writer = new ReflectDatumWriter<X>(clazz); + + writer.write(obj, encoder); + dataOut.flush(); + dataOut.close(); + } + + byte[] data = baos.toByteArray(); + X result = null; + + // deserialize + { + ByteArrayInputStream bais = new ByteArrayInputStream(data); + DataInputStream dataIn = new DataInputStream(bais); + DataInputDecoder decoder = new DataInputDecoder(); + decoder.setIn(dataIn); + + @SuppressWarnings("unchecked") + Class<X> clazz = (Class<X>) obj.getClass(); + ReflectDatumReader<X> reader = new ReflectDatumReader<X>(clazz); + + // create a reuse object if possible, otherwise we have no reuse object + X reuse = null; + try { + @SuppressWarnings("unchecked") + X test = (X) obj.getClass().newInstance(); + reuse = test; + } catch (Throwable t) {} + + result = reader.read(reuse, decoder); + } + + // check + final String message = "Deserialized object is not the same as the original"; + + if (obj.getClass().isArray()) { + Class<?> clazz = obj.getClass(); + if (clazz == byte[].class) { + assertArrayEquals(message, (byte[]) obj, (byte[]) result); + } + else if (clazz == short[].class) { + assertArrayEquals(message, (short[]) obj, (short[]) result); + } + else if (clazz == int[].class) { + assertArrayEquals(message, (int[]) obj, (int[]) result); + } + else if (clazz == long[].class) { + assertArrayEquals(message, (long[]) obj, (long[]) result); + } + else if (clazz == char[].class) { + assertArrayEquals(message, (char[]) obj, (char[]) result); + } + else if (clazz == float[].class) { + assertArrayEquals(message, (float[]) obj, (float[]) result, 0.0f); + } + else if (clazz == double[].class) { + assertArrayEquals(message, (double[]) obj, (double[]) result, 0.0); + } else { + assertArrayEquals(message, (Object[]) obj, (Object[]) result); + } + } else { + assertEquals(message, obj, result); + } + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test failed due to an exception: " + e.getMessage()); + } + } + + // -------------------------------------------------------------------------------------------- + // Test Objects + // -------------------------------------------------------------------------------------------- + + private static final class SimpleTypes { + + private final int iVal; + private final long lVal; + private final byte bVal; + private final String sVal; + private final short rVal; + private final double dVal; + + public SimpleTypes() { + this(0, 0, (byte) 0, "", (short) 0, 0); + } + + public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, short rVal, double dVal) { + this.iVal = iVal; + this.lVal = lVal; + this.bVal = bVal; + this.sVal = sVal; + this.rVal = rVal; + this.dVal = dVal; + } + + @Override + public boolean equals(Object obj) { + if (obj.getClass() == SimpleTypes.class) { + SimpleTypes other = (SimpleTypes) obj; + + return other.iVal == this.iVal && + other.lVal == this.lVal && + other.bVal == this.bVal && + other.sVal.equals(this.sVal) && + other.rVal == this.rVal && + other.dVal == this.dVal; + + } else { + return false; + } + } + } + + private static class ComplexNestedObject1 { + + private double doubleValue; + + private List<String> stringList; + + public ComplexNestedObject1() {} + + public ComplexNestedObject1(int offInit) { + this.doubleValue = 6293485.6723 + offInit; + + this.stringList = new ArrayList<String>(); + this.stringList.add("A" + offInit); + this.stringList.add("somewhat" + offInit); + this.stringList.add("random" + offInit); + this.stringList.add("collection" + offInit); + this.stringList.add("of" + offInit); + this.stringList.add("strings" + offInit); + } + + @Override + public boolean equals(Object obj) { + if (obj.getClass() == ComplexNestedObject1.class) { + ComplexNestedObject1 other = (ComplexNestedObject1) obj; + return other.doubleValue == this.doubleValue && this.stringList.equals(other.stringList); + } else { + return false; + } + } + } + + private static class ComplexNestedObject2 { + + private long longValue; + + private Map<String, ComplexNestedObject1> theMap; + + public ComplexNestedObject2() {} + + public ComplexNestedObject2(boolean init) { + this.longValue = 46547; + + this.theMap = new HashMap<String, ComplexNestedObject1>(); + this.theMap.put("36354L", new ComplexNestedObject1(43546543)); + this.theMap.put("785611L", new ComplexNestedObject1(45784568)); + this.theMap.put("43L", new ComplexNestedObject1(9876543)); + this.theMap.put("-45687L", new ComplexNestedObject1(7897615)); + this.theMap.put("1919876876896L", new ComplexNestedObject1(27154)); + this.theMap.put("-868468468L", new ComplexNestedObject1(546435)); + } + + @Override + public boolean equals(Object obj) { + if (obj.getClass() == ComplexNestedObject2.class) { + ComplexNestedObject2 other = (ComplexNestedObject2) obj; + return other.longValue == this.longValue && this.theMap.equals(other.theMap); + } else { + return false; + } + } + } + + private static class Book { + + private long bookId; + private String title; + private long authorId; + + public Book() {} + + public Book(long bookId, String title, long authorId) { + this.bookId = bookId; + this.title = title; + this.authorId = authorId; + } + + @Override + public boolean equals(Object obj) { + if (obj.getClass() == Book.class) { + Book other = (Book) obj; + return other.bookId == this.bookId && other.authorId == this.authorId && this.title.equals(other.title); + } else { + return false; + } + } + } + + private static class BookAuthor { + + private long authorId; + private List<String> bookTitles; + private String authorName; + + public BookAuthor() {} + + public BookAuthor(long authorId, List<String> bookTitles, String authorName) { + this.authorId = authorId; + this.bookTitles = bookTitles; + this.authorName = authorName; + } + + @Override + public boolean equals(Object obj) { + if (obj.getClass() == BookAuthor.class) { + BookAuthor other = (BookAuthor) obj; + return other.authorName.equals(this.authorName) && other.authorId == this.authorId && + other.bookTitles.equals(this.bookTitles); + } else { + return false; + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/testjar/AvroExternalJarProgram.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/testjar/AvroExternalJarProgram.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/testjar/AvroExternalJarProgram.java new file mode 100644 index 0000000..17f56a6 --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/testjar/AvroExternalJarProgram.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.testjar; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.avro.AvroInputFormat; + +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumWriter; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +/** + * This file defines the classes for the AvroExternalJarProgramITCase. + */ +public class AvroExternalJarProgram { + + private static final class Color { + + private String name; + private double saturation; + + public Color() { + name = ""; + saturation = 1.0; + } + + public Color(String name, double saturation) { + this.name = name; + this.saturation = saturation; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public double getSaturation() { + return saturation; + } + + public void setSaturation(double saturation) { + this.saturation = saturation; + } + + @Override + public String toString() { + return name + '(' + saturation + ')'; + } + } + + private static final class MyUser { + + private String name; + private List<Color> colors; + + public MyUser() { + name = "unknown"; + colors = new ArrayList<Color>(); + } + + public MyUser(String name, List<Color> colors) { + this.name = name; + this.colors = colors; + } + + public String getName() { + return name; + } + + public List<Color> getColors() { + return colors; + } + + public void setName(String name) { + this.name = name; + } + + public void setColors(List<Color> colors) { + this.colors = colors; + } + + @Override + public String toString() { + return name + " : " + colors; + } + } + + // -------------------------------------------------------------------------------------------- + + // -------------------------------------------------------------------------------------------- + + private static final class NameExtractor extends RichMapFunction<MyUser, Tuple2<String, MyUser>> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, MyUser> map(MyUser u) { + String namePrefix = u.getName().substring(0, 1); + return new Tuple2<String, MyUser>(namePrefix, u); + } + } + + private static final class NameGrouper extends RichReduceFunction<Tuple2<String, MyUser>> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, MyUser> reduce(Tuple2<String, MyUser> val1, Tuple2<String, MyUser> val2) { + return val1; + } + } + + // -------------------------------------------------------------------------------------------- + // Test Data + // -------------------------------------------------------------------------------------------- + + private static final class Generator { + + private final Random rnd = new Random(2389756789345689276L); + + public MyUser nextUser() { + return randomUser(); + } + + private MyUser randomUser() { + + int numColors = rnd.nextInt(5); + ArrayList<Color> colors = new ArrayList<Color>(numColors); + for (int i = 0; i < numColors; i++) { + colors.add(new Color(randomString(), rnd.nextDouble())); + } + + return new MyUser(randomString(), colors); + } + + private String randomString() { + char[] c = new char[this.rnd.nextInt(20) + 5]; + + for (int i = 0; i < c.length; i++) { + c[i] = (char) (this.rnd.nextInt(150) + 40); + } + + return new String(c); + } + } + + public static void writeTestData(File testFile, int numRecords) throws IOException { + + DatumWriter<MyUser> userDatumWriter = new ReflectDatumWriter<MyUser>(MyUser.class); + DataFileWriter<MyUser> dataFileWriter = new DataFileWriter<MyUser>(userDatumWriter); + + dataFileWriter.create(ReflectData.get().getSchema(MyUser.class), testFile); + + Generator generator = new Generator(); + + for (int i = 0; i < numRecords; i++) { + MyUser user = generator.nextUser(); + dataFileWriter.append(user); + } + + dataFileWriter.close(); + } + +// public static void main(String[] args) throws Exception { +// String testDataFile = new File("src/test/resources/testdata.avro").getAbsolutePath(); +// writeTestData(new File(testDataFile), 50); +// } + + public static void main(String[] args) throws Exception { + String inputPath = args[0]; + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<MyUser> input = env.createInput(new AvroInputFormat<MyUser>(new Path(inputPath), MyUser.class)); + + DataSet<Tuple2<String, MyUser>> result = input.map(new NameExtractor()).groupBy(0).reduce(new NameGrouper()); + + result.output(new DiscardingOutputFormat<Tuple2<String, MyUser>>()); + env.execute(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericArraySerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericArraySerializerTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericArraySerializerTest.java new file mode 100644 index 0000000..89be9c0 --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericArraySerializerTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.AbstractGenericArraySerializerTest; + +/** + * Test for the {@link AvroSerializer}. + */ +public class AvroGenericArraySerializerTest extends AbstractGenericArraySerializerTest { + + @Override + protected <T> TypeSerializer<T> createComponentSerializer(Class<T> type) { + return new AvroSerializer<T>(type); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeComparatorTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeComparatorTest.java new file mode 100644 index 0000000..a247766 --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeComparatorTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeComparatorTest; + +/** + * Test for the {@link AvroSerializer}. + */ +public class AvroGenericTypeComparatorTest extends AbstractGenericTypeComparatorTest { + + @Override + protected <T> TypeSerializer<T> createSerializer(Class<T> type) { + return new AvroSerializer<>(type); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeSerializerTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeSerializerTest.java new file mode 100644 index 0000000..1c1a19b --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeSerializerTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest; + +/** + * Test for the {@link AvroSerializer}. + */ +public class AvroGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest { + + @Override + protected <T> TypeSerializer<T> createSerializer(Class<T> type) { + return new AvroSerializer<>(type); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java new file mode 100644 index 0000000..bb3d911 --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.typeutils.SerializerTestInstance; + +import org.apache.avro.reflect.Nullable; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.fail; + +/** + * Tests for the {@link AvroSerializer}. + */ +public class AvroSerializerEmptyArrayTest { + + @Test + public void testBookSerialization() { + try { + Book b = new Book(123, "This is a test book", 26382648); + AvroSerializer<Book> serializer = new AvroSerializer<Book>(Book.class); + SerializerTestInstance<Book> test = new SerializerTestInstance<Book>(serializer, Book.class, -1, b); + test.testAll(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSerialization() { + try { + List<String> titles = new ArrayList<String>(); + + List<Book> books = new ArrayList<Book>(); + books.add(new Book(123, "This is a test book", 1)); + books.add(new Book(24234234, "This is a test book", 1)); + books.add(new Book(1234324, "This is a test book", 3)); + + BookAuthor a = new BookAuthor(1, titles, "Test Author"); + a.books = books; + a.bookType = BookAuthor.BookType.journal; + + AvroSerializer<BookAuthor> serializer = new AvroSerializer<BookAuthor>(BookAuthor.class); + + SerializerTestInstance<BookAuthor> test = new SerializerTestInstance<BookAuthor>(serializer, BookAuthor.class, -1, a); + test.testAll(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + /** + * Avro POJO for testing. + */ + public static class Book { + + long bookId; + @Nullable + String title; + long authorId; + + public Book() {} + + public Book(long bookId, String title, long authorId) { + this.bookId = bookId; + this.title = title; + this.authorId = authorId; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (int) (authorId ^ (authorId >>> 32)); + result = prime * result + (int) (bookId ^ (bookId >>> 32)); + result = prime * result + ((title == null) ? 0 : title.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Book other = (Book) obj; + if (authorId != other.authorId) { + return false; + } + if (bookId != other.bookId) { + return false; + } + if (title == null) { + if (other.title != null) { + return false; + } + } else if (!title.equals(other.title)) { + return false; + } + return true; + } + } + + /** + * Avro POJO for testing. + */ + public static class BookAuthor { + + enum BookType { + book, + article, + journal + } + + long authorId; + + @Nullable + List<String> bookTitles; + + @Nullable + List<Book> books; + + String authorName; + + BookType bookType; + + public BookAuthor() {} + + public BookAuthor(long authorId, List<String> bookTitles, String authorName) { + this.authorId = authorId; + this.bookTitles = bookTitles; + this.authorName = authorName; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (int) (authorId ^ (authorId >>> 32)); + result = prime * result + ((authorName == null) ? 0 : authorName.hashCode()); + result = prime * result + ((bookTitles == null) ? 0 : bookTitles.hashCode()); + result = prime * result + ((bookType == null) ? 0 : bookType.hashCode()); + result = prime * result + ((books == null) ? 0 : books.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + BookAuthor other = (BookAuthor) obj; + if (authorId != other.authorId) { + return false; + } + if (authorName == null) { + if (other.authorName != null) { + return false; + } + } else if (!authorName.equals(other.authorName)) { + return false; + } + if (bookTitles == null) { + if (other.bookTitles != null) { + return false; + } + } else if (!bookTitles.equals(other.bookTitles)) { + return false; + } + if (bookType != other.bookType) { + return false; + } + if (books == null) { + if (other.books != null) { + return false; + } + } else if (!books.equals(other.books)) { + return false; + } + return true; + } + } +}
