This is an automated email from the ASF dual-hosted git repository.
hxb pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new 4380185 [FLINK-23765][python] Fix the NPE in Python UDTF
4380185 is described below
commit 4380185253863dbcd4fef7ffde712e64d7316647
Author: huangxingbo <[email protected]>
AuthorDate: Thu Aug 19 11:31:52 2021 +0800
[FLINK-23765][python] Fix the NPE in Python UDTF
This closes #16890.
---
.../python/table/PythonTableFunctionOperator.java | 21 +++++++++++++++++----
.../table/RowDataPythonTableFunctionOperator.java | 21 +++++++++++++++++----
2 files changed, 34 insertions(+), 8 deletions(-)
diff --git
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
index 44f643c..454b029 100644
---
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
@@ -54,6 +54,15 @@ public class PythonTableFunctionOperator
/** The TypeSerializer for udtf input elements. */
private transient TypeSerializer<Row> udtfInputTypeSerializer;
+ /** The current input element which has not been received all python udtf
results. */
+ private transient CRow input;
+
+ /** Whether the current input element has joined parts of python udtf
results. */
+ private transient boolean hasJoined;
+
+ /** Whether the current received data is the finished result of the
current input element. */
+ private transient boolean isFinishResult;
+
public PythonTableFunctionOperator(
Configuration config,
PythonFunctionInfo tableFunction,
@@ -79,17 +88,21 @@ public class PythonTableFunctionOperator
PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionOutputType);
udtfInputTypeSerializer =
PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionInputType);
+ input = null;
+ hasJoined = false;
+ isFinishResult = true;
}
@Override
@SuppressWarnings("ConstantConditions")
public void emitResult(Tuple2<byte[], Integer> resultTuple) throws
Exception {
- CRow input = forwardedInputQueue.poll();
byte[] rawUdtfResult;
int length;
- boolean isFinishResult;
- boolean hasJoined = false;
Row udtfResult;
+ if (isFinishResult) {
+ input = forwardedInputQueue.poll();
+ hasJoined = false;
+ }
do {
rawUdtfResult = resultTuple.f0;
length = resultTuple.f1;
@@ -109,7 +122,7 @@ public class PythonTableFunctionOperator
cRowWrapper.setChange(input.change());
cRowWrapper.collect(Row.join(input.row(), udtfResult));
}
- } while (!isFinishResult);
+ } while (!isFinishResult && resultTuple != null);
}
@Override
diff --git
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/RowDataPythonTableFunctionOperator.java
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/RowDataPythonTableFunctionOperator.java
index b61e6ad..a007f99 100644
---
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/RowDataPythonTableFunctionOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/RowDataPythonTableFunctionOperator.java
@@ -65,6 +65,15 @@ public class RowDataPythonTableFunctionOperator
/** The type serializer for the forwarded fields. */
private transient RowDataSerializer forwardedInputSerializer;
+ /** The current input element which has not been received all python udtf
results. */
+ private transient RowData input;
+
+ /** Whether the current input element has joined parts of python udtf
results. */
+ private transient boolean hasJoined;
+
+ /** Whether the current received data is the finished result of the
current input element. */
+ private transient boolean isFinishResult;
+
public RowDataPythonTableFunctionOperator(
Configuration config,
PythonFunctionInfo tableFunction,
@@ -88,6 +97,9 @@ public class RowDataPythonTableFunctionOperator
PythonTypeUtils.toBlinkTypeSerializer(userDefinedFunctionInputType);
udtfOutputTypeSerializer =
PythonTypeUtils.toBlinkTypeSerializer(userDefinedFunctionOutputType);
+ input = null;
+ hasJoined = false;
+ isFinishResult = true;
}
@Override
@@ -125,11 +137,12 @@ public class RowDataPythonTableFunctionOperator
@Override
@SuppressWarnings("ConstantConditions")
public void emitResult(Tuple2<byte[], Integer> resultTuple) throws
Exception {
- RowData input = forwardedInputQueue.poll();
byte[] rawUdtfResult;
int length;
- boolean isFinishResult;
- boolean hasJoined = false;
+ if (isFinishResult) {
+ input = forwardedInputQueue.poll();
+ hasJoined = false;
+ }
do {
rawUdtfResult = resultTuple.f0;
length = resultTuple.f1;
@@ -149,6 +162,6 @@ public class RowDataPythonTableFunctionOperator
}
rowDataWrapper.collect(reuseJoinedRow.replace(input,
udtfResult));
}
- } while (!isFinishResult);
+ } while (!isFinishResult && resultTuple != null);
}
}