This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 4380185  [FLINK-23765][python] Fix the NPE in Python UDTF
4380185 is described below

commit 4380185253863dbcd4fef7ffde712e64d7316647
Author: huangxingbo <[email protected]>
AuthorDate: Thu Aug 19 11:31:52 2021 +0800

    [FLINK-23765][python] Fix the NPE in Python UDTF
    
    This closes #16890.
---
 .../python/table/PythonTableFunctionOperator.java   | 21 +++++++++++++++++----
 .../table/RowDataPythonTableFunctionOperator.java   | 21 +++++++++++++++++----
 2 files changed, 34 insertions(+), 8 deletions(-)

diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
index 44f643c..454b029 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
@@ -54,6 +54,15 @@ public class PythonTableFunctionOperator
     /** The TypeSerializer for udtf input elements. */
     private transient TypeSerializer<Row> udtfInputTypeSerializer;
 
+    /** The current input element which has not been received all python udtf 
results. */
+    private transient CRow input;
+
+    /** Whether the current input element has joined parts of python udtf 
results. */
+    private transient boolean hasJoined;
+
+    /** Whether the current received data is the finished result of the 
current input element. */
+    private transient boolean isFinishResult;
+
     public PythonTableFunctionOperator(
             Configuration config,
             PythonFunctionInfo tableFunction,
@@ -79,17 +88,21 @@ public class PythonTableFunctionOperator
                 
PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionOutputType);
         udtfInputTypeSerializer =
                 
PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionInputType);
+        input = null;
+        hasJoined = false;
+        isFinishResult = true;
     }
 
     @Override
     @SuppressWarnings("ConstantConditions")
     public void emitResult(Tuple2<byte[], Integer> resultTuple) throws 
Exception {
-        CRow input = forwardedInputQueue.poll();
         byte[] rawUdtfResult;
         int length;
-        boolean isFinishResult;
-        boolean hasJoined = false;
         Row udtfResult;
+        if (isFinishResult) {
+            input = forwardedInputQueue.poll();
+            hasJoined = false;
+        }
         do {
             rawUdtfResult = resultTuple.f0;
             length = resultTuple.f1;
@@ -109,7 +122,7 @@ public class PythonTableFunctionOperator
                 cRowWrapper.setChange(input.change());
                 cRowWrapper.collect(Row.join(input.row(), udtfResult));
             }
-        } while (!isFinishResult);
+        } while (!isFinishResult && resultTuple != null);
     }
 
     @Override
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/RowDataPythonTableFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/RowDataPythonTableFunctionOperator.java
index b61e6ad..a007f99 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/RowDataPythonTableFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/RowDataPythonTableFunctionOperator.java
@@ -65,6 +65,15 @@ public class RowDataPythonTableFunctionOperator
     /** The type serializer for the forwarded fields. */
     private transient RowDataSerializer forwardedInputSerializer;
 
+    /** The current input element which has not been received all python udtf 
results. */
+    private transient RowData input;
+
+    /** Whether the current input element has joined parts of python udtf 
results. */
+    private transient boolean hasJoined;
+
+    /** Whether the current received data is the finished result of the 
current input element. */
+    private transient boolean isFinishResult;
+
     public RowDataPythonTableFunctionOperator(
             Configuration config,
             PythonFunctionInfo tableFunction,
@@ -88,6 +97,9 @@ public class RowDataPythonTableFunctionOperator
                 
PythonTypeUtils.toBlinkTypeSerializer(userDefinedFunctionInputType);
         udtfOutputTypeSerializer =
                 
PythonTypeUtils.toBlinkTypeSerializer(userDefinedFunctionOutputType);
+        input = null;
+        hasJoined = false;
+        isFinishResult = true;
     }
 
     @Override
@@ -125,11 +137,12 @@ public class RowDataPythonTableFunctionOperator
     @Override
     @SuppressWarnings("ConstantConditions")
     public void emitResult(Tuple2<byte[], Integer> resultTuple) throws 
Exception {
-        RowData input = forwardedInputQueue.poll();
         byte[] rawUdtfResult;
         int length;
-        boolean isFinishResult;
-        boolean hasJoined = false;
+        if (isFinishResult) {
+            input = forwardedInputQueue.poll();
+            hasJoined = false;
+        }
         do {
             rawUdtfResult = resultTuple.f0;
             length = resultTuple.f1;
@@ -149,6 +162,6 @@ public class RowDataPythonTableFunctionOperator
                 }
                 rowDataWrapper.collect(reuseJoinedRow.replace(input, 
udtfResult));
             }
-        } while (!isFinishResult);
+        } while (!isFinishResult && resultTuple != null);
     }
 }

Reply via email to