This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new c063eba [ZEPPELIN-5204]. NPE when runAsOne is true in flink
interpreter
c063eba is described below
commit c063ebadeab0ec4897b57bc88bc9232a19a84905
Author: Jeff Zhang <[email protected]>
AuthorDate: Mon Jan 18 12:56:23 2021 +0800
[ZEPPELIN-5204]. NPE when runAsOne is true in flink interpreter
### What is this PR for?
This PR is to fix the NPE when `runAsOne` is true in flink interpreter. The
root cause is that runAsOne should only take effect when there's at least one
insert statement.
### What type of PR is it?
[Bug Fix]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5204
### How should this be tested?
* Test is added
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <[email protected]>
Closes #4039 from zjffdu/ZEPPELIN-5204 and squashes the following commits:
3138cf5e6 [Jeff Zhang] [ZEPPELIN-5204]. NPE when runAsOne is true in flink
interpreter
---
.../main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java | 4 +++-
.../org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java | 9 +++++++++
2 files changed, 12 insertions(+), 1 deletion(-)
diff --git
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
index e7f6a3b..16f2187 100644
---
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
+++
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
@@ -129,6 +129,7 @@ public abstract class FlinkSqlInterrpeter extends
AbstractInterpreter {
boolean runAsOne =
Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
List<String> sqls =
sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList());
boolean isFirstInsert = true;
+ boolean hasInsert = false;
for (String sql : sqls) {
Optional<SqlCommandParser.SqlCommandCall> sqlCommand =
sqlCommandParser.parse(sql);
if (!sqlCommand.isPresent()) {
@@ -143,6 +144,7 @@ public abstract class FlinkSqlInterrpeter extends
AbstractInterpreter {
try {
if (sqlCommand.get().command == SqlCommand.INSERT_INTO ||
sqlCommand.get().command == SqlCommand.INSERT_OVERWRITE) {
+ hasInsert = true;
if (isFirstInsert && runAsOne) {
flinkInterpreter.getFlinkShims().startMultipleInsert(tbenv,
context);
isFirstInsert = false;
@@ -164,7 +166,7 @@ public abstract class FlinkSqlInterrpeter extends
AbstractInterpreter {
}
}
- if (runAsOne) {
+ if (runAsOne && hasInsert) {
try {
lock.lock();
String jobName = context.getStringLocalProperty("jobName", st);
diff --git
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
index d27f422..fc7c206 100644
---
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
+++
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
@@ -443,6 +443,15 @@ public class FlinkStreamSqlInterpreterTest extends
SqlInterpreterTest {
context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // runAsOne won't affect the select statement.
+ context = getInterpreterContext();
+ context.getLocalProperties().put("runAsOne", "true");
+ context.getLocalProperties().put("type", "update");
+ result = sqlInterpreter.interpret(
+ "select 1",
+ context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
}
@Test