This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 00f585234f8 [FLINK-29005][parquet] Parquet row type reader should not 
return null value when some child fields is null
00f585234f8 is described below

commit 00f585234f8db8fe1e2bfec5c6c323ca99d9b775
Author: Kai Chen <[email protected]>
AuthorDate: Wed Aug 31 12:09:19 2022 +0800

    [FLINK-29005][parquet] Parquet row type reader should not return null value 
when some child fields is null
    
    This closes #20616
---
 .../connectors/hive/HiveTableSourceITCase.java     | 119 +++++++++++++++++++++
 .../parquet/vector/reader/RowColumnReader.java     |  15 ++-
 .../data/columnar/vector/heap/HeapRowVector.java   |   8 ++
 3 files changed, 137 insertions(+), 5 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
index 99540c947c6..f3cb5e1e874 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
@@ -18,12 +18,18 @@
 
 package org.apache.flink.connectors.hive;
 
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.FiniteTestSource;
 import org.apache.flink.table.HiveVersionTestUtil;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.SqlDialect;
@@ -73,6 +79,8 @@ import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -963,6 +971,117 @@ public class HiveTableSourceITCase extends 
BatchAbstractTestBase {
         result.getJobClient().get().cancel();
     }
 
+    @Test(timeout = 120000)
+    public void testReadParquetWithNullableComplexType() throws Exception {
+        final String catalogName = "hive";
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(3);
+        env.enableCheckpointing(100);
+        StreamTableEnvironment tEnv =
+                HiveTestUtils.createTableEnvInStreamingMode(env, 
SqlDialect.HIVE);
+        tEnv.registerCatalog(catalogName, hiveCatalog);
+        tEnv.useCatalog(catalogName);
+
+        List<Row> rows = generateRows();
+        List<Row> expectedRows = generateExpectedRows(rows);
+        DataStream<Row> stream =
+                env.addSource(
+                                new FiniteTestSource<>(rows),
+                                new RowTypeInfo(
+                                        new TypeInformation[] {
+                                            Types.INT,
+                                            Types.STRING,
+                                            new RowTypeInfo(
+                                                    new TypeInformation[] {
+                                                        Types.STRING, 
Types.INT, Types.INT
+                                                    },
+                                                    new String[] {"c1", "c2", 
"c3"}),
+                                            new MapTypeInfo<>(Types.STRING, 
Types.STRING),
+                                            Types.OBJECT_ARRAY(Types.STRING),
+                                            Types.STRING
+                                        },
+                                        new String[] {"a", "b", "c", "d", "e", 
"f"}))
+                        .filter((FilterFunction<Row>) value -> true)
+                        .setParallelism(3); // to parallel tasks
+
+        tEnv.createTemporaryView("my_table", stream);
+        assertResults(executeAndGetResult(tEnv), expectedRows);
+    }
+
+    private static List<Row> generateRows() {
+        List<Row> rows = new ArrayList<>();
+        for (int i = 0; i < 10000; i++) {
+            Map<String, String> e = new HashMap<>();
+            e.put(i + "", i % 2 == 0 ? null : i + "");
+            String[] f = new String[2];
+            f[0] = i % 3 == 0 ? null : i + "";
+            f[1] = i % 3 == 2 ? null : i + "";
+            rows.add(
+                    Row.of(
+                            i,
+                            String.valueOf(i % 10),
+                            Row.of(
+                                    i % 2 == 0 ? null : String.valueOf(i % 10),
+                                    i % 3 == 0 ? null : i % 10,
+                                    i % 5 == 0 ? null : i % 10),
+                            e,
+                            f,
+                            String.valueOf(i % 10)));
+        }
+        return rows;
+    }
+
+    private static List<Row> generateExpectedRows(List<Row> rows) {
+        List<Row> sortedRows = new ArrayList<>();
+        sortedRows.addAll(rows);
+        sortedRows.addAll(rows);
+        sortedRows.sort(Comparator.comparingInt(o -> (Integer) o.getField(0)));
+
+        List<Row> expectedRows = new ArrayList<>();
+        for (int i = 0; i < sortedRows.size(); i++) {
+            Row rowExpect = Row.copy(sortedRows.get(i));
+            Row nestedRow = (Row) rowExpect.getField(2);
+            if (nestedRow.getField(0) == null
+                    && nestedRow.getField(1) == null
+                    && nestedRow.getField(2) == null) {
+                rowExpect.setField(2, null);
+            }
+            expectedRows.add(rowExpect);
+        }
+        return expectedRows;
+    }
+
+    private static CloseableIterator<Row> 
executeAndGetResult(StreamTableEnvironment tEnv)
+            throws Exception {
+        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        tEnv.executeSql(
+                "CREATE TABLE sink_table (a int, b string,"
+                        + "c struct<c1:string, c2:int, c3:int>,"
+                        + "d map<string, string>, e array<string>, f string "
+                        + ") "
+                        + " stored as parquet"
+                        + " TBLPROPERTIES ("
+                        + 
"'sink.partition-commit.policy.kind'='metastore,success-file',"
+                        + "'auto-compaction'='true',"
+                        + "'compaction.file-size' = '128MB',"
+                        + "'sink.rolling-policy.file-size' = '1b'"
+                        + ")");
+        tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+        String sql =
+                "insert into sink_table /*+ OPTIONS('sink.parallelism' = '3') 
*/"
+                        + " select * from my_table";
+        tEnv.executeSql(sql).await();
+        return tEnv.executeSql("select * from sink_table").collect();
+    }
+
+    private static void assertResults(CloseableIterator<Row> iterator, 
List<Row> expectedRows)
+            throws Exception {
+        List<Row> result = CollectionUtil.iteratorToList(iterator);
+        iterator.close();
+        result.sort(Comparator.comparingInt(o -> (Integer) o.getField(0)));
+        assertThat(result).isEqualTo(expectedRows);
+    }
+
     private static TableEnvironment createTableEnv() {
         TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
         tableEnv.registerCatalog("hive", hiveCatalog);
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/RowColumnReader.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/RowColumnReader.java
index 2abe8a85aec..7fec09e568e 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/RowColumnReader.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/RowColumnReader.java
@@ -36,15 +36,20 @@ public class RowColumnReader implements 
ColumnReader<WritableColumnVector> {
     public void readToVector(int readNumber, WritableColumnVector vector) 
throws IOException {
         HeapRowVector rowVector = (HeapRowVector) vector;
         WritableColumnVector[] vectors = rowVector.getFields();
+        // row vector null array
+        boolean[] isNulls = new boolean[readNumber];
         for (int i = 0; i < vectors.length; i++) {
             fieldReaders.get(i).readToVector(readNumber, vectors[i]);
 
             for (int j = 0; j < readNumber; j++) {
-                boolean isNull =
-                        (i == 0)
-                                ? vectors[i].isNullAt(j)
-                                : rowVector.isNullAt(j) && 
vectors[i].isNullAt(j);
-                if (isNull) {
+                if (i == 0) {
+                    isNulls[j] = vectors[i].isNullAt(j);
+                } else {
+                    isNulls[j] = isNulls[j] && vectors[i].isNullAt(j);
+                }
+                if (i == vectors.length - 1 && isNulls[j]) {
+                    // rowColumnVector[j] is null only when all fields[j] of 
rowColumnVector[j] is
+                    // null
                     rowVector.setNullAt(j);
                 }
             }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/columnar/vector/heap/HeapRowVector.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/columnar/vector/heap/HeapRowVector.java
index f29f7194897..ab26fcf85fc 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/columnar/vector/heap/HeapRowVector.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/columnar/vector/heap/HeapRowVector.java
@@ -46,4 +46,12 @@ public class HeapRowVector extends AbstractHeapVector
         columnarRowData.setRowId(i);
         return columnarRowData;
     }
+
+    @Override
+    public void reset() {
+        super.reset();
+        for (WritableColumnVector field : fields) {
+            field.reset();
+        }
+    }
 }

Reply via email to