This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 47f0ad48679 [FLINK-27760][python] Fix the issue that NPE is thrown
when executing PyFlink Table API jobs in batch mode
47f0ad48679 is described below
commit 47f0ad48679561b452b14e7d6fe36f7abd53e33b
Author: Dian Fu <[email protected]>
AuthorDate: Wed May 25 20:44:57 2022 +0800
[FLINK-27760][python] Fix the issue that NPE is thrown when executing
PyFlink Table API jobs in batch mode
This closes #19816.
---
.../streaming/api/operators/python/AbstractPythonFunctionOperator.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index 98b85f6db9f..a316ed8e8db 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -258,7 +258,8 @@ public abstract class AbstractPythonFunctionOperator<OUT>
extends AbstractStream
private void processElementsOfCurrentKeyIfNeeded(Object newKey) {
// process all the elements belonging to the current key when
encountering a new key
// for batch operator
- if (inBatchExecutionMode(getKeyedStateBackend())
+ if (getKeyedStateStore() != null
+ && inBatchExecutionMode(getKeyedStateBackend())
&& !Objects.equals(newKey, getCurrentKey())) {
while (!isBundleFinished()) {
try {