This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3b8df4bb20 [HUDI-5206] RowColumnReader should not return null value
for certain null child columns (#7194)
3b8df4bb20 is described below
commit 3b8df4bb204bf37b8b496ff95f30a10a9c3b2c0b
Author: Nicholas Jiang <[email protected]>
AuthorDate: Tue Nov 15 10:00:14 2022 +0800
[HUDI-5206] RowColumnReader should not return null value for certain null
child columns (#7194)
---
.../apache/hudi/table/ITTestHoodieDataSource.java | 26 ++++++++++++++++++++++
.../test/java/org/apache/hudi/utils/TestSQL.java | 5 +++++
.../format/cow/vector/HeapRowColumnVector.java | 8 +++++++
.../format/cow/vector/reader/RowColumnReader.java | 14 ++++++++----
.../format/cow/vector/HeapRowColumnVector.java | 8 +++++++
.../format/cow/vector/reader/RowColumnReader.java | 14 ++++++++----
.../format/cow/vector/HeapRowColumnVector.java | 8 +++++++
.../format/cow/vector/reader/RowColumnReader.java | 14 ++++++++----
8 files changed, 85 insertions(+), 12 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 865a717e08..0b383870d6 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1337,6 +1337,32 @@ public class ITTestHoodieDataSource {
assertRowsEquals(result, expected);
}
+ @ParameterizedTest
+ @ValueSource(strings = {"insert", "upsert", "bulk_insert"})
+ void testParquetNullChildColumnsRowTypes(String operation) {
+ TableEnvironment tableEnv = batchTableEnv;
+
+ String hoodieTableDDL = sql("t1")
+ .field("f_int int")
+ .field("f_row row(f_row_f0 int, f_row_f1 varchar(10))")
+ .pkField("f_int")
+ .noPartition()
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.OPERATION, operation)
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+
+ execInsertSql(tableEnv, TestSQL.NULL_CHILD_COLUMNS_ROW_TYPE_INSERT_T1);
+
+ List<Row> result = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ final String expected = "["
+ + "+I[1, +I[null, abc1]], "
+ + "+I[2, +I[2, null]], "
+ + "+I[3, null]]";
+ assertRowsEquals(result, expected);
+ }
+
@ParameterizedTest
@ValueSource(strings = {"insert", "upsert", "bulk_insert"})
void testBuiltinFunctionWithCatalog(String operation) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
index b109fee0ff..531847f3c8 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
@@ -62,6 +62,11 @@ public class TestSQL {
+ "(2, array['abc2', 'def2'], array[2, 2], map['abc2', 1, 'def2', 3],
row(array['abc2', 'def2'], row(2, 'abc2'))),\n"
+ "(3, array['abc3', 'def3'], array[3, 3], map['abc3', 1, 'def3', 3],
row(array['abc3', 'def3'], row(3, 'abc3')))";
+ public static final String NULL_CHILD_COLUMNS_ROW_TYPE_INSERT_T1 = "insert
into t1 values\n"
+ + "(1, row(cast(null as int), 'abc1')),\n"
+ + "(2, row(2, cast(null as varchar))),\n"
+ + "(3, row(cast(null as int), cast(null as varchar)))";
+
public static final String INSERT_DATE_PARTITION_T1 = "insert into t1
values\n"
+ "('id1','Danny',23,DATE '1970-01-01'),\n"
+ "('id2','Stephen',33,DATE '1970-01-01'),\n"
diff --git
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
index 132b48f139..03da9205d3 100644
---
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
+++
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
@@ -44,4 +44,12 @@ public class HeapRowColumnVector extends AbstractHeapVector
columnarRowData.setRowId(i);
return columnarRowData;
}
+
+ @Override
+ public void reset() {
+ super.reset();
+ for (WritableColumnVector vector : vectors) {
+ vector.reset();
+ }
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
index 39ebb90ee6..524c00f402 100644
---
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
+++
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
@@ -41,14 +41,20 @@ public class RowColumnReader implements
ColumnReader<WritableColumnVector> {
public void readToVector(int readNumber, WritableColumnVector vector) throws
IOException {
HeapRowColumnVector rowColumnVector = (HeapRowColumnVector) vector;
WritableColumnVector[] vectors = rowColumnVector.vectors;
+ // row vector null array
+ boolean[] isNulls = new boolean[readNumber];
for (int i = 0; i < vectors.length; i++) {
fieldReaders.get(i).readToVector(readNumber, vectors[i]);
for (int j = 0; j < readNumber; j++) {
- boolean isNull = (i == 0)
- ? vectors[i].isNullAt(j)
- : rowColumnVector.isNullAt(j) && vectors[i].isNullAt(j);
- if (isNull) {
+ if (i == 0) {
+ isNulls[j] = vectors[i].isNullAt(j);
+ } else {
+ isNulls[j] = isNulls[j] && vectors[i].isNullAt(j);
+ }
+ if (i == vectors.length - 1 && isNulls[j]) {
+ // rowColumnVector[j] is null only when all fields[j] of
rowColumnVector[j] is
+ // null
rowColumnVector.setNullAt(j);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
index 0193e6cbb1..53a1eee68c 100644
---
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
+++
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
@@ -43,4 +43,12 @@ public class HeapRowColumnVector extends AbstractHeapVector
columnarRowData.setRowId(i);
return columnarRowData;
}
+
+ @Override
+ public void reset() {
+ super.reset();
+ for (WritableColumnVector vector : vectors) {
+ vector.reset();
+ }
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
index 39ebb90ee6..524c00f402 100644
---
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
+++
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
@@ -41,14 +41,20 @@ public class RowColumnReader implements
ColumnReader<WritableColumnVector> {
public void readToVector(int readNumber, WritableColumnVector vector) throws
IOException {
HeapRowColumnVector rowColumnVector = (HeapRowColumnVector) vector;
WritableColumnVector[] vectors = rowColumnVector.vectors;
+ // row vector null array
+ boolean[] isNulls = new boolean[readNumber];
for (int i = 0; i < vectors.length; i++) {
fieldReaders.get(i).readToVector(readNumber, vectors[i]);
for (int j = 0; j < readNumber; j++) {
- boolean isNull = (i == 0)
- ? vectors[i].isNullAt(j)
- : rowColumnVector.isNullAt(j) && vectors[i].isNullAt(j);
- if (isNull) {
+ if (i == 0) {
+ isNulls[j] = vectors[i].isNullAt(j);
+ } else {
+ isNulls[j] = isNulls[j] && vectors[i].isNullAt(j);
+ }
+ if (i == vectors.length - 1 && isNulls[j]) {
+ // rowColumnVector[j] is null only when all fields[j] of
rowColumnVector[j] is
+ // null
rowColumnVector.setNullAt(j);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
index 8d4031251d..ae194e4e6a 100644
---
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
+++
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
@@ -43,4 +43,12 @@ public class HeapRowColumnVector extends AbstractHeapVector
columnarRowData.setRowId(i);
return columnarRowData;
}
+
+ @Override
+ public void reset() {
+ super.reset();
+ for (WritableColumnVector vector : vectors) {
+ vector.reset();
+ }
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
index 8d6a8cc52d..79b50487f1 100644
---
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
+++
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
@@ -41,14 +41,20 @@ public class RowColumnReader implements
ColumnReader<WritableColumnVector> {
public void readToVector(int readNumber, WritableColumnVector vector) throws
IOException {
HeapRowColumnVector rowColumnVector = (HeapRowColumnVector) vector;
WritableColumnVector[] vectors = rowColumnVector.vectors;
+ // row vector null array
+ boolean[] isNulls = new boolean[readNumber];
for (int i = 0; i < vectors.length; i++) {
fieldReaders.get(i).readToVector(readNumber, vectors[i]);
for (int j = 0; j < readNumber; j++) {
- boolean isNull = (i == 0)
- ? vectors[i].isNullAt(j)
- : rowColumnVector.isNullAt(j) && vectors[i].isNullAt(j);
- if (isNull) {
+ if (i == 0) {
+ isNulls[j] = vectors[i].isNullAt(j);
+ } else {
+ isNulls[j] = isNulls[j] && vectors[i].isNullAt(j);
+ }
+ if (i == vectors.length - 1 && isNulls[j]) {
+ // rowColumnVector[j] is null only when all fields[j] of
rowColumnVector[j] is
+ // null
rowColumnVector.setNullAt(j);
}
}