This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c77e6f9  [FLINK-20054][formats] Fix ParquetInputFormat 3 level List 
handling
c77e6f9 is described below

commit c77e6f9e987f344f1bad2deac4793d78d472ac30
Author: Peter Huang <[email protected]>
AuthorDate: Sun Nov 8 23:32:15 2020 -0800

    [FLINK-20054][formats] Fix ParquetInputFormat 3 level List handling
    
    This closes #13994
---
 .../parquet/utils/ParquetSchemaConverter.java      | 66 +++++++++-------
 .../flink/formats/parquet/utils/RowConverter.java  | 88 ++++++++++++++++++++--
 .../formats/parquet/ParquetMapInputFormatTest.java | 23 ++++--
 .../parquet/ParquetPojoInputFormatTest.java        | 23 ++++--
 .../formats/parquet/ParquetRowInputFormatTest.java | 85 ++++++++++++---------
 .../formats/parquet/ParquetTableSourceITCase.java  |  3 +-
 .../formats/parquet/ParquetTableSourceTest.java    | 21 +++---
 .../parquet/utils/ParquetRecordReaderTest.java     | 43 +++++++----
 .../parquet/utils/ParquetSchemaConverterTest.java  | 21 +++---
 .../flink/formats/parquet/utils/TestUtil.java      | 56 +++++++++++++-
 10 files changed, 307 insertions(+), 122 deletions(-)

diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
index fe79c6f..cf34d46 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
@@ -228,16 +228,7 @@ public class ParquetSchemaConverter {
                                                        // If the repeated 
field is a group with multiple fields, then its type is the element
                                                        // type and elements 
are required.
                                                        if 
(elementType.getFieldCount() > 1) {
-
-                                                               for (Type type 
: elementType.getFields()) {
-                                                                       if 
(!type.isRepetition(Type.Repetition.REQUIRED)) {
-                                                                               
throw new UnsupportedOperationException(
-                                                                               
        String.format("List field [%s] in List [%s] has to be required. ",
-                                                                               
                type.toString(), fieldType.getName()));
-                                                                       }
-                                                               }
-                                                               typeInfo = 
ObjectArrayTypeInfo.getInfoFor(
-                                                                       
convertParquetTypeToTypeInfo(elementType));
+                                                               typeInfo = 
convertGroupElementToArrayTypeInfo(parquetGroupType, elementType);
                                                        } else {
                                                                Type 
internalType = elementType.getType(0);
                                                                if 
(internalType.isPrimitive()) {
@@ -250,9 +241,7 @@ public class ParquetSchemaConverter {
                                                                                
typeInfo = ObjectArrayTypeInfo.getInfoFor(
                                                                                
        convertParquetTypeToTypeInfo(internalType));
                                                                        } else {
-                                                                               
throw new UnsupportedOperationException(
-                                                                               
        String.format("Unrecgonized List schema [%s] according to Parquet"
-                                                                               
                + " standard", parquetGroupType.toString()));
+                                                                               
typeInfo = convertGroupElementToArrayTypeInfo(parquetGroupType, tupleGroup);
                                                                        }
                                                                }
                                                        }
@@ -306,6 +295,18 @@ public class ParquetSchemaConverter {
                return typeInfo;
        }
 
+       private static ObjectArrayTypeInfo 
convertGroupElementToArrayTypeInfo(GroupType arrayFieldType, GroupType 
elementType) {
+               for (Type type : elementType.getFields()) {
+                       if (!type.isRepetition(Type.Repetition.REQUIRED)) {
+                               throw new UnsupportedOperationException(
+                                       String.format("List field [%s] in List 
[%s] has to be required. ",
+                                               type.toString(), 
arrayFieldType.getName()));
+                       }
+               }
+               return ObjectArrayTypeInfo.getInfoFor(
+                       convertParquetTypeToTypeInfo(elementType));
+       }
+
        private static TypeInformation<?> 
convertParquetPrimitiveListToFlinkArray(Type type) {
                // Backward-compatibility element group doesn't exist also 
allowed
                TypeInformation<?> flinkType = 
convertParquetTypeToTypeInfo(type);
@@ -374,17 +375,33 @@ public class ParquetSchemaConverter {
                        GroupType componentGroup = (GroupType) 
convertField(LIST_ELEMENT, objectArrayTypeInfo.getComponentInfo(),
                                        Type.Repetition.REQUIRED, legacyMode);
 
-                       GroupType elementGroup = 
Types.repeatedGroup().named(LIST_ELEMENT);
-                       elementGroup = 
elementGroup.withNewFields(componentGroup.getFields());
-                       fieldType = Types.buildGroup(repetition)
-                               .addField(elementGroup)
-                               .as(OriginalType.LIST)
-                               .named(fieldName);
+                       if (legacyMode) {
+                               // LegacyMode is 2 Level List schema
+                               fieldType = Types.buildGroup(repetition)
+                                       .addField(componentGroup)
+                                       .as(OriginalType.LIST)
+                                       .named(fieldName);
+                       } else {
+                               // Add extra layer of Group according to 
Parquet's standard
+                               Type listGroup = Types.repeatedGroup()
+                                       
.addField(componentGroup).named(LIST_GROUP_NAME);
+                               fieldType = Types.buildGroup(repetition)
+                                       .addField(listGroup)
+                                       .as(OriginalType.LIST)
+                                       .named(fieldName);
+                       }
                } else if (typeInfo instanceof BasicArrayTypeInfo) {
                        BasicArrayTypeInfo basicArrayType = 
(BasicArrayTypeInfo) typeInfo;
 
+                       // LegacyMode is 2 Level List schema
                        if (legacyMode) {
-
+                               PrimitiveType primitiveTyp =
+                                       convertField(fieldName, 
basicArrayType.getComponentInfo(),
+                                               Type.Repetition.REQUIRED, 
legacyMode).asPrimitiveType();
+                               fieldType = Types.buildGroup(repetition)
+                                       .addField(primitiveTyp)
+                                       .as(OriginalType.LIST).named(fieldName);
+                       } else {
                                // Add extra layer of Group according to 
Parquet's standard
                                Type listGroup = Types.repeatedGroup().addField(
                                        convertField(LIST_ELEMENT, 
basicArrayType.getComponentInfo(),
@@ -393,15 +410,6 @@ public class ParquetSchemaConverter {
                                fieldType = Types.buildGroup(repetition)
                                        .addField(listGroup)
                                        .as(OriginalType.LIST).named(fieldName);
-                       } else {
-                               PrimitiveType primitiveTyp =
-                                       convertField(fieldName, 
basicArrayType.getComponentInfo(),
-                                               Type.Repetition.REQUIRED, 
legacyMode).asPrimitiveType();
-                               fieldType = Types.buildGroup(repetition)
-                                       
.repeated(primitiveTyp.getPrimitiveTypeName())
-                                       .as(primitiveTyp.getOriginalType())
-                                       .named(LIST_ARRAY_TYPE)
-                                       .as(OriginalType.LIST).named(fieldName);
                        }
                } else if (typeInfo instanceof SqlTimeTypeInfo) {
                        if (typeInfo.equals(SqlTimeTypeInfo.DATE)) {
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java
index 3452278..1aa0118 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java
@@ -48,6 +48,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+
 /**
  * Extends from {@link GroupConverter} to convert an nested Parquet Record 
into Row.
  */
@@ -90,6 +92,7 @@ public class RowConverter extends GroupConverter implements 
ParentDataHolder {
                } else if (typeInformation instanceof BasicArrayTypeInfo) {
                        Type elementType = 
field.asGroupType().getFields().get(0);
                        Class typeClass = ((BasicArrayTypeInfo) 
typeInformation).getComponentInfo().getTypeClass();
+
                        if (typeClass.equals(Character.class)) {
                                return new 
RowConverter.ArrayConverter<Character>(elementType,
                                        Character.class, 
BasicTypeInfo.CHAR_TYPE_INFO, parentDataHolder, fieldPos);
@@ -131,7 +134,6 @@ public class RowConverter extends GroupConverter implements 
ParentDataHolder {
                } else if (typeInformation instanceof ObjectArrayTypeInfo) {
                        GroupType parquetGroupType = field.asGroupType();
                        Type elementType = parquetGroupType.getType(0);
-
                        return new 
RowConverter.ArrayConverter<Row>(elementType, Row.class,
                                ((ObjectArrayTypeInfo) 
typeInformation).getComponentInfo(), parentDataHolder, fieldPos);
                } else if (typeInformation instanceof RowTypeInfo) {
@@ -302,11 +304,13 @@ public class RowConverter extends GroupConverter 
implements ParentDataHolder {
                        this.elementClass = elementClass;
                        this.parentDataHolder = parentDataHolder;
                        this.pos = pos;
-
-                       if (elementClass.equals(Row.class)) {
-                               this.elementConverter = 
createConverter(elementType, 0, elementTypeInfo, this);
+                       if (isElementType(elementType, elementTypeInfo)) {
+                               elementConverter = 
createArrayElementConverter(elementType, elementClass,
+                                       elementTypeInfo, this);
                        } else {
-                               this.elementConverter = new 
RowConverter.RowPrimitiveConverter(elementType, this, 0);
+                               GroupType groupType = elementType.asGroupType();
+                               elementConverter = new 
ArrayElementConverter(groupType, elementClass,
+                                       elementTypeInfo, this);
                        }
                }
 
@@ -329,6 +333,46 @@ public class RowConverter extends GroupConverter 
implements ParentDataHolder {
                public void add(int fieldIndex, Object object) {
                        list.add((T) object);
                }
+
+               /**
+                * Converter for list elements.
+                *
+                * <pre>
+                *   optional group the_list (LIST) {
+                *     repeated group array { <-- this layer
+                *       optional (type) element;
+                *     }
+                *   }
+                * </pre>
+                */
+               static class ArrayElementConverter extends GroupConverter {
+                       private final Converter elementConverter;
+
+                       public ArrayElementConverter(
+                               GroupType repeatedTye,
+                               Class elementClass,
+                               TypeInformation elementTypeInfo,
+                               ParentDataHolder holder) {
+                               Type elementType = repeatedTye.getType(0);
+                               this.elementConverter = 
createArrayElementConverter(elementType, elementClass,
+                                       elementTypeInfo, holder);
+                       }
+
+                       @Override
+                       public Converter getConverter(int i) {
+                               return elementConverter;
+                       }
+
+                       @Override
+                       public void start() {
+
+                       }
+
+                       @Override
+                       public void end() {
+
+                       }
+               }
        }
 
        static class MapConverter extends GroupConverter {
@@ -395,4 +439,38 @@ public class RowConverter extends GroupConverter 
implements ParentDataHolder {
                        }
                }
        }
+
+       static Converter createArrayElementConverter(
+               Type elementType,
+               Class elementClass,
+               TypeInformation elementTypeInfo,
+               ParentDataHolder holder) {
+               if (elementClass.equals(Row.class)) {
+                       return createConverter(elementType, 0, elementTypeInfo, 
holder);
+               } else {
+                       return new 
RowConverter.RowPrimitiveConverter(elementType, holder, 0);
+               }
+       }
+
+       /**
+        * Returns whether the given type is the element type of a list or is a
+        * synthetic group with one field that is the element type. This is
+        * determined by checking whether the type can be a synthetic group and 
by
+        * checking whether a potential synthetic group matches the expected 
schema.
+        *
+        * @param repeatedType a type that may be the element type
+        * @param typeInformation the expected flink type for list elements
+        * @return {@code true} if the repeatedType is the element schema
+        */
+       static boolean isElementType(Type repeatedType, TypeInformation 
typeInformation) {
+               if (repeatedType.isPrimitive() ||
+                       repeatedType.asGroupType().getFieldCount() > 1 ||
+                       
repeatedType.asGroupType().getType(0).isRepetition(REPEATED)) {
+                       // The repeated type must be the element type because 
it is an invalid
+                       // synthetic wrapper. Must be a group with one optional 
or required field
+                       return true;
+               }
+
+               return false;
+       }
 }
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java
index f36b12c..66deac7 100644
--- 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java
@@ -30,6 +30,8 @@ import org.apache.parquet.schema.MessageType;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -43,21 +45,27 @@ import static org.junit.Assert.assertNotNull;
 /**
  * Test cases for reading Map from Parquet files.
  */
-public class ParquetMapInputFormatTest {
+@RunWith(Parameterized.class)
+public class ParquetMapInputFormatTest extends TestUtil {
        private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
 
        @ClassRule
        public static TemporaryFolder tempRoot = new TemporaryFolder();
 
+       public ParquetMapInputFormatTest(boolean useLegacyMode) {
+               super(useLegacyMode);
+       }
+
        @Test
        @SuppressWarnings("unchecked")
        public void testReadMapFromNestedRecord() throws IOException {
                Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
nested = TestUtil.getNestedRecordTestData();
-               Path path = TestUtil.createTempParquetFile(tempRoot.getRoot(), 
TestUtil.NESTED_SCHEMA, Collections.singletonList(nested.f1));
-               MessageType nestedType = 
SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
+               Path path = createTempParquetFile(tempRoot.getRoot(), 
NESTED_SCHEMA,
+                       Collections.singletonList(nested.f1), 
getConfiguration());
+               MessageType nestedType = 
getSchemaConverter().convert(NESTED_SCHEMA);
 
                ParquetMapInputFormat inputFormat = new 
ParquetMapInputFormat(path, nestedType);
-               inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
+               inputFormat.setRuntimeContext(getMockRuntimeContext());
 
                FileInputSplit[] splits = inputFormat.createInputSplits(1);
                assertEquals(1, splits.length);
@@ -84,12 +92,13 @@ public class ParquetMapInputFormatTest {
        @SuppressWarnings("unchecked")
        public void testProjectedReadMapFromNestedRecord() throws IOException {
                Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
nested = TestUtil.getNestedRecordTestData();
-               Path path = TestUtil.createTempParquetFile(tempRoot.getRoot(), 
TestUtil.NESTED_SCHEMA, Collections.singletonList(nested.f1));
-               MessageType nestedType = 
SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
+               Path path = createTempParquetFile(tempRoot.getRoot(), 
NESTED_SCHEMA,
+                       Collections.singletonList(nested.f1), 
getConfiguration());
+               MessageType nestedType = 
getSchemaConverter().convert(NESTED_SCHEMA);
                ParquetMapInputFormat inputFormat = new 
ParquetMapInputFormat(path, nestedType);
 
                
inputFormat.selectFields(Collections.singletonList("nestedMap").toArray(new 
String[0]));
-               inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
+               inputFormat.setRuntimeContext(getMockRuntimeContext());
 
                FileInputSplit[] splits = inputFormat.createInputSplits(1);
                assertEquals(1, splits.length);
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetPojoInputFormatTest.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetPojoInputFormatTest.java
index df5f25b..29ffcad 100644
--- 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetPojoInputFormatTest.java
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetPojoInputFormatTest.java
@@ -33,6 +33,8 @@ import org.apache.parquet.schema.MessageType;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -43,17 +45,23 @@ import static org.junit.Assert.assertEquals;
 /**
  * Test cases for reading Pojo from Parquet files.
  */
-public class ParquetPojoInputFormatTest {
+@RunWith(Parameterized.class)
+public class ParquetPojoInputFormatTest extends TestUtil {
        private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
 
        @ClassRule
        public static TemporaryFolder tempRoot = new TemporaryFolder();
 
+       public ParquetPojoInputFormatTest(boolean useLegacyMode) {
+               super(useLegacyMode);
+       }
+
        @Test
        public void testReadPojoFromSimpleRecord() throws IOException {
-               Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
simple = TestUtil.getSimpleRecordTestData();
-               MessageType messageType = 
SCHEMA_CONVERTER.convert(TestUtil.SIMPLE_SCHEMA);
-               Path path = TestUtil.createTempParquetFile(tempRoot.getRoot(), 
TestUtil.SIMPLE_SCHEMA, Collections.singletonList(simple.f1));
+               Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
simple = getSimpleRecordTestData();
+               MessageType messageType = 
getSchemaConverter().convert(SIMPLE_SCHEMA);
+               Path path = createTempParquetFile(tempRoot.getRoot(), 
SIMPLE_SCHEMA,
+                       Collections.singletonList(simple.f1), 
getConfiguration());
 
                ParquetPojoInputFormat<PojoSimpleRecord> inputFormat = new 
ParquetPojoInputFormat<>(
                        path, messageType, (PojoTypeInfo<PojoSimpleRecord>) 
Types.POJO(PojoSimpleRecord.class));
@@ -72,12 +80,13 @@ public class ParquetPojoInputFormatTest {
        @Test
        public void testProjectedReadPojoFromSimpleRecord() throws IOException, 
NoSuchFieldError {
                Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
simple = TestUtil.getSimpleRecordTestData();
-               MessageType messageType = 
SCHEMA_CONVERTER.convert(TestUtil.SIMPLE_SCHEMA);
-               Path path = TestUtil.createTempParquetFile(tempRoot.getRoot(), 
TestUtil.SIMPLE_SCHEMA, Collections.singletonList(simple.f1));
+               MessageType messageType = 
getSchemaConverter().convert(SIMPLE_SCHEMA);
+               Path path = createTempParquetFile(tempRoot.getRoot(), 
SIMPLE_SCHEMA,
+                       Collections.singletonList(simple.f1), 
getConfiguration());
 
                ParquetPojoInputFormat<PojoSimpleRecord> inputFormat = new 
ParquetPojoInputFormat<>(
                        path, messageType, (PojoTypeInfo<PojoSimpleRecord>) 
Types.POJO(PojoSimpleRecord.class));
-               inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
+               inputFormat.setRuntimeContext(getMockRuntimeContext());
 
                FileInputSplit[] splits = inputFormat.createInputSplits(1);
                assertEquals(1, splits.length);
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetRowInputFormatTest.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetRowInputFormatTest.java
index c2543cd..08514b5 100644
--- 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetRowInputFormatTest.java
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetRowInputFormatTest.java
@@ -29,11 +29,12 @@ import org.apache.flink.util.InstantiationUtil;
 
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.schema.MessageType;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.File;
 import java.io.IOException;
@@ -51,21 +52,25 @@ import static org.junit.Assert.assertTrue;
 /**
  * Simple test case for reading {@link org.apache.flink.types.Row} from 
Parquet files.
  */
-public class ParquetRowInputFormatTest {
-       private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
+@RunWith(Parameterized.class)
+public class ParquetRowInputFormatTest extends TestUtil {
 
        @ClassRule
        public static TemporaryFolder tempRoot = new TemporaryFolder();
 
+       public ParquetRowInputFormatTest(boolean useLegacyMode) {
+               super(useLegacyMode);
+       }
+
        @Test
        public void testReadRowFromSimpleRecord() throws IOException {
-               Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
simple = TestUtil.getSimpleRecordTestData();
-               Path path = TestUtil.createTempParquetFile(
-                       tempRoot.getRoot(), TestUtil.SIMPLE_SCHEMA, 
Arrays.asList(simple.f1, simple.f1));
-               MessageType simpleType = 
SCHEMA_CONVERTER.convert(TestUtil.SIMPLE_SCHEMA);
+               Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
simple = getSimpleRecordTestData();
+               Path path = createTempParquetFile(
+                       tempRoot.getRoot(), SIMPLE_SCHEMA, 
Arrays.asList(simple.f1, simple.f1), getConfiguration());
+               MessageType simpleType = 
getSchemaConverter().convert(SIMPLE_SCHEMA);
 
                ParquetRowInputFormat inputFormat = new 
ParquetRowInputFormat(path, simpleType);
-               inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
+               inputFormat.setRuntimeContext(getMockRuntimeContext());
 
                FileInputSplit[] splits = inputFormat.createInputSplits(1);
                assertEquals(1, splits.length);
@@ -95,13 +100,16 @@ public class ParquetRowInputFormatTest {
 
                File tempFolder = tempRoot.newFolder();
                // created a parquet file with 10 row groups. Each row group 
has 100 records
-               TestUtil.createTempParquetFile(tempFolder, 
TestUtil.SIMPLE_SCHEMA, records);
-               TestUtil.createTempParquetFile(tempFolder, 
TestUtil.SIMPLE_SCHEMA, records);
-               TestUtil.createTempParquetFile(tempFolder, 
TestUtil.SIMPLE_SCHEMA, records);
-               MessageType simpleType = 
SCHEMA_CONVERTER.convert(TestUtil.SIMPLE_SCHEMA);
+               createTempParquetFile(tempFolder, SIMPLE_SCHEMA,
+                       records, getConfiguration());
+               createTempParquetFile(tempFolder, SIMPLE_SCHEMA,
+                       records, getConfiguration());
+               createTempParquetFile(tempFolder, SIMPLE_SCHEMA,
+                       records, getConfiguration());
+               MessageType simpleType = 
getSchemaConverter().convert(SIMPLE_SCHEMA);
 
                ParquetRowInputFormat inputFormat = new 
ParquetRowInputFormat(new Path(tempFolder.getPath()), simpleType);
-               inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
+               inputFormat.setRuntimeContext(getMockRuntimeContext());
 
                FileInputSplit[] splits = inputFormat.createInputSplits(3);
                assertEquals(3, splits.length);
@@ -133,11 +141,12 @@ public class ParquetRowInputFormatTest {
                }
 
                // created a parquet file with 10 row groups. Each row group 
has 100 records
-               Path path = 
TestUtil.createTempParquetFile(tempRoot.newFolder(), TestUtil.SIMPLE_SCHEMA, 
records);
-               MessageType simpleType = 
SCHEMA_CONVERTER.convert(TestUtil.SIMPLE_SCHEMA);
+               Path path = createTempParquetFile(tempRoot.newFolder(), 
SIMPLE_SCHEMA,
+                       records, getConfiguration());
+               MessageType simpleType = 
getSchemaConverter().convert(SIMPLE_SCHEMA);
 
                ParquetRowInputFormat inputFormat = new 
ParquetRowInputFormat(path, simpleType);
-               inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
+               inputFormat.setRuntimeContext(getMockRuntimeContext());
 
                FileInputSplit[] splits = inputFormat.createInputSplits(1);
                assertEquals(1, splits.length);
@@ -240,11 +249,12 @@ public class ParquetRowInputFormatTest {
                }
 
                // created a parquet file with 10 row groups. Each row group 
has 100 records
-               Path path = 
TestUtil.createTempParquetFile(tempRoot.newFolder(), TestUtil.SIMPLE_SCHEMA, 
records);
-               MessageType simpleType = 
SCHEMA_CONVERTER.convert(TestUtil.SIMPLE_SCHEMA);
+               Path path = createTempParquetFile(tempRoot.newFolder(), 
SIMPLE_SCHEMA,
+                       records, getConfiguration());
+               MessageType simpleType = 
getSchemaConverter().convert(SIMPLE_SCHEMA);
 
                ParquetRowInputFormat inputFormat = new 
ParquetRowInputFormat(path, simpleType);
-               inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
+               inputFormat.setRuntimeContext(getMockRuntimeContext());
 
                FileInputSplit[] splits = inputFormat.createInputSplits(1);
                assertEquals(1, splits.length);
@@ -340,12 +350,13 @@ public class ParquetRowInputFormatTest {
 
        @Test
        public void testReadRowFromNestedRecord() throws IOException {
-               Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
nested = TestUtil.getNestedRecordTestData();
-               Path path = 
TestUtil.createTempParquetFile(tempRoot.newFolder(), TestUtil.NESTED_SCHEMA, 
Collections.singletonList(nested.f1));
-               MessageType nestedType = 
SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
+               Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
nested = getNestedRecordTestData();
+               Path path = createTempParquetFile(tempRoot.newFolder(), 
NESTED_SCHEMA,
+                       Collections.singletonList(nested.f1), 
getConfiguration());
+               MessageType nestedType = 
getSchemaConverter().convert(NESTED_SCHEMA);
 
                ParquetRowInputFormat inputFormat = new 
ParquetRowInputFormat(path, nestedType);
-               inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
+               inputFormat.setRuntimeContext(getMockRuntimeContext());
 
                FileInputSplit[] splits = inputFormat.createInputSplits(1);
                assertEquals(1, splits.length);
@@ -365,12 +376,13 @@ public class ParquetRowInputFormatTest {
 
        @Test
        public void testProjectedRowFromNestedRecord() throws Exception {
-               Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
nested = TestUtil.getNestedRecordTestData();
-               Path path = 
TestUtil.createTempParquetFile(tempRoot.newFolder(), TestUtil.NESTED_SCHEMA, 
Collections.singletonList(nested.f1));
-               MessageType nestedType = 
SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
+               Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
nested = getNestedRecordTestData();
+               Path path = createTempParquetFile(tempRoot.newFolder(), 
NESTED_SCHEMA,
+                       Collections.singletonList(nested.f1), 
getConfiguration());
+               MessageType nestedType = 
getSchemaConverter().convert(NESTED_SCHEMA);
 
                ParquetRowInputFormat inputFormat = new 
ParquetRowInputFormat(path, nestedType);
-               inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
+               inputFormat.setRuntimeContext(getMockRuntimeContext());
 
                inputFormat.selectFields(new String[]{"bar", "nestedMap"});
 
@@ -387,27 +399,29 @@ public class ParquetRowInputFormatTest {
 
        @Test(expected = IllegalArgumentException.class)
        public void testInvalidProjectionOfNestedRecord() throws Exception {
-               Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
nested = TestUtil.getNestedRecordTestData();
-               Path path = 
TestUtil.createTempParquetFile(tempRoot.newFolder(), TestUtil.NESTED_SCHEMA, 
Collections.singletonList(nested.f1));
-               MessageType nestedType = 
SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
+               Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
nested = getNestedRecordTestData();
+               Path path = createTempParquetFile(tempRoot.newFolder(), 
NESTED_SCHEMA,
+                       Collections.singletonList(nested.f1), 
getConfiguration());
+               MessageType nestedType = 
getSchemaConverter().convert(NESTED_SCHEMA);
 
                ParquetRowInputFormat inputFormat = new 
ParquetRowInputFormat(path, nestedType);
-               inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
+               inputFormat.setRuntimeContext(getMockRuntimeContext());
 
                inputFormat.selectFields(new String[]{"bar", "celona"});
        }
 
        @Test
        public void testSerialization() throws Exception {
-               Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
simple = TestUtil.getSimpleRecordTestData();
-               Path path = 
TestUtil.createTempParquetFile(tempRoot.newFolder(), TestUtil.SIMPLE_SCHEMA, 
Collections.singletonList(simple.f1));
-               MessageType simpleType = 
SCHEMA_CONVERTER.convert(TestUtil.SIMPLE_SCHEMA);
+               Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
simple = getSimpleRecordTestData();
+               Path path = createTempParquetFile(tempRoot.newFolder(), 
SIMPLE_SCHEMA,
+                       Collections.singletonList(simple.f1), 
getConfiguration());
+               MessageType simpleType = 
getSchemaConverter().convert(SIMPLE_SCHEMA);
 
                ParquetRowInputFormat inputFormat = new 
ParquetRowInputFormat(path, simpleType);
                byte[] bytes = InstantiationUtil.serializeObject(inputFormat);
                ParquetRowInputFormat copy = 
InstantiationUtil.deserializeObject(bytes, getClass().getClassLoader());
 
-               copy.setRuntimeContext(TestUtil.getMockRuntimeContext());
+               copy.setRuntimeContext(getMockRuntimeContext());
 
                FileInputSplit[] splits = copy.createInputSplits(1);
                assertEquals(1, splits.length);
@@ -417,5 +431,4 @@ public class ParquetRowInputFormatTest {
                assertNotNull(row);
                assertEquals(simple.f2, row);
        }
-
 }
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java
index 8e7c43b..463eba6 100644
--- 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.types.Row;
 
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.schema.MessageType;
 import org.junit.BeforeClass;
@@ -111,7 +112,7 @@ public class ParquetTableSourceITCase extends 
MultipleProgramsTestBase {
         */
        private static Path createTestParquetFile(int numberOfRows) throws 
Exception {
                List<IndexedRecord> records = 
TestUtil.createRecordList(numberOfRows);
-               Path path = TestUtil.createTempParquetFile(tempRoot.getRoot(), 
TestUtil.NESTED_SCHEMA, records);
+               Path path = TestUtil.createTempParquetFile(tempRoot.getRoot(), 
TestUtil.NESTED_SCHEMA, records, new Configuration());
                return path;
        }
 }
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceTest.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceTest.java
index 1e6ebf8..a0274fe 100644
--- 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceTest.java
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceTest.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.table.expressions.PlannerResolvedFieldReference;
 import org.apache.flink.types.Row;
 
 import org.apache.avro.specific.SpecificRecord;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.filter2.predicate.FilterApi;
 import org.apache.parquet.filter2.predicate.FilterPredicate;
@@ -64,7 +65,7 @@ import static org.junit.Assert.assertTrue;
 /**
  * Test cases for {@link ParquetTableSource}.
  */
-public class ParquetTableSourceTest extends TestUtil {
+public class ParquetTableSourceTest {
        private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
        private static Path testPath;
 
@@ -88,7 +89,7 @@ public class ParquetTableSourceTest extends TestUtil {
                assertNotNull(returnType);
                assertTrue(returnType instanceof RowTypeInfo);
                RowTypeInfo rowType = (RowTypeInfo) returnType;
-               assertEquals(NESTED_ROW_TYPE, rowType);
+               assertEquals(TestUtil.NESTED_ROW_TYPE, rowType);
        }
 
        @Test
@@ -102,7 +103,7 @@ public class ParquetTableSourceTest extends TestUtil {
                TableSchema schema = parquetTableSource.getTableSchema();
                assertNotNull(schema);
 
-               RowTypeInfo expectedSchema = (RowTypeInfo) NESTED_ROW_TYPE;
+               RowTypeInfo expectedSchema = (RowTypeInfo) 
TestUtil.NESTED_ROW_TYPE;
                assertArrayEquals(expectedSchema.getFieldNames(), 
schema.getFieldNames());
                assertArrayEquals(expectedSchema.getFieldTypes(), 
schema.getFieldTypes());
        }
@@ -121,8 +122,8 @@ public class ParquetTableSourceTest extends TestUtil {
                // ensure that table source description differs
                assertNotEquals(parquetTableSource.explainSource(), 
projected.explainSource());
 
-               String[] fieldNames = ((RowTypeInfo) 
NESTED_ROW_TYPE).getFieldNames();
-               TypeInformation[] fieldTypes =  ((RowTypeInfo) 
NESTED_ROW_TYPE).getFieldTypes();
+               String[] fieldNames = ((RowTypeInfo) 
TestUtil.NESTED_ROW_TYPE).getFieldNames();
+               TypeInformation[] fieldTypes =  ((RowTypeInfo) 
TestUtil.NESTED_ROW_TYPE).getFieldTypes();
                assertEquals(
                        Types.ROW_NAMED(
                                new String[] {fieldNames[2], fieldNames[4], 
fieldNames[6]},
@@ -186,7 +187,7 @@ public class ParquetTableSourceTest extends TestUtil {
                assertEquals(parquetTableSource.getTableSchema(), 
filtered.getTableSchema());
 
                // ensure return type is identical
-               assertEquals(NESTED_ROW_TYPE, filtered.getReturnType());
+               assertEquals(TestUtil.NESTED_ROW_TYPE, 
filtered.getReturnType());
 
                // ensure source description is not the same
                assertNotEquals(parquetTableSource.explainSource(), 
filtered.explainSource());
@@ -217,14 +218,14 @@ public class ParquetTableSourceTest extends TestUtil {
        }
 
        private static Path createTestParquetFile() throws Exception {
-               Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
nested = getNestedRecordTestData();
-               Path path = createTempParquetFile(tempRoot.getRoot(), 
NESTED_SCHEMA,
-                       Collections.singletonList(nested.f1));
+               Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
nested = TestUtil.getNestedRecordTestData();
+               Path path = TestUtil.createTempParquetFile(tempRoot.getRoot(), 
TestUtil.NESTED_SCHEMA,
+                       Collections.singletonList(nested.f1), new 
Configuration());
                return path;
        }
 
        private ParquetTableSource createNestedTestParquetTableSource(Path 
path) throws Exception {
-               MessageType nestedSchema = 
SCHEMA_CONVERTER.convert(NESTED_SCHEMA);
+               MessageType nestedSchema = 
SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
                ParquetTableSource parquetTableSource = 
ParquetTableSource.builder()
                        .path(path.getPath())
                        .forParquetSchema(nestedSchema)
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java
index 6c79605..f957125 100644
--- 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java
@@ -32,12 +32,13 @@ import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.ParquetReadOptions;
-import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.io.InputFile;
 import org.apache.parquet.schema.MessageType;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -54,10 +55,15 @@ import static org.junit.Assert.assertTrue;
 /**
  * Simple test case for reading parquet records.
  */
+@RunWith(Parameterized.class)
 public class ParquetRecordReaderTest extends TestUtil {
 
        private final Configuration testConfig = new Configuration();
 
+       public ParquetRecordReaderTest(boolean useLegacyMode) {
+               super(useLegacyMode);
+       }
+
        @Test
        public void testReadSimpleGroup() throws IOException {
                Long[] array = {1L};
@@ -66,8 +72,9 @@ public class ParquetRecordReaderTest extends TestUtil {
                        .set("foo", 32L)
                        .set("arr", array).build();
 
-               Path path = createTempParquetFile(tempRoot.getRoot(), 
SIMPLE_SCHEMA, Collections.singletonList(record));
-               MessageType readSchema = (new 
AvroSchemaConverter()).convert(SIMPLE_SCHEMA);
+               Path path = createTempParquetFile(tempRoot.getRoot(), 
SIMPLE_SCHEMA,
+                       Collections.singletonList(record), getConfiguration());
+               MessageType readSchema = 
getSchemaConverter().convert(SIMPLE_SCHEMA);
                ParquetRecordReader<Row> rowReader = new 
ParquetRecordReader<>(new RowReadSupport(), readSchema);
 
                InputFile inputFile =
@@ -99,8 +106,9 @@ public class ParquetRecordReaderTest extends TestUtil {
                        records.add(record);
                }
 
-               Path path = createTempParquetFile(tempRoot.getRoot(), 
SIMPLE_SCHEMA, records);
-               MessageType readSchema = (new 
AvroSchemaConverter()).convert(SIMPLE_SCHEMA);
+               Path path = createTempParquetFile(tempRoot.getRoot(),
+                       SIMPLE_SCHEMA, records, getConfiguration());
+               MessageType readSchema = 
getSchemaConverter().convert(SIMPLE_SCHEMA);
                ParquetRecordReader<Row> rowReader = new 
ParquetRecordReader<>(new RowReadSupport(), readSchema);
 
                InputFile inputFile =
@@ -134,8 +142,9 @@ public class ParquetRecordReaderTest extends TestUtil {
                        .set("bar", barRecord)
                        .build();
 
-               Path path = createTempParquetFile(tempRoot.getRoot(), 
NESTED_SCHEMA, Collections.singletonList(record));
-               MessageType readSchema = (new 
AvroSchemaConverter()).convert(NESTED_SCHEMA);
+               Path path = createTempParquetFile(tempRoot.getRoot(), 
NESTED_SCHEMA,
+                       Collections.singletonList(record), getConfiguration());
+               MessageType readSchema = 
getSchemaConverter().convert(NESTED_SCHEMA);
                ParquetRecordReader<Row> rowReader = new 
ParquetRecordReader<>(new RowReadSupport(), readSchema);
 
                InputFile inputFile =
@@ -165,8 +174,9 @@ public class ParquetRecordReaderTest extends TestUtil {
                        .set("spamMap", map.build())
                        .build();
 
-               Path path = createTempParquetFile(tempRoot.getRoot(), 
NESTED_SCHEMA, Collections.singletonList(record));
-               MessageType readSchema = (new 
AvroSchemaConverter()).convert(NESTED_SCHEMA);
+               Path path = createTempParquetFile(tempRoot.getRoot(), 
NESTED_SCHEMA,
+                       Collections.singletonList(record), getConfiguration());
+               MessageType readSchema = 
getSchemaConverter().convert(NESTED_SCHEMA);
                ParquetRecordReader<Row> rowReader = new 
ParquetRecordReader<>(new RowReadSupport(), readSchema);
 
                InputFile inputFile =
@@ -207,8 +217,9 @@ public class ParquetRecordReaderTest extends TestUtil {
                        .set("strArray", arrayString)
                        .build();
 
-               Path path = createTempParquetFile(tempRoot.getRoot(), 
NESTED_SCHEMA, Collections.singletonList(record));
-               MessageType readSchema = (new 
AvroSchemaConverter()).convert(NESTED_SCHEMA);
+               Path path = createTempParquetFile(tempRoot.getRoot(), 
NESTED_SCHEMA,
+                       Collections.singletonList(record), getConfiguration());
+               MessageType readSchema = 
getSchemaConverter().convert(NESTED_SCHEMA);
                ParquetRecordReader<Row> rowReader = new 
ParquetRecordReader<>(new RowReadSupport(), readSchema);
 
                InputFile inputFile =
@@ -248,8 +259,9 @@ public class ParquetRecordReaderTest extends TestUtil {
                        .set("nestedMap", map.build())
                        .set("foo", 34L).build();
 
-               Path path = createTempParquetFile(tempRoot.getRoot(), 
NESTED_SCHEMA, Collections.singletonList(record));
-               MessageType readSchema = (new 
AvroSchemaConverter()).convert(NESTED_SCHEMA);
+               Path path = createTempParquetFile(tempRoot.getRoot(), 
NESTED_SCHEMA,
+                       Collections.singletonList(record), getConfiguration());
+               MessageType readSchema = 
getSchemaConverter().convert(NESTED_SCHEMA);
                ParquetRecordReader<Row> rowReader = new 
ParquetRecordReader<>(new RowReadSupport(), readSchema);
 
                InputFile inputFile =
@@ -288,8 +300,9 @@ public class ParquetRecordReaderTest extends TestUtil {
                        .set("nestedArray", list.build())
                        .set("foo", 34L).build();
 
-               Path path = createTempParquetFile(tempRoot.getRoot(), 
NESTED_SCHEMA, Collections.singletonList(record));
-               MessageType readSchema = (new 
AvroSchemaConverter()).convert(NESTED_SCHEMA);
+               Path path = createTempParquetFile(tempRoot.getRoot(), 
NESTED_SCHEMA,
+                       Collections.singletonList(record), getConfiguration());
+               MessageType readSchema = 
getSchemaConverter().convert(NESTED_SCHEMA);
                ParquetRecordReader<Row> rowReader = new 
ParquetRecordReader<>(new RowReadSupport(), readSchema);
 
                InputFile inputFile =
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java
index 10db6d2..05dd209 100644
--- 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java
@@ -33,7 +33,7 @@ import static org.junit.Assert.assertEquals;
 /**
  * Simple test case for conversion between Parquet schema and Flink date types.
  */
-public class ParquetSchemaConverterTest extends TestUtil {
+public class ParquetSchemaConverterTest {
 
        private final Type[] simpleStandardTypes = {
                
org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.INT64,
 Type.Repetition.OPTIONAL)
@@ -73,11 +73,12 @@ public class ParquetSchemaConverterTest extends TestUtil {
                        .named("value"))
                        .named("nestedMap"),
                
org.apache.parquet.schema.Types.optionalGroup().addField(org.apache.parquet.schema.Types.repeatedGroup()
-                       
.addField(org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
 Type.Repetition.REQUIRED)
-                               .as(OriginalType.UTF8).named("type"))
-                       
.addField(org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.INT64,
 Type.Repetition.REQUIRED)
-                               .as(OriginalType.INT_64).named("value"))
-                       .named("element")).as(OriginalType.LIST)
+                       
.addField(org.apache.parquet.schema.Types.requiredGroup()
+                               
.addField(org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
 Type.Repetition.REQUIRED)
+                                       .as(OriginalType.UTF8).named("type"))
+                               
.addField(org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.INT64,
 Type.Repetition.REQUIRED)
+                                       .as(OriginalType.INT_64).named("value"))
+                               
.named("element")).named("list")).as(OriginalType.LIST)
                        .named("nestedArray")
        };
 
@@ -85,25 +86,25 @@ public class ParquetSchemaConverterTest extends TestUtil {
        public void testSimpleSchemaConversion() {
                MessageType simpleType = new MessageType("simple", 
simpleStandardTypes);
                RowTypeInfo rowTypeInfo = (RowTypeInfo) 
ParquetSchemaConverter.fromParquetType(simpleType);
-               assertEquals(SIMPLE_ROW_TYPE, rowTypeInfo);
+               assertEquals(TestUtil.SIMPLE_ROW_TYPE, rowTypeInfo);
        }
 
        @Test
        public void testNestedSchemaConversion() {
                MessageType nestedTypes = new MessageType("nested", 
this.nestedTypes);
                RowTypeInfo rowTypeInfo = (RowTypeInfo) 
ParquetSchemaConverter.fromParquetType(nestedTypes);
-               assertEquals(NESTED_ROW_TYPE, rowTypeInfo);
+               assertEquals(TestUtil.NESTED_ROW_TYPE, rowTypeInfo);
        }
 
        @Test
        public void testSimpleRowTypeConversion() {
-               MessageType simpleSchema = 
ParquetSchemaConverter.toParquetType(SIMPLE_ROW_TYPE, true);
+               MessageType simpleSchema = 
ParquetSchemaConverter.toParquetType(TestUtil.SIMPLE_ROW_TYPE, false);
                assertEquals(Arrays.asList(simpleStandardTypes), 
simpleSchema.getFields());
        }
 
        @Test
        public void testNestedRowTypeConversion() {
-               MessageType nestedSchema = 
ParquetSchemaConverter.toParquetType(NESTED_ROW_TYPE, true);
+               MessageType nestedSchema = 
ParquetSchemaConverter.toParquetType(TestUtil.NESTED_ROW_TYPE, false);
                assertEquals(Arrays.asList(nestedTypes), 
nestedSchema.getFields());
        }
 }
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
index 9220aaa..f31a0dc 100644
--- 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
@@ -36,10 +36,14 @@ import org.apache.flink.types.Row;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.specific.SpecificRecord;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.hadoop.ParquetWriter;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runners.Parameterized;
 import org.mockito.Mockito;
 
 import java.io.File;
@@ -47,6 +51,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -56,6 +61,12 @@ import java.util.UUID;
  * Utilities for testing schema conversion and test parquet file creation.
  */
 public class TestUtil {
+       public static final Configuration OLD_BEHAVIOR_CONF = new 
Configuration();
+       public static final Configuration NEW_BEHAVIOR_CONF = new 
Configuration();
+       private static AvroSchemaConverter schemaConverter;
+       private static AvroSchemaConverter legacySchemaConverter;
+       protected boolean useLegacyMode;
+
        private static final TypeInformation<Row[]> nestedArray = 
Types.OBJECT_ARRAY(Types.ROW_NAMED(
                new String[] {"type", "value"}, BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO));
 
@@ -83,10 +94,51 @@ public class TestUtil {
                nestedMap,
                nestedArray);
 
-       public static Path createTempParquetFile(File folder, Schema schema, 
List<IndexedRecord> records) throws IOException {
+       @BeforeClass
+       public static void setupNewBehaviorConfiguration() {
+               OLD_BEHAVIOR_CONF.setBoolean(
+                       "parquet.avro.write-old-list-structure", true);
+               NEW_BEHAVIOR_CONF.setBoolean(
+                       "parquet.avro.write-old-list-structure", false);
+               schemaConverter = new AvroSchemaConverter(NEW_BEHAVIOR_CONF);
+               legacySchemaConverter = new 
AvroSchemaConverter(OLD_BEHAVIOR_CONF);
+       }
+
+       public TestUtil(boolean useLegacyMode) {
+               this.useLegacyMode = useLegacyMode;
+       }
+
+       @Parameterized.Parameters
+       public static Collection<Boolean> primeNumbers() {
+               return Arrays.asList(new Boolean[] {true, false});
+       }
+
+       protected AvroSchemaConverter getSchemaConverter() {
+               if (useLegacyMode) {
+                       return legacySchemaConverter;
+               }
+
+               return schemaConverter;
+       }
+
+       protected Configuration getConfiguration() {
+               if (useLegacyMode) {
+                       return OLD_BEHAVIOR_CONF;
+               }
+
+               return NEW_BEHAVIOR_CONF;
+       }
+
+       public static Path createTempParquetFile(
+               File folder,
+               Schema schema,
+               List<IndexedRecord> records,
+               Configuration configuration) throws IOException {
                Path path = new Path(folder.getPath(), 
UUID.randomUUID().toString());
                ParquetWriter<IndexedRecord> writer = 
AvroParquetWriter.<IndexedRecord>builder(
-                       new 
org.apache.hadoop.fs.Path(path.toUri())).withSchema(schema).withRowGroupSize(10).build();
+                       new org.apache.hadoop.fs.Path(path.toUri()))
+                       .withConf(configuration)
+                       .withSchema(schema).withRowGroupSize(10).build();
 
                for (IndexedRecord record : records) {
                        writer.write(record);

Reply via email to