Repository: flink Updated Branches: refs/heads/master 8ec47f17b -> 5c4cb452f
[FLINK-3691] [avro] Extend AvroInputFormat to support Avro GenericRecord This closes #1920 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c4cb452 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c4cb452 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c4cb452 Branch: refs/heads/master Commit: 5c4cb452f06e4333be475b30779eaac2628abf51 Parents: 70978f5 Author: Phetsarath, Sourigna <[email protected]> Authored: Tue Apr 5 18:01:24 2016 -0400 Committer: Fabian Hueske <[email protected]> Committed: Wed May 4 21:21:49 2016 +0200 ---------------------------------------------------------------------- .../flink/api/java/io/AvroInputFormat.java | 47 ++++-- .../api/io/avro/AvroRecordInputFormatTest.java | 143 ++++++++++++++++++- 2 files changed, 176 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5c4cb452/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java index 9457b98..605ce69 100644 --- a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java +++ b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java @@ -24,6 +24,8 @@ import java.io.IOException; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.FileReader; import org.apache.avro.file.SeekableInput; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.avro.specific.SpecificDatumReader; @@ -38,6 +40,15 @@ import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.util.InstantiationUtil; +/** + * Provides a {@link FileInputFormat} for Avro records. + * + * @param <E> + * the type of the result Avro record. If you specify + * {@link GenericRecord} then the result will be returned as a + * {@link GenericRecord}, so you do not have to know the schema ahead + * of time. + */ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E> { private static final long serialVersionUID = 1L; @@ -51,7 +62,6 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType private transient FileReader<E> dataFileReader; private transient long end; - public AvroInputFormat(Path filePath, Class<E> type) { super(filePath); @@ -94,17 +104,26 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType super.open(split); DatumReader<E> datumReader; - if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) { - datumReader = new SpecificDatumReader<E>(avroValueType); + + if (org.apache.avro.generic.GenericRecord.class == avroValueType) { + datumReader = new GenericDatumReader<E>(); } else { - datumReader = new ReflectDatumReader<E>(avroValueType); + datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType) + ? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType); } - - LOG.info("Opening split " + split); - + + if (LOG.isInfoEnabled()) { + LOG.info("Opening split {}", split); + } + SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen()); dataFileReader = DataFileReader.openReader(in, datumReader); + + if (LOG.isDebugEnabled()) { + LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema()); + } + dataFileReader.sync(split.getStart()); this.end = split.getStart() + split.getLength(); } @@ -119,12 +138,14 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType if (reachedEnd()) { return null; } - - if (!reuseAvroValue) { - reuseValue = InstantiationUtil.instantiate(avroValueType, Object.class); + if (reuseAvroValue) { + return dataFileReader.next(reuseValue); + } else { + if (GenericRecord.class == avroValueType) { + return dataFileReader.next(); + } else { + return dataFileReader.next(InstantiationUtil.instantiate(avroValueType, Object.class)); + } } - - reuseValue = dataFileReader.next(reuseValue); - return reuseValue; } } http://git-wip-us.apache.org/repos/asf/flink/blob/5c4cb452/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java index 42cbebe..8439e01 100644 --- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java +++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java @@ -30,6 +30,7 @@ import org.apache.avro.file.DataFileWriter; import org.apache.avro.file.FileReader; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; import org.apache.avro.specific.SpecificDatumReader; @@ -207,6 +208,56 @@ public class AvroRecordInputFormatTest { } /** + * Test if the AvroInputFormat is able to properly read data from an avro file. + * @throws IOException + */ + @Test + public void testDeserialisationReuseAvroRecordFalse() throws IOException { + Configuration parameters = new Configuration(); + + AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); + format.setReuseAvroValue(false); + + format.configure(parameters); + FileInputSplit[] splits = format.createInputSplits(1); + assertEquals(splits.length, 1); + format.open(splits[0]); + + User u = format.nextRecord(null); + assertNotNull(u); + + String name = u.getName().toString(); + assertNotNull("empty record", name); + assertEquals("name not equal", TEST_NAME, name); + + // check arrays + List<CharSequence> sl = u.getTypeArrayString(); + assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString()); + assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString()); + + List<Boolean> bl = u.getTypeArrayBoolean(); + assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0)); + assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1)); + + // check enums + Colors enumValue = u.getTypeEnum(); + assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue); + + // check maps + Map<CharSequence, Long> lm = u.getTypeMap(); + assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue()); + assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue()); + + assertFalse("expecting second element", format.reachedEnd()); + assertNotNull("expecting second element", format.nextRecord(u)); + + assertNull(format.nextRecord(u)); + assertTrue(format.reachedEnd()); + + format.close(); + } + + /** * Test if the Flink serialization is able to properly process GenericData.Record types. * Usually users of Avro generate classes (POJOs) from Avro schemas. * However, if generated classes are not available, one can also use GenericData.Record. @@ -254,7 +305,7 @@ public class AvroRecordInputFormatTest { assertEquals(null, newRec.get("type_long_test")); } } - + /** * This test validates proper serialization with specific (generated POJO) types. */ @@ -289,6 +340,96 @@ public class AvroRecordInputFormatTest { } } + /** + * Test if the AvroInputFormat is able to properly read data from an Avro + * file as a GenericRecord. + * + * @throws IOException, + * if there is an exception + */ + @Test + public void testDeserialisationGenericRecord() throws IOException { + Configuration parameters = new Configuration(); + + AvroInputFormat<GenericRecord> format = new AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()), + GenericRecord.class); + + doTestDeserializationGenericRecord(format, parameters); + } + + /** + * Helper method to test GenericRecord serialisation + * + * @param format + * the format to test + * @param parameters + * the configuration to use + * @throws IOException + * thrown id there is a issue + */ + @SuppressWarnings("unchecked") + private void doTestDeserializationGenericRecord(final AvroInputFormat<GenericRecord> format, + final Configuration parameters) throws IOException { + try { + format.configure(parameters); + FileInputSplit[] splits = format.createInputSplits(1); + assertEquals(splits.length, 1); + format.open(splits[0]); + + GenericRecord u = format.nextRecord(null); + assertNotNull(u); + assertEquals("The schemas should be equal", userSchema, u.getSchema()); + + String name = u.get("name").toString(); + assertNotNull("empty record", name); + assertEquals("name not equal", TEST_NAME, name); + + // check arrays + List<CharSequence> sl = (List<CharSequence>) u.get("type_array_string"); + assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString()); + assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString()); + + List<Boolean> bl = (List<Boolean>) u.get("type_array_boolean"); + assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0)); + assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1)); + + // check enums + GenericData.EnumSymbol enumValue = (GenericData.EnumSymbol) u.get("type_enum"); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), enumValue.toString()); + + // check maps + Map<CharSequence, Long> lm = (Map<CharSequence, Long>) u.get("type_map"); + assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue()); + assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue()); + + assertFalse("expecting second element", format.reachedEnd()); + assertNotNull("expecting second element", format.nextRecord(u)); + + assertNull(format.nextRecord(u)); + assertTrue(format.reachedEnd()); + } finally { + format.close(); + } + } + + /** + * Test if the AvroInputFormat is able to properly read data from an avro + * file as a GenericRecord + * + * @throws IOException, + * if there is an error + */ + @Test + public void testDeserialisationGenericRecordReuseAvroValueFalse() throws IOException { + Configuration parameters = new Configuration(); + + AvroInputFormat<GenericRecord> format = new AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()), + GenericRecord.class); + format.configure(parameters); + format.setReuseAvroValue(false); + + doTestDeserializationGenericRecord(format, parameters); + } @After public void deleteFiles() {
