This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 47f0ad48679 [FLINK-27760][python] Fix the issue that NPE is thrown 
when executing PyFlink Table API jobs in batch mode
47f0ad48679 is described below

commit 47f0ad48679561b452b14e7d6fe36f7abd53e33b
Author: Dian Fu <[email protected]>
AuthorDate: Wed May 25 20:44:57 2022 +0800

    [FLINK-27760][python] Fix the issue that NPE is thrown when executing 
PyFlink Table API jobs in batch mode
    
    This closes #19816.
---
 .../streaming/api/operators/python/AbstractPythonFunctionOperator.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index 98b85f6db9f..a316ed8e8db 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -258,7 +258,8 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
     private void processElementsOfCurrentKeyIfNeeded(Object newKey) {
         // process all the elements belonging to the current key when 
encountering a new key
         // for batch operator
-        if (inBatchExecutionMode(getKeyedStateBackend())
+        if (getKeyedStateStore() != null
+                && inBatchExecutionMode(getKeyedStateBackend())
                 && !Objects.equals(newKey, getCurrentKey())) {
             while (!isBundleFinished()) {
                 try {

Reply via email to