This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 6490cd52825 [FLINK-27760][python] Fix the issue that NPE is thrown 
when executing PyFlink Table API jobs in batch mode
6490cd52825 is described below

commit 6490cd52825eeeeaf4325e5bfb902d9dbd604f56
Author: Dian Fu <[email protected]>
AuthorDate: Wed May 25 20:44:57 2022 +0800

    [FLINK-27760][python] Fix the issue that NPE is thrown when executing 
PyFlink Table API jobs in batch mode
    
    This closes #19816.
---
 .../streaming/api/operators/python/AbstractPythonFunctionOperator.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index 5324df04f8a..1b53b80998c 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -220,7 +220,8 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
     private void processElementsOfCurrentKeyIfNeeded(Object newKey) {
         // process all the elements belonging to the current key when 
encountering a new key
         // for batch operator
-        if (inBatchExecutionMode(getKeyedStateBackend())
+        if (getKeyedStateStore() != null
+                && inBatchExecutionMode(getKeyedStateBackend())
                 && !Objects.equals(newKey, getCurrentKey())) {
             while (!isBundleFinished()) {
                 try {

Reply via email to