Repository: flink
Updated Branches:
  refs/heads/master 8ec47f17b -> 5c4cb452f


[FLINK-3691] [avro] Extend AvroInputFormat to support Avro GenericRecord

This closes #1920


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c4cb452
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c4cb452
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c4cb452

Branch: refs/heads/master
Commit: 5c4cb452f06e4333be475b30779eaac2628abf51
Parents: 70978f5
Author: Phetsarath, Sourigna <[email protected]>
Authored: Tue Apr 5 18:01:24 2016 -0400
Committer: Fabian Hueske <[email protected]>
Committed: Wed May 4 21:21:49 2016 +0200

----------------------------------------------------------------------
 .../flink/api/java/io/AvroInputFormat.java      |  47 ++++--
 .../api/io/avro/AvroRecordInputFormatTest.java  | 143 ++++++++++++++++++-
 2 files changed, 176 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5c4cb452/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
 
b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
index 9457b98..605ce69 100644
--- 
a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
+++ 
b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.FileReader;
 import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
@@ -38,6 +40,15 @@ import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.InstantiationUtil;
 
+/**
+ * Provides a {@link FileInputFormat} for Avro records.
+ * 
+ * @param <E>
+ *            the type of the result Avro record. If you specify
+ *            {@link GenericRecord} then the result will be returned as a
+ *            {@link GenericRecord}, so you do not have to know the schema 
ahead
+ *            of time.
+ */
 public class AvroInputFormat<E> extends FileInputFormat<E> implements 
ResultTypeQueryable<E> {
        
        private static final long serialVersionUID = 1L;
@@ -51,7 +62,6 @@ public class AvroInputFormat<E> extends FileInputFormat<E> 
implements ResultType
        private transient FileReader<E> dataFileReader;
 
        private transient long end;
-
        
        public AvroInputFormat(Path filePath, Class<E> type) {
                super(filePath);
@@ -94,17 +104,26 @@ public class AvroInputFormat<E> extends FileInputFormat<E> 
implements ResultType
                super.open(split);
 
                DatumReader<E> datumReader;
-               if 
(org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType))
 {
-                       datumReader = new SpecificDatumReader<E>(avroValueType);
+               
+               if (org.apache.avro.generic.GenericRecord.class == 
avroValueType) {
+                       datumReader = new GenericDatumReader<E>();
                } else {
-                       datumReader = new ReflectDatumReader<E>(avroValueType);
+                       datumReader = 
org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)
+                                       ? new 
SpecificDatumReader<E>(avroValueType) : new 
ReflectDatumReader<E>(avroValueType);
                }
-               
-               LOG.info("Opening split " + split);
-               
+
+               if (LOG.isInfoEnabled()) {
+                       LOG.info("Opening split {}", split);
+               }
+
                SeekableInput in = new FSDataInputStreamWrapper(stream, 
split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
 
                dataFileReader = DataFileReader.openReader(in, datumReader);
+               
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Loaded SCHEMA: {}", 
dataFileReader.getSchema());
+               }
+               
                dataFileReader.sync(split.getStart());
                this.end = split.getStart() + split.getLength();
        }
@@ -119,12 +138,14 @@ public class AvroInputFormat<E> extends 
FileInputFormat<E> implements ResultType
                if (reachedEnd()) {
                        return null;
                }
-               
-               if (!reuseAvroValue) {
-                       reuseValue = 
InstantiationUtil.instantiate(avroValueType, Object.class);
+               if (reuseAvroValue) {
+                       return dataFileReader.next(reuseValue);
+               } else {
+                       if (GenericRecord.class == avroValueType) {
+                               return dataFileReader.next();
+                       } else {
+                               return 
dataFileReader.next(InstantiationUtil.instantiate(avroValueType, Object.class));
+                       }
                }
-               
-               reuseValue = dataFileReader.next(reuseValue);
-               return reuseValue;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5c4cb452/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
index 42cbebe..8439e01 100644
--- 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
+++ 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
@@ -30,6 +30,7 @@ import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.file.FileReader;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.SpecificDatumReader;
@@ -207,6 +208,56 @@ public class AvroRecordInputFormatTest {
        }
 
        /**
+        * Test if the AvroInputFormat is able to properly read data from an 
avro file.
+        * @throws IOException
+        */
+       @Test
+       public void testDeserialisationReuseAvroRecordFalse() throws 
IOException {
+               Configuration parameters = new Configuration();
+               
+               AvroInputFormat<User> format = new AvroInputFormat<User>(new 
Path(testFile.getAbsolutePath()), User.class);
+               format.setReuseAvroValue(false);
+               
+               format.configure(parameters);
+               FileInputSplit[] splits = format.createInputSplits(1);
+               assertEquals(splits.length, 1);
+               format.open(splits[0]);
+               
+               User u = format.nextRecord(null);
+               assertNotNull(u);
+               
+               String name = u.getName().toString();
+               assertNotNull("empty record", name);
+               assertEquals("name not equal", TEST_NAME, name);
+               
+               // check arrays
+               List<CharSequence> sl = u.getTypeArrayString();
+               assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, 
sl.get(0).toString());
+               assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, 
sl.get(1).toString());
+               
+               List<Boolean> bl = u.getTypeArrayBoolean();
+               assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, 
bl.get(0));
+               assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, 
bl.get(1));
+               
+               // check enums
+               Colors enumValue = u.getTypeEnum();
+               assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
+               
+               // check maps
+               Map<CharSequence, Long> lm = u.getTypeMap();
+               assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, 
lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
+               assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, 
lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
+               
+               assertFalse("expecting second element", format.reachedEnd());
+               assertNotNull("expecting second element", format.nextRecord(u));
+               
+               assertNull(format.nextRecord(u));
+               assertTrue(format.reachedEnd());
+               
+               format.close();
+       }
+
+       /**
         * Test if the Flink serialization is able to properly process 
GenericData.Record types.
         * Usually users of Avro generate classes (POJOs) from Avro schemas.
         * However, if generated classes are not available, one can also use 
GenericData.Record.
@@ -254,7 +305,7 @@ public class AvroRecordInputFormatTest {
                        assertEquals(null, newRec.get("type_long_test"));
                }
        }
-
+               
        /**
         * This test validates proper serialization with specific (generated 
POJO) types.
         */
@@ -289,6 +340,96 @@ public class AvroRecordInputFormatTest {
                }
        }
 
+       /**
+        * Test if the AvroInputFormat is able to properly read data from an 
Avro
+        * file as a GenericRecord.
+        * 
+        * @throws IOException,
+        *             if there is an exception
+        */
+       @Test
+       public void testDeserialisationGenericRecord() throws IOException {
+               Configuration parameters = new Configuration();
+
+               AvroInputFormat<GenericRecord> format = new 
AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()),
+                               GenericRecord.class);
+
+               doTestDeserializationGenericRecord(format, parameters);
+       }
+
+       /**
+        * Helper method to test GenericRecord serialisation
+        * 
+        * @param format
+        *            the format to test
+        * @param parameters
+        *            the configuration to use
+        * @throws IOException
+        *             thrown id there is a issue
+        */
+       @SuppressWarnings("unchecked")
+       private void doTestDeserializationGenericRecord(final 
AvroInputFormat<GenericRecord> format,
+                       final Configuration parameters) throws IOException {
+               try {
+                       format.configure(parameters);
+                       FileInputSplit[] splits = format.createInputSplits(1);
+                       assertEquals(splits.length, 1);
+                       format.open(splits[0]);
+
+                       GenericRecord u = format.nextRecord(null);
+                       assertNotNull(u);
+                       assertEquals("The schemas should be equal", userSchema, 
u.getSchema());
+
+                       String name = u.get("name").toString();
+                       assertNotNull("empty record", name);
+                       assertEquals("name not equal", TEST_NAME, name);
+
+                       // check arrays
+                       List<CharSequence> sl = (List<CharSequence>) 
u.get("type_array_string");
+                       assertEquals("element 0 not equal", 
TEST_ARRAY_STRING_1, sl.get(0).toString());
+                       assertEquals("element 1 not equal", 
TEST_ARRAY_STRING_2, sl.get(1).toString());
+
+                       List<Boolean> bl = (List<Boolean>) 
u.get("type_array_boolean");
+                       assertEquals("element 0 not equal", 
TEST_ARRAY_BOOLEAN_1, bl.get(0));
+                       assertEquals("element 1 not equal", 
TEST_ARRAY_BOOLEAN_2, bl.get(1));
+
+                       // check enums
+                       GenericData.EnumSymbol enumValue = 
(GenericData.EnumSymbol) u.get("type_enum");
+                       assertEquals("enum not equal", 
TEST_ENUM_COLOR.toString(), enumValue.toString());
+
+                       // check maps
+                       Map<CharSequence, Long> lm = (Map<CharSequence, Long>) 
u.get("type_map");
+                       assertEquals("map value of key 1 not equal", 
TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
+                       assertEquals("map value of key 2 not equal", 
TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
+
+                       assertFalse("expecting second element", 
format.reachedEnd());
+                       assertNotNull("expecting second element", 
format.nextRecord(u));
+
+                       assertNull(format.nextRecord(u));
+                       assertTrue(format.reachedEnd());
+               } finally {
+                       format.close();
+               }
+       }
+
+       /**
+        * Test if the AvroInputFormat is able to properly read data from an 
avro
+        * file as a GenericRecord
+        * 
+        * @throws IOException,
+        *             if there is an error
+        */
+       @Test
+       public void testDeserialisationGenericRecordReuseAvroValueFalse() 
throws IOException {
+               Configuration parameters = new Configuration();
+
+               AvroInputFormat<GenericRecord> format = new 
AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()),
+                               GenericRecord.class);
+               format.configure(parameters);
+               format.setReuseAvroValue(false);
+
+               doTestDeserializationGenericRecord(format, parameters);
+       }
 
        @After
        public void deleteFiles() {

Reply via email to