This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new e51f3eb [ZEPPELIN-5020]. Unable create temporary view in flink
e51f3eb is described below
commit e51f3ebf7c491663247ed31a2bf84861e1174bbf
Author: Jeff Zhang <[email protected]>
AuthorDate: Thu Aug 27 19:28:26 2020 +0800
[ZEPPELIN-5020]. Unable create temporary view in flink
### What is this PR for?
This is due to api change in flink 1.11, this PR fix this issue and also
add new test for this case.
### What type of PR is it?
[Bug Fix]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5020
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <[email protected]>
Closes #3894 from zjffdu/ZEPPELIN-5020 and squashes the following commits:
e9551bfee [Jeff Zhang] [ZEPPELIN-5020]. Unable create temporary view in
flink
---
.../java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java | 10 +++++++---
.../java/org/apache/zeppelin/flink/SqlInterpreterTest.java | 11 +++++++++++
2 files changed, 18 insertions(+), 3 deletions(-)
diff --git
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
index 81a0bce..2e1fe71 100644
---
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
+++
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
@@ -264,7 +264,7 @@ public abstract class FlinkSqlInterrpeter extends
Interpreter {
callDropTable(cmdCall.operands[0], context);
break;
case CREATE_VIEW:
- callCreateView(cmdCall.operands[0], cmdCall.operands[1], context);
+ callCreateView(cmdCall, context);
break;
case DROP_VIEW:
callDropView(cmdCall.operands[0], context);
@@ -337,10 +337,14 @@ public abstract class FlinkSqlInterrpeter extends
Interpreter {
context.out.write("View has been dropped.\n");
}
- private void callCreateView(String name, String query, InterpreterContext
context) throws IOException {
+ private void callCreateView(SqlCommandParser.SqlCommandCall sqlCommand,
InterpreterContext context) throws IOException {
try {
lock.lock();
- this.tbenv.createTemporaryView(name, tbenv.sqlQuery(query));
+ if (flinkInterpreter.getFlinkVersion().isFlink110()) {
+ this.tbenv.createTemporaryView(sqlCommand.operands[0],
tbenv.sqlQuery(sqlCommand.operands[1]));
+ } else {
+ flinkInterpreter.getFlinkShims().executeSql(tbenv, sqlCommand.sql);
+ }
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
diff --git
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
index 0b621ce..c978533 100644
---
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
+++
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
@@ -352,6 +352,17 @@ public abstract class SqlInterpreterTest {
resultMessages = context.out.toInterpreterResultMessage();
assertEquals(1, resultMessages.size());
assertEquals("View has been dropped.\n", resultMessages.get(0).getData());
+
+ // create temporary view
+ if (!flinkInterpreter.getFlinkVersion().isFlink110()) {
+ context = getInterpreterContext();
+ result = sqlInterpreter.interpret("create temporary view my_temp_view as
select int_col from source_table", context);
+ assertEquals(result.toString(), Code.SUCCESS, result.code());
+ resultMessages = context.out.toInterpreterResultMessage();
+ assertEquals(1, resultMessages.size());
+ assertEquals(Type.TEXT, resultMessages.get(0).getType());
+ assertEquals("View has been created.\n",
resultMessages.get(0).getData());
+ }
}
@Test