This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e7d9c59d68644a943fc5d059c4a7a9fa3366da10 Author: Jing Ge <[email protected]> AuthorDate: Wed Mar 9 15:08:43 2022 +0100 [FLINK-26349][AvroParquet][test] add test for reading reflect records from parquet file created with generic record schema. --- .../parquet/avro/AvroParquetFileReadITCase.java | 54 +++++++++++++++++++--- .../parquet/avro/AvroParquetRecordFormatTest.java | 53 ++++++++++++++++++++- 2 files changed, 99 insertions(+), 8 deletions(-) diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java index 8ba5a40..1f8e812 100644 --- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java +++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java @@ -32,16 +32,19 @@ import org.apache.flink.util.CloseableIterator; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; -import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.List; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -53,17 +56,18 @@ public class AvroParquetFileReadITCase extends AbstractTestBase { private static final String USER_PARQUET_FILE_2 = "user2.parquet"; private static final String USER_PARQUET_FILE_3 = "user3.parquet"; - private Schema schema; - private final List<GenericRecord> userRecords = new ArrayList<>(3); + private static Schema schema; + private static final List<GenericRecord> userRecords = new ArrayList<>(3); - @Before - public void setup() throws IOException { + @BeforeClass + public static void setup() throws IOException { // Generic records schema = new Schema.Parser() .parse( "{\"type\": \"record\", " + "\"name\": \"User\", " + + "\"namespace\": \"org.apache.flink.formats.parquet.avro.AvroParquetRecordFormatTest\", " + "\"fields\": [\n" + " {\"name\": \"name\", \"type\": \"string\" },\n" + " {\"name\": \"favoriteNumber\", \"type\": [\"int\", \"null\"] },\n" @@ -125,6 +129,36 @@ public class AvroParquetFileReadITCase extends AbstractTestBase { } } + @Test + public void testReadAvroReflectRecord() throws Exception { + final FileSource<AvroParquetRecordFormatTest.User> source = + FileSource.forRecordStreamFormat( + AvroParquetReaders.forReflectRecord( + AvroParquetRecordFormatTest.User.class), + Path.fromLocalFile(TEMPORARY_FOLDER.getRoot())) + .monitorContinuously(Duration.ofMillis(5)) + .build(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + env.enableCheckpointing(10L); + + DataStream<AvroParquetRecordFormatTest.User> stream = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); + + try (CloseableIterator<AvroParquetRecordFormatTest.User> iterator = + stream.executeAndCollect("Reading Avro Reflect Records")) { + List<AvroParquetRecordFormatTest.User> list = collectRecords(iterator, 6); + Collections.sort( + list, + Comparator.comparing(AvroParquetRecordFormatTest.User::getFavoriteNumber)); + assertEquals(list.size(), 6); + + for (int i = 0; i < 6; i++) { + assertUserEquals(list.get(i), userRecords.get(i)); + } + } + } + private static <E> List<E> collectRecords( final CloseableIterator<E> iterator, final int numElements) { @@ -161,7 +195,15 @@ public class AvroParquetFileReadITCase extends AbstractTestBase { writer.finish(); } - private GenericRecord createUser(String name, int favoriteNumber, String favoriteColor) { + private void assertUserEquals(AvroParquetRecordFormatTest.User user, GenericRecord expected) { + assertThat(user).isNotNull(); + assertThat(String.valueOf(user.getName())).isNotNull().isEqualTo(expected.get("name")); + assertThat(user.getFavoriteNumber()).isEqualTo(expected.get("favoriteNumber")); + assertThat(String.valueOf(user.getFavoriteColor())) + .isEqualTo(String.valueOf(expected.get("favoriteColor"))); + } + + private static GenericRecord createUser(String name, int favoriteNumber, String favoriteColor) { GenericRecord record = new GenericData.Record(schema); record.put("name", name); record.put("favoriteNumber", favoriteNumber); diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java index 38b5396..92b1269 100644 --- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java +++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java @@ -84,6 +84,7 @@ class AvroParquetRecordFormatTest { .parse( "{\"type\": \"record\", " + "\"name\": \"User\", " + + "\"namespace\": \"org.apache.flink.formats.parquet.avro.AvroParquetRecordFormatTest\", " + "\"fields\": [\n" + " {\"name\": \"name\", \"type\": \"string\" },\n" + " {\"name\": \"favoriteNumber\", \"type\": [\"int\", \"null\"] },\n" @@ -141,6 +142,21 @@ class AvroParquetRecordFormatTest { } @Test + void testReflectReadFromGenericRecords() throws IOException { + StreamFormat.Reader<User> reader = + createReader( + AvroParquetReaders.forReflectRecord(User.class), + new Configuration(), + userPath, + 0, + userPath.getFileSystem().getFileStatus(userPath).getLen()); + for (GenericRecord record : userRecords) { + User user = reader.read(); + assertUserEquals(Objects.requireNonNull(user), record); + } + } + + @Test void testCreateGenericReader() throws IOException { StreamFormat.Reader<GenericRecord> reader = createReader( @@ -309,9 +325,16 @@ class AvroParquetRecordFormatTest { } private void assertUserEquals(GenericRecord user, GenericRecord expected) { - assertEquals(user.get("name").toString(), expected.get("name")); + assertEquals(user.get("name").toString(), expected.get("name").toString()); assertEquals(user.get("favoriteNumber"), expected.get("favoriteNumber")); - assertEquals(user.get("favoriteColor").toString(), expected.get("favoriteColor")); + assertEquals( + user.get("favoriteColor").toString(), expected.get("favoriteColor").toString()); + } + + private void assertUserEquals(User user, GenericRecord expected) { + assertEquals(user.getName(), expected.get("name").toString()); + assertEquals(user.getFavoriteNumber(), expected.get("favoriteNumber")); + assertEquals(user.getFavoriteColor(), expected.get("favoriteColor").toString()); } private static List<Address> createAddressList() { @@ -324,4 +347,30 @@ class AvroParquetRecordFormatTest { private static List<Datum> createDatumList() { return Arrays.asList(new Datum("a", 1), new Datum("b", 2), new Datum("c", 3)); } + + private static final class User { + private String name; + private Integer favoriteNumber; + private String favoriteColor; + + public User() {} + + public User(String name, Integer favoriteNumber, String favoriteColor) { + this.name = name; + this.favoriteNumber = favoriteNumber; + this.favoriteColor = favoriteColor; + } + + public String getName() { + return name; + } + + public Integer getFavoriteNumber() { + return favoriteNumber; + } + + public String getFavoriteColor() { + return favoriteColor; + } + } }
