This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 28278765f8 [Fix][Flink] Fix NPE when get null row from upstream
transform (#8469)
28278765f8 is described below
commit 28278765f8470c55d50b0078e946494ea6e79288
Author: litiliu <[email protected]>
AuthorDate: Wed Jan 8 22:15:23 2025 +0800
[Fix][Flink] Fix NPE when get null row from upstream transform (#8469)
---
.../flink/execution/TransformExecuteProcessor.java | 22 ++++++-----
.../e2e/transform/TestFilterRowKindIT.java | 4 ++
.../resources/filter_row_to_next_transform.json | 44 ++++++++++++++++++++++
3 files changed, 61 insertions(+), 9 deletions(-)
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index 7582543e2f..615876c417 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -45,6 +45,7 @@ import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -148,15 +149,18 @@ public class TransformExecuteProcessor
}
return stream.transform(
- String.format("%s-Transform", transform.getPluginName()),
- TypeInformation.of(SeaTunnelRow.class),
- new StreamMap<>(
- flinkRuntimeEnvironment
- .getStreamExecutionEnvironment()
- .clean(
- row ->
-
((SeaTunnelMapTransform<SeaTunnelRow>) transform)
- .map(row))));
+ String.format("%s-Transform",
transform.getPluginName()),
+ TypeInformation.of(SeaTunnelRow.class),
+ new StreamMap<>(
+ flinkRuntimeEnvironment
+ .getStreamExecutionEnvironment()
+ .clean(
+ row ->
+
((SeaTunnelMapTransform<SeaTunnelRow>)
+
transform)
+ .map(row))))
+ // null value shouldn't be passed to downstream
+ .filter(Objects::nonNull);
}
public static class ArrayFlatMap implements FlatMapFunction<SeaTunnelRow,
SeaTunnelRow> {
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterRowKindIT.java
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterRowKindIT.java
index c4104f734c..d923d7137b 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterRowKindIT.java
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterRowKindIT.java
@@ -39,6 +39,10 @@ public class TestFilterRowKindIT extends TestSuiteBase {
Container.ExecResult execResult3 =
container.executeJob("/filter_row_kind_include_insert.conf");
Assertions.assertEquals(0, execResult3.getExitCode());
+
+ Container.ExecResult execResult4 =
+ container.executeJob("/filter_row_to_next_transform.json");
+ Assertions.assertEquals(0, execResult4.getExitCode());
}
@TestTemplate
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_to_next_transform.json
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_to_next_transform.json
new file mode 100644
index 0000000000..72819644a9
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_to_next_transform.json
@@ -0,0 +1,44 @@
+{
+ "env": {
+ "jobMode": "batch",
+ "parallelism": 1
+ },
+ "source": [
+ {
+ "plugin_name": "FakeSource",
+ "plugin_output": "fake",
+ "row.num": 5,
+ "schema": {
+ "fields": {
+ "name": "string",
+ "age": "int",
+ "card": "int"
+ }
+ }
+ }
+ ],
+ "transform": [
+ {
+ "plugin_name": "FilterRowKind",
+ "plugin_input": "fake",
+ "plugin_output": "fake1",
+ "exclude_kinds": ["INSERT"]
+ },
+ {
+ "plugin_name": "Copy",
+ "plugin_input": "fake1",
+ "plugin_output": "fake2",
+ "fields": {
+ "name1": "name",
+ "age1": "age",
+ "card1": "card"
+ }
+ }
+ ],
+ "sink": [
+ {
+ "plugin_name": "Console",
+ "plugin_input": "fake2"
+ }
+ ]
+}