This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 24eaac0  [ZEPPELIN-5204]. NPE when runAsOne is true in flink 
interpreter
24eaac0 is described below

commit 24eaac0ea55696890c70d21befcab86ab6413bc1
Author: Jeff Zhang <[email protected]>
AuthorDate: Mon Jan 18 12:56:23 2021 +0800

    [ZEPPELIN-5204]. NPE when runAsOne is true in flink interpreter
    
    ### What is this PR for?
    
    This PR is to fix the NPE when `runAsOne` is true in flink interpreter. The 
root cause is that runAsOne should only take effect when there's at least one 
insert statement.
    
    ### What type of PR is it?
    [Bug Fix]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5204
    
    ### How should this be tested?
    * Test is added
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <[email protected]>
    
    Closes #4039 from zjffdu/ZEPPELIN-5204 and squashes the following commits:
    
    3138cf5e6 [Jeff Zhang] [ZEPPELIN-5204]. NPE when runAsOne is true in flink 
interpreter
    
    (cherry picked from commit c063ebadeab0ec4897b57bc88bc9232a19a84905)
    Signed-off-by: Jeff Zhang <[email protected]>
---
 .../main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java | 4 +++-
 .../org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java | 9 +++++++++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
 
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
index e7f6a3b..16f2187 100644
--- 
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
+++ 
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
@@ -129,6 +129,7 @@ public abstract class FlinkSqlInterrpeter extends 
AbstractInterpreter {
       boolean runAsOne = 
Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
       List<String> sqls = 
sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList());
       boolean isFirstInsert = true;
+      boolean hasInsert = false;
       for (String sql : sqls) {
         Optional<SqlCommandParser.SqlCommandCall> sqlCommand = 
sqlCommandParser.parse(sql);
         if (!sqlCommand.isPresent()) {
@@ -143,6 +144,7 @@ public abstract class FlinkSqlInterrpeter extends 
AbstractInterpreter {
         try {
           if (sqlCommand.get().command == SqlCommand.INSERT_INTO ||
                   sqlCommand.get().command == SqlCommand.INSERT_OVERWRITE) {
+            hasInsert = true;
             if (isFirstInsert && runAsOne) {
               flinkInterpreter.getFlinkShims().startMultipleInsert(tbenv, 
context);
               isFirstInsert = false;
@@ -164,7 +166,7 @@ public abstract class FlinkSqlInterrpeter extends 
AbstractInterpreter {
         }
       }
 
-      if (runAsOne) {
+      if (runAsOne && hasInsert) {
         try {
           lock.lock();
           String jobName = context.getStringLocalProperty("jobName", st);
diff --git 
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
 
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
index d27f422..fc7c206 100644
--- 
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
+++ 
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
@@ -443,6 +443,15 @@ public class FlinkStreamSqlInterpreterTest extends 
SqlInterpreterTest {
             context);
 
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // runAsOne won't affect the select statement.
+    context = getInterpreterContext();
+    context.getLocalProperties().put("runAsOne", "true");
+    context.getLocalProperties().put("type", "update");
+    result = sqlInterpreter.interpret(
+            "select 1",
+            context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
   }
 
   @Test

Reply via email to