This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c77e6f9 [FLINK-20054][formats] Fix ParquetInputFormat 3 level List
handling
c77e6f9 is described below
commit c77e6f9e987f344f1bad2deac4793d78d472ac30
Author: Peter Huang <[email protected]>
AuthorDate: Sun Nov 8 23:32:15 2020 -0800
[FLINK-20054][formats] Fix ParquetInputFormat 3 level List handling
This closes #13994
---
.../parquet/utils/ParquetSchemaConverter.java | 66 +++++++++-------
.../flink/formats/parquet/utils/RowConverter.java | 88 ++++++++++++++++++++--
.../formats/parquet/ParquetMapInputFormatTest.java | 23 ++++--
.../parquet/ParquetPojoInputFormatTest.java | 23 ++++--
.../formats/parquet/ParquetRowInputFormatTest.java | 85 ++++++++++++---------
.../formats/parquet/ParquetTableSourceITCase.java | 3 +-
.../formats/parquet/ParquetTableSourceTest.java | 21 +++---
.../parquet/utils/ParquetRecordReaderTest.java | 43 +++++++----
.../parquet/utils/ParquetSchemaConverterTest.java | 21 +++---
.../flink/formats/parquet/utils/TestUtil.java | 56 +++++++++++++-
10 files changed, 307 insertions(+), 122 deletions(-)
diff --git
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
index fe79c6f..cf34d46 100644
---
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
+++
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
@@ -228,16 +228,7 @@ public class ParquetSchemaConverter {
// If the repeated
field is a group with multiple fields, then its type is the element
// type and elements
are required.
if
(elementType.getFieldCount() > 1) {
-
- for (Type type
: elementType.getFields()) {
- if
(!type.isRepetition(Type.Repetition.REQUIRED)) {
-
throw new UnsupportedOperationException(
-
String.format("List field [%s] in List [%s] has to be required. ",
-
type.toString(), fieldType.getName()));
- }
- }
- typeInfo =
ObjectArrayTypeInfo.getInfoFor(
-
convertParquetTypeToTypeInfo(elementType));
+ typeInfo =
convertGroupElementToArrayTypeInfo(parquetGroupType, elementType);
} else {
Type
internalType = elementType.getType(0);
if
(internalType.isPrimitive()) {
@@ -250,9 +241,7 @@ public class ParquetSchemaConverter {
typeInfo = ObjectArrayTypeInfo.getInfoFor(
convertParquetTypeToTypeInfo(internalType));
} else {
-
throw new UnsupportedOperationException(
-
String.format("Unrecgonized List schema [%s] according to Parquet"
-
+ " standard", parquetGroupType.toString()));
+
typeInfo = convertGroupElementToArrayTypeInfo(parquetGroupType, tupleGroup);
}
}
}
@@ -306,6 +295,18 @@ public class ParquetSchemaConverter {
return typeInfo;
}
+ private static ObjectArrayTypeInfo
convertGroupElementToArrayTypeInfo(GroupType arrayFieldType, GroupType
elementType) {
+ for (Type type : elementType.getFields()) {
+ if (!type.isRepetition(Type.Repetition.REQUIRED)) {
+ throw new UnsupportedOperationException(
+ String.format("List field [%s] in List
[%s] has to be required. ",
+ type.toString(),
arrayFieldType.getName()));
+ }
+ }
+ return ObjectArrayTypeInfo.getInfoFor(
+ convertParquetTypeToTypeInfo(elementType));
+ }
+
private static TypeInformation<?>
convertParquetPrimitiveListToFlinkArray(Type type) {
// Backward-compatibility element group doesn't exist also
allowed
TypeInformation<?> flinkType =
convertParquetTypeToTypeInfo(type);
@@ -374,17 +375,33 @@ public class ParquetSchemaConverter {
GroupType componentGroup = (GroupType)
convertField(LIST_ELEMENT, objectArrayTypeInfo.getComponentInfo(),
Type.Repetition.REQUIRED, legacyMode);
- GroupType elementGroup =
Types.repeatedGroup().named(LIST_ELEMENT);
- elementGroup =
elementGroup.withNewFields(componentGroup.getFields());
- fieldType = Types.buildGroup(repetition)
- .addField(elementGroup)
- .as(OriginalType.LIST)
- .named(fieldName);
+ if (legacyMode) {
+ // LegacyMode is 2 Level List schema
+ fieldType = Types.buildGroup(repetition)
+ .addField(componentGroup)
+ .as(OriginalType.LIST)
+ .named(fieldName);
+ } else {
+ // Add extra layer of Group according to
Parquet's standard
+ Type listGroup = Types.repeatedGroup()
+
.addField(componentGroup).named(LIST_GROUP_NAME);
+ fieldType = Types.buildGroup(repetition)
+ .addField(listGroup)
+ .as(OriginalType.LIST)
+ .named(fieldName);
+ }
} else if (typeInfo instanceof BasicArrayTypeInfo) {
BasicArrayTypeInfo basicArrayType =
(BasicArrayTypeInfo) typeInfo;
+ // LegacyMode is 2 Level List schema
if (legacyMode) {
-
+ PrimitiveType primitiveTyp =
+ convertField(fieldName,
basicArrayType.getComponentInfo(),
+ Type.Repetition.REQUIRED,
legacyMode).asPrimitiveType();
+ fieldType = Types.buildGroup(repetition)
+ .addField(primitiveTyp)
+ .as(OriginalType.LIST).named(fieldName);
+ } else {
// Add extra layer of Group according to
Parquet's standard
Type listGroup = Types.repeatedGroup().addField(
convertField(LIST_ELEMENT,
basicArrayType.getComponentInfo(),
@@ -393,15 +410,6 @@ public class ParquetSchemaConverter {
fieldType = Types.buildGroup(repetition)
.addField(listGroup)
.as(OriginalType.LIST).named(fieldName);
- } else {
- PrimitiveType primitiveTyp =
- convertField(fieldName,
basicArrayType.getComponentInfo(),
- Type.Repetition.REQUIRED,
legacyMode).asPrimitiveType();
- fieldType = Types.buildGroup(repetition)
-
.repeated(primitiveTyp.getPrimitiveTypeName())
- .as(primitiveTyp.getOriginalType())
- .named(LIST_ARRAY_TYPE)
- .as(OriginalType.LIST).named(fieldName);
}
} else if (typeInfo instanceof SqlTimeTypeInfo) {
if (typeInfo.equals(SqlTimeTypeInfo.DATE)) {
diff --git
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java
index 3452278..1aa0118 100644
---
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java
+++
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java
@@ -48,6 +48,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+
/**
* Extends from {@link GroupConverter} to convert an nested Parquet Record
into Row.
*/
@@ -90,6 +92,7 @@ public class RowConverter extends GroupConverter implements
ParentDataHolder {
} else if (typeInformation instanceof BasicArrayTypeInfo) {
Type elementType =
field.asGroupType().getFields().get(0);
Class typeClass = ((BasicArrayTypeInfo)
typeInformation).getComponentInfo().getTypeClass();
+
if (typeClass.equals(Character.class)) {
return new
RowConverter.ArrayConverter<Character>(elementType,
Character.class,
BasicTypeInfo.CHAR_TYPE_INFO, parentDataHolder, fieldPos);
@@ -131,7 +134,6 @@ public class RowConverter extends GroupConverter implements
ParentDataHolder {
} else if (typeInformation instanceof ObjectArrayTypeInfo) {
GroupType parquetGroupType = field.asGroupType();
Type elementType = parquetGroupType.getType(0);
-
return new
RowConverter.ArrayConverter<Row>(elementType, Row.class,
((ObjectArrayTypeInfo)
typeInformation).getComponentInfo(), parentDataHolder, fieldPos);
} else if (typeInformation instanceof RowTypeInfo) {
@@ -302,11 +304,13 @@ public class RowConverter extends GroupConverter
implements ParentDataHolder {
this.elementClass = elementClass;
this.parentDataHolder = parentDataHolder;
this.pos = pos;
-
- if (elementClass.equals(Row.class)) {
- this.elementConverter =
createConverter(elementType, 0, elementTypeInfo, this);
+ if (isElementType(elementType, elementTypeInfo)) {
+ elementConverter =
createArrayElementConverter(elementType, elementClass,
+ elementTypeInfo, this);
} else {
- this.elementConverter = new
RowConverter.RowPrimitiveConverter(elementType, this, 0);
+ GroupType groupType = elementType.asGroupType();
+ elementConverter = new
ArrayElementConverter(groupType, elementClass,
+ elementTypeInfo, this);
}
}
@@ -329,6 +333,46 @@ public class RowConverter extends GroupConverter
implements ParentDataHolder {
public void add(int fieldIndex, Object object) {
list.add((T) object);
}
+
+ /**
+ * Converter for list elements.
+ *
+ * <pre>
+ * optional group the_list (LIST) {
+ * repeated group array { <-- this layer
+ * optional (type) element;
+ * }
+ * }
+ * </pre>
+ */
+ static class ArrayElementConverter extends GroupConverter {
+ private final Converter elementConverter;
+
+ public ArrayElementConverter(
+ GroupType repeatedTye,
+ Class elementClass,
+ TypeInformation elementTypeInfo,
+ ParentDataHolder holder) {
+ Type elementType = repeatedTye.getType(0);
+ this.elementConverter =
createArrayElementConverter(elementType, elementClass,
+ elementTypeInfo, holder);
+ }
+
+ @Override
+ public Converter getConverter(int i) {
+ return elementConverter;
+ }
+
+ @Override
+ public void start() {
+
+ }
+
+ @Override
+ public void end() {
+
+ }
+ }
}
static class MapConverter extends GroupConverter {
@@ -395,4 +439,38 @@ public class RowConverter extends GroupConverter
implements ParentDataHolder {
}
}
}
+
+ static Converter createArrayElementConverter(
+ Type elementType,
+ Class elementClass,
+ TypeInformation elementTypeInfo,
+ ParentDataHolder holder) {
+ if (elementClass.equals(Row.class)) {
+ return createConverter(elementType, 0, elementTypeInfo,
holder);
+ } else {
+ return new
RowConverter.RowPrimitiveConverter(elementType, holder, 0);
+ }
+ }
+
+ /**
+ * Returns whether the given type is the element type of a list or is a
+ * synthetic group with one field that is the element type. This is
+ * determined by checking whether the type can be a synthetic group and
by
+ * checking whether a potential synthetic group matches the expected
schema.
+ *
+ * @param repeatedType a type that may be the element type
+ * @param typeInformation the expected flink type for list elements
+ * @return {@code true} if the repeatedType is the element schema
+ */
+ static boolean isElementType(Type repeatedType, TypeInformation
typeInformation) {
+ if (repeatedType.isPrimitive() ||
+ repeatedType.asGroupType().getFieldCount() > 1 ||
+
repeatedType.asGroupType().getType(0).isRepetition(REPEATED)) {
+ // The repeated type must be the element type because
it is an invalid
+ // synthetic wrapper. Must be a group with one optional
or required field
+ return true;
+ }
+
+ return false;
+ }
}
diff --git
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java
index f36b12c..66deac7 100644
---
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java
+++
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java
@@ -30,6 +30,8 @@ import org.apache.parquet.schema.MessageType;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Collections;
@@ -43,21 +45,27 @@ import static org.junit.Assert.assertNotNull;
/**
* Test cases for reading Map from Parquet files.
*/
-public class ParquetMapInputFormatTest {
+@RunWith(Parameterized.class)
+public class ParquetMapInputFormatTest extends TestUtil {
private static final AvroSchemaConverter SCHEMA_CONVERTER = new
AvroSchemaConverter();
@ClassRule
public static TemporaryFolder tempRoot = new TemporaryFolder();
+ public ParquetMapInputFormatTest(boolean useLegacyMode) {
+ super(useLegacyMode);
+ }
+
@Test
@SuppressWarnings("unchecked")
public void testReadMapFromNestedRecord() throws IOException {
Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row>
nested = TestUtil.getNestedRecordTestData();
- Path path = TestUtil.createTempParquetFile(tempRoot.getRoot(),
TestUtil.NESTED_SCHEMA, Collections.singletonList(nested.f1));
- MessageType nestedType =
SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
+ Path path = createTempParquetFile(tempRoot.getRoot(),
NESTED_SCHEMA,
+ Collections.singletonList(nested.f1),
getConfiguration());
+ MessageType nestedType =
getSchemaConverter().convert(NESTED_SCHEMA);
ParquetMapInputFormat inputFormat = new
ParquetMapInputFormat(path, nestedType);
- inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
+ inputFormat.setRuntimeContext(getMockRuntimeContext());
FileInputSplit[] splits = inputFormat.createInputSplits(1);
assertEquals(1, splits.length);
@@ -84,12 +92,13 @@ public class ParquetMapInputFormatTest {
@SuppressWarnings("unchecked")
public void testProjectedReadMapFromNestedRecord() throws IOException {
Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row>
nested = TestUtil.getNestedRecordTestData();
- Path path = TestUtil.createTempParquetFile(tempRoot.getRoot(),
TestUtil.NESTED_SCHEMA, Collections.singletonList(nested.f1));
- MessageType nestedType =
SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
+ Path path = createTempParquetFile(tempRoot.getRoot(),
NESTED_SCHEMA,
+ Collections.singletonList(nested.f1),
getConfiguration());
+ MessageType nestedType =
getSchemaConverter().convert(NESTED_SCHEMA);
ParquetMapInputFormat inputFormat = new
ParquetMapInputFormat(path, nestedType);
inputFormat.selectFields(Collections.singletonList("nestedMap").toArray(new
String[0]));
- inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
+ inputFormat.setRuntimeContext(getMockRuntimeContext());
FileInputSplit[] splits = inputFormat.createInputSplits(1);
assertEquals(1, splits.length);
diff --git
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetPojoInputFormatTest.java
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetPojoInputFormatTest.java
index df5f25b..29ffcad 100644
---
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetPojoInputFormatTest.java
+++
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetPojoInputFormatTest.java
@@ -33,6 +33,8 @@ import org.apache.parquet.schema.MessageType;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Collections;
@@ -43,17 +45,23 @@ import static org.junit.Assert.assertEquals;
/**
* Test cases for reading Pojo from Parquet files.
*/
-public class ParquetPojoInputFormatTest {
+@RunWith(Parameterized.class)
+public class ParquetPojoInputFormatTest extends TestUtil {
private static final AvroSchemaConverter SCHEMA_CONVERTER = new
AvroSchemaConverter();
@ClassRule
public static TemporaryFolder tempRoot = new TemporaryFolder();
+ public ParquetPojoInputFormatTest(boolean useLegacyMode) {
+ super(useLegacyMode);
+ }
+
@Test
public void testReadPojoFromSimpleRecord() throws IOException {
- Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row>
simple = TestUtil.getSimpleRecordTestData();
- MessageType messageType =
SCHEMA_CONVERTER.convert(TestUtil.SIMPLE_SCHEMA);
- Path path = TestUtil.createTempParquetFile(tempRoot.getRoot(),
TestUtil.SIMPLE_SCHEMA, Collections.singletonList(simple.f1));
+ Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row>
simple = getSimpleRecordTestData();
+ MessageType messageType =
getSchemaConverter().convert(SIMPLE_SCHEMA);
+ Path path = createTempParquetFile(tempRoot.getRoot(),
SIMPLE_SCHEMA,
+ Collections.singletonList(simple.f1),
getConfiguration());
ParquetPojoInputFormat<PojoSimpleRecord> inputFormat = new
ParquetPojoInputFormat<>(
path, messageType, (PojoTypeInfo<PojoSimpleRecord>)
Types.POJO(PojoSimpleRecord.class));
@@ -72,12 +80,13 @@ public class ParquetPojoInputFormatTest {
@Test
public void testProjectedReadPojoFromSimpleRecord() throws IOException,
NoSuchFieldError {
Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row>
simple = TestUtil.getSimpleRecordTestData();
- MessageType messageType =
SCHEMA_CONVERTER.convert(TestUtil.SIMPLE_SCHEMA);
- Path path = TestUtil.createTempParquetFile(tempRoot.getRoot(),
TestUtil.SIMPLE_SCHEMA, Collections.singletonList(simple.f1));
+ MessageType messageType =
getSchemaConverter().convert(SIMPLE_SCHEMA);
+ Path path = createTempParquetFile(tempRoot.getRoot(),
SIMPLE_SCHEMA,
+ Collections.singletonList(simple.f1),
getConfiguration());
ParquetPojoInputFormat<PojoSimpleRecord> inputFormat = new
ParquetPojoInputFormat<>(
path, messageType, (PojoTypeInfo<PojoSimpleRecord>)
Types.POJO(PojoSimpleRecord.class));
- inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
+ inputFormat.setRuntimeContext(getMockRuntimeContext());
FileInputSplit[] splits = inputFormat.createInputSplits(1);
assertEquals(1, splits.length);
diff --git
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetRowInputFormatTest.java
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetRowInputFormatTest.java
index c2543cd..08514b5 100644
---
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetRowInputFormatTest.java
+++
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetRowInputFormatTest.java
@@ -29,11 +29,12 @@ import org.apache.flink.util.InstantiationUtil;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificRecord;
-import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.MessageType;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
@@ -51,21 +52,25 @@ import static org.junit.Assert.assertTrue;
/**
* Simple test case for reading {@link org.apache.flink.types.Row} from
Parquet files.
*/
-public class ParquetRowInputFormatTest {
- private static final AvroSchemaConverter SCHEMA_CONVERTER = new
AvroSchemaConverter();
+@RunWith(Parameterized.class)
+public class ParquetRowInputFormatTest extends TestUtil {
@ClassRule
public static TemporaryFolder tempRoot = new TemporaryFolder();
+ public ParquetRowInputFormatTest(boolean useLegacyMode) {
+ super(useLegacyMode);
+ }
+
@Test
public void testReadRowFromSimpleRecord() throws IOException {
- Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row>
simple = TestUtil.getSimpleRecordTestData();
- Path path = TestUtil.createTempParquetFile(
- tempRoot.getRoot(), TestUtil.SIMPLE_SCHEMA,
Arrays.asList(simple.f1, simple.f1));
- MessageType simpleType =
SCHEMA_CONVERTER.convert(TestUtil.SIMPLE_SCHEMA);
+ Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row>
simple = getSimpleRecordTestData();
+ Path path = createTempParquetFile(
+ tempRoot.getRoot(), SIMPLE_SCHEMA,
Arrays.asList(simple.f1, simple.f1), getConfiguration());
+ MessageType simpleType =
getSchemaConverter().convert(SIMPLE_SCHEMA);
ParquetRowInputFormat inputFormat = new
ParquetRowInputFormat(path, simpleType);
- inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
+ inputFormat.setRuntimeContext(getMockRuntimeContext());
FileInputSplit[] splits = inputFormat.createInputSplits(1);
assertEquals(1, splits.length);
@@ -95,13 +100,16 @@ public class ParquetRowInputFormatTest {
File tempFolder = tempRoot.newFolder();
// created a parquet file with 10 row groups. Each row group
has 100 records
- TestUtil.createTempParquetFile(tempFolder,
TestUtil.SIMPLE_SCHEMA, records);
- TestUtil.createTempParquetFile(tempFolder,
TestUtil.SIMPLE_SCHEMA, records);
- TestUtil.createTempParquetFile(tempFolder,
TestUtil.SIMPLE_SCHEMA, records);
- MessageType simpleType =
SCHEMA_CONVERTER.convert(TestUtil.SIMPLE_SCHEMA);
+ createTempParquetFile(tempFolder, SIMPLE_SCHEMA,
+ records, getConfiguration());
+ createTempParquetFile(tempFolder, SIMPLE_SCHEMA,
+ records, getConfiguration());
+ createTempParquetFile(tempFolder, SIMPLE_SCHEMA,
+ records, getConfiguration());
+ MessageType simpleType =
getSchemaConverter().convert(SIMPLE_SCHEMA);
ParquetRowInputFormat inputFormat = new
ParquetRowInputFormat(new Path(tempFolder.getPath()), simpleType);
- inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
+ inputFormat.setRuntimeContext(getMockRuntimeContext());
FileInputSplit[] splits = inputFormat.createInputSplits(3);
assertEquals(3, splits.length);
@@ -133,11 +141,12 @@ public class ParquetRowInputFormatTest {
}
// created a parquet file with 10 row groups. Each row group
has 100 records
- Path path =
TestUtil.createTempParquetFile(tempRoot.newFolder(), TestUtil.SIMPLE_SCHEMA,
records);
- MessageType simpleType =
SCHEMA_CONVERTER.convert(TestUtil.SIMPLE_SCHEMA);
+ Path path = createTempParquetFile(tempRoot.newFolder(),
SIMPLE_SCHEMA,
+ records, getConfiguration());
+ MessageType simpleType =
getSchemaConverter().convert(SIMPLE_SCHEMA);
ParquetRowInputFormat inputFormat = new
ParquetRowInputFormat(path, simpleType);
- inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
+ inputFormat.setRuntimeContext(getMockRuntimeContext());
FileInputSplit[] splits = inputFormat.createInputSplits(1);
assertEquals(1, splits.length);
@@ -240,11 +249,12 @@ public class ParquetRowInputFormatTest {
}
// created a parquet file with 10 row groups. Each row group
has 100 records
- Path path =
TestUtil.createTempParquetFile(tempRoot.newFolder(), TestUtil.SIMPLE_SCHEMA,
records);
- MessageType simpleType =
SCHEMA_CONVERTER.convert(TestUtil.SIMPLE_SCHEMA);
+ Path path = createTempParquetFile(tempRoot.newFolder(),
SIMPLE_SCHEMA,
+ records, getConfiguration());
+ MessageType simpleType =
getSchemaConverter().convert(SIMPLE_SCHEMA);
ParquetRowInputFormat inputFormat = new
ParquetRowInputFormat(path, simpleType);
- inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
+ inputFormat.setRuntimeContext(getMockRuntimeContext());
FileInputSplit[] splits = inputFormat.createInputSplits(1);
assertEquals(1, splits.length);
@@ -340,12 +350,13 @@ public class ParquetRowInputFormatTest {
@Test
public void testReadRowFromNestedRecord() throws IOException {
- Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row>
nested = TestUtil.getNestedRecordTestData();
- Path path =
TestUtil.createTempParquetFile(tempRoot.newFolder(), TestUtil.NESTED_SCHEMA,
Collections.singletonList(nested.f1));
- MessageType nestedType =
SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
+ Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row>
nested = getNestedRecordTestData();
+ Path path = createTempParquetFile(tempRoot.newFolder(),
NESTED_SCHEMA,
+ Collections.singletonList(nested.f1),
getConfiguration());
+ MessageType nestedType =
getSchemaConverter().convert(NESTED_SCHEMA);
ParquetRowInputFormat inputFormat = new
ParquetRowInputFormat(path, nestedType);
- inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
+ inputFormat.setRuntimeContext(getMockRuntimeContext());
FileInputSplit[] splits = inputFormat.createInputSplits(1);
assertEquals(1, splits.length);
@@ -365,12 +376,13 @@ public class ParquetRowInputFormatTest {
@Test
public void testProjectedRowFromNestedRecord() throws Exception {
- Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row>
nested = TestUtil.getNestedRecordTestData();
- Path path =
TestUtil.createTempParquetFile(tempRoot.newFolder(), TestUtil.NESTED_SCHEMA,
Collections.singletonList(nested.f1));
- MessageType nestedType =
SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
+ Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row>
nested = getNestedRecordTestData();
+ Path path = createTempParquetFile(tempRoot.newFolder(),
NESTED_SCHEMA,
+ Collections.singletonList(nested.f1),
getConfiguration());
+ MessageType nestedType =
getSchemaConverter().convert(NESTED_SCHEMA);
ParquetRowInputFormat inputFormat = new
ParquetRowInputFormat(path, nestedType);
- inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
+ inputFormat.setRuntimeContext(getMockRuntimeContext());
inputFormat.selectFields(new String[]{"bar", "nestedMap"});
@@ -387,27 +399,29 @@ public class ParquetRowInputFormatTest {
@Test(expected = IllegalArgumentException.class)
public void testInvalidProjectionOfNestedRecord() throws Exception {
- Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row>
nested = TestUtil.getNestedRecordTestData();
- Path path =
TestUtil.createTempParquetFile(tempRoot.newFolder(), TestUtil.NESTED_SCHEMA,
Collections.singletonList(nested.f1));
- MessageType nestedType =
SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
+ Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row>
nested = getNestedRecordTestData();
+ Path path = createTempParquetFile(tempRoot.newFolder(),
NESTED_SCHEMA,
+ Collections.singletonList(nested.f1),
getConfiguration());
+ MessageType nestedType =
getSchemaConverter().convert(NESTED_SCHEMA);
ParquetRowInputFormat inputFormat = new
ParquetRowInputFormat(path, nestedType);
- inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
+ inputFormat.setRuntimeContext(getMockRuntimeContext());
inputFormat.selectFields(new String[]{"bar", "celona"});
}
@Test
public void testSerialization() throws Exception {
- Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row>
simple = TestUtil.getSimpleRecordTestData();
- Path path =
TestUtil.createTempParquetFile(tempRoot.newFolder(), TestUtil.SIMPLE_SCHEMA,
Collections.singletonList(simple.f1));
- MessageType simpleType =
SCHEMA_CONVERTER.convert(TestUtil.SIMPLE_SCHEMA);
+ Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row>
simple = getSimpleRecordTestData();
+ Path path = createTempParquetFile(tempRoot.newFolder(),
SIMPLE_SCHEMA,
+ Collections.singletonList(simple.f1),
getConfiguration());
+ MessageType simpleType =
getSchemaConverter().convert(SIMPLE_SCHEMA);
ParquetRowInputFormat inputFormat = new
ParquetRowInputFormat(path, simpleType);
byte[] bytes = InstantiationUtil.serializeObject(inputFormat);
ParquetRowInputFormat copy =
InstantiationUtil.deserializeObject(bytes, getClass().getClassLoader());
- copy.setRuntimeContext(TestUtil.getMockRuntimeContext());
+ copy.setRuntimeContext(getMockRuntimeContext());
FileInputSplit[] splits = copy.createInputSplits(1);
assertEquals(1, splits.length);
@@ -417,5 +431,4 @@ public class ParquetRowInputFormatTest {
assertNotNull(row);
assertEquals(simple.f2, row);
}
-
}
diff --git
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java
index 8e7c43b..463eba6 100644
---
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java
+++
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.Row;
import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.MessageType;
import org.junit.BeforeClass;
@@ -111,7 +112,7 @@ public class ParquetTableSourceITCase extends
MultipleProgramsTestBase {
*/
private static Path createTestParquetFile(int numberOfRows) throws
Exception {
List<IndexedRecord> records =
TestUtil.createRecordList(numberOfRows);
- Path path = TestUtil.createTempParquetFile(tempRoot.getRoot(),
TestUtil.NESTED_SCHEMA, records);
+ Path path = TestUtil.createTempParquetFile(tempRoot.getRoot(),
TestUtil.NESTED_SCHEMA, records, new Configuration());
return path;
}
}
diff --git
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceTest.java
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceTest.java
index 1e6ebf8..a0274fe 100644
---
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceTest.java
+++
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceTest.java
@@ -40,6 +40,7 @@ import
org.apache.flink.table.expressions.PlannerResolvedFieldReference;
import org.apache.flink.types.Row;
import org.apache.avro.specific.SpecificRecord;
+import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.apache.parquet.filter2.predicate.FilterPredicate;
@@ -64,7 +65,7 @@ import static org.junit.Assert.assertTrue;
/**
* Test cases for {@link ParquetTableSource}.
*/
-public class ParquetTableSourceTest extends TestUtil {
+public class ParquetTableSourceTest {
private static final AvroSchemaConverter SCHEMA_CONVERTER = new
AvroSchemaConverter();
private static Path testPath;
@@ -88,7 +89,7 @@ public class ParquetTableSourceTest extends TestUtil {
assertNotNull(returnType);
assertTrue(returnType instanceof RowTypeInfo);
RowTypeInfo rowType = (RowTypeInfo) returnType;
- assertEquals(NESTED_ROW_TYPE, rowType);
+ assertEquals(TestUtil.NESTED_ROW_TYPE, rowType);
}
@Test
@@ -102,7 +103,7 @@ public class ParquetTableSourceTest extends TestUtil {
TableSchema schema = parquetTableSource.getTableSchema();
assertNotNull(schema);
- RowTypeInfo expectedSchema = (RowTypeInfo) NESTED_ROW_TYPE;
+ RowTypeInfo expectedSchema = (RowTypeInfo)
TestUtil.NESTED_ROW_TYPE;
assertArrayEquals(expectedSchema.getFieldNames(),
schema.getFieldNames());
assertArrayEquals(expectedSchema.getFieldTypes(),
schema.getFieldTypes());
}
@@ -121,8 +122,8 @@ public class ParquetTableSourceTest extends TestUtil {
// ensure that table source description differs
assertNotEquals(parquetTableSource.explainSource(),
projected.explainSource());
- String[] fieldNames = ((RowTypeInfo)
NESTED_ROW_TYPE).getFieldNames();
- TypeInformation[] fieldTypes = ((RowTypeInfo)
NESTED_ROW_TYPE).getFieldTypes();
+ String[] fieldNames = ((RowTypeInfo)
TestUtil.NESTED_ROW_TYPE).getFieldNames();
+ TypeInformation[] fieldTypes = ((RowTypeInfo)
TestUtil.NESTED_ROW_TYPE).getFieldTypes();
assertEquals(
Types.ROW_NAMED(
new String[] {fieldNames[2], fieldNames[4],
fieldNames[6]},
@@ -186,7 +187,7 @@ public class ParquetTableSourceTest extends TestUtil {
assertEquals(parquetTableSource.getTableSchema(),
filtered.getTableSchema());
// ensure return type is identical
- assertEquals(NESTED_ROW_TYPE, filtered.getReturnType());
+ assertEquals(TestUtil.NESTED_ROW_TYPE,
filtered.getReturnType());
// ensure source description is not the same
assertNotEquals(parquetTableSource.explainSource(),
filtered.explainSource());
@@ -217,14 +218,14 @@ public class ParquetTableSourceTest extends TestUtil {
}
private static Path createTestParquetFile() throws Exception {
- Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row>
nested = getNestedRecordTestData();
- Path path = createTempParquetFile(tempRoot.getRoot(),
NESTED_SCHEMA,
- Collections.singletonList(nested.f1));
+ Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row>
nested = TestUtil.getNestedRecordTestData();
+ Path path = TestUtil.createTempParquetFile(tempRoot.getRoot(),
TestUtil.NESTED_SCHEMA,
+ Collections.singletonList(nested.f1), new
Configuration());
return path;
}
private ParquetTableSource createNestedTestParquetTableSource(Path
path) throws Exception {
- MessageType nestedSchema =
SCHEMA_CONVERTER.convert(NESTED_SCHEMA);
+ MessageType nestedSchema =
SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
ParquetTableSource parquetTableSource =
ParquetTableSource.builder()
.path(path.getPath())
.forParquetSchema(nestedSchema)
diff --git
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java
index 6c79605..f957125 100644
---
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java
+++
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java
@@ -32,12 +32,13 @@ import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.ParquetReadOptions;
-import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.schema.MessageType;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.ArrayList;
@@ -54,10 +55,15 @@ import static org.junit.Assert.assertTrue;
/**
* Simple test case for reading parquet records.
*/
+@RunWith(Parameterized.class)
public class ParquetRecordReaderTest extends TestUtil {
private final Configuration testConfig = new Configuration();
+ public ParquetRecordReaderTest(boolean useLegacyMode) {
+ super(useLegacyMode);
+ }
+
@Test
public void testReadSimpleGroup() throws IOException {
Long[] array = {1L};
@@ -66,8 +72,9 @@ public class ParquetRecordReaderTest extends TestUtil {
.set("foo", 32L)
.set("arr", array).build();
- Path path = createTempParquetFile(tempRoot.getRoot(),
SIMPLE_SCHEMA, Collections.singletonList(record));
- MessageType readSchema = (new
AvroSchemaConverter()).convert(SIMPLE_SCHEMA);
+ Path path = createTempParquetFile(tempRoot.getRoot(),
SIMPLE_SCHEMA,
+ Collections.singletonList(record), getConfiguration());
+ MessageType readSchema =
getSchemaConverter().convert(SIMPLE_SCHEMA);
ParquetRecordReader<Row> rowReader = new
ParquetRecordReader<>(new RowReadSupport(), readSchema);
InputFile inputFile =
@@ -99,8 +106,9 @@ public class ParquetRecordReaderTest extends TestUtil {
records.add(record);
}
- Path path = createTempParquetFile(tempRoot.getRoot(),
SIMPLE_SCHEMA, records);
- MessageType readSchema = (new
AvroSchemaConverter()).convert(SIMPLE_SCHEMA);
+ Path path = createTempParquetFile(tempRoot.getRoot(),
+ SIMPLE_SCHEMA, records, getConfiguration());
+ MessageType readSchema =
getSchemaConverter().convert(SIMPLE_SCHEMA);
ParquetRecordReader<Row> rowReader = new
ParquetRecordReader<>(new RowReadSupport(), readSchema);
InputFile inputFile =
@@ -134,8 +142,9 @@ public class ParquetRecordReaderTest extends TestUtil {
.set("bar", barRecord)
.build();
- Path path = createTempParquetFile(tempRoot.getRoot(),
NESTED_SCHEMA, Collections.singletonList(record));
- MessageType readSchema = (new
AvroSchemaConverter()).convert(NESTED_SCHEMA);
+ Path path = createTempParquetFile(tempRoot.getRoot(),
NESTED_SCHEMA,
+ Collections.singletonList(record), getConfiguration());
+ MessageType readSchema =
getSchemaConverter().convert(NESTED_SCHEMA);
ParquetRecordReader<Row> rowReader = new
ParquetRecordReader<>(new RowReadSupport(), readSchema);
InputFile inputFile =
@@ -165,8 +174,9 @@ public class ParquetRecordReaderTest extends TestUtil {
.set("spamMap", map.build())
.build();
- Path path = createTempParquetFile(tempRoot.getRoot(),
NESTED_SCHEMA, Collections.singletonList(record));
- MessageType readSchema = (new
AvroSchemaConverter()).convert(NESTED_SCHEMA);
+ Path path = createTempParquetFile(tempRoot.getRoot(),
NESTED_SCHEMA,
+ Collections.singletonList(record), getConfiguration());
+ MessageType readSchema =
getSchemaConverter().convert(NESTED_SCHEMA);
ParquetRecordReader<Row> rowReader = new
ParquetRecordReader<>(new RowReadSupport(), readSchema);
InputFile inputFile =
@@ -207,8 +217,9 @@ public class ParquetRecordReaderTest extends TestUtil {
.set("strArray", arrayString)
.build();
- Path path = createTempParquetFile(tempRoot.getRoot(),
NESTED_SCHEMA, Collections.singletonList(record));
- MessageType readSchema = (new
AvroSchemaConverter()).convert(NESTED_SCHEMA);
+ Path path = createTempParquetFile(tempRoot.getRoot(),
NESTED_SCHEMA,
+ Collections.singletonList(record), getConfiguration());
+ MessageType readSchema =
getSchemaConverter().convert(NESTED_SCHEMA);
ParquetRecordReader<Row> rowReader = new
ParquetRecordReader<>(new RowReadSupport(), readSchema);
InputFile inputFile =
@@ -248,8 +259,9 @@ public class ParquetRecordReaderTest extends TestUtil {
.set("nestedMap", map.build())
.set("foo", 34L).build();
- Path path = createTempParquetFile(tempRoot.getRoot(),
NESTED_SCHEMA, Collections.singletonList(record));
- MessageType readSchema = (new
AvroSchemaConverter()).convert(NESTED_SCHEMA);
+ Path path = createTempParquetFile(tempRoot.getRoot(),
NESTED_SCHEMA,
+ Collections.singletonList(record), getConfiguration());
+ MessageType readSchema =
getSchemaConverter().convert(NESTED_SCHEMA);
ParquetRecordReader<Row> rowReader = new
ParquetRecordReader<>(new RowReadSupport(), readSchema);
InputFile inputFile =
@@ -288,8 +300,9 @@ public class ParquetRecordReaderTest extends TestUtil {
.set("nestedArray", list.build())
.set("foo", 34L).build();
- Path path = createTempParquetFile(tempRoot.getRoot(),
NESTED_SCHEMA, Collections.singletonList(record));
- MessageType readSchema = (new
AvroSchemaConverter()).convert(NESTED_SCHEMA);
+ Path path = createTempParquetFile(tempRoot.getRoot(),
NESTED_SCHEMA,
+ Collections.singletonList(record), getConfiguration());
+ MessageType readSchema =
getSchemaConverter().convert(NESTED_SCHEMA);
ParquetRecordReader<Row> rowReader = new
ParquetRecordReader<>(new RowReadSupport(), readSchema);
InputFile inputFile =
diff --git
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java
index 10db6d2..05dd209 100644
---
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java
+++
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java
@@ -33,7 +33,7 @@ import static org.junit.Assert.assertEquals;
/**
* Simple test case for conversion between Parquet schema and Flink date types.
*/
-public class ParquetSchemaConverterTest extends TestUtil {
+public class ParquetSchemaConverterTest {
private final Type[] simpleStandardTypes = {
org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.OPTIONAL)
@@ -73,11 +73,12 @@ public class ParquetSchemaConverterTest extends TestUtil {
.named("value"))
.named("nestedMap"),
org.apache.parquet.schema.Types.optionalGroup().addField(org.apache.parquet.schema.Types.repeatedGroup()
-
.addField(org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
Type.Repetition.REQUIRED)
- .as(OriginalType.UTF8).named("type"))
-
.addField(org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.REQUIRED)
- .as(OriginalType.INT_64).named("value"))
- .named("element")).as(OriginalType.LIST)
+
.addField(org.apache.parquet.schema.Types.requiredGroup()
+
.addField(org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
Type.Repetition.REQUIRED)
+ .as(OriginalType.UTF8).named("type"))
+
.addField(org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.INT64,
Type.Repetition.REQUIRED)
+ .as(OriginalType.INT_64).named("value"))
+
.named("element")).named("list")).as(OriginalType.LIST)
.named("nestedArray")
};
@@ -85,25 +86,25 @@ public class ParquetSchemaConverterTest extends TestUtil {
public void testSimpleSchemaConversion() {
MessageType simpleType = new MessageType("simple",
simpleStandardTypes);
RowTypeInfo rowTypeInfo = (RowTypeInfo)
ParquetSchemaConverter.fromParquetType(simpleType);
- assertEquals(SIMPLE_ROW_TYPE, rowTypeInfo);
+ assertEquals(TestUtil.SIMPLE_ROW_TYPE, rowTypeInfo);
}
@Test
public void testNestedSchemaConversion() {
MessageType nestedTypes = new MessageType("nested",
this.nestedTypes);
RowTypeInfo rowTypeInfo = (RowTypeInfo)
ParquetSchemaConverter.fromParquetType(nestedTypes);
- assertEquals(NESTED_ROW_TYPE, rowTypeInfo);
+ assertEquals(TestUtil.NESTED_ROW_TYPE, rowTypeInfo);
}
@Test
public void testSimpleRowTypeConversion() {
- MessageType simpleSchema =
ParquetSchemaConverter.toParquetType(SIMPLE_ROW_TYPE, true);
+ MessageType simpleSchema =
ParquetSchemaConverter.toParquetType(TestUtil.SIMPLE_ROW_TYPE, false);
assertEquals(Arrays.asList(simpleStandardTypes),
simpleSchema.getFields());
}
@Test
public void testNestedRowTypeConversion() {
- MessageType nestedSchema =
ParquetSchemaConverter.toParquetType(NESTED_ROW_TYPE, true);
+ MessageType nestedSchema =
ParquetSchemaConverter.toParquetType(TestUtil.NESTED_ROW_TYPE, false);
assertEquals(Arrays.asList(nestedTypes),
nestedSchema.getFields());
}
}
diff --git
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
index 9220aaa..f31a0dc 100644
---
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
+++
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
@@ -36,10 +36,14 @@ import org.apache.flink.types.Row;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificRecord;
+import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetWriter;
+import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
+import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import java.io.File;
@@ -47,6 +51,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -56,6 +61,12 @@ import java.util.UUID;
* Utilities for testing schema conversion and test parquet file creation.
*/
public class TestUtil {
+ public static final Configuration OLD_BEHAVIOR_CONF = new
Configuration();
+ public static final Configuration NEW_BEHAVIOR_CONF = new
Configuration();
+ private static AvroSchemaConverter schemaConverter;
+ private static AvroSchemaConverter legacySchemaConverter;
+ protected boolean useLegacyMode;
+
private static final TypeInformation<Row[]> nestedArray =
Types.OBJECT_ARRAY(Types.ROW_NAMED(
new String[] {"type", "value"}, BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO));
@@ -83,10 +94,51 @@ public class TestUtil {
nestedMap,
nestedArray);
- public static Path createTempParquetFile(File folder, Schema schema,
List<IndexedRecord> records) throws IOException {
+ @BeforeClass
+ public static void setupNewBehaviorConfiguration() {
+ OLD_BEHAVIOR_CONF.setBoolean(
+ "parquet.avro.write-old-list-structure", true);
+ NEW_BEHAVIOR_CONF.setBoolean(
+ "parquet.avro.write-old-list-structure", false);
+ schemaConverter = new AvroSchemaConverter(NEW_BEHAVIOR_CONF);
+ legacySchemaConverter = new
AvroSchemaConverter(OLD_BEHAVIOR_CONF);
+ }
+
+ public TestUtil(boolean useLegacyMode) {
+ this.useLegacyMode = useLegacyMode;
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Boolean> primeNumbers() {
+ return Arrays.asList(new Boolean[] {true, false});
+ }
+
+ protected AvroSchemaConverter getSchemaConverter() {
+ if (useLegacyMode) {
+ return legacySchemaConverter;
+ }
+
+ return schemaConverter;
+ }
+
+ protected Configuration getConfiguration() {
+ if (useLegacyMode) {
+ return OLD_BEHAVIOR_CONF;
+ }
+
+ return NEW_BEHAVIOR_CONF;
+ }
+
+ public static Path createTempParquetFile(
+ File folder,
+ Schema schema,
+ List<IndexedRecord> records,
+ Configuration configuration) throws IOException {
Path path = new Path(folder.getPath(),
UUID.randomUUID().toString());
ParquetWriter<IndexedRecord> writer =
AvroParquetWriter.<IndexedRecord>builder(
- new
org.apache.hadoop.fs.Path(path.toUri())).withSchema(schema).withRowGroupSize(10).build();
+ new org.apache.hadoop.fs.Path(path.toUri()))
+ .withConf(configuration)
+ .withSchema(schema).withRowGroupSize(10).build();
for (IndexedRecord record : records) {
writer.write(record);