This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8128c04  [FLINK-23765][python] Fix the NPE in Python UDTF
8128c04 is described below

commit 8128c04fc9c44a47d2e383597bf9ddd6cd86eb7b
Author: huangxingbo <[email protected]>
AuthorDate: Mon Aug 16 17:34:18 2021 +0800

    [FLINK-23765][python] Fix the NPE in Python UDTF
    
    This closes #16843.
---
 .../python/table/PythonTableFunctionOperator.java   | 21 +++++++++++++++++----
 .../table/PythonTableFunctionOperatorTest.java      |  2 --
 2 files changed, 17 insertions(+), 6 deletions(-)

diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
index 38fbcb8..9ae059b 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
@@ -83,6 +83,15 @@ public class PythonTableFunctionOperator
     /** The type serializer for the forwarded fields. */
     private transient RowDataSerializer forwardedInputSerializer;
 
+    /** The current input element which has not been received all python udtf 
results. */
+    private transient RowData input;
+
+    /** Whether the current input element has joined parts of python udtf 
results. */
+    private transient boolean hasJoined;
+
+    /** Whether the current received data is the finished result of the 
current input element. */
+    private transient boolean isFinishResult;
+
     public PythonTableFunctionOperator(
             Configuration config,
             PythonFunctionInfo tableFunction,
@@ -111,6 +120,9 @@ public class PythonTableFunctionOperator
                 
PythonTypeUtils.toInternalSerializer(userDefinedFunctionInputType);
         udtfOutputTypeSerializer =
                 
PythonTypeUtils.toInternalSerializer(userDefinedFunctionOutputType);
+        input = null;
+        hasJoined = false;
+        isFinishResult = true;
     }
 
     @Override
@@ -184,11 +196,12 @@ public class PythonTableFunctionOperator
     @Override
     @SuppressWarnings("ConstantConditions")
     public void emitResult(Tuple2<byte[], Integer> resultTuple) throws 
Exception {
-        RowData input = forwardedInputQueue.poll();
         byte[] rawUdtfResult;
         int length;
-        boolean isFinishResult;
-        boolean hasJoined = false;
+        if (isFinishResult) {
+            input = forwardedInputQueue.poll();
+            hasJoined = false;
+        }
         do {
             rawUdtfResult = resultTuple.f0;
             length = resultTuple.f1;
@@ -208,7 +221,7 @@ public class PythonTableFunctionOperator
                 }
                 rowDataWrapper.collect(reuseJoinedRow.replace(input, 
udtfResult));
             }
-        } while (!isFinishResult);
+        } while (!isFinishResult && resultTuple != null);
     }
 
     /** The received udtf execution result is a finish message when it is a 
byte with value 0x00. */
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
index 96bb3e3..78d1e28 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 
 import org.apache.calcite.rel.core.JoinRelType;
-import org.junit.Ignore;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -40,7 +39,6 @@ import java.util.HashMap;
 import static org.apache.flink.table.runtime.util.StreamRecordUtils.row;
 
 /** Tests for {@link PythonTableFunctionOperator}. */
-@Ignore
 public class PythonTableFunctionOperatorTest
         extends PythonTableFunctionOperatorTestBase<RowData, RowData> {
 

Reply via email to