This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 952cc2b  [ZEPPELIN-5171]. Support flink command in flink interpreter
952cc2b is described below

commit 952cc2bfc3f80f95d486c789194ad35ad1d3d9ef
Author: Jeff Zhang <[email protected]>
AuthorDate: Tue Dec 22 20:13:54 2020 +0800

    [ZEPPELIN-5171]. Support flink command in flink interpreter
    
    ### What is this PR for?
    
    This PR is to support %flink.cmd to flink command so that user can run 
flink jar jobs in zeppelin. This is a sub interpreter of flink. Internally it 
runs shell interpreter for flink command.
    
    ### What type of PR is it?
    [Feature ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5171
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
![image](https://user-images.githubusercontent.com/164491/103163889-dbef5700-483e-11eb-9626-8712a86491dc.png)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <[email protected]>
    
    Closes #4006 from zjffdu/ZEPPELIN-5171 and squashes the following commits:
    
    ea8d4c0a7 [Jeff Zhang] [ZEPPELIN-5171]. Support flink command in flink 
interpreter
---
 flink/interpreter/pom.xml                          |   6 ++
 .../apache/zeppelin/flink/FlinkCmdInterpreter.java | 119 +++++++++++++++++++++
 .../src/main/resources/interpreter-setting.json    |  12 +++
 .../apache/zeppelin/shell/ShellInterpreter.java    |   2 +-
 .../zeppelin/integration/FlinkIntegrationTest.java |   4 +
 5 files changed, 142 insertions(+), 1 deletion(-)

diff --git a/flink/interpreter/pom.xml b/flink/interpreter/pom.xml
index 2807552..2770ab3 100644
--- a/flink/interpreter/pom.xml
+++ b/flink/interpreter/pom.xml
@@ -83,6 +83,12 @@
 
     <dependency>
       <groupId>org.apache.zeppelin</groupId>
+      <artifactId>zeppelin-shell</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
       <artifactId>zeppelin-python</artifactId>
       <version>${project.version}</version>
       <exclusions>
diff --git 
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkCmdInterpreter.java
 
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkCmdInterpreter.java
new file mode 100644
index 0000000..4558b80
--- /dev/null
+++ 
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkCmdInterpreter.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.flink;
+
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterOutputListener;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
+import org.apache.zeppelin.shell.ShellInterpreter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class FlinkCmdInterpreter extends ShellInterpreter {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FlinkCmdInterpreter.class);
+
+  private String flinkHome;
+
+  public FlinkCmdInterpreter(Properties property) {
+    super(property);
+    // Set time to be max integer so that the shell process won't timeout.
+    setProperty("shell.command.timeout.millisecs", Integer.MAX_VALUE + "");
+    this.flinkHome = properties.getProperty("FLINK_HOME");
+    LOGGER.info("FLINK_HOME: " + flinkHome);
+  }
+
+  @Override
+  public InterpreterResult internalInterpret(String cmd, InterpreterContext 
context) {
+    String flinkCommand = flinkHome + "/bin/flink " + cmd.trim();
+    LOGGER.info("Flink command: " + flinkCommand);
+    context.out.addInterpreterOutListener(new FlinkCmdOutputListener(context));
+    return super.internalInterpret(flinkCommand, context);
+  }
+
+  /**
+   * InterpreterOutputListener which extract flink link from logs.
+   */
+  private static class FlinkCmdOutputListener implements 
InterpreterOutputListener {
+
+    private InterpreterContext context;
+    private boolean isFlinkUrlSent = false;
+
+    public FlinkCmdOutputListener(InterpreterContext context) {
+      this.context = context;
+    }
+
+    @Override
+    public void onUpdateAll(InterpreterOutput out) {
+
+    }
+
+    @Override
+    public void onAppend(int index, InterpreterResultMessageOutput out, byte[] 
line) {
+      String text = new String(line);
+      if (isFlinkUrlSent) {
+        return;
+      }
+      if (text.contains("Submitted application")) {
+        // yarn mode, extract yarn proxy url as flink ui link
+        buildFlinkUIInfo(text, context);
+        isFlinkUrlSent = true;
+      }
+    }
+
+    private void buildFlinkUIInfo(String log, InterpreterContext context) {
+      int pos = log.lastIndexOf(" ");
+      if (pos != -1) {
+        String appId = log.substring(pos + 1);
+        try {
+          YarnClient yarnClient = YarnClient.createYarnClient();
+          yarnClient.init(new YarnConfiguration());
+          yarnClient.start();
+
+          ApplicationReport applicationReport = 
yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId));
+          Map<String, String> infos = new java.util.HashMap<String, String>();
+          infos.put("jobUrl", applicationReport.getTrackingUrl());
+          infos.put("label", "Flink UI");
+          infos.put("tooltip", "View in Flink web UI");
+          infos.put("noteId", context.getNoteId());
+          infos.put("paraId", context.getParagraphId());
+          context.getIntpEventClient().onParaInfosReceived(infos);
+        } catch (Exception e) {
+          LOGGER.error("Fail to extract flink url", e);
+        }
+      } else {
+        LOGGER.error("Unable to extract flink url from this log: " + log);
+      }
+    }
+
+    @Override
+    public void onUpdate(int index, InterpreterResultMessageOutput out) {
+
+    }
+  }
+}
diff --git a/flink/interpreter/src/main/resources/interpreter-setting.json 
b/flink/interpreter/src/main/resources/interpreter-setting.json
index 0a87ddf..3ce1920 100644
--- a/flink/interpreter/src/main/resources/interpreter-setting.json
+++ b/flink/interpreter/src/main/resources/interpreter-setting.json
@@ -276,5 +276,17 @@
       "completionKey": "TAB",
       "completionSupport": true
     }
+  },
+  {
+    "group": "flink",
+    "name": "cmd",
+    "className": "org.apache.zeppelin.flink.FlinkCmdInterpreter",
+    "properties": {
+    },
+    "editor": {
+      "language": "sh",
+      "editOnDblClick": false,
+      "completionSupport": false
+    }
   }
 ]
diff --git 
a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java 
b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
index c6f6496..7cfcd19 100644
--- a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
+++ b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
@@ -202,7 +202,7 @@ public class ShellInterpreter extends KerberosInterpreter {
 
   @Override
   protected boolean isKerboseEnabled() {
-    if (!StringUtils.isAnyEmpty(getProperty("zeppelin.shell.auth.type")) && 
getProperty(
+    if (StringUtils.isNotBlank(getProperty("zeppelin.shell.auth.type")) && 
getProperty(
         "zeppelin.shell.auth.type").equalsIgnoreCase("kerberos")) {
       return true;
     }
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
index 44f0f2c..68ee2fa 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
@@ -111,6 +111,10 @@ public abstract class FlinkIntegrationTest {
     InterpreterSetting flinkInterpreterSetting = 
interpreterSettingManager.getByName("flink");
     assertEquals(1, flinkInterpreterSetting.getAllInterpreterGroups().size());
     
assertNotNull(flinkInterpreterSetting.getAllInterpreterGroups().get(0).getWebUrl());
+
+    Interpreter flinkShellInterpreter = 
interpreterFactory.getInterpreter("flink.cmd", new 
ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("flink").createExecutionContext());
+    interpreterResult = flinkShellInterpreter.interpret("info -c 
org.apache.flink.streaming.examples.wordcount.WordCount " + flinkHome + 
"/examples/streaming/WordCount.jar", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
   }
 
   @Test

Reply via email to