This is an automated email from the ASF dual-hosted git repository.
hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8128c04 [FLINK-23765][python] Fix the NPE in Python UDTF
8128c04 is described below
commit 8128c04fc9c44a47d2e383597bf9ddd6cd86eb7b
Author: huangxingbo <[email protected]>
AuthorDate: Mon Aug 16 17:34:18 2021 +0800
[FLINK-23765][python] Fix the NPE in Python UDTF
This closes #16843.
---
.../python/table/PythonTableFunctionOperator.java | 21 +++++++++++++++++----
.../table/PythonTableFunctionOperatorTest.java | 2 --
2 files changed, 17 insertions(+), 6 deletions(-)
diff --git
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
index 38fbcb8..9ae059b 100644
---
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
@@ -83,6 +83,15 @@ public class PythonTableFunctionOperator
/** The type serializer for the forwarded fields. */
private transient RowDataSerializer forwardedInputSerializer;
+ /** The current input element which has not been received all python udtf
results. */
+ private transient RowData input;
+
+ /** Whether the current input element has joined parts of python udtf
results. */
+ private transient boolean hasJoined;
+
+ /** Whether the current received data is the finished result of the
current input element. */
+ private transient boolean isFinishResult;
+
public PythonTableFunctionOperator(
Configuration config,
PythonFunctionInfo tableFunction,
@@ -111,6 +120,9 @@ public class PythonTableFunctionOperator
PythonTypeUtils.toInternalSerializer(userDefinedFunctionInputType);
udtfOutputTypeSerializer =
PythonTypeUtils.toInternalSerializer(userDefinedFunctionOutputType);
+ input = null;
+ hasJoined = false;
+ isFinishResult = true;
}
@Override
@@ -184,11 +196,12 @@ public class PythonTableFunctionOperator
@Override
@SuppressWarnings("ConstantConditions")
public void emitResult(Tuple2<byte[], Integer> resultTuple) throws
Exception {
- RowData input = forwardedInputQueue.poll();
byte[] rawUdtfResult;
int length;
- boolean isFinishResult;
- boolean hasJoined = false;
+ if (isFinishResult) {
+ input = forwardedInputQueue.poll();
+ hasJoined = false;
+ }
do {
rawUdtfResult = resultTuple.f0;
length = resultTuple.f1;
@@ -208,7 +221,7 @@ public class PythonTableFunctionOperator
}
rowDataWrapper.collect(reuseJoinedRow.replace(input,
udtfResult));
}
- } while (!isFinishResult);
+ } while (!isFinishResult && resultTuple != null);
}
/** The received udtf execution result is a finish message when it is a
byte with value 0x00. */
diff --git
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
index 96bb3e3..78d1e28 100644
---
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
+++
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.calcite.rel.core.JoinRelType;
-import org.junit.Ignore;
import java.util.Collection;
import java.util.HashMap;
@@ -40,7 +39,6 @@ import java.util.HashMap;
import static org.apache.flink.table.runtime.util.StreamRecordUtils.row;
/** Tests for {@link PythonTableFunctionOperator}. */
-@Ignore
public class PythonTableFunctionOperatorTest
extends PythonTableFunctionOperatorTestBase<RowData, RowData> {