This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 6490cd52825 [FLINK-27760][python] Fix the issue that NPE is thrown
when executing PyFlink Table API jobs in batch mode
6490cd52825 is described below
commit 6490cd52825eeeeaf4325e5bfb902d9dbd604f56
Author: Dian Fu <[email protected]>
AuthorDate: Wed May 25 20:44:57 2022 +0800
[FLINK-27760][python] Fix the issue that NPE is thrown when executing
PyFlink Table API jobs in batch mode
This closes #19816.
---
.../streaming/api/operators/python/AbstractPythonFunctionOperator.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index 5324df04f8a..1b53b80998c 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -220,7 +220,8 @@ public abstract class AbstractPythonFunctionOperator<OUT>
extends AbstractStream
private void processElementsOfCurrentKeyIfNeeded(Object newKey) {
// process all the elements belonging to the current key when
encountering a new key
// for batch operator
- if (inBatchExecutionMode(getKeyedStateBackend())
+ if (getKeyedStateStore() != null
+ && inBatchExecutionMode(getKeyedStateBackend())
&& !Objects.equals(newKey, getCurrentKey())) {
while (!isBundleFinished()) {
try {