This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 086b762318a [HUDI-7169][TEST] Add test case with the partition value
is null (#10279)
086b762318a is described below
commit 086b762318a01e39feb4e73a82b2444d325da0d0
Author: hehuiyuan <[email protected]>
AuthorDate: Sat Dec 9 11:32:27 2023 +0800
[HUDI-7169][TEST] Add test case with the partition value is null (#10279)
---
.../hudi/source/TestIncrementalInputSplits.java | 19 +++++++++++++++++--
.../src/test/java/org/apache/hudi/utils/TestData.java | 5 +++++
2 files changed, 22 insertions(+), 2 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
index 92766186065..29a0326ea41 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
@@ -38,6 +38,7 @@ import org.apache.hudi.utils.TestData;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.hadoop.fs.FileStatus;
@@ -182,7 +183,10 @@ public class TestIncrementalInputSplits extends
HoodieCommonTestHarness {
List<String> expectedPartitions) throws Exception {
Configuration conf = TestConfigurations.getDefaultConf(basePath);
conf.set(FlinkOptions.READ_AS_STREAMING, true);
- TestData.writeData(TestData.DATA_SET_INSERT, conf);
+ List<RowData> testData = new ArrayList<>();
+
testData.addAll(TestData.DATA_SET_INSERT.stream().collect(Collectors.toList()));
+
testData.addAll(TestData.DATA_SET_INSERT_PARTITION_IS_NULL.stream().collect(Collectors.toList()));
+ TestData.writeData(testData, conf);
PartitionPruners.PartitionPruner partitionPruner =
PartitionPruners.getInstance(
Collections.singletonList(partitionEvaluator),
Collections.singletonList("partition"),
@@ -300,11 +304,22 @@ public class TestIncrementalInputSplits extends
HoodieCommonTestHarness {
ExpressionEvaluators.In in = ExpressionEvaluators.In.getInstance();
in.bindFieldReference(partitionFieldRef);
in.bindVals("par1", "par4");
+
+ // `partition` is not null
+ ExpressionEvaluators.IsNotNull isNotNull =
ExpressionEvaluators.IsNotNull.getInstance();
+ isNotNull.bindFieldReference(partitionFieldRef);
+
+ // `partition` is null
+ ExpressionEvaluators.IsNull isNull =
ExpressionEvaluators.IsNull.getInstance();
+ isNull.bindFieldReference(partitionFieldRef);
+
Object[][] data = new Object[][] {
{notEqualTo, Arrays.asList("par1", "par2", "par4")},
{greaterThan, Arrays.asList("par2", "par3", "par4")},
{and, Arrays.asList("par2", "par4")},
- {in, Arrays.asList("par1", "par4")}};
+ {in, Arrays.asList("par1", "par4")},
+ {isNotNull, Arrays.asList("par1", "par2", "par3", "par4")},
+ {isNull,
Arrays.asList(PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH)}};
return Stream.of(data).map(Arguments::of);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 732065c0a3c..634e980bcf5 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -119,6 +119,11 @@ public class TestData {
TimestampData.fromEpochMillis(8), StringData.fromString("par4"))
);
+ public static List<RowData> DATA_SET_INSERT_PARTITION_IS_NULL =
Arrays.asList(
+ insertRow(StringData.fromString("idNull"), StringData.fromString("He"),
30,
+ TimestampData.fromEpochMillis(9), null)
+ );
+
public static List<RowData> DATA_SET_UPDATE_INSERT = Arrays.asList(
// advance the age by 1
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"),
24,