Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master 0868dcbd9 -> c46d8a010


ZEPPELIN-145 Upgrade Flink version to 0.9.0

https://issues.apache.org/jira/browse/ZEPPELIN-145

Upgrade flink version to 0.9.0

Author: Lee moon soo <[email protected]>

Closes #142 from Leemoonsoo/ZEPPELIN-145 and squashes the following commits:

c60673c [Lee moon soo] Fix test
9097e04 [Lee moon soo] Change code to use FlinkILoop that is introduced in 0.9.0


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/c46d8a01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/c46d8a01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/c46d8a01

Branch: refs/heads/master
Commit: c46d8a01030e7d1ca41320cf4467a706f46167e5
Parents: 0868dcb
Author: Lee moon soo <[email protected]>
Authored: Wed Jul 8 13:38:55 2015 -0700
Committer: Lee moon soo <[email protected]>
Committed: Sat Jul 11 00:44:21 2015 -0700

----------------------------------------------------------------------
 flink/pom.xml                                   |  23 +-
 .../apache/zeppelin/flink/FlinkEnvironment.java |  83 -------
 .../org/apache/zeppelin/flink/FlinkIMain.java   |  92 --------
 .../apache/zeppelin/flink/FlinkInterpreter.java | 138 ++++++------
 .../org/apache/zeppelin/flink/JarHelper.java    | 219 -------------------
 .../zeppelin/flink/FlinkInterpreterTest.java    |   3 +-
 6 files changed, 82 insertions(+), 476 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c46d8a01/flink/pom.xml
----------------------------------------------------------------------
diff --git a/flink/pom.xml b/flink/pom.xml
index bfdc29b..b6c790b 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -34,28 +34,13 @@
   <url>http://zeppelin.incubator.apache.org</url>
 
   <properties>
-    <flink.version>0.9.0-milestone-1</flink.version>
+    <flink.version>0.9.0</flink.version>
     <flink.akka.version>2.3.7</flink.akka.version>
     <flink.scala.binary.version>2.10</flink.scala.binary.version>
     <flink.scala.version>2.10.4</flink.scala.version>
     <scala.macros.version>2.0.1</scala.macros.version>
   </properties>
 
-  <repositories>
-    <!-- for flink 0.9-SNAPSHOT. After 0.9 released, it can be removed -->
-    <repository>
-      <id>apache.snapshots</id>
-      <name>Apache Development Snapshot Repository</name>
-      <url>https://repository.apache.org/content/repositories/snapshots/</url>
-      <releases>
-        <enabled>false</enabled>
-      </releases>
-      <snapshots>
-        <enabled>true</enabled>
-      </snapshots>
-    </repository>
-  </repositories>
-
   <dependencies>
     <dependency>
       <groupId>org.slf4j</groupId>
@@ -104,6 +89,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-scala-shell</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>com.typesafe.akka</groupId>
       <artifactId>akka-actor_${flink.scala.binary.version}</artifactId>
       <version>${flink.akka.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c46d8a01/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java 
b/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java
deleted file mode 100644
index 629932b..0000000
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.flink;
-
-import java.io.File;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.PlanExecutor;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.translation.JavaPlan;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The class override execute() method to create an PlanExecutor with
- * jar file that packages classes from scala compiler.
- */
-public class FlinkEnvironment extends ExecutionEnvironment {
-  Logger logger = LoggerFactory.getLogger(FlinkEnvironment.class);
-
-  private String host;
-  private int port;
-
-  private FlinkIMain imain;
-
-  public FlinkEnvironment(String host, int port, FlinkIMain imain) {
-    this.host = host;
-    this.port = port;
-    this.imain = imain;
-
-    logger.info("jobManager host={}, port={}", host, port);
-  }
-
-  @Override
-  public JobExecutionResult execute(String jobName) throws Exception {
-    JavaPlan plan = createProgramPlan(jobName);
-
-    File jarFile = imain.jar();
-    PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port,
-        jarFile.getAbsolutePath());
-
-    JobExecutionResult result = executor.executePlan(plan);
-
-    if (jarFile.isFile()) {
-      jarFile.delete();
-    }
-
-    return result;
-  }
-
-  @Override
-  public String getExecutionPlan() throws Exception {
-    JavaPlan plan = createProgramPlan("unnamed", false);
-    plan.setDefaultParallelism(getParallelism());
-    registerCachedFilesWithPlan(plan);
-
-    File jarFile = imain.jar();
-    PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port,
-        jarFile.getAbsolutePath());
-    String jsonPlan = executor.getOptimizerPlanAsJSON(plan);
-
-    if (jarFile != null && jarFile.isFile()) {
-      jarFile.delete();
-    }
-
-    return jsonPlan;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c46d8a01/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java 
b/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java
deleted file mode 100644
index ee6516c..0000000
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.flink;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PrintWriter;
-
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.collection.Iterator;
-import scala.reflect.io.AbstractFile;
-import scala.reflect.io.VirtualDirectory;
-import scala.tools.nsc.Settings;
-import scala.tools.nsc.interpreter.IMain;
-
-/**
- * Scala compiler
- */
-public class FlinkIMain extends IMain {
-  Logger logger = LoggerFactory.getLogger(FlinkIMain.class);
-
-  public FlinkIMain(Settings setting, PrintWriter out) {
-    super(setting, out);
-  }
-
-  public File jar() throws IOException {
-    VirtualDirectory classDir = virtualDirectory();
-    // create execution environment
-    File jarBuildDir = new File(System.getProperty("java.io.tmpdir")
-        + "/ZeppelinFlinkJarBiuldDir_" + System.currentTimeMillis());
-    jarBuildDir.mkdirs();
-
-    File jarFile = new File(System.getProperty("java.io.tmpdir")
-        + "/ZeppelinFlinkJarFile_" + System.currentTimeMillis() + ".jar");
-
-
-    Iterator<AbstractFile> vdIt = classDir.iterator();
-    while (vdIt.hasNext()) {
-      AbstractFile fi = vdIt.next();
-      if (fi.isDirectory()) {
-        Iterator<AbstractFile> fiIt = fi.iterator();
-        while (fiIt.hasNext()) {
-          AbstractFile f = fiIt.next();
-
-          // directory for compiled line
-          File lineDir = new File(jarBuildDir.getAbsolutePath(), fi.name());
-          lineDir.mkdirs();
-
-          // compiled classes for commands from shell
-          File writeFile = new File(lineDir.getAbsolutePath(), f.name());
-          FileOutputStream outputStream = new FileOutputStream(writeFile);
-          InputStream inputStream = f.input();
-
-          // copy file contents
-          org.apache.commons.io.IOUtils.copy(inputStream, outputStream);
-
-          inputStream.close();
-          outputStream.close();
-        }
-      }
-    }
-
-    // jar up
-    JarHelper jh = new JarHelper();
-    jh.jarDir(jarBuildDir, jarFile);
-
-    FileUtils.deleteDirectory(jarBuildDir);
-    return jarFile;
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c46d8a01/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java 
b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index b342f4e..c516b9d 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.zeppelin.flink;
 
+import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.PrintWriter;
@@ -28,6 +29,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.scala.FlinkILoop;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.zeppelin.interpreter.Interpreter;
@@ -41,8 +43,10 @@ import org.slf4j.LoggerFactory;
 
 import scala.Console;
 import scala.None;
+import scala.Option;
 import scala.Some;
 import scala.tools.nsc.Settings;
+import scala.tools.nsc.interpreter.IMain;
 import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
 import scala.tools.nsc.settings.MutableSettings.PathSetting;
 
@@ -51,13 +55,12 @@ import scala.tools.nsc.settings.MutableSettings.PathSetting;
  */
 public class FlinkInterpreter extends Interpreter {
   Logger logger = LoggerFactory.getLogger(FlinkInterpreter.class);
-  private Settings settings;
   private ByteArrayOutputStream out;
-  private FlinkIMain imain;
-  private Map<String, Object> binder;
-  private ExecutionEnvironment env;
   private Configuration flinkConf;
   private LocalFlinkMiniCluster localFlinkCluster;
+  private FlinkILoop flinkIloop;
+  private Map<String, Object> binder;
+  private IMain imain;
 
   public FlinkInterpreter(Properties property) {
     super(property);
@@ -69,17 +72,72 @@ public class FlinkInterpreter extends Interpreter {
         "flink",
         FlinkInterpreter.class.getName(),
         new InterpreterPropertyBuilder()
-          .add("local", "true", "Run flink locally")
-          .add("jobmanager.rpc.address", "localhost", "Flink cluster")
-          .add("jobmanager.rpc.port", "6123", "Flink cluster")
+                .add("host", "local",
+                    "host name of running JobManager. 'local' runs flink in 
local mode")
+          .add("port", "6123", "port of running JobManager")
           .build()
     );
   }
 
   @Override
   public void open() {
+    out = new ByteArrayOutputStream();
+    flinkConf = new org.apache.flink.configuration.Configuration();
+    Properties intpProperty = getProperty();
+    for (Object k : intpProperty.keySet()) {
+      String key = (String) k;
+      String val = toString(intpProperty.get(key));
+      flinkConf.setString(key, val);
+    }
+
+    if (localMode()) {
+      startFlinkMiniCluster();
+    }
+
+    flinkIloop = new FlinkILoop(getHost(), getPort(), (BufferedReader) null, 
new PrintWriter(out));
+    flinkIloop.settings_$eq(createSettings());
+    flinkIloop.createInterpreter();
+    
+    imain = flinkIloop.intp();
+
+    // prepare bindings
+    imain.interpret("@transient var _binder = new java.util.HashMap[String, 
Object]()");
+    binder = (Map<String, Object>) getValue("_binder");    
+
+    // import libraries
+    imain.interpret("import scala.tools.nsc.io._");
+    imain.interpret("import Properties.userHome");
+    imain.interpret("import scala.compat.Platform.EOL");
+    
+    imain.interpret("import org.apache.flink.api.scala._");
+    imain.interpret("import org.apache.flink.api.common.functions._");
+    imain.bindValue("env", flinkIloop.scalaEnv());
+  }
+
+  private boolean localMode() {
+    String host = getProperty("host");
+    return host == null || host.trim().length() == 0 || 
host.trim().equals("local");
+  }
+
+  private String getHost() {
+    if (localMode()) {
+      return "localhost";
+    } else {
+      return getProperty("host");
+    }
+  }
+
+  private int getPort() {
+    if (localMode()) {
+      return localFlinkCluster.getJobManagerRPCPort();
+    } else {
+      return Integer.parseInt(getProperty("port"));
+    }
+  }
+
+  private Settings createSettings() {
     URL[] urls = getClassloaderUrls();
-    this.settings = new Settings();
+    Settings settings = new Settings();
 
     // set classpath
     PathSetting pathSettings = settings.classpath();
@@ -108,61 +166,10 @@ public class FlinkInterpreter extends Interpreter {
     BooleanSetting b = (BooleanSetting) settings.usejavacp();
     b.v_$eq(true);
     
settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
-
-    out = new ByteArrayOutputStream();
-    imain = new FlinkIMain(settings, new PrintWriter(out));
-
-    initializeFlinkEnv();
-  }
-
-  private boolean localMode() {
-    return Boolean.parseBoolean(getProperty("local"));
-  }
-
-  private String getRpcAddress() {
-    if (localMode()) {
-      return "localhost";
-    } else {
-      return getProperty("jobmanager.rpc.address");
-    }
-  }
-
-  private int getRpcPort() {
-    if (localMode()) {
-      return localFlinkCluster.getJobManagerRPCPort();
-    } else {
-      return Integer.parseInt(getProperty("jobmanager.rpc.port"));
-    }
-  }
-
-  private void initializeFlinkEnv() {
-    // prepare bindings
-    imain.interpret("@transient var _binder = new java.util.HashMap[String, 
Object]()");
-    binder = (Map<String, Object>) getValue("_binder");
-
-    flinkConf = new org.apache.flink.configuration.Configuration();
-    Properties intpProperty = getProperty();
-    for (Object k : intpProperty.keySet()) {
-      String key = (String) k;
-      String val = toString(intpProperty.get(key));
-      flinkConf.setString(key, val);
-    }
-
-    if (localMode()) {
-      startFlinkMiniCluster();
-    }
-
-    env = new FlinkEnvironment(getRpcAddress(), getRpcPort(), imain);
-    binder.put("env", new 
org.apache.flink.api.scala.ExecutionEnvironment(env));
-
-    // do import and create val
-    imain.interpret("@transient val env = "
-        + "_binder.get(\"env\")"
-        + ".asInstanceOf[org.apache.flink.api.scala.ExecutionEnvironment]");
-
-    imain.interpret("import org.apache.flink.api.scala._");
+    
+    return settings;
   }
-
+  
 
   private List<File> currentClassPath() {
     List<File> paths = 
classPath(Thread.currentThread().getContextClassLoader());
@@ -194,6 +201,7 @@ public class FlinkInterpreter extends Interpreter {
   }
 
   public Object getValue(String name) {
+    IMain imain = flinkIloop.intp();
     Object ret = imain.valueOfTerm(name);
     if (ret instanceof None) {
       return null;
@@ -206,7 +214,7 @@ public class FlinkInterpreter extends Interpreter {
 
   @Override
   public void close() {
-    imain.close();
+    flinkIloop.closeInterpreter();
 
     if (localMode()) {
       stopFlinkMiniCluster();
@@ -224,6 +232,8 @@ public class FlinkInterpreter extends Interpreter {
   }
 
   public InterpreterResult interpret(String[] lines, InterpreterContext 
context) {
+    IMain imain = flinkIloop.intp();
+    
     String[] linesToRun = new String[lines.length + 1];
     for (int i = 0; i < lines.length; i++) {
       linesToRun[i] = lines[i];
@@ -300,7 +310,7 @@ public class FlinkInterpreter extends Interpreter {
 
   private void stopFlinkMiniCluster() {
     if (localFlinkCluster != null) {
-      localFlinkCluster.shutdown();
+      localFlinkCluster.stop();
       localFlinkCluster = null;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c46d8a01/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java 
b/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java
deleted file mode 100644
index efc4951..0000000
--- a/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.flink;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.jar.JarEntry;
-import java.util.jar.JarInputStream;
-import java.util.jar.JarOutputStream;
-
-/**
- * This class copied from flink-scala-shell. Once the flink-0.9 is published in
- * the maven repository, this class can be removed
- *
- * Provides utility services for jarring and unjarring files and directories.
- * Note that a given instance of JarHelper is not threadsafe with respect to
- * multiple jar operations.
- *
- * Copied from
- * http://grepcode.com/file_/repo1.maven.org/maven2/org.apache.xmlbeans
- * /xmlbeans/2.4.0/org/apache/xmlbeans/impl/common/JarHelper.java/?v=source
- *
- * @author Patrick Calahan <[email protected]>
- */
-public class JarHelper {
-  // ========================================================================
-  // Constants
-
-  private static final int BUFFER_SIZE = 2156;
-
-  // ========================================================================
-  // Variables
-
-  private byte[] mBuffer = new byte[BUFFER_SIZE];
-  private int mByteCount = 0;
-  private boolean mVerbose = false;
-  private String mDestJarName = "";
-
-  // ========================================================================
-  // Constructor
-
-  /**
-   * Instantiates a new JarHelper.
-   */
-  public JarHelper() {
-  }
-
-  // ========================================================================
-  // Public methods
-
-  /**
-   * Jars a given directory or single file into a JarOutputStream.
-   */
-  public void jarDir(File dirOrFile2Jar, File destJar) throws IOException {
-
-    if (dirOrFile2Jar == null || destJar == null) {
-      throw new IllegalArgumentException();
-    }
-
-    mDestJarName = destJar.getCanonicalPath();
-    FileOutputStream fout = new FileOutputStream(destJar);
-    JarOutputStream jout = new JarOutputStream(fout);
-    // jout.setLevel(0);
-    try {
-      jarDir(dirOrFile2Jar, jout, null);
-    } catch (IOException ioe) {
-      throw ioe;
-    } finally {
-      jout.close();
-      fout.close();
-    }
-  }
-
-  /**
-   * Unjars a given jar file into a given directory.
-   */
-  public void unjarDir(File jarFile, File destDir) throws IOException {
-    BufferedOutputStream dest = null;
-    FileInputStream fis = new FileInputStream(jarFile);
-    unjar(fis, destDir);
-  }
-
-  /**
-   * Given an InputStream on a jar file, unjars the contents into the given
-   * directory.
-   */
-  public void unjar(InputStream in, File destDir) throws IOException {
-    BufferedOutputStream dest = null;
-    JarInputStream jis = new JarInputStream(in);
-    JarEntry entry;
-    while ((entry = jis.getNextJarEntry()) != null) {
-      if (entry.isDirectory()) {
-        File dir = new File(destDir, entry.getName());
-        dir.mkdir();
-        if (entry.getTime() != -1) {
-          dir.setLastModified(entry.getTime());
-        }
-        continue;
-      }
-      int count;
-      byte[] data = new byte[BUFFER_SIZE];
-      File destFile = new File(destDir, entry.getName());
-      if (mVerbose) {
-        System.out
-            .println("unjarring " + destFile + " from " + entry.getName());
-      }
-      FileOutputStream fos = new FileOutputStream(destFile);
-      dest = new BufferedOutputStream(fos, BUFFER_SIZE);
-      while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) {
-        dest.write(data, 0, count);
-      }
-      dest.flush();
-      dest.close();
-      if (entry.getTime() != -1) {
-        destFile.setLastModified(entry.getTime());
-      }
-    }
-    jis.close();
-  }
-
-  public void setVerbose(boolean b) {
-    mVerbose = b;
-  }
-
-  // ========================================================================
-  // Private methods
-
-  private static final char SEP = '/';
-
-  /**
-   * Recursively jars up the given path under the given directory.
-   */
-  private void jarDir(File dirOrFile2jar, JarOutputStream jos, String path)
-      throws IOException {
-    if (mVerbose) {
-      System.out.println("checking " + dirOrFile2jar);
-    }
-    if (dirOrFile2jar.isDirectory()) {
-      String[] dirList = dirOrFile2jar.list();
-      String subPath = (path == null) ? ""
-          : (path + dirOrFile2jar.getName() + SEP);
-      if (path != null) {
-        JarEntry je = new JarEntry(subPath);
-        je.setTime(dirOrFile2jar.lastModified());
-        jos.putNextEntry(je);
-        jos.flush();
-        jos.closeEntry();
-      }
-      for (int i = 0; i < dirList.length; i++) {
-        File f = new File(dirOrFile2jar, dirList[i]);
-        jarDir(f, jos, subPath);
-      }
-    } else {
-      if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName)) {
-        if (mVerbose) {
-          System.out.println("skipping " + dirOrFile2jar.getPath());
-        }
-        return;
-      }
-
-      if (mVerbose) {
-        System.out.println("adding " + dirOrFile2jar.getPath());
-      }
-      FileInputStream fis = new FileInputStream(dirOrFile2jar);
-      try {
-        JarEntry entry = new JarEntry(path + dirOrFile2jar.getName());
-        entry.setTime(dirOrFile2jar.lastModified());
-        jos.putNextEntry(entry);
-        while ((mByteCount = fis.read(mBuffer)) != -1) {
-          jos.write(mBuffer, 0, mByteCount);
-          if (mVerbose) {
-            System.out.println("wrote " + mByteCount + " bytes");
-          }
-        }
-        jos.flush();
-        jos.closeEntry();
-      } catch (IOException ioe) {
-        throw ioe;
-      } finally {
-        fis.close();
-      }
-    }
-  }
-
-  // for debugging
-  public static void main(String[] args) throws IOException {
-    if (args.length < 2) {
-      System.err.println("Usage: JarHelper jarname.jar directory");
-      return;
-    }
-
-    JarHelper jarHelper = new JarHelper();
-    jarHelper.mVerbose = true;
-
-    File destJar = new File(args[0]);
-    File dirOrFile2Jar = new File(args[1]);
-
-    jarHelper.jarDir(dirOrFile2Jar, destJar);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c46d8a01/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java 
b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index 264008a..37f9c27 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -59,8 +59,7 @@ public class FlinkInterpreterTest {
   public void testWordCount() {
     flink.interpret("val text = env.fromElements(\"To be or not to be\")", 
context);
     flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") 
}.map { (_, 1) }.groupBy(0).sum(1)", context);
-    flink.interpret("counts.print()", context);
-    InterpreterResult result = flink.interpret("env.execute(\"WordCount 
Example\")", context);
+    InterpreterResult result = flink.interpret("counts.print()", context);
     assertEquals(Code.SUCCESS, result.code());
   }
 }

Reply via email to