This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new e51f3eb  [ZEPPELIN-5020]. Unable create temporary view in flink
e51f3eb is described below

commit e51f3ebf7c491663247ed31a2bf84861e1174bbf
Author: Jeff Zhang <[email protected]>
AuthorDate: Thu Aug 27 19:28:26 2020 +0800

    [ZEPPELIN-5020]. Unable create temporary view in flink
    
    ### What is this PR for?
    
    This is due to api change in flink 1.11, this PR fix this issue and also 
add new test for this case.
    
    ### What type of PR is it?
    [Bug Fix]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5020
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <[email protected]>
    
    Closes #3894 from zjffdu/ZEPPELIN-5020 and squashes the following commits:
    
    e9551bfee [Jeff Zhang] [ZEPPELIN-5020]. Unable create temporary view in 
flink
---
 .../java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java   | 10 +++++++---
 .../java/org/apache/zeppelin/flink/SqlInterpreterTest.java    | 11 +++++++++++
 2 files changed, 18 insertions(+), 3 deletions(-)

diff --git 
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
 
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
index 81a0bce..2e1fe71 100644
--- 
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
+++ 
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
@@ -264,7 +264,7 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
         callDropTable(cmdCall.operands[0], context);
         break;
       case CREATE_VIEW:
-        callCreateView(cmdCall.operands[0], cmdCall.operands[1], context);
+        callCreateView(cmdCall, context);
         break;
       case DROP_VIEW:
         callDropView(cmdCall.operands[0], context);
@@ -337,10 +337,14 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
     context.out.write("View has been dropped.\n");
   }
 
-  private void callCreateView(String name, String query, InterpreterContext 
context) throws IOException {
+  private void callCreateView(SqlCommandParser.SqlCommandCall sqlCommand, 
InterpreterContext context) throws IOException {
     try {
       lock.lock();
-      this.tbenv.createTemporaryView(name, tbenv.sqlQuery(query));
+      if (flinkInterpreter.getFlinkVersion().isFlink110()) {
+        this.tbenv.createTemporaryView(sqlCommand.operands[0], 
tbenv.sqlQuery(sqlCommand.operands[1]));
+      } else {
+        flinkInterpreter.getFlinkShims().executeSql(tbenv, sqlCommand.sql);
+      }
     } finally {
       if (lock.isHeldByCurrentThread()) {
         lock.unlock();
diff --git 
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
 
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
index 0b621ce..c978533 100644
--- 
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
+++ 
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
@@ -352,6 +352,17 @@ public abstract class SqlInterpreterTest {
     resultMessages = context.out.toInterpreterResultMessage();
     assertEquals(1, resultMessages.size());
     assertEquals("View has been dropped.\n", resultMessages.get(0).getData());
+
+    // create temporary view
+    if (!flinkInterpreter.getFlinkVersion().isFlink110()) {
+      context = getInterpreterContext();
+      result = sqlInterpreter.interpret("create temporary view my_temp_view as 
select int_col from source_table", context);
+      assertEquals(result.toString(), Code.SUCCESS, result.code());
+      resultMessages = context.out.toInterpreterResultMessage();
+      assertEquals(1, resultMessages.size());
+      assertEquals(Type.TEXT, resultMessages.get(0).getType());
+      assertEquals("View has been created.\n", 
resultMessages.get(0).getData());
+    }
   }
 
   @Test

Reply via email to