This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 28278765f8 [Fix][Flink] Fix NPE when get null row from upstream 
transform (#8469)
28278765f8 is described below

commit 28278765f8470c55d50b0078e946494ea6e79288
Author: litiliu <[email protected]>
AuthorDate: Wed Jan 8 22:15:23 2025 +0800

    [Fix][Flink] Fix NPE when get null row from upstream transform (#8469)
---
 .../flink/execution/TransformExecuteProcessor.java | 22 ++++++-----
 .../e2e/transform/TestFilterRowKindIT.java         |  4 ++
 .../resources/filter_row_to_next_transform.json    | 44 ++++++++++++++++++++++
 3 files changed, 61 insertions(+), 9 deletions(-)

diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index 7582543e2f..615876c417 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -45,6 +45,7 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
@@ -148,15 +149,18 @@ public class TransformExecuteProcessor
         }
 
         return stream.transform(
-                String.format("%s-Transform", transform.getPluginName()),
-                TypeInformation.of(SeaTunnelRow.class),
-                new StreamMap<>(
-                        flinkRuntimeEnvironment
-                                .getStreamExecutionEnvironment()
-                                .clean(
-                                        row ->
-                                                
((SeaTunnelMapTransform<SeaTunnelRow>) transform)
-                                                        .map(row))));
+                        String.format("%s-Transform", 
transform.getPluginName()),
+                        TypeInformation.of(SeaTunnelRow.class),
+                        new StreamMap<>(
+                                flinkRuntimeEnvironment
+                                        .getStreamExecutionEnvironment()
+                                        .clean(
+                                                row ->
+                                                        
((SeaTunnelMapTransform<SeaTunnelRow>)
+                                                                        
transform)
+                                                                .map(row))))
+                // null value shouldn't be passed to downstream
+                .filter(Objects::nonNull);
     }
 
     public static class ArrayFlatMap implements FlatMapFunction<SeaTunnelRow, 
SeaTunnelRow> {
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterRowKindIT.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterRowKindIT.java
index c4104f734c..d923d7137b 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterRowKindIT.java
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterRowKindIT.java
@@ -39,6 +39,10 @@ public class TestFilterRowKindIT extends TestSuiteBase {
         Container.ExecResult execResult3 =
                 container.executeJob("/filter_row_kind_include_insert.conf");
         Assertions.assertEquals(0, execResult3.getExitCode());
+
+        Container.ExecResult execResult4 =
+                container.executeJob("/filter_row_to_next_transform.json");
+        Assertions.assertEquals(0, execResult4.getExitCode());
     }
 
     @TestTemplate
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_to_next_transform.json
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_to_next_transform.json
new file mode 100644
index 0000000000..72819644a9
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_to_next_transform.json
@@ -0,0 +1,44 @@
+{
+  "env": {
+    "jobMode": "batch",
+    "parallelism": 1
+  },
+  "source": [
+    {
+      "plugin_name": "FakeSource",
+      "plugin_output": "fake",
+      "row.num": 5,
+      "schema": {
+        "fields": {
+          "name": "string",
+          "age": "int",
+          "card": "int"
+        }
+      }
+    }
+  ],
+  "transform": [
+    {
+      "plugin_name": "FilterRowKind",
+      "plugin_input": "fake",
+      "plugin_output": "fake1",
+      "exclude_kinds": ["INSERT"]
+    },
+    {
+      "plugin_name": "Copy",
+      "plugin_input": "fake1",
+      "plugin_output": "fake2",
+      "fields": {
+        "name1": "name",
+        "age1": "age",
+        "card1": "card"
+      }
+    }
+  ],
+  "sink": [
+    {
+      "plugin_name": "Console",
+      "plugin_input": "fake2"
+    }
+  ]
+}

Reply via email to