http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java deleted file mode 100644 index c39db15..0000000 --- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java +++ /dev/null @@ -1,528 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.avro; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; - -import org.apache.avro.reflect.ReflectDatumReader; -import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.flink.api.io.avro.generated.Address; -import org.apache.flink.api.io.avro.generated.Colors; -import org.apache.flink.api.io.avro.generated.Fixed16; -import org.apache.flink.api.io.avro.generated.User; -import org.apache.flink.util.StringUtils; -import org.junit.Test; - -import static org.junit.Assert.*; - -/** - * Tests the {@link DataOutputEncoder} and {@link DataInputDecoder} classes for Avro serialization. - */ -public class EncoderDecoderTest { - @Test - public void testComplexStringsDirecty() { - try { - Random rnd = new Random(349712539451944123L); - - for (int i = 0; i < 10; i++) { - String testString = StringUtils.getRandomString(rnd, 10, 100); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - { - DataOutputStream dataOut = new DataOutputStream(baos); - DataOutputEncoder encoder = new DataOutputEncoder(); - encoder.setOut(dataOut); - - encoder.writeString(testString); - dataOut.flush(); - dataOut.close(); - } - - byte[] data = baos.toByteArray(); - - // deserialize - { - ByteArrayInputStream bais = new ByteArrayInputStream(data); - DataInputStream dataIn = new DataInputStream(bais); - DataInputDecoder decoder = new DataInputDecoder(); - decoder.setIn(dataIn); - - String deserialized = decoder.readString(); - - assertEquals(testString, deserialized); - } - } - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Test failed due to an exception: " + e.getMessage()); - } - } - - @Test - public void testPrimitiveTypes() { - - testObjectSerialization(new Boolean(true)); - testObjectSerialization(new Boolean(false)); - - testObjectSerialization(Byte.valueOf((byte) 0)); - testObjectSerialization(Byte.valueOf((byte) 1)); - testObjectSerialization(Byte.valueOf((byte) -1)); - testObjectSerialization(Byte.valueOf(Byte.MIN_VALUE)); - testObjectSerialization(Byte.valueOf(Byte.MAX_VALUE)); - - testObjectSerialization(Short.valueOf((short) 0)); - testObjectSerialization(Short.valueOf((short) 1)); - testObjectSerialization(Short.valueOf((short) -1)); - testObjectSerialization(Short.valueOf(Short.MIN_VALUE)); - testObjectSerialization(Short.valueOf(Short.MAX_VALUE)); - - testObjectSerialization(Integer.valueOf(0)); - testObjectSerialization(Integer.valueOf(1)); - testObjectSerialization(Integer.valueOf(-1)); - testObjectSerialization(Integer.valueOf(Integer.MIN_VALUE)); - testObjectSerialization(Integer.valueOf(Integer.MAX_VALUE)); - - testObjectSerialization(Long.valueOf(0)); - testObjectSerialization(Long.valueOf(1)); - testObjectSerialization(Long.valueOf(-1)); - testObjectSerialization(Long.valueOf(Long.MIN_VALUE)); - testObjectSerialization(Long.valueOf(Long.MAX_VALUE)); - - testObjectSerialization(Float.valueOf(0)); - testObjectSerialization(Float.valueOf(1)); - testObjectSerialization(Float.valueOf(-1)); - testObjectSerialization(Float.valueOf((float)Math.E)); - testObjectSerialization(Float.valueOf((float)Math.PI)); - testObjectSerialization(Float.valueOf(Float.MIN_VALUE)); - testObjectSerialization(Float.valueOf(Float.MAX_VALUE)); - testObjectSerialization(Float.valueOf(Float.MIN_NORMAL)); - testObjectSerialization(Float.valueOf(Float.NaN)); - testObjectSerialization(Float.valueOf(Float.NEGATIVE_INFINITY)); - testObjectSerialization(Float.valueOf(Float.POSITIVE_INFINITY)); - - testObjectSerialization(Double.valueOf(0)); - testObjectSerialization(Double.valueOf(1)); - testObjectSerialization(Double.valueOf(-1)); - testObjectSerialization(Double.valueOf(Math.E)); - testObjectSerialization(Double.valueOf(Math.PI)); - testObjectSerialization(Double.valueOf(Double.MIN_VALUE)); - testObjectSerialization(Double.valueOf(Double.MAX_VALUE)); - testObjectSerialization(Double.valueOf(Double.MIN_NORMAL)); - testObjectSerialization(Double.valueOf(Double.NaN)); - testObjectSerialization(Double.valueOf(Double.NEGATIVE_INFINITY)); - testObjectSerialization(Double.valueOf(Double.POSITIVE_INFINITY)); - - testObjectSerialization(""); - testObjectSerialization("abcdefg"); - testObjectSerialization("ab\u1535\u0155xyz\u706F"); - - testObjectSerialization(new SimpleTypes(3637, 54876486548L, (byte) 65, "We're out looking for astronauts", (short) 0x2387, 2.65767523)); - testObjectSerialization(new SimpleTypes(705608724, -1L, (byte) -65, "Serve me the sky with a big slice of lemon", (short) Byte.MIN_VALUE, 0.0000001)); - } - - @Test - public void testArrayTypes() { - { - int[] array = new int[] {1, 2, 3, 4, 5}; - testObjectSerialization(array); - } - { - long[] array = new long[] {1, 2, 3, 4, 5}; - testObjectSerialization(array); - } - { - float[] array = new float[] {1, 2, 3, 4, 5}; - testObjectSerialization(array); - } - { - double[] array = new double[] {1, 2, 3, 4, 5}; - testObjectSerialization(array); - } - { - String[] array = new String[] {"Oh", "my", "what", "do", "we", "have", "here", "?"}; - testObjectSerialization(array); - } - } - - @Test - public void testEmptyArray() { - { - int[] array = new int[0]; - testObjectSerialization(array); - } - { - long[] array = new long[0]; - testObjectSerialization(array); - } - { - float[] array = new float[0]; - testObjectSerialization(array); - } - { - double[] array = new double[0]; - testObjectSerialization(array); - } - { - String[] array = new String[0]; - testObjectSerialization(array); - } - } - - @Test - public void testObjects() { - // simple object containing only primitives - { - testObjectSerialization(new Book(976243875L, "The Serialization Odysse", 42)); - } - - // object with collection - { - ArrayList<String> list = new ArrayList<String>(); - list.add("A"); - list.add("B"); - list.add("C"); - list.add("D"); - list.add("E"); - - testObjectSerialization(new BookAuthor(976243875L, list, "Arno Nym")); - } - - // object with empty collection - { - ArrayList<String> list = new ArrayList<String>(); - testObjectSerialization(new BookAuthor(987654321L, list, "The Saurus")); - } - } - - @Test - public void testNestedObjectsWithCollections() { - testObjectSerialization(new ComplexNestedObject2(true)); - } - - @Test - public void testGeneratedObjectWithNullableFields() { - List<CharSequence> strings = Arrays.asList(new CharSequence[] { "These", "strings", "should", "be", "recognizable", "as", "a", "meaningful", "sequence" }); - List<Boolean> bools = Arrays.asList(true, true, false, false, true, false, true, true); - Map<CharSequence, Long> map = new HashMap<CharSequence, Long>(); - map.put("1", 1L); - map.put("2", 2L); - map.put("3", 3L); - - byte[] b = new byte[16]; - new Random().nextBytes(b); - Fixed16 f = new Fixed16(b); - Address addr = new Address(new Integer(239), "6th Main", "Bangalore", - "Karnataka", "560075"); - User user = new User("Freudenreich", 1337, "macintosh gray", - 1234567890L, 3.1415926, null, true, strings, bools, null, - Colors.GREEN, map, f, new Boolean(true), addr); - - testObjectSerialization(user); - } - - @Test - public void testVarLenCountEncoding() { - try { - long[] values = new long[] { 0, 1, 2, 3, 4, 0, 574, 45236, 0, 234623462, 23462462346L, 0, 9734028767869761L, 0x7fffffffffffffffL}; - - // write - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - { - DataOutputStream dataOut = new DataOutputStream(baos); - - for (long val : values) { - DataOutputEncoder.writeVarLongCount(dataOut, val); - } - - dataOut.flush(); - dataOut.close(); - } - - // read - { - ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); - DataInputStream dataIn = new DataInputStream(bais); - - for (long val : values) { - long read = DataInputDecoder.readVarLongCount(dataIn); - assertEquals("Wrong var-len encoded value read.", val, read); - } - } - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Test failed due to an exception: " + e.getMessage()); - } - } - - private static <X> void testObjectSerialization(X obj) { - - try { - - // serialize - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - { - DataOutputStream dataOut = new DataOutputStream(baos); - DataOutputEncoder encoder = new DataOutputEncoder(); - encoder.setOut(dataOut); - - @SuppressWarnings("unchecked") - Class<X> clazz = (Class<X>) obj.getClass(); - ReflectDatumWriter<X> writer = new ReflectDatumWriter<X>(clazz); - - writer.write(obj, encoder); - dataOut.flush(); - dataOut.close(); - } - - byte[] data = baos.toByteArray(); - X result = null; - - // deserialize - { - ByteArrayInputStream bais = new ByteArrayInputStream(data); - DataInputStream dataIn = new DataInputStream(bais); - DataInputDecoder decoder = new DataInputDecoder(); - decoder.setIn(dataIn); - - @SuppressWarnings("unchecked") - Class<X> clazz = (Class<X>) obj.getClass(); - ReflectDatumReader<X> reader = new ReflectDatumReader<X>(clazz); - - // create a reuse object if possible, otherwise we have no reuse object - X reuse = null; - try { - @SuppressWarnings("unchecked") - X test = (X) obj.getClass().newInstance(); - reuse = test; - } catch (Throwable t) {} - - result = reader.read(reuse, decoder); - } - - // check - final String message = "Deserialized object is not the same as the original"; - - if (obj.getClass().isArray()) { - Class<?> clazz = obj.getClass(); - if (clazz == byte[].class) { - assertArrayEquals(message, (byte[]) obj, (byte[]) result); - } - else if (clazz == short[].class) { - assertArrayEquals(message, (short[]) obj, (short[]) result); - } - else if (clazz == int[].class) { - assertArrayEquals(message, (int[]) obj, (int[]) result); - } - else if (clazz == long[].class) { - assertArrayEquals(message, (long[]) obj, (long[]) result); - } - else if (clazz == char[].class) { - assertArrayEquals(message, (char[]) obj, (char[]) result); - } - else if (clazz == float[].class) { - assertArrayEquals(message, (float[]) obj, (float[]) result, 0.0f); - } - else if (clazz == double[].class) { - assertArrayEquals(message, (double[]) obj, (double[]) result, 0.0); - } else { - assertArrayEquals(message, (Object[]) obj, (Object[]) result); - } - } else { - assertEquals(message, obj, result); - } - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Test failed due to an exception: " + e.getMessage()); - } - } - - // -------------------------------------------------------------------------------------------- - // Test Objects - // -------------------------------------------------------------------------------------------- - - - public static final class SimpleTypes { - - private final int iVal; - private final long lVal; - private final byte bVal; - private final String sVal; - private final short rVal; - private final double dVal; - - - public SimpleTypes() { - this(0, 0, (byte) 0, "", (short) 0, 0); - } - - public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, short rVal, double dVal) { - this.iVal = iVal; - this.lVal = lVal; - this.bVal = bVal; - this.sVal = sVal; - this.rVal = rVal; - this.dVal = dVal; - } - - @Override - public boolean equals(Object obj) { - if (obj.getClass() == SimpleTypes.class) { - SimpleTypes other = (SimpleTypes) obj; - - return other.iVal == this.iVal && - other.lVal == this.lVal && - other.bVal == this.bVal && - other.sVal.equals(this.sVal) && - other.rVal == this.rVal && - other.dVal == this.dVal; - - } else { - return false; - } - } - } - - public static class ComplexNestedObject1 { - - private double doubleValue; - - private List<String> stringList; - - public ComplexNestedObject1() {} - - public ComplexNestedObject1(int offInit) { - this.doubleValue = 6293485.6723 + offInit; - - this.stringList = new ArrayList<String>(); - this.stringList.add("A" + offInit); - this.stringList.add("somewhat" + offInit); - this.stringList.add("random" + offInit); - this.stringList.add("collection" + offInit); - this.stringList.add("of" + offInit); - this.stringList.add("strings" + offInit); - } - - @Override - public boolean equals(Object obj) { - if (obj.getClass() == ComplexNestedObject1.class) { - ComplexNestedObject1 other = (ComplexNestedObject1) obj; - return other.doubleValue == this.doubleValue && this.stringList.equals(other.stringList); - } else { - return false; - } - } - } - - public static class ComplexNestedObject2 { - - private long longValue; - - private Map<String, ComplexNestedObject1> theMap; - - public ComplexNestedObject2() {} - - public ComplexNestedObject2(boolean init) { - this.longValue = 46547; - - this.theMap = new HashMap<String, ComplexNestedObject1>(); - this.theMap.put("36354L", new ComplexNestedObject1(43546543)); - this.theMap.put("785611L", new ComplexNestedObject1(45784568)); - this.theMap.put("43L", new ComplexNestedObject1(9876543)); - this.theMap.put("-45687L", new ComplexNestedObject1(7897615)); - this.theMap.put("1919876876896L", new ComplexNestedObject1(27154)); - this.theMap.put("-868468468L", new ComplexNestedObject1(546435)); - } - - @Override - public boolean equals(Object obj) { - if (obj.getClass() == ComplexNestedObject2.class) { - ComplexNestedObject2 other = (ComplexNestedObject2) obj; - return other.longValue == this.longValue && this.theMap.equals(other.theMap); - } else { - return false; - } - } - } - - public static class Book { - - private long bookId; - private String title; - private long authorId; - - public Book() {} - - public Book(long bookId, String title, long authorId) { - this.bookId = bookId; - this.title = title; - this.authorId = authorId; - } - - @Override - public boolean equals(Object obj) { - if (obj.getClass() == Book.class) { - Book other = (Book) obj; - return other.bookId == this.bookId && other.authorId == this.authorId && this.title.equals(other.title); - } else { - return false; - } - } - } - - public static class BookAuthor { - - private long authorId; - private List<String> bookTitles; - private String authorName; - - public BookAuthor() {} - - public BookAuthor(long authorId, List<String> bookTitles, String authorName) { - this.authorId = authorId; - this.bookTitles = bookTitles; - this.authorName = authorName; - } - - @Override - public boolean equals(Object obj) { - if (obj.getClass() == BookAuthor.class) { - BookAuthor other = (BookAuthor) obj; - return other.authorName.equals(this.authorName) && other.authorId == this.authorId && - other.bookTitles.equals(this.bookTitles); - } else { - return false; - } - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java deleted file mode 100644 index 1174786..0000000 --- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.avro.testjar; - -// ================================================================================================ -// This file defines the classes for the AvroExternalJarProgramITCase. -// The program is exported into src/test/resources/AvroTestProgram.jar. -// -// THIS FILE MUST STAY FULLY COMMENTED SUCH THAT THE HERE DEFINED CLASSES ARE NOT COMPILED -// AND ADDED TO THE test-classes DIRECTORY. OTHERWISE, THE EXTERNAL CLASS LOADING WILL -// NOT BE COVERED BY THIS TEST. -// ================================================================================================ - - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.reflect.ReflectData; -import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.functions.RichReduceFunction; -import org.apache.flink.api.java.io.DiscardingOutputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.AvroInputFormat; -import org.apache.flink.core.fs.Path; - -public class AvroExternalJarProgram { - - public static final class Color { - - private String name; - private double saturation; - - public Color() { - name = ""; - saturation = 1.0; - } - - public Color(String name, double saturation) { - this.name = name; - this.saturation = saturation; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public double getSaturation() { - return saturation; - } - - public void setSaturation(double saturation) { - this.saturation = saturation; - } - - @Override - public String toString() { - return name + '(' + saturation + ')'; - } - } - - public static final class MyUser { - - private String name; - private List<Color> colors; - - public MyUser() { - name = "unknown"; - colors = new ArrayList<Color>(); - } - - public MyUser(String name, List<Color> colors) { - this.name = name; - this.colors = colors; - } - - public String getName() { - return name; - } - - public List<Color> getColors() { - return colors; - } - - public void setName(String name) { - this.name = name; - } - - public void setColors(List<Color> colors) { - this.colors = colors; - } - - @Override - public String toString() { - return name + " : " + colors; - } - } - - // -------------------------------------------------------------------------------------------- - - // -------------------------------------------------------------------------------------------- - - public static final class NameExtractor extends RichMapFunction<MyUser, Tuple2<String, MyUser>> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<String, MyUser> map(MyUser u) { - String namePrefix = u.getName().substring(0, 1); - return new Tuple2<String, MyUser>(namePrefix, u); - } - } - - public static final class NameGrouper extends RichReduceFunction<Tuple2<String, MyUser>> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<String, MyUser> reduce(Tuple2<String, MyUser> val1, Tuple2<String, MyUser> val2) { - return val1; - } - } - - // -------------------------------------------------------------------------------------------- - // Test Data - // -------------------------------------------------------------------------------------------- - - public static final class Generator { - - private final Random rnd = new Random(2389756789345689276L); - - public MyUser nextUser() { - return randomUser(); - } - - private MyUser randomUser() { - - int numColors = rnd.nextInt(5); - ArrayList<Color> colors = new ArrayList<Color>(numColors); - for (int i = 0; i < numColors; i++) { - colors.add(new Color(randomString(), rnd.nextDouble())); - } - - return new MyUser(randomString(), colors); - } - - private String randomString() { - char[] c = new char[this.rnd.nextInt(20) + 5]; - - for (int i = 0; i < c.length; i++) { - c[i] = (char) (this.rnd.nextInt(150) + 40); - } - - return new String(c); - } - } - - public static void writeTestData(File testFile, int numRecords) throws IOException { - - DatumWriter<MyUser> userDatumWriter = new ReflectDatumWriter<MyUser>(MyUser.class); - DataFileWriter<MyUser> dataFileWriter = new DataFileWriter<MyUser>(userDatumWriter); - - dataFileWriter.create(ReflectData.get().getSchema(MyUser.class), testFile); - - - Generator generator = new Generator(); - - for (int i = 0; i < numRecords; i++) { - MyUser user = generator.nextUser(); - dataFileWriter.append(user); - } - - dataFileWriter.close(); - } - -// public static void main(String[] args) throws Exception { -// String testDataFile = new File("src/test/resources/testdata.avro").getAbsolutePath(); -// writeTestData(new File(testDataFile), 50); -// } - - public static void main(String[] args) throws Exception { - String inputPath = args[0]; - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<MyUser> input = env.createInput(new AvroInputFormat<MyUser>(new Path(inputPath), MyUser.class)); - - DataSet<Tuple2<String, MyUser>> result = input.map(new NameExtractor()).groupBy(0).reduce(new NameGrouper()); - - result.output(new DiscardingOutputFormat<Tuple2<String,MyUser>>()); - env.execute(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java deleted file mode 100644 index f33f433..0000000 --- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.io.avro; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.io.avro.generated.User; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.io.AvroInputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.fs.Path; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.util.Collector; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.File; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -@RunWith(Parameterized.class) -public class AvroPojoTest extends MultipleProgramsTestBase { - public AvroPojoTest(TestExecutionMode mode) { - super(mode); - } - - private File inFile; - private String resultPath; - private String expected; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - inFile = tempFolder.newFile(); - AvroRecordInputFormatTest.writeTestFile(inFile); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expected, resultPath); - } - - @Test - public void testSimpleAvroRead() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); - - AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users) - // null map type because the order changes in different JVMs (hard to test) - .map(new MapFunction<User, User>() { - @Override - public User map(User value) throws Exception { - value.setTypeMap(null); - return value; - } - }); - - usersDS.writeAsText(resultPath); - - env.execute("Simple Avro read job"); - - - expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" + - "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n"; - } - - @Test - public void testSerializeWithAvro() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableForceAvro(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); - - AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users) - // null map type because the order changes in different JVMs (hard to test) - .map(new MapFunction<User, User>() { - @Override - public User map(User value) throws Exception { - Map<CharSequence, Long> ab = new HashMap<CharSequence, Long>(1); - ab.put("hehe", 12L); - value.setTypeMap(ab); - return value; - } - }); - - usersDS.writeAsText(resultPath); - - env.execute("Simple Avro read job"); - - - expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" + - "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n"; - - } - - @Test - public void testKeySelection() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); - - AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users); - - DataSet<Tuple2<String, Integer>> res = usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { - @Override - public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { - for (User u : values) { - out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); - } - } - }); - res.writeAsText(resultPath); - env.execute("Avro Key selection"); - - - expected = "(Alyssa,1)\n(Charlie,1)\n"; - } - - @Test - public void testWithAvroGenericSer() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableForceAvro(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); - - AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users); - - DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() { - @Override - public String getKey(User value) throws Exception { - return String.valueOf(value.getName()); - } - }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { - @Override - public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { - for(User u : values) { - out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); - } - } - }); - - res.writeAsText(resultPath); - env.execute("Avro Key selection"); - - - expected = "(Charlie,1)\n(Alyssa,1)\n"; - } - - @Test - public void testWithKryoGenericSer() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableForceKryo(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); - - AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users); - - DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() { - @Override - public String getKey(User value) throws Exception { - return String.valueOf(value.getName()); - } - }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { - @Override - public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { - for (User u : values) { - out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); - } - } - }); - - res.writeAsText(resultPath); - env.execute("Avro Key selection"); - - - expected = "(Charlie,1)\n(Alyssa,1)\n"; - } - - /** - * Test some know fields for grouping on - */ - @Test - public void testAllFields() throws Exception { - for(String fieldName : Arrays.asList("name", "type_enum", "type_double_test")) { - testField(fieldName); - } - } - - private void testField(final String fieldName) throws Exception { - before(); - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); - - AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users); - - DataSet<Object> res = usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() { - @Override - public void reduce(Iterable<User> values, Collector<Object> out) throws Exception { - for(User u : values) { - out.collect(u.get(fieldName)); - } - } - }); - res.writeAsText(resultPath); - env.execute("Simple Avro read job"); - - // test if automatic registration of the Types worked - ExecutionConfig ec = env.getConfig(); - Assert.assertTrue(ec.getRegisteredKryoTypes().contains(org.apache.flink.api.io.avro.generated.Fixed16.class)); - - if(fieldName.equals("name")) { - expected = "Alyssa\nCharlie"; - } else if(fieldName.equals("type_enum")) { - expected = "GREEN\nRED\n"; - } else if(fieldName.equals("type_double_test")) { - expected = "123.45\n1.337\n"; - } else { - Assert.fail("Unknown field"); - } - - after(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java deleted file mode 100644 index 91a9612..0000000 --- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java +++ /dev/null @@ -1,458 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.io.avro; - -import static org.junit.Assert.*; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.util.*; - -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileReader; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.file.FileReader; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.specific.SpecificDatumReader; -import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.avro.util.Utf8; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.io.avro.generated.Address; -import org.apache.flink.api.io.avro.generated.Colors; -import org.apache.flink.api.io.avro.generated.User; -import org.apache.flink.api.java.io.AvroInputFormat; -import org.apache.flink.api.java.typeutils.AvroTypeInfo; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -/** - * Test the avro input format. - * (The testcase is mostly the getting started tutorial of avro) - * http://avro.apache.org/docs/current/gettingstartedjava.html - */ -public class AvroRecordInputFormatTest { - - public File testFile; - - final static String TEST_NAME = "Alyssa"; - - final static String TEST_ARRAY_STRING_1 = "ELEMENT 1"; - final static String TEST_ARRAY_STRING_2 = "ELEMENT 2"; - - final static boolean TEST_ARRAY_BOOLEAN_1 = true; - final static boolean TEST_ARRAY_BOOLEAN_2 = false; - - final static Colors TEST_ENUM_COLOR = Colors.GREEN; - - final static String TEST_MAP_KEY1 = "KEY 1"; - final static long TEST_MAP_VALUE1 = 8546456L; - final static String TEST_MAP_KEY2 = "KEY 2"; - final static long TEST_MAP_VALUE2 = 17554L; - - final static int TEST_NUM = 239; - final static String TEST_STREET = "Baker Street"; - final static String TEST_CITY = "London"; - final static String TEST_STATE = "London"; - final static String TEST_ZIP = "NW1 6XE"; - - - private Schema userSchema = new User().getSchema(); - - - public static void writeTestFile(File testFile) throws IOException { - ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>(); - stringArray.add(TEST_ARRAY_STRING_1); - stringArray.add(TEST_ARRAY_STRING_2); - - ArrayList<Boolean> booleanArray = new ArrayList<Boolean>(); - booleanArray.add(TEST_ARRAY_BOOLEAN_1); - booleanArray.add(TEST_ARRAY_BOOLEAN_2); - - HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>(); - longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1); - longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2); - - Address addr = new Address(); - addr.setNum(TEST_NUM); - addr.setStreet(TEST_STREET); - addr.setCity(TEST_CITY); - addr.setState(TEST_STATE); - addr.setZip(TEST_ZIP); - - - User user1 = new User(); - - user1.setName(TEST_NAME); - user1.setFavoriteNumber(256); - user1.setTypeDoubleTest(123.45d); - user1.setTypeBoolTest(true); - user1.setTypeArrayString(stringArray); - user1.setTypeArrayBoolean(booleanArray); - user1.setTypeEnum(TEST_ENUM_COLOR); - user1.setTypeMap(longMap); - user1.setTypeNested(addr); - - // Construct via builder - User user2 = User.newBuilder() - .setName("Charlie") - .setFavoriteColor("blue") - .setFavoriteNumber(null) - .setTypeBoolTest(false) - .setTypeDoubleTest(1.337d) - .setTypeNullTest(null) - .setTypeLongTest(1337L) - .setTypeArrayString(new ArrayList<CharSequence>()) - .setTypeArrayBoolean(new ArrayList<Boolean>()) - .setTypeNullableArray(null) - .setTypeEnum(Colors.RED) - .setTypeMap(new HashMap<CharSequence, Long>()) - .setTypeFixed(null) - .setTypeUnion(null) - .setTypeNested( - Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET) - .setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP) - .build()) - .build(); - DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class); - DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter); - dataFileWriter.create(user1.getSchema(), testFile); - dataFileWriter.append(user1); - dataFileWriter.append(user2); - dataFileWriter.close(); - } - @Before - public void createFiles() throws IOException { - testFile = File.createTempFile("AvroInputFormatTest", null); - writeTestFile(testFile); - } - - - /** - * Test if the AvroInputFormat is able to properly read data from an avro file. - * @throws IOException - */ - @Test - public void testDeserialisation() throws IOException { - Configuration parameters = new Configuration(); - - AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); - - format.configure(parameters); - FileInputSplit[] splits = format.createInputSplits(1); - assertEquals(splits.length, 1); - format.open(splits[0]); - - User u = format.nextRecord(null); - assertNotNull(u); - - String name = u.getName().toString(); - assertNotNull("empty record", name); - assertEquals("name not equal", TEST_NAME, name); - - // check arrays - List<CharSequence> sl = u.getTypeArrayString(); - assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString()); - assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString()); - - List<Boolean> bl = u.getTypeArrayBoolean(); - assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0)); - assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1)); - - // check enums - Colors enumValue = u.getTypeEnum(); - assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue); - - // check maps - Map<CharSequence, Long> lm = u.getTypeMap(); - assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue()); - assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue()); - - assertFalse("expecting second element", format.reachedEnd()); - assertNotNull("expecting second element", format.nextRecord(u)); - - assertNull(format.nextRecord(u)); - assertTrue(format.reachedEnd()); - - format.close(); - } - - /** - * Test if the AvroInputFormat is able to properly read data from an avro file. - * @throws IOException - */ - @Test - public void testDeserialisationReuseAvroRecordFalse() throws IOException { - Configuration parameters = new Configuration(); - - AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); - format.setReuseAvroValue(false); - - format.configure(parameters); - FileInputSplit[] splits = format.createInputSplits(1); - assertEquals(splits.length, 1); - format.open(splits[0]); - - User u = format.nextRecord(null); - assertNotNull(u); - - String name = u.getName().toString(); - assertNotNull("empty record", name); - assertEquals("name not equal", TEST_NAME, name); - - // check arrays - List<CharSequence> sl = u.getTypeArrayString(); - assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString()); - assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString()); - - List<Boolean> bl = u.getTypeArrayBoolean(); - assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0)); - assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1)); - - // check enums - Colors enumValue = u.getTypeEnum(); - assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue); - - // check maps - Map<CharSequence, Long> lm = u.getTypeMap(); - assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue()); - assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue()); - - assertFalse("expecting second element", format.reachedEnd()); - assertNotNull("expecting second element", format.nextRecord(u)); - - assertNull(format.nextRecord(u)); - assertTrue(format.reachedEnd()); - - format.close(); - } - - /** - * Test if the Flink serialization is able to properly process GenericData.Record types. - * Usually users of Avro generate classes (POJOs) from Avro schemas. - * However, if generated classes are not available, one can also use GenericData.Record. - * It is an untyped key-value record which is using a schema to validate the correctness of the data. - * - * It is not recommended to use GenericData.Record with Flink. Use generated POJOs instead. - */ - @Test - public void testDeserializeToGenericType() throws IOException { - DatumReader<GenericData.Record> datumReader = new GenericDatumReader<>(userSchema); - - try (FileReader<GenericData.Record> dataFileReader = DataFileReader.openReader(testFile, datumReader)) { - // initialize Record by reading it from disk (thats easier than creating it by hand) - GenericData.Record rec = new GenericData.Record(userSchema); - dataFileReader.next(rec); - - // check if record has been read correctly - assertNotNull(rec); - assertEquals("name not equal", TEST_NAME, rec.get("name").toString()); - assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString()); - assertEquals(null, rec.get("type_long_test")); // it is null for the first record. - - // now serialize it with our framework: - TypeInformation<GenericData.Record> te = TypeExtractor.createTypeInfo(GenericData.Record.class); - - ExecutionConfig ec = new ExecutionConfig(); - Assert.assertEquals(GenericTypeInfo.class, te.getClass()); - - Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<Class<?>>()); - - TypeSerializer<GenericData.Record> tser = te.createSerializer(ec); - Assert.assertEquals(1, ec.getDefaultKryoSerializerClasses().size()); - Assert.assertTrue( - ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) && - ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(Serializers.AvroSchemaSerializer.class)); - - ByteArrayOutputStream out = new ByteArrayOutputStream(); - try (DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out)) { - tser.serialize(rec, outView); - } - - GenericData.Record newRec; - try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper( - new ByteArrayInputStream(out.toByteArray()))) - { - newRec = tser.deserialize(inView); - } - - // check if it is still the same - assertNotNull(newRec); - assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString()); - assertEquals("name not equal", TEST_NAME, newRec.get("name").toString()); - assertEquals(null, newRec.get("type_long_test")); - } - } - - /** - * This test validates proper serialization with specific (generated POJO) types. - */ - @Test - public void testDeserializeToSpecificType() throws IOException { - - DatumReader<User> datumReader = new SpecificDatumReader<User>(userSchema); - - try (FileReader<User> dataFileReader = DataFileReader.openReader(testFile, datumReader)) { - User rec = dataFileReader.next(); - - // check if record has been read correctly - assertNotNull(rec); - assertEquals("name not equal", TEST_NAME, rec.get("name").toString()); - assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString()); - - // now serialize it with our framework: - ExecutionConfig ec = new ExecutionConfig(); - TypeInformation<User> te = TypeExtractor.createTypeInfo(User.class); - - Assert.assertEquals(AvroTypeInfo.class, te.getClass()); - TypeSerializer<User> tser = te.createSerializer(ec); - - ByteArrayOutputStream out = new ByteArrayOutputStream(); - try (DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out)) { - tser.serialize(rec, outView); - } - - User newRec; - try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper( - new ByteArrayInputStream(out.toByteArray()))) - { - newRec = tser.deserialize(inView); - } - - // check if it is still the same - assertNotNull(newRec); - assertEquals("name not equal", TEST_NAME, newRec.getName().toString()); - assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.getTypeEnum().toString()); - } - } - - /** - * Test if the AvroInputFormat is able to properly read data from an Avro - * file as a GenericRecord. - * - * @throws IOException, - * if there is an exception - */ - @Test - public void testDeserialisationGenericRecord() throws IOException { - Configuration parameters = new Configuration(); - - AvroInputFormat<GenericRecord> format = new AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()), - GenericRecord.class); - - doTestDeserializationGenericRecord(format, parameters); - } - - /** - * Helper method to test GenericRecord serialisation - * - * @param format - * the format to test - * @param parameters - * the configuration to use - * @throws IOException - * thrown id there is a issue - */ - @SuppressWarnings("unchecked") - private void doTestDeserializationGenericRecord(final AvroInputFormat<GenericRecord> format, - final Configuration parameters) throws IOException { - try { - format.configure(parameters); - FileInputSplit[] splits = format.createInputSplits(1); - assertEquals(splits.length, 1); - format.open(splits[0]); - - GenericRecord u = format.nextRecord(null); - assertNotNull(u); - assertEquals("The schemas should be equal", userSchema, u.getSchema()); - - String name = u.get("name").toString(); - assertNotNull("empty record", name); - assertEquals("name not equal", TEST_NAME, name); - - // check arrays - List<CharSequence> sl = (List<CharSequence>) u.get("type_array_string"); - assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString()); - assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString()); - - List<Boolean> bl = (List<Boolean>) u.get("type_array_boolean"); - assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0)); - assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1)); - - // check enums - GenericData.EnumSymbol enumValue = (GenericData.EnumSymbol) u.get("type_enum"); - assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), enumValue.toString()); - - // check maps - Map<CharSequence, Long> lm = (Map<CharSequence, Long>) u.get("type_map"); - assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue()); - assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue()); - - assertFalse("expecting second element", format.reachedEnd()); - assertNotNull("expecting second element", format.nextRecord(u)); - - assertNull(format.nextRecord(u)); - assertTrue(format.reachedEnd()); - } finally { - format.close(); - } - } - - /** - * Test if the AvroInputFormat is able to properly read data from an avro - * file as a GenericRecord - * - * @throws IOException, - * if there is an error - */ - @Test - public void testDeserialisationGenericRecordReuseAvroValueFalse() throws IOException { - Configuration parameters = new Configuration(); - - AvroInputFormat<GenericRecord> format = new AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()), - GenericRecord.class); - format.configure(parameters); - format.setReuseAvroValue(false); - - doTestDeserializationGenericRecord(format, parameters); - } - - @After - public void deleteFiles() { - testFile.delete(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java deleted file mode 100644 index 37a83d1..0000000 --- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java +++ /dev/null @@ -1,326 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.io.avro; - -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.flink.api.io.avro.generated.Address; -import org.apache.flink.api.io.avro.generated.Colors; -import org.apache.flink.api.io.avro.generated.Fixed16; -import org.apache.flink.api.io.avro.generated.User; -import org.apache.flink.api.java.io.AvroInputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Random; - -import static org.junit.Assert.assertEquals; - -/** - * Test the avro input format. - * (The testcase is mostly the getting started tutorial of avro) - * http://avro.apache.org/docs/current/gettingstartedjava.html - */ -public class AvroSplittableInputFormatTest { - - private File testFile; - - final static String TEST_NAME = "Alyssa"; - - final static String TEST_ARRAY_STRING_1 = "ELEMENT 1"; - final static String TEST_ARRAY_STRING_2 = "ELEMENT 2"; - - final static boolean TEST_ARRAY_BOOLEAN_1 = true; - final static boolean TEST_ARRAY_BOOLEAN_2 = false; - - final static Colors TEST_ENUM_COLOR = Colors.GREEN; - - final static String TEST_MAP_KEY1 = "KEY 1"; - final static long TEST_MAP_VALUE1 = 8546456L; - final static String TEST_MAP_KEY2 = "KEY 2"; - final static long TEST_MAP_VALUE2 = 17554L; - - final static Integer TEST_NUM = new Integer(239); - final static String TEST_STREET = "Baker Street"; - final static String TEST_CITY = "London"; - final static String TEST_STATE = "London"; - final static String TEST_ZIP = "NW1 6XE"; - - final static int NUM_RECORDS = 5000; - - @Before - public void createFiles() throws IOException { - testFile = File.createTempFile("AvroSplittableInputFormatTest", null); - - ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>(); - stringArray.add(TEST_ARRAY_STRING_1); - stringArray.add(TEST_ARRAY_STRING_2); - - ArrayList<Boolean> booleanArray = new ArrayList<Boolean>(); - booleanArray.add(TEST_ARRAY_BOOLEAN_1); - booleanArray.add(TEST_ARRAY_BOOLEAN_2); - - HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>(); - longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1); - longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2); - - Address addr = new Address(); - addr.setNum(new Integer(TEST_NUM)); - addr.setStreet(TEST_STREET); - addr.setCity(TEST_CITY); - addr.setState(TEST_STATE); - addr.setZip(TEST_ZIP); - - - User user1 = new User(); - user1.setName(TEST_NAME); - user1.setFavoriteNumber(256); - user1.setTypeDoubleTest(123.45d); - user1.setTypeBoolTest(true); - user1.setTypeArrayString(stringArray); - user1.setTypeArrayBoolean(booleanArray); - user1.setTypeEnum(TEST_ENUM_COLOR); - user1.setTypeMap(longMap); - user1.setTypeNested(addr); - - // Construct via builder - User user2 = User.newBuilder() - .setName(TEST_NAME) - .setFavoriteColor("blue") - .setFavoriteNumber(null) - .setTypeBoolTest(false) - .setTypeDoubleTest(1.337d) - .setTypeNullTest(null) - .setTypeLongTest(1337L) - .setTypeArrayString(new ArrayList<CharSequence>()) - .setTypeArrayBoolean(new ArrayList<Boolean>()) - .setTypeNullableArray(null) - .setTypeEnum(Colors.RED) - .setTypeMap(new HashMap<CharSequence, Long>()) - .setTypeFixed(new Fixed16()) - .setTypeUnion(123L) - .setTypeNested( - Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET) - .setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP) - .build()) - - .build(); - DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class); - DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter); - dataFileWriter.create(user1.getSchema(), testFile); - dataFileWriter.append(user1); - dataFileWriter.append(user2); - - Random rnd = new Random(1337); - for(int i = 0; i < NUM_RECORDS -2 ; i++) { - User user = new User(); - user.setName(TEST_NAME + rnd.nextInt()); - user.setFavoriteNumber(rnd.nextInt()); - user.setTypeDoubleTest(rnd.nextDouble()); - user.setTypeBoolTest(true); - user.setTypeArrayString(stringArray); - user.setTypeArrayBoolean(booleanArray); - user.setTypeEnum(TEST_ENUM_COLOR); - user.setTypeMap(longMap); - Address address = new Address(); - address.setNum(new Integer(TEST_NUM)); - address.setStreet(TEST_STREET); - address.setCity(TEST_CITY); - address.setState(TEST_STATE); - address.setZip(TEST_ZIP); - user.setTypeNested(address); - - dataFileWriter.append(user); - } - dataFileWriter.close(); - } - - @Test - public void testSplittedIF() throws IOException { - Configuration parameters = new Configuration(); - - AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); - - format.configure(parameters); - FileInputSplit[] splits = format.createInputSplits(4); - assertEquals(splits.length, 4); - int elements = 0; - int elementsPerSplit[] = new int[4]; - for(int i = 0; i < splits.length; i++) { - format.open(splits[i]); - while(!format.reachedEnd()) { - User u = format.nextRecord(null); - Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME)); - elements++; - elementsPerSplit[i]++; - } - format.close(); - } - - Assert.assertEquals(1539, elementsPerSplit[0]); - Assert.assertEquals(1026, elementsPerSplit[1]); - Assert.assertEquals(1539, elementsPerSplit[2]); - Assert.assertEquals(896, elementsPerSplit[3]); - Assert.assertEquals(NUM_RECORDS, elements); - format.close(); - } - - @Test - public void testAvroRecoveryWithFailureAtStart() throws Exception { - final int recordsUntilCheckpoint = 132; - - Configuration parameters = new Configuration(); - - AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); - format.configure(parameters); - - FileInputSplit[] splits = format.createInputSplits(4); - assertEquals(splits.length, 4); - - int elements = 0; - int elementsPerSplit[] = new int[4]; - for(int i = 0; i < splits.length; i++) { - format.reopen(splits[i], format.getCurrentState()); - while(!format.reachedEnd()) { - User u = format.nextRecord(null); - Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME)); - elements++; - - if(format.getRecordsReadFromBlock() == recordsUntilCheckpoint) { - - // do the whole checkpoint-restore procedure and see if we pick up from where we left off. - Tuple2<Long, Long> state = format.getCurrentState(); - - // this is to make sure that nothing stays from the previous format - // (as it is going to be in the normal case) - format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class); - - format.reopen(splits[i], state); - assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint); - } - elementsPerSplit[i]++; - } - format.close(); - } - - Assert.assertEquals(1539, elementsPerSplit[0]); - Assert.assertEquals(1026, elementsPerSplit[1]); - Assert.assertEquals(1539, elementsPerSplit[2]); - Assert.assertEquals(896, elementsPerSplit[3]); - Assert.assertEquals(NUM_RECORDS, elements); - format.close(); - } - - @Test - public void testAvroRecovery() throws Exception { - final int recordsUntilCheckpoint = 132; - - Configuration parameters = new Configuration(); - - AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); - format.configure(parameters); - - FileInputSplit[] splits = format.createInputSplits(4); - assertEquals(splits.length, 4); - - int elements = 0; - int elementsPerSplit[] = new int[4]; - for(int i = 0; i < splits.length; i++) { - format.open(splits[i]); - while(!format.reachedEnd()) { - User u = format.nextRecord(null); - Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME)); - elements++; - - if(format.getRecordsReadFromBlock() == recordsUntilCheckpoint) { - - // do the whole checkpoint-restore procedure and see if we pick up from where we left off. - Tuple2<Long, Long> state = format.getCurrentState(); - - // this is to make sure that nothing stays from the previous format - // (as it is going to be in the normal case) - format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class); - - format.reopen(splits[i], state); - assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint); - } - elementsPerSplit[i]++; - } - format.close(); - } - - Assert.assertEquals(1539, elementsPerSplit[0]); - Assert.assertEquals(1026, elementsPerSplit[1]); - Assert.assertEquals(1539, elementsPerSplit[2]); - Assert.assertEquals(896, elementsPerSplit[3]); - Assert.assertEquals(NUM_RECORDS, elements); - format.close(); - } - - /* - This test is gave the reference values for the test of Flink's IF. - - This dependency needs to be added - - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-mapred</artifactId> - <version>1.7.6</version> - </dependency> - - @Test - public void testHadoop() throws Exception { - JobConf jf = new JobConf(); - FileInputFormat.addInputPath(jf, new org.apache.hadoop.fs.Path(testFile.toURI())); - jf.setBoolean(org.apache.avro.mapred.AvroInputFormat.IGNORE_FILES_WITHOUT_EXTENSION_KEY, false); - org.apache.avro.mapred.AvroInputFormat<User> format = new org.apache.avro.mapred.AvroInputFormat<User>(); - InputSplit[] sp = format.getSplits(jf, 4); - int elementsPerSplit[] = new int[4]; - int cnt = 0; - int i = 0; - for(InputSplit s:sp) { - RecordReader<AvroWrapper<User>, NullWritable> r = format.getRecordReader(s, jf, new HadoopDummyReporter()); - AvroWrapper<User> k = r.createKey(); - NullWritable v = r.createValue(); - - while(r.next(k,v)) { - cnt++; - elementsPerSplit[i]++; - } - i++; - } - System.out.println("Status "+Arrays.toString(elementsPerSplit)); - } **/ - - @After - public void deleteFiles() { - testFile.delete(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java deleted file mode 100644 index 5a21691..0000000 --- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.io.avro.example; - -import java.io.IOException; -import java.util.Random; - -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.io.GenericInputFormat; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.util.Collector; - -@SuppressWarnings("serial") -public class AvroTypeExample { - - - public static void main(String[] args) throws Exception { - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<User> users = env.createInput(new UserGeneratingInputFormat()); - - users - .map(new NumberExtractingMapper()) - .groupBy(1) - .reduceGroup(new ConcatenatingReducer()) - .print(); - } - - - public static final class NumberExtractingMapper implements MapFunction<User, Tuple2<User, Integer>> { - - @Override - public Tuple2<User, Integer> map(User user) { - return new Tuple2<User, Integer>(user, user.getFavoriteNumber()); - } - } - - - public static final class ConcatenatingReducer implements GroupReduceFunction<Tuple2<User, Integer>, Tuple2<Integer, String>> { - - @Override - public void reduce(Iterable<Tuple2<User, Integer>> values, Collector<Tuple2<Integer, String>> out) throws Exception { - int number = 0; - StringBuilder colors = new StringBuilder(); - - for (Tuple2<User, Integer> u : values) { - number = u.f1; - colors.append(u.f0.getFavoriteColor()).append(" - "); - } - - colors.setLength(colors.length() - 3); - out.collect(new Tuple2<Integer, String>(number, colors.toString())); - } - } - - - public static final class UserGeneratingInputFormat extends GenericInputFormat<User> { - - private static final long serialVersionUID = 1L; - - private static final int NUM = 100; - - private final Random rnd = new Random(32498562304986L); - - private static final String[] NAMES = { "Peter", "Bob", "Liddy", "Alexander", "Stan" }; - - private static final String[] COLORS = { "mauve", "crimson", "copper", "sky", "grass" }; - - private int count; - - - @Override - public boolean reachedEnd() throws IOException { - return count >= NUM; - } - - @Override - public User nextRecord(User reuse) throws IOException { - count++; - - User u = new User(); - u.setName(NAMES[rnd.nextInt(NAMES.length)]); - u.setFavoriteColor(COLORS[rnd.nextInt(COLORS.length)]); - u.setFavoriteNumber(rnd.nextInt(87)); - return u; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java deleted file mode 100644 index 4608f96..0000000 --- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -/** - * Autogenerated by Avro - * - * DO NOT EDIT DIRECTLY - */ -package org.apache.flink.api.io.avro.example; -@SuppressWarnings("all") [email protected] -public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { - public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.avro.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}"); - public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } - @Deprecated public java.lang.CharSequence name; - @Deprecated public java.lang.Integer favorite_number; - @Deprecated public java.lang.CharSequence favorite_color; - - /** - * Default constructor. Note that this does not initialize fields - * to their default values from the schema. If that is desired then - * one should use {@link #newBuilder()}. - */ - public User() {} - - /** - * All-args constructor. - */ - public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color) { - this.name = name; - this.favorite_number = favorite_number; - this.favorite_color = favorite_color; - } - - public org.apache.avro.Schema getSchema() { return SCHEMA$; } - // Used by DatumWriter. Applications should not call. - public java.lang.Object get(int field$) { - switch (field$) { - case 0: return name; - case 1: return favorite_number; - case 2: return favorite_color; - default: throw new org.apache.avro.AvroRuntimeException("Bad index"); - } - } - // Used by DatumReader. Applications should not call. - @SuppressWarnings(value="unchecked") - public void put(int field$, java.lang.Object value$) { - switch (field$) { - case 0: name = (java.lang.CharSequence)value$; break; - case 1: favorite_number = (java.lang.Integer)value$; break; - case 2: favorite_color = (java.lang.CharSequence)value$; break; - default: throw new org.apache.avro.AvroRuntimeException("Bad index"); - } - } - - /** - * Gets the value of the 'name' field. - */ - public java.lang.CharSequence getName() { - return name; - } - - /** - * Sets the value of the 'name' field. - * @param value the value to set. - */ - public void setName(java.lang.CharSequence value) { - this.name = value; - } - - /** - * Gets the value of the 'favorite_number' field. - */ - public java.lang.Integer getFavoriteNumber() { - return favorite_number; - } - - /** - * Sets the value of the 'favorite_number' field. - * @param value the value to set. - */ - public void setFavoriteNumber(java.lang.Integer value) { - this.favorite_number = value; - } - - /** - * Gets the value of the 'favorite_color' field. - */ - public java.lang.CharSequence getFavoriteColor() { - return favorite_color; - } - - /** - * Sets the value of the 'favorite_color' field. - * @param value the value to set. - */ - public void setFavoriteColor(java.lang.CharSequence value) { - this.favorite_color = value; - } - - /** Creates a new User RecordBuilder */ - public static org.apache.flink.api.io.avro.example.User.Builder newBuilder() { - return new org.apache.flink.api.io.avro.example.User.Builder(); - } - - /** Creates a new User RecordBuilder by copying an existing Builder */ - public static org.apache.flink.api.io.avro.example.User.Builder newBuilder(org.apache.flink.api.io.avro.example.User.Builder other) { - return new org.apache.flink.api.io.avro.example.User.Builder(other); - } - - /** Creates a new User RecordBuilder by copying an existing User instance */ - public static org.apache.flink.api.io.avro.example.User.Builder newBuilder(org.apache.flink.api.io.avro.example.User other) { - return new org.apache.flink.api.io.avro.example.User.Builder(other); - } - - /** - * RecordBuilder for User instances. - */ - public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User> - implements org.apache.avro.data.RecordBuilder<User> { - - private java.lang.CharSequence name; - private java.lang.Integer favorite_number; - private java.lang.CharSequence favorite_color; - - /** Creates a new Builder */ - private Builder() { - super(org.apache.flink.api.io.avro.example.User.SCHEMA$); - } - - /** Creates a Builder by copying an existing Builder */ - private Builder(org.apache.flink.api.io.avro.example.User.Builder other) { - super(other); - if (isValidValue(fields()[0], other.name)) { - this.name = data().deepCopy(fields()[0].schema(), other.name); - fieldSetFlags()[0] = true; - } - if (isValidValue(fields()[1], other.favorite_number)) { - this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number); - fieldSetFlags()[1] = true; - } - if (isValidValue(fields()[2], other.favorite_color)) { - this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color); - fieldSetFlags()[2] = true; - } - } - - /** Creates a Builder by copying an existing User instance */ - private Builder(org.apache.flink.api.io.avro.example.User other) { - super(org.apache.flink.api.io.avro.example.User.SCHEMA$); - if (isValidValue(fields()[0], other.name)) { - this.name = data().deepCopy(fields()[0].schema(), other.name); - fieldSetFlags()[0] = true; - } - if (isValidValue(fields()[1], other.favorite_number)) { - this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number); - fieldSetFlags()[1] = true; - } - if (isValidValue(fields()[2], other.favorite_color)) { - this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color); - fieldSetFlags()[2] = true; - } - } - - /** Gets the value of the 'name' field */ - public java.lang.CharSequence getName() { - return name; - } - - /** Sets the value of the 'name' field */ - public org.apache.flink.api.io.avro.example.User.Builder setName(java.lang.CharSequence value) { - validate(fields()[0], value); - this.name = value; - fieldSetFlags()[0] = true; - return this; - } - - /** Checks whether the 'name' field has been set */ - public boolean hasName() { - return fieldSetFlags()[0]; - } - - /** Clears the value of the 'name' field */ - public org.apache.flink.api.io.avro.example.User.Builder clearName() { - name = null; - fieldSetFlags()[0] = false; - return this; - } - - /** Gets the value of the 'favorite_number' field */ - public java.lang.Integer getFavoriteNumber() { - return favorite_number; - } - - /** Sets the value of the 'favorite_number' field */ - public org.apache.flink.api.io.avro.example.User.Builder setFavoriteNumber(java.lang.Integer value) { - validate(fields()[1], value); - this.favorite_number = value; - fieldSetFlags()[1] = true; - return this; - } - - /** Checks whether the 'favorite_number' field has been set */ - public boolean hasFavoriteNumber() { - return fieldSetFlags()[1]; - } - - /** Clears the value of the 'favorite_number' field */ - public org.apache.flink.api.io.avro.example.User.Builder clearFavoriteNumber() { - favorite_number = null; - fieldSetFlags()[1] = false; - return this; - } - - /** Gets the value of the 'favorite_color' field */ - public java.lang.CharSequence getFavoriteColor() { - return favorite_color; - } - - /** Sets the value of the 'favorite_color' field */ - public org.apache.flink.api.io.avro.example.User.Builder setFavoriteColor(java.lang.CharSequence value) { - validate(fields()[2], value); - this.favorite_color = value; - fieldSetFlags()[2] = true; - return this; - } - - /** Checks whether the 'favorite_color' field has been set */ - public boolean hasFavoriteColor() { - return fieldSetFlags()[2]; - } - - /** Clears the value of the 'favorite_color' field */ - public org.apache.flink.api.io.avro.example.User.Builder clearFavoriteColor() { - favorite_color = null; - fieldSetFlags()[2] = false; - return this; - } - - @Override - public User build() { - try { - User record = new User(); - record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]); - record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]); - record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]); - return record; - } catch (Exception e) { - throw new org.apache.avro.AvroRuntimeException(e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java deleted file mode 100644 index e245026..0000000 --- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.io; - -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.typeutils.PojoTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.core.fs.Path; -import org.junit.Assert; -import org.junit.Test; - -public class AvroInputFormatTypeExtractionTest { - - @Test - public void testTypeExtraction() { - try { - InputFormat<MyAvroType, ?> format = new AvroInputFormat<MyAvroType>(new Path("file:///ignore/this/file"), MyAvroType.class); - - TypeInformation<?> typeInfoDirect = TypeExtractor.getInputFormatTypes(format); - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<MyAvroType> input = env.createInput(format); - TypeInformation<?> typeInfoDataSet = input.getType(); - - - Assert.assertTrue(typeInfoDirect instanceof PojoTypeInfo); - Assert.assertTrue(typeInfoDataSet instanceof PojoTypeInfo); - - Assert.assertEquals(MyAvroType.class, typeInfoDirect.getTypeClass()); - Assert.assertEquals(MyAvroType.class, typeInfoDataSet.getTypeClass()); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - } - - public static final class MyAvroType { - - public String theString; - - public MyAvroType recursive; - - private double aDouble; - - public double getaDouble() { - return aDouble; - } - - public void setaDouble(double aDouble) { - this.aDouble = aDouble; - } - - public void setTheString(String theString) { - this.theString = theString; - } - - public String getTheString() { - return theString; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java deleted file mode 100644 index 4d6c6b7..0000000 --- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.io; - -import static org.apache.flink.api.java.io.AvroOutputFormat.Codec; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.nio.file.Files; -import java.nio.file.Paths; - -import org.apache.avro.Schema; -import org.apache.flink.api.io.avro.example.User; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; -import org.junit.Test; -import org.mockito.internal.util.reflection.Whitebox; - -/** - * Tests for {@link AvroOutputFormat} - */ -public class AvroOutputFormatTest { - - @Test - public void testSetCodec() throws Exception { - // given - final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class); - - // when - try { - outputFormat.setCodec(Codec.SNAPPY); - } catch (Exception ex) { - // then - fail("unexpected exception"); - } - } - - @Test - public void testSetCodecError() throws Exception { - // given - boolean error = false; - final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class); - - // when - try { - outputFormat.setCodec(null); - } catch (Exception ex) { - error = true; - } - - // then - assertTrue(error); - } - - @Test - public void testSerialization() throws Exception { - - serializeAndDeserialize(null, null); - serializeAndDeserialize(null, User.SCHEMA$); - for (final Codec codec : Codec.values()) { - serializeAndDeserialize(codec, null); - serializeAndDeserialize(codec, User.SCHEMA$); - } - } - - private void serializeAndDeserialize(final Codec codec, final Schema schema) throws IOException, ClassNotFoundException { - // given - final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class); - if (codec != null) { - outputFormat.setCodec(codec); - } - if (schema != null) { - outputFormat.setSchema(schema); - } - - final ByteArrayOutputStream bos = new ByteArrayOutputStream(); - - // when - try (final ObjectOutputStream oos = new ObjectOutputStream(bos)) { - oos.writeObject(outputFormat); - } - try (final ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) { - // then - Object o = ois.readObject(); - assertTrue(o instanceof AvroOutputFormat); - final AvroOutputFormat<User> restored = (AvroOutputFormat<User>) o; - final Codec restoredCodec = (Codec) Whitebox.getInternalState(restored, "codec"); - final Schema restoredSchema = (Schema) Whitebox.getInternalState(restored, "userDefinedSchema"); - - assertTrue(codec != null ? restoredCodec == codec : restoredCodec == null); - assertTrue(schema != null ? restoredSchema.equals(schema) : restoredSchema == null); - } - } - - @Test - public void testCompression() throws Exception { - // given - final Path outputPath = new Path(File.createTempFile("avro-output-file","avro").getAbsolutePath()); - final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(outputPath,User.class); - outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); - - final Path compressedOutputPath = new Path(File.createTempFile("avro-output-file","compressed.avro").getAbsolutePath()); - final AvroOutputFormat<User> compressedOutputFormat = new AvroOutputFormat<>(compressedOutputPath,User.class); - compressedOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); - compressedOutputFormat.setCodec(Codec.SNAPPY); - - // when - output(outputFormat); - output(compressedOutputFormat); - - // then - assertTrue(fileSize(outputPath) > fileSize(compressedOutputPath)); - - // cleanup - Files.delete(Paths.get(outputPath.getPath())); - Files.delete(Paths.get(compressedOutputPath.getPath())); - } - - private long fileSize(Path path) throws IOException { - return Files.size(Paths.get(path.getPath())); - } - - private void output(final AvroOutputFormat<User> outputFormat) throws IOException { - outputFormat.configure(new Configuration()); - outputFormat.open(1,1); - for (int i = 0; i < 100; i++) { - outputFormat.writeRecord(new User("testUser",1,"blue")); - } - outputFormat.close(); - } -}
