This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 00f585234f8 [FLINK-29005][parquet] Parquet row type reader should not
return null value when some child fields is null
00f585234f8 is described below
commit 00f585234f8db8fe1e2bfec5c6c323ca99d9b775
Author: Kai Chen <[email protected]>
AuthorDate: Wed Aug 31 12:09:19 2022 +0800
[FLINK-29005][parquet] Parquet row type reader should not return null value
when some child fields is null
This closes #20616
---
.../connectors/hive/HiveTableSourceITCase.java | 119 +++++++++++++++++++++
.../parquet/vector/reader/RowColumnReader.java | 15 ++-
.../data/columnar/vector/heap/HeapRowVector.java | 8 ++
3 files changed, 137 insertions(+), 5 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
index 99540c947c6..f3cb5e1e874 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
@@ -18,12 +18,18 @@
package org.apache.flink.connectors.hive;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.table.HiveVersionTestUtil;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
@@ -73,6 +79,8 @@ import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -963,6 +971,117 @@ public class HiveTableSourceITCase extends
BatchAbstractTestBase {
result.getJobClient().get().cancel();
}
+ @Test(timeout = 120000)
+ public void testReadParquetWithNullableComplexType() throws Exception {
+ final String catalogName = "hive";
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(3);
+ env.enableCheckpointing(100);
+ StreamTableEnvironment tEnv =
+ HiveTestUtils.createTableEnvInStreamingMode(env,
SqlDialect.HIVE);
+ tEnv.registerCatalog(catalogName, hiveCatalog);
+ tEnv.useCatalog(catalogName);
+
+ List<Row> rows = generateRows();
+ List<Row> expectedRows = generateExpectedRows(rows);
+ DataStream<Row> stream =
+ env.addSource(
+ new FiniteTestSource<>(rows),
+ new RowTypeInfo(
+ new TypeInformation[] {
+ Types.INT,
+ Types.STRING,
+ new RowTypeInfo(
+ new TypeInformation[] {
+ Types.STRING,
Types.INT, Types.INT
+ },
+ new String[] {"c1", "c2",
"c3"}),
+ new MapTypeInfo<>(Types.STRING,
Types.STRING),
+ Types.OBJECT_ARRAY(Types.STRING),
+ Types.STRING
+ },
+ new String[] {"a", "b", "c", "d", "e",
"f"}))
+ .filter((FilterFunction<Row>) value -> true)
+ .setParallelism(3); // to parallel tasks
+
+ tEnv.createTemporaryView("my_table", stream);
+ assertResults(executeAndGetResult(tEnv), expectedRows);
+ }
+
+ private static List<Row> generateRows() {
+ List<Row> rows = new ArrayList<>();
+ for (int i = 0; i < 10000; i++) {
+ Map<String, String> e = new HashMap<>();
+ e.put(i + "", i % 2 == 0 ? null : i + "");
+ String[] f = new String[2];
+ f[0] = i % 3 == 0 ? null : i + "";
+ f[1] = i % 3 == 2 ? null : i + "";
+ rows.add(
+ Row.of(
+ i,
+ String.valueOf(i % 10),
+ Row.of(
+ i % 2 == 0 ? null : String.valueOf(i % 10),
+ i % 3 == 0 ? null : i % 10,
+ i % 5 == 0 ? null : i % 10),
+ e,
+ f,
+ String.valueOf(i % 10)));
+ }
+ return rows;
+ }
+
+ private static List<Row> generateExpectedRows(List<Row> rows) {
+ List<Row> sortedRows = new ArrayList<>();
+ sortedRows.addAll(rows);
+ sortedRows.addAll(rows);
+ sortedRows.sort(Comparator.comparingInt(o -> (Integer) o.getField(0)));
+
+ List<Row> expectedRows = new ArrayList<>();
+ for (int i = 0; i < sortedRows.size(); i++) {
+ Row rowExpect = Row.copy(sortedRows.get(i));
+ Row nestedRow = (Row) rowExpect.getField(2);
+ if (nestedRow.getField(0) == null
+ && nestedRow.getField(1) == null
+ && nestedRow.getField(2) == null) {
+ rowExpect.setField(2, null);
+ }
+ expectedRows.add(rowExpect);
+ }
+ return expectedRows;
+ }
+
+ private static CloseableIterator<Row>
executeAndGetResult(StreamTableEnvironment tEnv)
+ throws Exception {
+ tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+ tEnv.executeSql(
+ "CREATE TABLE sink_table (a int, b string,"
+ + "c struct<c1:string, c2:int, c3:int>,"
+ + "d map<string, string>, e array<string>, f string "
+ + ") "
+ + " stored as parquet"
+ + " TBLPROPERTIES ("
+ +
"'sink.partition-commit.policy.kind'='metastore,success-file',"
+ + "'auto-compaction'='true',"
+ + "'compaction.file-size' = '128MB',"
+ + "'sink.rolling-policy.file-size' = '1b'"
+ + ")");
+ tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+ String sql =
+ "insert into sink_table /*+ OPTIONS('sink.parallelism' = '3')
*/"
+ + " select * from my_table";
+ tEnv.executeSql(sql).await();
+ return tEnv.executeSql("select * from sink_table").collect();
+ }
+
+ private static void assertResults(CloseableIterator<Row> iterator,
List<Row> expectedRows)
+ throws Exception {
+ List<Row> result = CollectionUtil.iteratorToList(iterator);
+ iterator.close();
+ result.sort(Comparator.comparingInt(o -> (Integer) o.getField(0)));
+ assertThat(result).isEqualTo(expectedRows);
+ }
+
private static TableEnvironment createTableEnv() {
TableEnvironment tableEnv =
HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
tableEnv.registerCatalog("hive", hiveCatalog);
diff --git
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/RowColumnReader.java
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/RowColumnReader.java
index 2abe8a85aec..7fec09e568e 100644
---
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/RowColumnReader.java
+++
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/RowColumnReader.java
@@ -36,15 +36,20 @@ public class RowColumnReader implements
ColumnReader<WritableColumnVector> {
public void readToVector(int readNumber, WritableColumnVector vector)
throws IOException {
HeapRowVector rowVector = (HeapRowVector) vector;
WritableColumnVector[] vectors = rowVector.getFields();
+ // row vector null array
+ boolean[] isNulls = new boolean[readNumber];
for (int i = 0; i < vectors.length; i++) {
fieldReaders.get(i).readToVector(readNumber, vectors[i]);
for (int j = 0; j < readNumber; j++) {
- boolean isNull =
- (i == 0)
- ? vectors[i].isNullAt(j)
- : rowVector.isNullAt(j) &&
vectors[i].isNullAt(j);
- if (isNull) {
+ if (i == 0) {
+ isNulls[j] = vectors[i].isNullAt(j);
+ } else {
+ isNulls[j] = isNulls[j] && vectors[i].isNullAt(j);
+ }
+ if (i == vectors.length - 1 && isNulls[j]) {
+ // rowColumnVector[j] is null only when all fields[j] of
rowColumnVector[j] is
+ // null
rowVector.setNullAt(j);
}
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/columnar/vector/heap/HeapRowVector.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/columnar/vector/heap/HeapRowVector.java
index f29f7194897..ab26fcf85fc 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/columnar/vector/heap/HeapRowVector.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/columnar/vector/heap/HeapRowVector.java
@@ -46,4 +46,12 @@ public class HeapRowVector extends AbstractHeapVector
columnarRowData.setRowId(i);
return columnarRowData;
}
+
+ @Override
+ public void reset() {
+ super.reset();
+ for (WritableColumnVector field : fields) {
+ field.reset();
+ }
+ }
}