Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master 215447693 -> 8cde5c9bd


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHeartbeat.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHeartbeat.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHeartbeat.java
new file mode 100644
index 0000000..8a85c84
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHeartbeat.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler;
+
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Routine that sends PING to all connected Zeppelin ws connections.
+ *
+ */
+public class ZeppelinHeartbeat implements Runnable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ZeppelinHubHeartbeat.class);
+  private ZeppelinClient client;
+  
+  public static ZeppelinHeartbeat newInstance(ZeppelinClient client) {
+    return new ZeppelinHeartbeat(client);
+  }
+  
+  private ZeppelinHeartbeat(ZeppelinClient client) {
+    this.client = client;
+  }
+
+  @Override
+  public void run() {
+    LOG.debug("Sending PING to all connected Zeppelin notes");
+    client.pingAllNotes();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHubHeartbeat.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHubHeartbeat.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHubHeartbeat.java
new file mode 100644
index 0000000..f982365
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHubHeartbeat.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler;
+
+import 
org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinhubClient;
+import 
org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils.ZeppelinhubUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Routine that send PING event to zeppelinhub.
+ *
+ */
+public class ZeppelinHubHeartbeat implements Runnable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ZeppelinHubHeartbeat.class);
+  private ZeppelinhubClient client;
+  
+  public static ZeppelinHubHeartbeat newInstance(ZeppelinhubClient client) {
+    return new ZeppelinHubHeartbeat(client);
+  }
+  
+  private ZeppelinHubHeartbeat(ZeppelinhubClient client) {
+    this.client = client;
+  }
+  
+  @Override
+  public void run() {
+    LOG.debug("Sending PING to zeppelinhub");
+    client.send(ZeppelinhubUtils.pingMessage(client.getToken()));
+  }  
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/session/ZeppelinhubSession.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/session/ZeppelinhubSession.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/session/ZeppelinhubSession.java
new file mode 100644
index 0000000..86cd4ad
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/session/ZeppelinhubSession.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.session;
+
+import org.apache.commons.lang.StringUtils;
+import org.eclipse.jetty.websocket.api.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Zeppelinhub session.
+ */
+public class ZeppelinhubSession {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ZeppelinhubSession.class);
+  private Session session;
+  private final String token;
+  
+  public static final ZeppelinhubSession EMPTY = new ZeppelinhubSession(null, 
StringUtils.EMPTY);
+  
+  public static ZeppelinhubSession createInstance(Session session, String 
token) {
+    return new ZeppelinhubSession(session, token);
+  }
+  
+  private ZeppelinhubSession(Session session, String token) {
+    this.session = session;
+    this.token = token;
+  }
+  
+  public boolean isSessionOpen() {
+    return ((session != null) && (session.isOpen()));
+  }
+  
+  public void close() {
+    if (isSessionOpen()) {
+      session.close();
+    }
+  }
+  
+  public void sendByFuture(String msg) {
+    if (StringUtils.isBlank(msg)) {
+      LOG.error("Cannot send event to Zeppelinhub, msg is empty");
+    }
+    if (isSessionOpen()) {
+      session.getRemote().sendStringByFuture(msg);
+    } else {
+      LOG.error("Cannot send event to Zeppelinhub, session is close");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/utils/ZeppelinhubUtils.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/utils/ZeppelinhubUtils.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/utils/ZeppelinhubUtils.java
new file mode 100644
index 0000000..c13abe5
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/utils/ZeppelinhubUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils;
+
+import java.util.HashMap;
+
+import org.apache.commons.lang.StringUtils;
+import 
org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinHubOp;
+import 
org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinhubMessage;
+import org.apache.zeppelin.notebook.socket.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class.
+ *
+ */
+public class ZeppelinhubUtils {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ZeppelinhubUtils.class);
+
+  public static String liveMessage(String token) {
+    if (StringUtils.isBlank(token)) {
+      LOG.error("Cannot create Live message: token is null or empty");
+      return ZeppelinhubMessage.EMPTY.serialize();
+    }
+    HashMap<String, Object> data = new HashMap<String, Object>();
+    data.put("token", token);
+    return ZeppelinhubMessage
+             .newMessage(ZeppelinHubOp.LIVE, data, new HashMap<String, 
String>())
+             .serialize();
+  }
+  
+  public static String deadMessage(String token) {
+    if (StringUtils.isBlank(token)) {
+      LOG.error("Cannot create Dead message: token is null or empty");
+      return ZeppelinhubMessage.EMPTY.serialize();
+    }
+    HashMap<String, Object> data = new HashMap<String, Object>();
+    data.put("token", token);
+    return ZeppelinhubMessage
+             .newMessage(ZeppelinHubOp.DEAD, data, new HashMap<String, 
String>())
+             .serialize();
+  }
+  
+  public static String pingMessage(String token) {
+    if (StringUtils.isBlank(token)) {
+      LOG.error("Cannot create Ping message: token is null or empty");
+      return ZeppelinhubMessage.EMPTY.serialize();
+    }
+    HashMap<String, Object> data = new HashMap<String, Object>();
+    data.put("token", token);
+    return ZeppelinhubMessage
+             .newMessage(ZeppelinHubOp.PING, data, new HashMap<String, 
String>())
+             .serialize();
+  }
+
+  public static ZeppelinHubOp toZeppelinHubOp(String text) {
+    ZeppelinHubOp hubOp = null;
+    try {
+      hubOp = ZeppelinHubOp.valueOf(text);
+    } catch (IllegalArgumentException e) {
+      // in case of non Hub op
+    }
+    return hubOp;
+  }
+
+  public static boolean isZeppelinHubOp(String text) {
+    return (toZeppelinHubOp(text) != null); 
+  }
+
+  public static Message.OP toZeppelinOp(String text) {
+    Message.OP zeppelinOp = null;
+    try {
+      zeppelinOp = Message.OP.valueOf(text);
+    } catch (IllegalArgumentException e) {
+      // in case of non Hub op
+    }
+    return zeppelinOp;
+  }
+
+  public static boolean isZeppelinOp(String text) {
+    return (toZeppelinOp(text) != null); 
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
new file mode 100644
index 0000000..a3fc048
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook.socket;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Zeppelin websocker massage template class.
+ */
+public class Message {
+  /**
+   * Representation of event type.
+   */
+  public static enum OP {
+    GET_HOME_NOTE, // [c-s] load note for home screen
+
+    GET_NOTE, // [c-s] client load note
+              // @param id note id
+
+    NOTE, // [s-c] note info
+          // @param note serlialized Note object
+
+    PARAGRAPH, // [s-c] paragraph info
+               // @param paragraph serialized paragraph object
+
+    PROGRESS, // [s-c] progress update
+              // @param id paragraph id
+              // @param progress percentage progress
+
+    NEW_NOTE, // [c-s] create new notebook
+    DEL_NOTE, // [c-s] delete notebook
+              // @param id note id
+    CLONE_NOTE, // [c-s] clone new notebook
+                // @param id id of note to clone
+                // @param name name fpor the cloned note
+    IMPORT_NOTE,  // [c-s] import notebook
+                  // @param object notebook
+    NOTE_UPDATE,
+
+    RUN_PARAGRAPH, // [c-s] run paragraph
+                   // @param id paragraph id
+                  // @param paragraph paragraph content.ie. script
+                  // @param config paragraph config
+                  // @param params paragraph params
+
+    COMMIT_PARAGRAPH, // [c-s] commit paragraph
+                      // @param id paragraph id
+                      // @param title paragraph title
+                      // @param paragraph paragraph content.ie. script
+                      // @param config paragraph config
+                      // @param params paragraph params
+
+    CANCEL_PARAGRAPH, // [c-s] cancel paragraph run
+                      // @param id paragraph id
+
+    MOVE_PARAGRAPH, // [c-s] move paragraph order
+                    // @param id paragraph id
+                    // @param index index the paragraph want to go
+
+    INSERT_PARAGRAPH, // [c-s] create new paragraph below current paragraph
+                      // @param target index
+
+    COMPLETION, // [c-s] ask completion candidates
+                // @param id
+                // @param buf current code
+                // @param cursor cursor position in code
+
+    COMPLETION_LIST, // [s-c] send back completion candidates list
+                     // @param id
+                     // @param completions list of string
+
+    LIST_NOTES, // [c-s] ask list of note
+    RELOAD_NOTES_FROM_REPO, // [c-s] reload notes from repo
+
+    NOTES_INFO, // [s-c] list of note infos
+                // @param notes serialized List<NoteInfo> object
+
+    PARAGRAPH_REMOVE,
+    PARAGRAPH_CLEAR_OUTPUT,
+    PARAGRAPH_APPEND_OUTPUT,  // [s-c] append output
+    PARAGRAPH_UPDATE_OUTPUT,  // [s-c] update (replace) output
+    PING,
+    AUTH_INFO,
+
+    ANGULAR_OBJECT_UPDATE,  // [s-c] add/update angular object
+    ANGULAR_OBJECT_REMOVE,  // [s-c] add angular object del
+    
+    ANGULAR_OBJECT_UPDATED,  // [c-s] angular object value updated,
+
+    ANGULAR_OBJECT_CLIENT_BIND,  // [c-s] angular object updated from 
AngularJS z object
+
+    ANGULAR_OBJECT_CLIENT_UNBIND,  // [c-s] angular object unbind from 
AngularJS z object
+
+    LIST_CONFIGURATIONS, // [c-s] ask all key/value pairs of configurations
+    CONFIGURATIONS_INFO, // [s-c] all key/value pairs of configurations
+                  // @param settings serialized Map<String, String> object
+
+    CHECKPOINT_NOTEBOOK     // [c-s] checkpoint notebook to storage repository
+                            // @param noteId
+                            // @param checkpointName
+
+  }
+
+  public OP op;
+  public Map<String, Object> data = new HashMap<String, Object>();
+  public String ticket = "anonymous";
+  public String principal = "anonymous";
+  public String roles = "";
+
+  public Message(OP op) {
+    this.op = op;
+  }
+
+  public Message put(String k, Object v) {
+    data.put(k, v);
+    return this;
+  }
+
+  public Object get(String k) {
+    return data.get(k);
+  }
+
+  public <T> T getType(String key) {
+    return (T) data.get(key);
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("Message{");
+    sb.append("data=").append(data);
+    sb.append(", op=").append(op);
+    sb.append('}');
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepoTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepoTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepoTest.java
new file mode 100644
index 0000000..1e83354
--- /dev/null
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepoTest.java
@@ -0,0 +1,150 @@
+package org.apache.zeppelin.notebook.repo.zeppelinhub;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.httpclient.HttpException;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.NoteInfo;
+import 
org.apache.zeppelin.notebook.repo.zeppelinhub.rest.ZeppelinhubRestApiHandler;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.io.Files;
+
+
+public class ZeppelinHubRepoTest {
+  final String TOKEN = "AAA-BBB-CCC-00";
+  final String testAddr = "http://zeppelinhub.ltd";;
+
+  private ZeppelinHubRepo repo;
+  private File pathOfNotebooks = new File(System.getProperty("user.dir") + 
"/src/test/resources/list_of_notes");
+  private File pathOfNotebook = new File(System.getProperty("user.dir") + 
"/src/test/resources/note");
+
+  @Before
+  public void setUp() throws Exception {
+    System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, 
testAddr);
+    System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_TOKEN, 
"AAA-BBB-CCC-00");
+
+    ZeppelinConfiguration conf = new ZeppelinConfiguration();
+    repo = new ZeppelinHubRepo(conf);
+    repo.setZeppelinhubRestApiHandler(getMockedZeppelinHandler());
+  }
+
+  private ZeppelinhubRestApiHandler getMockedZeppelinHandler() throws 
HttpException, IOException {
+    ZeppelinhubRestApiHandler mockedZeppelinhubHandler = 
mock(ZeppelinhubRestApiHandler.class);
+
+    byte[] response = Files.toByteArray(pathOfNotebooks);
+    when(mockedZeppelinhubHandler.asyncGet("")).thenReturn(new 
String(response));
+
+    response =  Files.toByteArray(pathOfNotebook);
+    when(mockedZeppelinhubHandler.asyncGet("AAAAA")).thenReturn(new 
String(response));
+
+    return mockedZeppelinhubHandler;
+  }
+
+  @Test
+  public void testGetZeppelinhubUrl() {
+    System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, 
testAddr);
+    
+    ZeppelinConfiguration config = new ZeppelinConfiguration();
+    ZeppelinHubRepo repository = new ZeppelinHubRepo(config);
+    
assertThat(repository.getZeppelinHubUrl(config)).isEqualTo("http://zeppelinhub.ltd";);
+
+    System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, 
"yolow");
+
+    config = new ZeppelinConfiguration();
+    repository = new ZeppelinHubRepo(config);
+    
assertThat(repository.getZeppelinHubUrl(config)).isEqualTo("https://www.zeppelinhub.com";);
+    
+    System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, 
"http://zeppelinhub.ltd:4242";);
+
+    config = new ZeppelinConfiguration();
+    repository = new ZeppelinHubRepo(config);
+    
assertThat(repository.getZeppelinHubUrl(config)).isEqualTo("http://zeppelinhub.ltd:4242";);
+    
+    System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, 
"http://zeppelinhub.ltd:0";);
+
+    config = new ZeppelinConfiguration();
+    repository = new ZeppelinHubRepo(config);
+    
assertThat(repository.getZeppelinHubUrl(config)).isEqualTo("http://zeppelinhub.ltd";);
+  }
+
+  @Test
+  public void testGetZeppelinHubWsEndpoint() {
+    System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, 
testAddr);
+
+    ZeppelinConfiguration config = new ZeppelinConfiguration();
+    ZeppelinHubRepo repository = new ZeppelinHubRepo(config);
+    
assertThat(repository.getZeppelinhubWebsocketUri(config)).isEqualTo("ws://zeppelinhub.ltd:80/async");
+
+    System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, 
"https://zeppelinhub.ltd";);
+
+    config = new ZeppelinConfiguration();
+    repository = new ZeppelinHubRepo(config);
+    
assertThat(repository.getZeppelinhubWebsocketUri(config)).isEqualTo("wss://zeppelinhub.ltd:443/async");
+
+    System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, 
"yolow");
+
+    config = new ZeppelinConfiguration();
+    repository = new ZeppelinHubRepo(config);
+    
assertThat(repository.getZeppelinhubWebsocketUri(config)).isEqualTo("wss://www.zeppelinhub.com:443/async");
+
+    System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, 
"http://zeppelinhub.ltd:4242";);
+
+    config = new ZeppelinConfiguration();
+    repository = new ZeppelinHubRepo(config);
+    
assertThat(repository.getZeppelinhubWebsocketUri(config)).isEqualTo("ws://zeppelinhub.ltd:4242/async");
+
+    System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, 
"https://www.zeppelinhub.com";);
+
+    config = new ZeppelinConfiguration();
+    repository = new ZeppelinHubRepo(config);
+    
assertThat(repository.getZeppelinhubWebsocketUri(config)).isEqualTo("wss://www.zeppelinhub.com:443/async");
+
+    System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, 
"http://www.zeppelinhub.com";);
+
+    config = new ZeppelinConfiguration();
+    repository = new ZeppelinHubRepo(config);
+    
assertThat(repository.getZeppelinhubWebsocketUri(config)).isEqualTo("ws://www.zeppelinhub.com:80/async");
+
+    System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, 
"https://www.zeppelinhub.com:4242";);
+
+    config = new ZeppelinConfiguration();
+    repository = new ZeppelinHubRepo(config);
+    
assertThat(repository.getZeppelinhubWebsocketUri(config)).isEqualTo("wss://www.zeppelinhub.com:4242/async");
+  }
+
+  @Test
+  public void testGetAllNotes() throws IOException {
+    List<NoteInfo> notebooks = repo.list();
+    assertThat(notebooks).isNotEmpty();
+    assertThat(notebooks.size()).isEqualTo(3);
+  }
+  
+  @Test
+  public void testGetNote() throws IOException {
+    Note notebook = repo.get("AAAAA");
+    assertThat(notebook).isNotNull();
+    assertThat(notebook.id()).isEqualTo("2A94M5J1Z");
+  }
+  
+  @Test
+  public void testRemoveNote() throws IOException {
+    // not suppose to throw
+    repo.remove("AAAAA");
+  }
+  
+  @Test
+  public void testRemoveNoteError() throws IOException {
+    // not suppose to throw
+    repo.remove("BBBBB");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClientTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClientTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClientTest.java
new file mode 100644
index 0000000..b2cc81c
--- /dev/null
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClientTest.java
@@ -0,0 +1,123 @@
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
+
+import static org.junit.Assert.*;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import 
org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.mock.MockEchoWebsocketServer;
+import org.apache.zeppelin.notebook.socket.Message;
+import org.apache.zeppelin.notebook.socket.Message.OP;
+import org.eclipse.jetty.websocket.api.Session;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+public class ZeppelinClientTest {
+  private Logger LOG = LoggerFactory.getLogger(ZeppelinClientTest.class);
+  private final int zeppelinPort = 8080;
+  private final String validWebsocketUrl = "ws://localhost:" + zeppelinPort + 
"/ws";
+  private ExecutorService executor;
+  private MockEchoWebsocketServer echoServer;
+
+  @Before
+  public void setUp() throws Exception {
+    startWebsocketServer();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    //tear down routine
+    echoServer.stop();
+    executor.shutdown();
+  }
+
+  private void startWebsocketServer() throws InterruptedException {
+    // mock zeppelin websocket server setup
+    executor = Executors.newFixedThreadPool(1);
+    echoServer = new MockEchoWebsocketServer(zeppelinPort);
+    executor.submit(echoServer);
+  }
+
+  @Test
+  public void zeppelinConnectionTest() {
+    try {
+      // Wait for websocket server to start
+      Thread.sleep(2000);
+    } catch (InterruptedException e) {
+      LOG.warn("Cannot wait for websocket server to start, returning");
+      return;
+    }
+    // Initialize and start Zeppelin client
+    ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, 
"dummy token", null);
+    client.start();
+    LOG.info("Zeppelin websocket client started");
+
+    // Connection to note AAAA
+    Session connectionA = client.getZeppelinConnection("AAAA");
+    assertNotNull(connectionA);
+    assertTrue(connectionA.isOpen());
+
+    assertEquals(client.countConnectedNotes(), 1);
+    assertEquals(connectionA, client.getZeppelinConnection("AAAA"));
+
+    // Connection to note BBBB
+    Session connectionB = client.getZeppelinConnection("BBBB");
+    assertNotNull(connectionB);
+    assertTrue(connectionB.isOpen());
+
+    assertEquals(client.countConnectedNotes(), 2);
+    assertEquals(connectionB, client.getZeppelinConnection("BBBB"));
+
+    // Remove connection to note AAAA
+    client.removeZeppelinConnection("AAAA");
+    assertEquals(client.countConnectedNotes(), 1);
+    assertNotEquals(connectionA, client.getZeppelinConnection("AAAA"));
+    assertEquals(client.countConnectedNotes(), 2);
+    client.stop();
+  }
+
+  @Test
+  public void zeppelinClientSingletonTest() {
+    ZeppelinClient client1 = ZeppelinClient.getInstance();
+    if (client1 == null) {
+      client1 = ZeppelinClient.initialize(validWebsocketUrl, "TOKEN", null);
+    }
+    assertNotNull(client1);
+    ZeppelinClient client2 = ZeppelinClient.getInstance();
+    assertNotNull(client2);
+    assertEquals(client1, client2);
+  }
+
+  @Test
+  public void zeppelinMessageSerializationTest() {
+    Message msg = new Message(OP.LIST_NOTES);
+    msg.data = Maps.newHashMap();
+    msg.data.put("key", "value");
+    ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, 
"TOKEN", null);
+    String serializedMsg = client.serialize(msg);
+    Message deserializedMsg = client.deserialize(serializedMsg);
+    assertEquals(msg.op, deserializedMsg.op);
+    assertEquals(msg.data.get("key"), deserializedMsg.data.get("key"));
+    
+    String invalidMsg = "random text";
+    deserializedMsg =client.deserialize(invalidMsg);
+    assertNull(deserializedMsg);
+  }
+
+  @Test
+  public void sendToZeppelinTest() {
+    ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, 
"TOKEN", null);
+    client.start();
+    Message msg = new Message(OP.LIST_NOTES);
+    msg.data = Maps.newHashMap();
+    msg.data.put("key", "value");
+    client.send(msg, "DDDD");
+    client.removeZeppelinConnection("DDDD");
+    client.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClientTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClientTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClientTest.java
new file mode 100644
index 0000000..384cfe4
--- /dev/null
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClientTest.java
@@ -0,0 +1,72 @@
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
+
+import static org.junit.Assert.*;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import 
org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinhubClient;
+import 
org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.mock.MockEchoWebsocketServer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZeppelinhubClientTest {
+  private Logger LOG = LoggerFactory.getLogger(ZeppelinClientTest.class);
+  private final int zeppelinPort = 8090;
+  private final String validWebsocketUrl = "ws://localhost:" + zeppelinPort + 
"/ws";
+  private ExecutorService executor;
+  private MockEchoWebsocketServer echoServer;
+  private final String runNotebookMsg =
+      "{\"op\":\"RUN_NOTEBOOK\"," +
+      
"\"data\":[{\"id\":\"20150112-172845_1301897143\",\"title\":null,\"config\":{},\"params\":{},\"data\":null},"
 +
+      
"{\"id\":\"20150112-172845_1301897143\",\"title\":null,\"config\":{},\"params\":{},\"data\":null}],"
 +
+      
"\"meta\":{\"owner\":\"author\",\"instance\":\"my-zepp\",\"noteId\":\"2AB7SY361\"}}";
+  private final String invalidRunNotebookMsg = "some random string";
+
+  @Before
+  public void setUp() throws Exception {
+    startWebsocketServer();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    //tear down routine
+    echoServer.stop();
+    executor.shutdown();
+  }
+
+  private void startWebsocketServer() throws InterruptedException {
+    // mock zeppelin websocket server setup
+    executor = Executors.newFixedThreadPool(1);
+    echoServer = new MockEchoWebsocketServer(zeppelinPort);
+    executor.submit(echoServer);
+  }
+
+  @Test
+  public void zeppelinhubClientSingletonTest() {
+    ZeppelinhubClient client1 = ZeppelinhubClient.getInstance();
+    if (client1 == null) {
+      client1 = ZeppelinhubClient.initialize(validWebsocketUrl, "TOKEN");
+    } 
+    assertNotNull(client1);
+    ZeppelinhubClient client2 = ZeppelinhubClient.getInstance();
+    assertNotNull(client2);
+    assertEquals(client1, client2);
+  }
+
+  @Test
+  public void runAllParagraphTest() throws Exception {
+    Client.initialize(validWebsocketUrl, validWebsocketUrl, "TOKEN", null);
+    Client.getInstance().start();
+    ZeppelinhubClient zeppelinhubClient = ZeppelinhubClient.getInstance();
+    boolean runStatus = zeppelinhubClient.runAllParagraph("2AB7SY361", 
runNotebookMsg);
+    assertTrue(runStatus);
+    runStatus = zeppelinhubClient.runAllParagraph("2AB7SY361", 
invalidRunNotebookMsg);
+    assertFalse(runStatus);
+    Client.getInstance().stop();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEchoWebsocketServer.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEchoWebsocketServer.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEchoWebsocketServer.java
new file mode 100644
index 0000000..e9959e9
--- /dev/null
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEchoWebsocketServer.java
@@ -0,0 +1,46 @@
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.mock;
+
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.slf4j.LoggerFactory;
+
+public class MockEchoWebsocketServer implements Runnable {
+  private static final org.slf4j.Logger LOG = 
LoggerFactory.getLogger(MockEchoWebsocketServer.class);
+  private Server server;
+
+  public MockEchoWebsocketServer(int port) {
+    server = new Server();
+    ServerConnector connector = new ServerConnector(server);
+    connector.setPort(port);
+    server.addConnector(connector);
+
+    ServletContextHandler context = new 
ServletContextHandler(ServletContextHandler.SESSIONS);
+    context.setContextPath("/");
+    server.setHandler(context);
+
+    //ServletHolder holderEvents = new ServletHolder("ws-events", 
MockEventServlet.class);
+    context.addServlet(MockEventServlet.class, "/ws/*");
+  }
+
+  public void start() throws Exception {
+    LOG.info("Starting mock echo websocket server");
+    server.start();
+    server.join();
+  }
+
+  public void stop() throws Exception {
+    LOG.info("Stopping mock echo websocket server");
+    server.stop();
+  }
+
+  @Override
+  public void run() {
+    try {
+      this.start();
+    } catch (Exception e) {
+      LOG.error("Couldn't start mock echo websocket server", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEventServlet.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEventServlet.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEventServlet.java
new file mode 100644
index 0000000..c84f2c3
--- /dev/null
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEventServlet.java
@@ -0,0 +1,14 @@
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.mock;
+
+import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+
+@SuppressWarnings("serial")
+public class MockEventServlet extends WebSocketServlet
+{
+    @Override
+    public void configure(WebSocketServletFactory factory)
+    {
+        factory.register(MockEventSocket.class);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEventSocket.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEventSocket.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEventSocket.java
new file mode 100644
index 0000000..0f39b01
--- /dev/null
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEventSocket.java
@@ -0,0 +1,38 @@
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.mock;
+
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockEventSocket extends WebSocketAdapter {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MockEventServlet.class);
+  private Session session;
+
+  @Override
+  public void onWebSocketConnect(Session session) {
+    super.onWebSocketConnect(session);
+    this.session = session;
+    LOG.info("Socket Connected: " + session);
+  }
+
+  @Override
+  public void onWebSocketText(String message) {
+    super.onWebSocketText(message);
+    session.getRemote().sendStringByFuture(message);
+    LOG.info("Received TEXT message: {}", message);
+    
+  }
+
+  @Override
+  public void onWebSocketClose(int statusCode, String reason) {
+    super.onWebSocketClose(statusCode, reason);
+    LOG.info("Socket Closed: [{}] {}", statusCode, reason);
+  }
+
+  @Override
+  public void onWebSocketError(Throwable cause) {
+    super.onWebSocketError(cause);
+    LOG.error("Websocket error: {}", cause);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinhubMessageTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinhubMessageTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinhubMessageTest.java
new file mode 100644
index 0000000..4cce203
--- /dev/null
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinhubMessageTest.java
@@ -0,0 +1,43 @@
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map;
+
+import org.apache.zeppelin.notebook.socket.Message.OP;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class ZeppelinhubMessageTest {
+  
+  private String msg = "{\"op\":\"LIST_NOTES\",\"data\":\"my 
data\",\"meta\":{\"key1\":\"val1\"}}";
+
+  @Test
+  public void testThatCanSerializeZeppelinHubMessage() {
+    Map<String,String> meta = Maps.newHashMap();
+    meta.put("key1", "val1");
+    String zeppelinHubMsg = ZeppelinhubMessage.newMessage(OP.LIST_NOTES, "my 
data", meta).serialize();
+
+    assertEquals(msg, zeppelinHubMsg);
+  }
+  
+  @Test
+  public void testThastCanDeserialiseZeppelinhubMessage() {
+    Map<String,String> meta = Maps.newHashMap();
+    meta.put("key1", "val1");
+    ZeppelinhubMessage expected = 
ZeppelinhubMessage.newMessage(OP.LIST_NOTES.toString(), "my data", meta);
+    ZeppelinhubMessage zeppelinHubMsg = ZeppelinhubMessage.deserialize(msg);
+
+    assertEquals(expected.op, zeppelinHubMsg.op);
+    assertEquals(expected.data, zeppelinHubMsg.data);
+    assertEquals(expected.meta, zeppelinHubMsg.meta);
+  }
+  
+  @Test
+  public void testThatInvalidStringReturnEmptyZeppelinhubMessage() {
+    assertEquals(ZeppelinhubMessage.EMPTY, ZeppelinhubMessage.deserialize(""));
+    assertEquals(ZeppelinhubMessage.EMPTY, 
ZeppelinhubMessage.deserialize("dwfewewrewr"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/test/resources/list_of_notes
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/resources/list_of_notes 
b/zeppelin-zengine/src/test/resources/list_of_notes
new file mode 100644
index 0000000..a33ee90
--- /dev/null
+++ b/zeppelin-zengine/src/test/resources/list_of_notes
@@ -0,0 +1,41 @@
+[
+  {
+    "id": "2ABSFSR35",
+    "name": "ES RDD",
+    "config": {
+      "looknfeel": "default",
+      "codeHighlightStyle": "GitHub",
+      "codeHighlightStyleOrig": "GitHub"
+    },
+    "info": {},
+    "paragraphs": [],
+    "angularObjects": {},
+    "lastUpdate": 1445663884000
+  },
+  {
+    "id": "2A94M5J1Z",
+    "name": "Zeppelin Tutorial",
+    "config": {
+      "looknfeel": "default"},
+      "info": {},
+      "paragraphs": [],
+      "angularObjects": {
+      "2AMFHG1GQ": [],
+      "2APT3NC5T": [],
+      "2ANPJTJRQ": [],
+      "2AMJZ9R4C": [],
+      "2ANJFBYJS": [],
+      "2APKU6T1J": []},
+    "lastUpdate": 1446688883000
+  },
+  {
+    "id": "2AVSEYW1R",
+    "name": "NFLabs - Tracking beta",
+    "config": {
+    "looknfeel": "default"},
+    "info": {},
+    "paragraphs": [],
+    "angularObjects": {},
+    "lastUpdate": 1445663888000
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/test/resources/note
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/resources/note 
b/zeppelin-zengine/src/test/resources/note
new file mode 100644
index 0000000..256f6ef
--- /dev/null
+++ b/zeppelin-zengine/src/test/resources/note
@@ -0,0 +1,16 @@
+  {
+    "id": "2A94M5J1Z",
+    "name": "Zeppelin Tutorial",
+    "config": {
+      "looknfeel": "default"},
+      "info": {},
+      "paragraphs": [],
+      "angularObjects": {
+      "2AMFHG1GQ": [],
+      "2APT3NC5T": [],
+      "2ANPJTJRQ": [],
+      "2AMJZ9R4C": [],
+      "2ANJFBYJS": [],
+      "2APKU6T1J": []},
+    "lastUpdate": 1446688883000
+  }


Reply via email to