This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b8df4bb20 [HUDI-5206] RowColumnReader should not return null value 
for certain null child columns (#7194)
3b8df4bb20 is described below

commit 3b8df4bb204bf37b8b496ff95f30a10a9c3b2c0b
Author: Nicholas Jiang <[email protected]>
AuthorDate: Tue Nov 15 10:00:14 2022 +0800

    [HUDI-5206] RowColumnReader should not return null value for certain null 
child columns (#7194)
---
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 26 ++++++++++++++++++++++
 .../test/java/org/apache/hudi/utils/TestSQL.java   |  5 +++++
 .../format/cow/vector/HeapRowColumnVector.java     |  8 +++++++
 .../format/cow/vector/reader/RowColumnReader.java  | 14 ++++++++----
 .../format/cow/vector/HeapRowColumnVector.java     |  8 +++++++
 .../format/cow/vector/reader/RowColumnReader.java  | 14 ++++++++----
 .../format/cow/vector/HeapRowColumnVector.java     |  8 +++++++
 .../format/cow/vector/reader/RowColumnReader.java  | 14 ++++++++----
 8 files changed, 85 insertions(+), 12 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 865a717e08..0b383870d6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1337,6 +1337,32 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(result, expected);
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"insert", "upsert", "bulk_insert"})
+  void testParquetNullChildColumnsRowTypes(String operation) {
+    TableEnvironment tableEnv = batchTableEnv;
+
+    String hoodieTableDDL = sql("t1")
+        .field("f_int int")
+        .field("f_row row(f_row_f0 int, f_row_f1 varchar(10))")
+        .pkField("f_int")
+        .noPartition()
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.OPERATION, operation)
+        .end();
+    tableEnv.executeSql(hoodieTableDDL);
+
+    execInsertSql(tableEnv, TestSQL.NULL_CHILD_COLUMNS_ROW_TYPE_INSERT_T1);
+
+    List<Row> result = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    final String expected = "["
+        + "+I[1, +I[null, abc1]], "
+        + "+I[2, +I[2, null]], "
+        + "+I[3, null]]";
+    assertRowsEquals(result, expected);
+  }
+
   @ParameterizedTest
   @ValueSource(strings = {"insert", "upsert", "bulk_insert"})
   void testBuiltinFunctionWithCatalog(String operation) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
index b109fee0ff..531847f3c8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
@@ -62,6 +62,11 @@ public class TestSQL {
       + "(2, array['abc2', 'def2'], array[2, 2], map['abc2', 1, 'def2', 3], 
row(array['abc2', 'def2'], row(2, 'abc2'))),\n"
       + "(3, array['abc3', 'def3'], array[3, 3], map['abc3', 1, 'def3', 3], 
row(array['abc3', 'def3'], row(3, 'abc3')))";
 
+  public static final String NULL_CHILD_COLUMNS_ROW_TYPE_INSERT_T1 = "insert 
into t1 values\n"
+      + "(1, row(cast(null as int), 'abc1')),\n"
+      + "(2, row(2, cast(null as varchar))),\n"
+      + "(3, row(cast(null as int), cast(null as varchar)))";
+
   public static final String INSERT_DATE_PARTITION_T1 = "insert into t1 
values\n"
       + "('id1','Danny',23,DATE '1970-01-01'),\n"
       + "('id2','Stephen',33,DATE '1970-01-01'),\n"
diff --git 
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
 
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
index 132b48f139..03da9205d3 100644
--- 
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
+++ 
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
@@ -44,4 +44,12 @@ public class HeapRowColumnVector extends AbstractHeapVector
     columnarRowData.setRowId(i);
     return columnarRowData;
   }
+
+  @Override
+  public void reset() {
+    super.reset();
+    for (WritableColumnVector vector : vectors) {
+      vector.reset();
+    }
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
 
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
index 39ebb90ee6..524c00f402 100644
--- 
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
+++ 
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
@@ -41,14 +41,20 @@ public class RowColumnReader implements 
ColumnReader<WritableColumnVector> {
   public void readToVector(int readNumber, WritableColumnVector vector) throws 
IOException {
     HeapRowColumnVector rowColumnVector = (HeapRowColumnVector) vector;
     WritableColumnVector[] vectors = rowColumnVector.vectors;
+    // row vector null array
+    boolean[] isNulls = new boolean[readNumber];
     for (int i = 0; i < vectors.length; i++) {
       fieldReaders.get(i).readToVector(readNumber, vectors[i]);
 
       for (int j = 0; j < readNumber; j++) {
-        boolean isNull = (i == 0)
-            ? vectors[i].isNullAt(j)
-            : rowColumnVector.isNullAt(j) && vectors[i].isNullAt(j);
-        if (isNull) {
+        if (i == 0) {
+          isNulls[j] = vectors[i].isNullAt(j);
+        } else {
+          isNulls[j] = isNulls[j] && vectors[i].isNullAt(j);
+        }
+        if (i == vectors.length - 1 && isNulls[j]) {
+          // rowColumnVector[j] is null only when all fields[j] of 
rowColumnVector[j] is
+          // null
           rowColumnVector.setNullAt(j);
         }
       }
diff --git 
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
 
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
index 0193e6cbb1..53a1eee68c 100644
--- 
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
+++ 
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
@@ -43,4 +43,12 @@ public class HeapRowColumnVector extends AbstractHeapVector
     columnarRowData.setRowId(i);
     return columnarRowData;
   }
+
+  @Override
+  public void reset() {
+    super.reset();
+    for (WritableColumnVector vector : vectors) {
+      vector.reset();
+    }
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
 
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
index 39ebb90ee6..524c00f402 100644
--- 
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
+++ 
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
@@ -41,14 +41,20 @@ public class RowColumnReader implements 
ColumnReader<WritableColumnVector> {
   public void readToVector(int readNumber, WritableColumnVector vector) throws 
IOException {
     HeapRowColumnVector rowColumnVector = (HeapRowColumnVector) vector;
     WritableColumnVector[] vectors = rowColumnVector.vectors;
+    // row vector null array
+    boolean[] isNulls = new boolean[readNumber];
     for (int i = 0; i < vectors.length; i++) {
       fieldReaders.get(i).readToVector(readNumber, vectors[i]);
 
       for (int j = 0; j < readNumber; j++) {
-        boolean isNull = (i == 0)
-            ? vectors[i].isNullAt(j)
-            : rowColumnVector.isNullAt(j) && vectors[i].isNullAt(j);
-        if (isNull) {
+        if (i == 0) {
+          isNulls[j] = vectors[i].isNullAt(j);
+        } else {
+          isNulls[j] = isNulls[j] && vectors[i].isNullAt(j);
+        }
+        if (i == vectors.length - 1 && isNulls[j]) {
+          // rowColumnVector[j] is null only when all fields[j] of 
rowColumnVector[j] is
+          // null
           rowColumnVector.setNullAt(j);
         }
       }
diff --git 
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
 
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
index 8d4031251d..ae194e4e6a 100644
--- 
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
+++ 
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
@@ -43,4 +43,12 @@ public class HeapRowColumnVector extends AbstractHeapVector
     columnarRowData.setRowId(i);
     return columnarRowData;
   }
+
+  @Override
+  public void reset() {
+    super.reset();
+    for (WritableColumnVector vector : vectors) {
+      vector.reset();
+    }
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
 
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
index 8d6a8cc52d..79b50487f1 100644
--- 
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
+++ 
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
@@ -41,14 +41,20 @@ public class RowColumnReader implements 
ColumnReader<WritableColumnVector> {
   public void readToVector(int readNumber, WritableColumnVector vector) throws 
IOException {
     HeapRowColumnVector rowColumnVector = (HeapRowColumnVector) vector;
     WritableColumnVector[] vectors = rowColumnVector.vectors;
+    // row vector null array
+    boolean[] isNulls = new boolean[readNumber];
     for (int i = 0; i < vectors.length; i++) {
       fieldReaders.get(i).readToVector(readNumber, vectors[i]);
 
       for (int j = 0; j < readNumber; j++) {
-        boolean isNull = (i == 0)
-            ? vectors[i].isNullAt(j)
-            : rowColumnVector.isNullAt(j) && vectors[i].isNullAt(j);
-        if (isNull) {
+        if (i == 0) {
+          isNulls[j] = vectors[i].isNullAt(j);
+        } else {
+          isNulls[j] = isNulls[j] && vectors[i].isNullAt(j);
+        }
+        if (i == vectors.length - 1 && isNulls[j]) {
+          // rowColumnVector[j] is null only when all fields[j] of 
rowColumnVector[j] is
+          // null
           rowColumnVector.setNullAt(j);
         }
       }

Reply via email to